Files
QuantEngineByItz/tools/load_from_xlsx_correct.py
kjh2064 4c8c879302 WBS-9 진행: API 에러 수정 및 DB 스키마 정규화
- snapshot_admin_store_v1.py: summarize_workspace에서 account_snapshot의 captured_at 사용 (updated_at 대신)
- account_snapshot 테이블: 올바른 스키마 재정의 (ordinal PK, row_json, 핵심필드, updated_at)
- settings 테이블: 올바른 스키마 재정의 (ordinal PK, key, value_json, note, updated_at)
- initialize_snapshot_admin_db.py: XLSX에서 settings, account_snapshot을 올바른 스키마로 로드
- load_from_xlsx_correct.py: account_snapshot을 특별 처리해서 스키마 보존
- /api/settings/save: 정상 작동 (200 응답)
- build_ui_state: load_collection_dashboard_state 예외 처리 추가 (진행 중)

데이터 현황:
- kis_data_collection.db: 1 테이블 (data_feed), 25행
- snapshot_admin.db: 27 테이블, 7,501행
  * settings: 32행 (올바른 스키마)
  * account_snapshot: 44행 (올바른 스키마)

남은 작업:
- /api/state 크래시 원인 진단 및 수정
- /api/export 데이터 검증
- 웹 UI 개선 (백오피스 수준)
- T+20 모니터링 활성화
- CI/CD 백업 기능

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 01:21:30 +09:00

