#!/usr/bin/env python3 """ WBS-9.5: Sector Flow Reliability Measurement 섹터 흐름 신뢰도 측정 - 데이터 커버리지: 섹터별 데이터 가용도 - 신선도: 최신 데이터의 타이밍 - 일관성: 데이터 품질 및 이상치 감지 """ import sqlite3 from pathlib import Path from datetime import datetime, timedelta import json class SectorFlowReliability: """섹터 흐름 신뢰도 측정""" def __init__(self): self.snapshot_db = Path('src/quant_engine/snapshot_admin.db') self.results = { "timestamp": datetime.now().isoformat(), "measurements": {} } def measure_data_coverage(self) -> dict: """데이터 커버리지 측정""" print("\n[1. 데이터 커버리지]") conn = sqlite3.connect(self.snapshot_db) cursor = conn.cursor() # 테이블 확인 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sector_flow_history'") if not cursor.fetchone(): print(" [!] sector_flow_history 테이블이 없음") return {} # 총 행 수 cursor.execute("SELECT COUNT(*) FROM sector_flow_history") total_rows = cursor.fetchone()[0] # 섹터별 행 수 cursor.execute(""" SELECT Sector, COUNT(*) as count FROM sector_flow_history GROUP BY Sector ORDER BY count DESC """) sector_counts = cursor.fetchall() coverage = { "total_rows": total_rows, "sectors": len(sector_counts), "sector_distribution": {} } print(f" 총 행: {total_rows}") print(f" 섹터 수: {len(sector_counts)}") print(f" 섹터별 분포:") for sector, count in sector_counts[:10]: pct = (count / total_rows * 100) if total_rows > 0 else 0 coverage["sector_distribution"][sector] = { "count": count, "percentage": round(pct, 1) } print(f" {sector}: {count} ({pct:.1f}%)") conn.close() return coverage def measure_data_freshness(self) -> dict: """데이터 신선도 측정""" print("\n[2. 데이터 신선도]") conn = sqlite3.connect(self.snapshot_db) cursor = conn.cursor() # 최신 날짜 확인 cursor.execute(""" SELECT MIN(Snapshot_Date) as earliest, MAX(Snapshot_Date) as latest FROM sector_flow_history """) earliest, latest = cursor.fetchone() freshness = { "earliest_date": earliest, "latest_date": latest, "age_days": 0 } if latest: try: latest_dt = datetime.fromisoformat(latest) age = (datetime.now() - latest_dt).days freshness["age_days"] = age print(f" 최신 데이터: {latest} ({age}일 전)") except: print(f" 최신 데이터: {latest}") if earliest: print(f" 가장 오래된 데이터: {earliest}") # 시간대별 분포 cursor.execute(""" SELECT DATE(Snapshot_Date) as date, COUNT(*) as count FROM sector_flow_history GROUP BY DATE(Snapshot_Date) ORDER BY date DESC LIMIT 10 """) date_dist = cursor.fetchall() print(f" 최근 10일 분포:") for date, count in date_dist: print(f" {date}: {count}") conn.close() return freshness def measure_data_consistency(self) -> dict: """데이터 일관성 측정""" print("\n[3. 데이터 일관성]") conn = sqlite3.connect(self.snapshot_db) cursor = conn.cursor() consistency = { "null_values": 0, "outliers": 0, "warnings": [] } # NULL 값 확인 cursor.execute(""" SELECT COUNT(*) as null_count, COUNT(DISTINCT Sector) as sectors_with_nulls FROM sector_flow_history WHERE Sector IS NULL OR Snapshot_Date IS NULL OR Sector_Score IS NULL """) null_count, sectors_null = cursor.fetchone() consistency["null_values"] = null_count if null_count > 0: consistency["warnings"].append(f"NULL 값 발견: {null_count}개") print(f" [!] NULL 값: {null_count}") # 이상치 감지 (극단값) cursor.execute(""" SELECT Sector, MIN(Sector_Score) as min_val, MAX(Sector_Score) as max_val, AVG(Sector_Score) as avg_val FROM sector_flow_history WHERE Sector_Score IS NOT NULL GROUP BY Sector """) anomalies = 0 for sector, min_val, max_val, avg_val in cursor.fetchall(): if min_val is None or max_val is None: continue # 극단값 감지 (평균의 5배 이상) if avg_val and max_val > avg_val * 5: anomalies += 1 consistency["warnings"].append(f"{sector}: 극단값 감지 ({max_val})") consistency["outliers"] = anomalies if anomalies > 0: print(f" [!] 이상치: {anomalies}개 섹터") # 데이터 완정성 (중요 컬럼) cursor.execute(""" SELECT (COUNT(*) - COUNT(Sector)) as missing_sectors, (COUNT(*) - COUNT(Snapshot_Date)) as missing_dates, (COUNT(*) - COUNT(Sector_Score)) as missing_values FROM sector_flow_history """) missing_sectors, missing_dates, missing_values = cursor.fetchone() print(f" 누락 데이터: sector={missing_sectors}, date={missing_dates}, value={missing_values}") conn.close() return consistency def calculate_reliability_score(self, coverage: dict, freshness: dict, consistency: dict) -> float: """종합 신뢰도 점수 계산""" print("\n[4. 종합 신뢰도]") scores = { "coverage_score": 0, "freshness_score": 0, "consistency_score": 0 } # 커버리지 점수 (0-100) if coverage.get("total_rows", 0) > 0: sector_count = coverage.get("sectors", 0) # 10개 이상 섹터: 100점 # 1개 미만: 0점 scores["coverage_score"] = min(100, sector_count * 10) print(f" 커버리지: {scores['coverage_score']:.1f}/100 ({coverage.get('sectors', 0)} 섹터)") # 신선도 점수 (0-100) age_days = freshness.get("age_days", 9999) if age_days <= 1: scores["freshness_score"] = 100 # 1일 이내 elif age_days <= 7: scores["freshness_score"] = 80 # 1주일 이내 elif age_days <= 30: scores["freshness_score"] = 50 # 1개월 이내 else: scores["freshness_score"] = 20 # 오래됨 print(f" 신선도: {scores['freshness_score']:.1f}/100 ({age_days}일 전)") # 일관성 점수 (0-100) null_violations = consistency.get("null_values", 0) outlier_count = consistency.get("outliers", 0) warnings = len(consistency.get("warnings", [])) consistency_score = 100 if null_violations > 0: consistency_score -= min(20, null_violations / 10) if outlier_count > 0: consistency_score -= min(30, outlier_count * 3) consistency_score = max(0, consistency_score) scores["consistency_score"] = consistency_score print(f" 일관성: {scores['consistency_score']:.1f}/100 ({warnings} 경고)") # 종합 점수 (가중 평균) # 커버리지 30%, 신선도 40%, 일관성 30% overall = ( scores["coverage_score"] * 0.3 + scores["freshness_score"] * 0.4 + scores["consistency_score"] * 0.3 ) print(f"\n 종합 신뢰도: {overall:.1f}/100") return overall def run(self) -> dict: """전체 실행""" print("="*80) print("WBS-9.5: Sector Flow Reliability Measurement") print("="*80) # 측정 coverage = self.measure_data_coverage() freshness = self.measure_data_freshness() consistency = self.measure_data_consistency() # 신뢰도 점수 reliability_score = self.calculate_reliability_score(coverage, freshness, consistency) # 결과 저장 self.results["measurements"] = { "coverage": coverage, "freshness": freshness, "consistency": consistency, "reliability_score": reliability_score } # 신뢰도 판정 if reliability_score >= 80: status = "HIGH (신뢰 가능)" elif reliability_score >= 60: status = "MEDIUM (주의 필요)" else: status = "LOW (개선 필요)" print(f"\n[판정]") print(f" 신뢰도 상태: {status}") print(f" 권장사항: {'데이터 사용 가능' if reliability_score >= 60 else '데이터 보완 필요'}") self.results["summary"] = { "status": status, "reliability_score": reliability_score, "measurement_date": datetime.now().isoformat() } return self.results if __name__ == "__main__": measurer = SectorFlowReliability() result = measurer.run() # 결과 저장 output_file = Path("Temp/wbs95_sector_flow_reliability.json") output_file.parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(result, f, indent=2, ensure_ascii=False) print(f"\n[저장] {output_file}") print("[완료] WBS-9.5 Sector Flow Reliability Measurement 완료")