#!/usr/bin/env python3 """ GatherTradingData.xlsx에서 직접 추출해서 DB에 로드 """ import sqlite3 from pathlib import Path from datetime import datetime import pandas as pd class XLSXDataLoader: """XLSX 직접 로더""" def __init__(self): self.xlsx_file = Path('GatherTradingData.xlsx') self.kis_db = Path('src/quant_engine/kis_data_collection.db') self.snapshot_db = Path('src/quant_engine/snapshot_admin.db') self.results = { "timestamp": datetime.now().isoformat(), "sheets_loaded": {}, "errors": [] } def load_excel_sheets(self) -> dict: """Excel에서 모든 시트 로드""" print("[로드 중] Excel 파일 읽기...") try: # 모든 시트 이름 먼저 얻기 excel_file = pd.ExcelFile(self.xlsx_file) sheet_names = excel_file.sheet_names print(f"발견된 시트: {len(sheet_names)}개") for i, sheet in enumerate(sheet_names, 1): print(f" {i}. {sheet}") # 각 시트 로드 sheets_data = {} for sheet_name in sheet_names: try: df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name) sheets_data[sheet_name] = df print(f" [OK] {sheet_name}: {len(df)} rows, {len(df.columns)} cols") except Exception as e: print(f" [FAIL] {sheet_name}: {str(e)[:50]}") self.results["errors"].append(sheet_name) return sheets_data except Exception as e: print(f"[ERROR] Excel 로드 실패: {e}") return {} def load_to_database(self, sheets_data: dict) -> None: """데이터를 DB에 로드""" print("\n[DB 로드 중...]") for sheet_name, df in sheets_data.items(): if df.empty: print(f" [SKIP] {sheet_name} (empty)") continue # 타겟 DB 결정 if sheet_name == 'data_feed': db_path = self.kis_db else: db_path = self.snapshot_db try: # NaN을 None으로 변환 df = df.where(pd.notna(df), None) # DB에 로드 (기존 테이블 교체) conn = sqlite3.connect(db_path) df.to_sql(sheet_name, conn, if_exists='replace', index=False) conn.close() print(f" [OK] {sheet_name}: {len(df)} rows loaded to {db_path.name}") self.results["sheets_loaded"][sheet_name] = { "rows": len(df), "cols": len(df.columns), "db": str(db_path) } except Exception as e: print(f" [FAIL] {sheet_name}: {str(e)[:80]}") self.results["errors"].append(sheet_name) def verify_load(self) -> None: """로드 검증""" print("\n[검증 중...]") for db_name, db_path in [("kis_data_collection", self.kis_db), ("snapshot_admin", self.snapshot_db)]: if not db_path.exists(): continue conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence' ORDER BY name") tables = [row[0] for row in cursor.fetchall()] total_rows = 0 for table in tables: cursor.execute(f"SELECT COUNT(*) FROM {table}") count = cursor.fetchone()[0] total_rows += count print(f" {db_name}.db: {len(tables)} 테이블, {total_rows:,} rows") conn.close() def run(self) -> dict: """전체 실행""" print("="*80) print("GatherTradingData.xlsx 직접 로드") print("="*80) print() # Excel 로드 sheets_data = self.load_excel_sheets() if not sheets_data: print("[ERROR] 로드된 시트가 없습니다") return self.results # DB 로드 self.load_to_database(sheets_data) # 검증 self.verify_load() self.results["summary"] = { "total_sheets": len(sheets_data), "loaded_sheets": len(self.results["sheets_loaded"]), "failed_sheets": len(self.results["errors"]), "coverage_pct": (len(self.results["sheets_loaded"]) / len(sheets_data) * 100) if sheets_data else 0 } print("\n[결과 요약]") print(f" 로드됨: {self.results['summary']['loaded_sheets']}/{self.results['summary']['total_sheets']}") print(f" 커버리지: {self.results['summary']['coverage_pct']:.1f}%") if self.results["errors"]: print(f" 실패: {', '.join(self.results['errors'][:5])}") return self.results if __name__ == "__main__": loader = XLSXDataLoader() result = loader.run() print("\n[완료]")