diff --git a/.gitea/workflows/kis_data_collection.yml b/.gitea/workflows/kis_data_collection.yml index 7b398f3..5311129 100644 --- a/.gitea/workflows/kis_data_collection.yml +++ b/.gitea/workflows/kis_data_collection.yml @@ -215,6 +215,40 @@ jobs: conn.close() PY + - name: Backup SQLite Database (WBS-9.7) + if: always() + run: | + BACKUP_BASE="/volume1/gitea/backups/kis_data_collection" + mkdir -p "$BACKUP_BASE" + + TIMESTAMP=$(date +%Y%m%d_%H%M%S) + SOURCE_DB="outputs/kis_data_collection/kis_data_collection.db" + BACKUP_DIR="$BACKUP_BASE/$TIMESTAMP" + BACKUP_DB="$BACKUP_DIR/kis_data_collection.db" + + if [ -f "$SOURCE_DB" ]; then + mkdir -p "$BACKUP_DIR" + cp "$SOURCE_DB" "$BACKUP_DB" + echo "Backup created: $BACKUP_DB" + + # 메타데이터 저장 (backup manifest) + cat > "$BACKUP_DIR/manifest.json" </dev/null || true + else + echo "::warning::Source DB not found: $SOURCE_DB" + fi + - name: Notify Run Result if: always() run: | diff --git a/src/quant_engine/kis_data_collection.db b/src/quant_engine/kis_data_collection.db index f68a557..f2c5fce 100644 Binary files a/src/quant_engine/kis_data_collection.db and b/src/quant_engine/kis_data_collection.db differ diff --git a/src/quant_engine/snapshot_admin.db b/src/quant_engine/snapshot_admin.db index b86eb30..25ecc8b 100644 Binary files a/src/quant_engine/snapshot_admin.db and b/src/quant_engine/snapshot_admin.db differ diff --git a/tools/load_from_xlsx_correct.py b/tools/load_from_xlsx_correct.py new file mode 100644 index 0000000..3eae912 --- /dev/null +++ b/tools/load_from_xlsx_correct.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python3 +""" +GatherTradingData.xlsx 올바르게 로드 (metadata 기반 header 파라미터) + +JSON metadata의 header_row_1based를 사용해서 각 시트마다 올바른 header를 지정 +""" + +import json +import sqlite3 +from pathlib import Path +from datetime import datetime +import pandas as pd + +class CorrectXLSXLoader: + """메타데이터 기반 정확한 XLSX 로더""" + + def __init__(self): + self.json_file = Path('GatherTradingData.json') + self.xlsx_file = Path('GatherTradingData.xlsx') + self.kis_db = Path('src/quant_engine/kis_data_collection.db') + self.snapshot_db = Path('src/quant_engine/snapshot_admin.db') + self.results = { + "timestamp": datetime.now().isoformat(), + "sheets_loaded": {}, + "errors": [] + } + + def load_metadata(self) -> dict: + """JSON 메타데이터 로드""" + with open(self.json_file, encoding='utf-8') as f: + data = json.load(f) + return data.get('metadata', {}) + + def load_excel_sheets(self, metadata: dict) -> dict: + """Excel에서 올바른 header를 사용해서 모든 시트 로드""" + print("[로드 중] Excel 파일 읽기...") + + sheet_headers = metadata.get('sheet_headers', {}) + excel_file = pd.ExcelFile(self.xlsx_file) + sheet_names = excel_file.sheet_names + + print(f"발견된 시트: {len(sheet_names)}개") + + sheets_data = {} + for sheet_name in sheet_names: + # metadata에서 header_row_1based 읽기 + header_info = sheet_headers.get(sheet_name, {}) + header_row_1based = header_info.get('header_row_1based', 1) + header_param = header_row_1based - 1 # pandas는 0-indexed + + try: + df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name, header=header_param) + + # NaN을 None으로 변환 + df = df.where(pd.notna(df), None) + + sheets_data[sheet_name] = df + print(f" [OK] {sheet_name}: {len(df)} rows, {len(df.columns)} cols (header={header_param})") + + except Exception as e: + print(f" [FAIL] {sheet_name}: {str(e)[:50]}") + self.results["errors"].append(sheet_name) + + return sheets_data + + def load_to_database(self, sheets_data: dict) -> None: + """데이터를 DB에 로드""" + print("\n[DB 로드 중...]") + + for sheet_name, df in sheets_data.items(): + if df.empty: + print(f" [SKIP] {sheet_name} (empty)") + continue + + # 타겟 DB 결정 + if sheet_name == 'data_feed': + db_path = self.kis_db + else: + db_path = self.snapshot_db + + try: + conn = sqlite3.connect(db_path) + df.to_sql(sheet_name, conn, if_exists='replace', index=False) + conn.close() + + print(f" [OK] {sheet_name}: {len(df)} rows → {db_path.name}") + self.results["sheets_loaded"][sheet_name] = { + "rows": len(df), + "cols": len(df.columns), + "db": str(db_path) + } + + except Exception as e: + print(f" [FAIL] {sheet_name}: {str(e)[:80]}") + self.results["errors"].append(sheet_name) + + def verify(self) -> None: + """로드 검증""" + print("\n[검증 중...]") + + for db_name, db_path in [("kis_data_collection", self.kis_db), ("snapshot_admin", self.snapshot_db)]: + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence'") + tables = [row[0] for row in cursor.fetchall()] + + total_rows = 0 + for table in tables: + cursor.execute(f"SELECT COUNT(*) FROM {table}") + total_rows += cursor.fetchone()[0] + + print(f" {db_name}.db: {len(tables)} 테이블, {total_rows:,} rows") + conn.close() + + def run(self) -> dict: + """전체 실행""" + print("="*80) + print("GatherTradingData.xlsx 정확하게 로드 (메타데이터 기반)") + print("="*80) + print() + + # 메타데이터 로드 + metadata = self.load_metadata() + + # Excel 로드 + sheets_data = self.load_excel_sheets(metadata) + + if not sheets_data: + print("[ERROR] 로드된 시트가 없습니다") + return self.results + + # DB 로드 + self.load_to_database(sheets_data) + + # 검증 + self.verify() + + self.results["summary"] = { + "total_sheets": len(sheets_data), + "loaded_sheets": len(self.results["sheets_loaded"]), + "failed_sheets": len(self.results["errors"]), + "coverage_pct": (len(self.results["sheets_loaded"]) / len(sheets_data) * 100) if sheets_data else 0 + } + + print("\n[결과 요약]") + print(f" 로드됨: {self.results['summary']['loaded_sheets']}/{self.results['summary']['total_sheets']}") + print(f" 커버리지: {self.results['summary']['coverage_pct']:.1f}%") + + return self.results + +if __name__ == "__main__": + loader = CorrectXLSXLoader() + result = loader.run() + + print("\n[완료] 정확한 XLSX 로드 완료") diff --git a/tools/wbs95_sector_flow_reliability.py b/tools/wbs95_sector_flow_reliability.py new file mode 100644 index 0000000..ebdba7d --- /dev/null +++ b/tools/wbs95_sector_flow_reliability.py @@ -0,0 +1,305 @@ +#!/usr/bin/env python3 +""" +WBS-9.5: Sector Flow Reliability Measurement + +섹터 흐름 신뢰도 측정 +- 데이터 커버리지: 섹터별 데이터 가용도 +- 신선도: 최신 데이터의 타이밍 +- 일관성: 데이터 품질 및 이상치 감지 +""" + +import sqlite3 +from pathlib import Path +from datetime import datetime, timedelta +import json + +class SectorFlowReliability: + """섹터 흐름 신뢰도 측정""" + + def __init__(self): + self.snapshot_db = Path('src/quant_engine/snapshot_admin.db') + self.results = { + "timestamp": datetime.now().isoformat(), + "measurements": {} + } + + def measure_data_coverage(self) -> dict: + """데이터 커버리지 측정""" + print("\n[1. 데이터 커버리지]") + + conn = sqlite3.connect(self.snapshot_db) + cursor = conn.cursor() + + # 테이블 확인 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sector_flow_history'") + if not cursor.fetchone(): + print(" [!] sector_flow_history 테이블이 없음") + return {} + + # 총 행 수 + cursor.execute("SELECT COUNT(*) FROM sector_flow_history") + total_rows = cursor.fetchone()[0] + + # 섹터별 행 수 + cursor.execute(""" + SELECT Sector, COUNT(*) as count + FROM sector_flow_history + GROUP BY Sector + ORDER BY count DESC + """) + sector_counts = cursor.fetchall() + + coverage = { + "total_rows": total_rows, + "sectors": len(sector_counts), + "sector_distribution": {} + } + + print(f" 총 행: {total_rows}") + print(f" 섹터 수: {len(sector_counts)}") + print(f" 섹터별 분포:") + + for sector, count in sector_counts[:10]: + pct = (count / total_rows * 100) if total_rows > 0 else 0 + coverage["sector_distribution"][sector] = { + "count": count, + "percentage": round(pct, 1) + } + print(f" {sector}: {count} ({pct:.1f}%)") + + conn.close() + return coverage + + def measure_data_freshness(self) -> dict: + """데이터 신선도 측정""" + print("\n[2. 데이터 신선도]") + + conn = sqlite3.connect(self.snapshot_db) + cursor = conn.cursor() + + # 최신 날짜 확인 + cursor.execute(""" + SELECT MIN(Snapshot_Date) as earliest, MAX(Snapshot_Date) as latest + FROM sector_flow_history + """) + earliest, latest = cursor.fetchone() + + freshness = { + "earliest_date": earliest, + "latest_date": latest, + "age_days": 0 + } + + if latest: + try: + latest_dt = datetime.fromisoformat(latest) + age = (datetime.now() - latest_dt).days + freshness["age_days"] = age + print(f" 최신 데이터: {latest} ({age}일 전)") + except: + print(f" 최신 데이터: {latest}") + + if earliest: + print(f" 가장 오래된 데이터: {earliest}") + + # 시간대별 분포 + cursor.execute(""" + SELECT + DATE(Snapshot_Date) as date, + COUNT(*) as count + FROM sector_flow_history + GROUP BY DATE(Snapshot_Date) + ORDER BY date DESC + LIMIT 10 + """) + + date_dist = cursor.fetchall() + print(f" 최근 10일 분포:") + for date, count in date_dist: + print(f" {date}: {count}") + + conn.close() + return freshness + + def measure_data_consistency(self) -> dict: + """데이터 일관성 측정""" + print("\n[3. 데이터 일관성]") + + conn = sqlite3.connect(self.snapshot_db) + cursor = conn.cursor() + + consistency = { + "null_values": 0, + "outliers": 0, + "warnings": [] + } + + # NULL 값 확인 + cursor.execute(""" + SELECT + COUNT(*) as null_count, + COUNT(DISTINCT Sector) as sectors_with_nulls + FROM sector_flow_history + WHERE + Sector IS NULL + OR Snapshot_Date IS NULL + OR Sector_Score IS NULL + """) + + null_count, sectors_null = cursor.fetchone() + consistency["null_values"] = null_count + if null_count > 0: + consistency["warnings"].append(f"NULL 값 발견: {null_count}개") + print(f" [!] NULL 값: {null_count}") + + # 이상치 감지 (극단값) + cursor.execute(""" + SELECT + Sector, + MIN(Sector_Score) as min_val, + MAX(Sector_Score) as max_val, + AVG(Sector_Score) as avg_val + FROM sector_flow_history + WHERE Sector_Score IS NOT NULL + GROUP BY Sector + """) + + anomalies = 0 + for sector, min_val, max_val, avg_val in cursor.fetchall(): + if min_val is None or max_val is None: + continue + + # 극단값 감지 (평균의 5배 이상) + if avg_val and max_val > avg_val * 5: + anomalies += 1 + consistency["warnings"].append(f"{sector}: 극단값 감지 ({max_val})") + + consistency["outliers"] = anomalies + if anomalies > 0: + print(f" [!] 이상치: {anomalies}개 섹터") + + # 데이터 완정성 (중요 컬럼) + cursor.execute(""" + SELECT + (COUNT(*) - COUNT(Sector)) as missing_sectors, + (COUNT(*) - COUNT(Snapshot_Date)) as missing_dates, + (COUNT(*) - COUNT(Sector_Score)) as missing_values + FROM sector_flow_history + """) + + missing_sectors, missing_dates, missing_values = cursor.fetchone() + print(f" 누락 데이터: sector={missing_sectors}, date={missing_dates}, value={missing_values}") + + conn.close() + return consistency + + def calculate_reliability_score(self, coverage: dict, freshness: dict, consistency: dict) -> float: + """종합 신뢰도 점수 계산""" + print("\n[4. 종합 신뢰도]") + + scores = { + "coverage_score": 0, + "freshness_score": 0, + "consistency_score": 0 + } + + # 커버리지 점수 (0-100) + if coverage.get("total_rows", 0) > 0: + sector_count = coverage.get("sectors", 0) + # 10개 이상 섹터: 100점 + # 1개 미만: 0점 + scores["coverage_score"] = min(100, sector_count * 10) + print(f" 커버리지: {scores['coverage_score']:.1f}/100 ({coverage.get('sectors', 0)} 섹터)") + + # 신선도 점수 (0-100) + age_days = freshness.get("age_days", 9999) + if age_days <= 1: + scores["freshness_score"] = 100 # 1일 이내 + elif age_days <= 7: + scores["freshness_score"] = 80 # 1주일 이내 + elif age_days <= 30: + scores["freshness_score"] = 50 # 1개월 이내 + else: + scores["freshness_score"] = 20 # 오래됨 + print(f" 신선도: {scores['freshness_score']:.1f}/100 ({age_days}일 전)") + + # 일관성 점수 (0-100) + null_violations = consistency.get("null_values", 0) + outlier_count = consistency.get("outliers", 0) + warnings = len(consistency.get("warnings", [])) + + consistency_score = 100 + if null_violations > 0: + consistency_score -= min(20, null_violations / 10) + if outlier_count > 0: + consistency_score -= min(30, outlier_count * 3) + consistency_score = max(0, consistency_score) + scores["consistency_score"] = consistency_score + print(f" 일관성: {scores['consistency_score']:.1f}/100 ({warnings} 경고)") + + # 종합 점수 (가중 평균) + # 커버리지 30%, 신선도 40%, 일관성 30% + overall = ( + scores["coverage_score"] * 0.3 + + scores["freshness_score"] * 0.4 + + scores["consistency_score"] * 0.3 + ) + + print(f"\n 종합 신뢰도: {overall:.1f}/100") + + return overall + + def run(self) -> dict: + """전체 실행""" + print("="*80) + print("WBS-9.5: Sector Flow Reliability Measurement") + print("="*80) + + # 측정 + coverage = self.measure_data_coverage() + freshness = self.measure_data_freshness() + consistency = self.measure_data_consistency() + + # 신뢰도 점수 + reliability_score = self.calculate_reliability_score(coverage, freshness, consistency) + + # 결과 저장 + self.results["measurements"] = { + "coverage": coverage, + "freshness": freshness, + "consistency": consistency, + "reliability_score": reliability_score + } + + # 신뢰도 판정 + if reliability_score >= 80: + status = "HIGH (신뢰 가능)" + elif reliability_score >= 60: + status = "MEDIUM (주의 필요)" + else: + status = "LOW (개선 필요)" + + print(f"\n[판정]") + print(f" 신뢰도 상태: {status}") + print(f" 권장사항: {'데이터 사용 가능' if reliability_score >= 60 else '데이터 보완 필요'}") + + self.results["summary"] = { + "status": status, + "reliability_score": reliability_score, + "measurement_date": datetime.now().isoformat() + } + + return self.results + +if __name__ == "__main__": + measurer = SectorFlowReliability() + result = measurer.run() + + # 결과 저장 + output_file = Path("Temp/wbs95_sector_flow_reliability.json") + output_file.parent.mkdir(parents=True, exist_ok=True) + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(result, f, indent=2, ensure_ascii=False) + + print(f"\n[저장] {output_file}") + print("[완료] WBS-9.5 Sector Flow Reliability Measurement 완료")