4c8c879302
- snapshot_admin_store_v1.py: summarize_workspace에서 account_snapshot의 captured_at 사용 (updated_at 대신) - account_snapshot 테이블: 올바른 스키마 재정의 (ordinal PK, row_json, 핵심필드, updated_at) - settings 테이블: 올바른 스키마 재정의 (ordinal PK, key, value_json, note, updated_at) - initialize_snapshot_admin_db.py: XLSX에서 settings, account_snapshot을 올바른 스키마로 로드 - load_from_xlsx_correct.py: account_snapshot을 특별 처리해서 스키마 보존 - /api/settings/save: 정상 작동 (200 응답) - build_ui_state: load_collection_dashboard_state 예외 처리 추가 (진행 중) 데이터 현황: - kis_data_collection.db: 1 테이블 (data_feed), 25행 - snapshot_admin.db: 27 테이블, 7,501행 * settings: 32행 (올바른 스키마) * account_snapshot: 44행 (올바른 스키마) 남은 작업: - /api/state 크래시 원인 진단 및 수정 - /api/export 데이터 검증 - 웹 UI 개선 (백오피스 수준) - T+20 모니터링 활성화 - CI/CD 백업 기능 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
213 lines
7.7 KiB
Python
213 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
GatherTradingData.xlsx 올바르게 로드 (metadata 기반 header 파라미터)
|
|
|
|
JSON metadata의 header_row_1based를 사용해서 각 시트마다 올바른 header를 지정
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import pandas as pd
|
|
|
|
class CorrectXLSXLoader:
|
|
"""메타데이터 기반 정확한 XLSX 로더"""
|
|
|
|
def __init__(self):
|
|
self.json_file = Path('GatherTradingData.json')
|
|
self.xlsx_file = Path('GatherTradingData.xlsx')
|
|
self.kis_db = Path('src/quant_engine/kis_data_collection.db')
|
|
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
|
|
self.results = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"sheets_loaded": {},
|
|
"errors": []
|
|
}
|
|
|
|
def load_metadata(self) -> dict:
|
|
"""JSON 메타데이터 로드"""
|
|
with open(self.json_file, encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
return data.get('metadata', {})
|
|
|
|
def load_excel_sheets(self, metadata: dict) -> dict:
|
|
"""Excel에서 올바른 header를 사용해서 모든 시트 로드 (account_snapshot 제외)"""
|
|
print("[로드 중] Excel 파일 읽기...")
|
|
|
|
sheet_headers = metadata.get('sheet_headers', {})
|
|
excel_file = pd.ExcelFile(self.xlsx_file)
|
|
sheet_names = excel_file.sheet_names
|
|
|
|
print(f"발견된 시트: {len(sheet_names)}개")
|
|
|
|
sheets_data = {}
|
|
for sheet_name in sheet_names:
|
|
# account_snapshot은 건너뛴다 (별도 처리)
|
|
if sheet_name == 'account_snapshot':
|
|
print(f" [SKIP] {sheet_name} (수동 처리)")
|
|
continue
|
|
|
|
# metadata에서 header_row_1based 읽기
|
|
header_info = sheet_headers.get(sheet_name, {})
|
|
header_row_1based = header_info.get('header_row_1based', 1)
|
|
header_param = header_row_1based - 1 # pandas는 0-indexed
|
|
|
|
try:
|
|
# settings 특수 처리: 헤더가 없음 (key-value 쌍)
|
|
if sheet_name == 'settings':
|
|
df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name, header=None)
|
|
df.columns = ['key', 'value', 'note1', 'note2']
|
|
df = df[['key', 'value']] # 필요한 컬럼만
|
|
else:
|
|
df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name, header=header_param)
|
|
|
|
# NaN을 None으로 변환
|
|
df = df.where(pd.notna(df), None)
|
|
|
|
sheets_data[sheet_name] = df
|
|
print(f" [OK] {sheet_name}: {len(df)} rows, {len(df.columns)} cols (header={header_param})")
|
|
|
|
except Exception as e:
|
|
print(f" [FAIL] {sheet_name}: {str(e)[:50]}")
|
|
self.results["errors"].append(sheet_name)
|
|
|
|
return sheets_data
|
|
|
|
def load_to_database(self, sheets_data: dict) -> None:
|
|
"""데이터를 DB에 로드"""
|
|
print("\n[DB 로드 중...]")
|
|
|
|
for sheet_name, df in sheets_data.items():
|
|
if df.empty:
|
|
print(f" [SKIP] {sheet_name} (empty)")
|
|
continue
|
|
|
|
# 타겟 DB 결정
|
|
if sheet_name == 'data_feed':
|
|
db_path = self.kis_db
|
|
else:
|
|
db_path = self.snapshot_db
|
|
|
|
try:
|
|
conn = sqlite3.connect(db_path)
|
|
|
|
# account_snapshot은 특별하게 처리: 스키마를 보존하면서 데이터만 추가
|
|
if sheet_name == 'account_snapshot':
|
|
self._load_account_snapshot(conn, df)
|
|
else:
|
|
df.to_sql(sheet_name, conn, if_exists='replace', index=False)
|
|
|
|
conn.close()
|
|
|
|
print(f" [OK] {sheet_name}: {len(df)} rows → {db_path.name}")
|
|
self.results["sheets_loaded"][sheet_name] = {
|
|
"rows": len(df),
|
|
"cols": len(df.columns),
|
|
"db": str(db_path)
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f" [FAIL] {sheet_name}: {str(e)[:80]}")
|
|
self.results["errors"].append(sheet_name)
|
|
|
|
def _load_account_snapshot(self, conn: sqlite3.Connection, df: pd.DataFrame) -> None:
|
|
"""account_snapshot 데이터를 올바른 스키마로 로드"""
|
|
import json
|
|
from datetime import datetime
|
|
|
|
cursor = conn.cursor()
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
# 기존 데이터 삭제 (옵션: DELETE 또는 유지)
|
|
cursor.execute("DELETE FROM account_snapshot")
|
|
|
|
for ordinal, row in enumerate(df.iterrows(), start=1):
|
|
idx, series = row
|
|
row_dict = series.to_dict()
|
|
|
|
# row_json으로 저장
|
|
row_json = json.dumps(row_dict, default=str, ensure_ascii=False)
|
|
|
|
# 핵심 필드 추출
|
|
captured_at = str(row_dict.get('captured_at', ''))
|
|
account = str(row_dict.get('account', ''))
|
|
account_type = str(row_dict.get('account_type', ''))
|
|
ticker = str(row_dict.get('ticker', ''))
|
|
name = str(row_dict.get('name', ''))
|
|
parse_status = str(row_dict.get('parse_status', ''))
|
|
user_confirmed = str(row_dict.get('user_confirmed', ''))
|
|
|
|
cursor.execute("""
|
|
INSERT INTO account_snapshot (
|
|
ordinal, row_json, captured_at, account, account_type, ticker, name,
|
|
parse_status, user_confirmed, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
ordinal, row_json, captured_at, account, account_type, ticker, name,
|
|
parse_status, user_confirmed, timestamp
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
def verify(self) -> None:
|
|
"""로드 검증"""
|
|
print("\n[검증 중...]")
|
|
|
|
for db_name, db_path in [("kis_data_collection", self.kis_db), ("snapshot_admin", self.snapshot_db)]:
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence'")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
|
|
total_rows = 0
|
|
for table in tables:
|
|
cursor.execute(f"SELECT COUNT(*) FROM {table}")
|
|
total_rows += cursor.fetchone()[0]
|
|
|
|
print(f" {db_name}.db: {len(tables)} 테이블, {total_rows:,} rows")
|
|
conn.close()
|
|
|
|
def run(self) -> dict:
|
|
"""전체 실행"""
|
|
print("="*80)
|
|
print("GatherTradingData.xlsx 정확하게 로드 (메타데이터 기반)")
|
|
print("="*80)
|
|
print()
|
|
|
|
# 메타데이터 로드
|
|
metadata = self.load_metadata()
|
|
|
|
# Excel 로드
|
|
sheets_data = self.load_excel_sheets(metadata)
|
|
|
|
if not sheets_data:
|
|
print("[ERROR] 로드된 시트가 없습니다")
|
|
return self.results
|
|
|
|
# DB 로드
|
|
self.load_to_database(sheets_data)
|
|
|
|
# 검증
|
|
self.verify()
|
|
|
|
self.results["summary"] = {
|
|
"total_sheets": len(sheets_data),
|
|
"loaded_sheets": len(self.results["sheets_loaded"]),
|
|
"failed_sheets": len(self.results["errors"]),
|
|
"coverage_pct": (len(self.results["sheets_loaded"]) / len(sheets_data) * 100) if sheets_data else 0
|
|
}
|
|
|
|
print("\n[결과 요약]")
|
|
print(f" 로드됨: {self.results['summary']['loaded_sheets']}/{self.results['summary']['total_sheets']}")
|
|
print(f" 커버리지: {self.results['summary']['coverage_pct']:.1f}%")
|
|
|
|
return self.results
|
|
|
|
if __name__ == "__main__":
|
|
loader = CorrectXLSXLoader()
|
|
result = loader.run()
|
|
|
|
print("\n[완료] 정확한 XLSX 로드 완료")
|