213 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
GatherTradingData.xlsx 올바르게 로드 (metadata 기반 header 파라미터)
JSON metadata의 header_row_1based를 사용해서 각 시트마다 올바른 header를 지정
"""
import json
import sqlite3
from pathlib import Path
from datetime import datetime
import pandas as pd
class CorrectXLSXLoader:
"""메타데이터 기반 정확한 XLSX 로더"""
def __init__(self):
self.json_file = Path('GatherTradingData.json')
self.xlsx_file = Path('GatherTradingData.xlsx')
self.kis_db = Path('src/quant_engine/kis_data_collection.db')
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
self.results = {
"timestamp": datetime.now().isoformat(),
"sheets_loaded": {},
"errors": []
}
def load_metadata(self) -> dict:
"""JSON 메타데이터 로드"""
with open(self.json_file, encoding='utf-8') as f:
data = json.load(f)
return data.get('metadata', {})
def load_excel_sheets(self, metadata: dict) -> dict:
"""Excel에서 올바른 header를 사용해서 모든 시트 로드 (account_snapshot 제외)"""
print("[로드 중] Excel 파일 읽기...")
sheet_headers = metadata.get('sheet_headers', {})
excel_file = pd.ExcelFile(self.xlsx_file)
sheet_names = excel_file.sheet_names
print(f"발견된 시트: {len(sheet_names)}개")
sheets_data = {}
for sheet_name in sheet_names:
# account_snapshot은 건너뛴다 (별도 처리)
if sheet_name == 'account_snapshot':
print(f" [SKIP] {sheet_name} (수동 처리)")
continue
# metadata에서 header_row_1based 읽기
header_info = sheet_headers.get(sheet_name, {})
header_row_1based = header_info.get('header_row_1based', 1)
header_param = header_row_1based - 1 # pandas는 0-indexed
try:
# settings 특수 처리: 헤더가 없음 (key-value 쌍)
if sheet_name == 'settings':
df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name, header=None)
df.columns = ['key', 'value', 'note1', 'note2']
df = df[['key', 'value']] # 필요한 컬럼만
else:
df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name, header=header_param)
# NaN을 None으로 변환
df = df.where(pd.notna(df), None)
sheets_data[sheet_name] = df
print(f" [OK] {sheet_name}: {len(df)} rows, {len(df.columns)} cols (header={header_param})")
except Exception as e:
print(f" [FAIL] {sheet_name}: {str(e)[:50]}")
self.results["errors"].append(sheet_name)
return sheets_data
def load_to_database(self, sheets_data: dict) -> None:
"""데이터를 DB에 로드"""
print("\n[DB 로드 중...]")
for sheet_name, df in sheets_data.items():
if df.empty:
print(f" [SKIP] {sheet_name} (empty)")
continue
# 타겟 DB 결정
if sheet_name == 'data_feed':
db_path = self.kis_db
else:
db_path = self.snapshot_db
try:
conn = sqlite3.connect(db_path)
# account_snapshot은 특별하게 처리: 스키마를 보존하면서 데이터만 추가
if sheet_name == 'account_snapshot':
self._load_account_snapshot(conn, df)
else:
df.to_sql(sheet_name, conn, if_exists='replace', index=False)
conn.close()
print(f" [OK] {sheet_name}: {len(df)} rows → {db_path.name}")
self.results["sheets_loaded"][sheet_name] = {
"rows": len(df),
"cols": len(df.columns),
"db": str(db_path)
}
except Exception as e:
print(f" [FAIL] {sheet_name}: {str(e)[:80]}")
self.results["errors"].append(sheet_name)
def _load_account_snapshot(self, conn: sqlite3.Connection, df: pd.DataFrame) -> None:
"""account_snapshot 데이터를 올바른 스키마로 로드"""
import json
from datetime import datetime
cursor = conn.cursor()
timestamp = datetime.now().isoformat()
# 기존 데이터 삭제 (옵션: DELETE 또는 유지)
cursor.execute("DELETE FROM account_snapshot")
for ordinal, row in enumerate(df.iterrows(), start=1):
idx, series = row
row_dict = series.to_dict()
# row_json으로 저장
row_json = json.dumps(row_dict, default=str, ensure_ascii=False)
# 핵심 필드 추출
captured_at = str(row_dict.get('captured_at', ''))
account = str(row_dict.get('account', ''))
account_type = str(row_dict.get('account_type', ''))
ticker = str(row_dict.get('ticker', ''))
name = str(row_dict.get('name', ''))
parse_status = str(row_dict.get('parse_status', ''))
user_confirmed = str(row_dict.get('user_confirmed', ''))
cursor.execute("""
INSERT INTO account_snapshot (
ordinal, row_json, captured_at, account, account_type, ticker, name,
parse_status, user_confirmed, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
ordinal, row_json, captured_at, account, account_type, ticker, name,
parse_status, user_confirmed, timestamp
))
conn.commit()
def verify(self) -> None:
"""로드 검증"""
print("\n[검증 중...]")
for db_name, db_path in [("kis_data_collection", self.kis_db), ("snapshot_admin", self.snapshot_db)]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence'")
tables = [row[0] for row in cursor.fetchall()]
total_rows = 0
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
total_rows += cursor.fetchone()[0]
print(f" {db_name}.db: {len(tables)} 테이블, {total_rows:,} rows")
conn.close()
def run(self) -> dict:
"""전체 실행"""
print("="*80)
print("GatherTradingData.xlsx 정확하게 로드 (메타데이터 기반)")
print("="*80)
print()
# 메타데이터 로드
metadata = self.load_metadata()
# Excel 로드
sheets_data = self.load_excel_sheets(metadata)
if not sheets_data:
print("[ERROR] 로드된 시트가 없습니다")
return self.results
# DB 로드
self.load_to_database(sheets_data)
# 검증
self.verify()
self.results["summary"] = {
"total_sheets": len(sheets_data),
"loaded_sheets": len(self.results["sheets_loaded"]),
"failed_sheets": len(self.results["errors"]),
"coverage_pct": (len(self.results["sheets_loaded"]) / len(sheets_data) * 100) if sheets_data else 0
}
print("\n[결과 요약]")
print(f" 로드됨: {self.results['summary']['loaded_sheets']}/{self.results['summary']['total_sheets']}")
print(f" 커버리지: {self.results['summary']['coverage_pct']:.1f}%")
return self.results
if __name__ == "__main__":
loader = CorrectXLSXLoader()
result = loader.run()
print("\n[완료] 정확한 XLSX 로드 완료")