Files
QuantEngineByItz/tools/load_all_trading_data.py
kjh2064 a7c28f240d WBS-8.1/9.2/9.6: 데이터 로드 및 웹 UI 테스트 완료
### 데이터 로드 (100% 커버리지)
- GatherTradingData.xlsx에서 20개 시트 추출 (7,495 rows)
- kis_data_collection.db: data_feed 26 rows
- snapshot_admin.db: 26개 테이블 (settings, account_snapshot, alpha_history 등)

### 로더 스크립트 (5개)
- load_from_xlsx.py: XLSX 전체 로드 (pandas 기반)
- load_settings_properly.py: settings key-value 형태 정확히 로드
- load_all_trading_data.py: JSON 부분 로드 (초기)
- load_complete_trading_data.py: JSON 완전 로드 시도 (구조 문제)
- verify_table_coverage.py: 테이블 커버리지 검증

### 웹 UI 테스트
- 서버: 포트 5000에서 실행 중
- API /api/table_rows: settings 조회 완료
- 수정 기능: DB 직접 수정 확인 (orbit_start_asset_krw 250M→300M)

### WBS 완료 현황
✓ WBS-8.1: T+20 모니터링 (2,711개 거래 기록, 목표 30개 초과 달성)
✓ WBS-9.2: 성능 최적화 (WAL 모드, 5개 인덱스)
✓ WBS-9.6: LLM Radar Phase 3-5 (5단계 구현)

### 주요 설정값 (현재)
- 포트폴리오 시작 자산: 300,000,000 (수정됨)
- 포트폴리오 목표 자산: 600,000,000 (수정됨)
- 현재 총자산: 431,266,207
- 예수금: 13,213,373

### 다음 단계
- WBS-9.3: NULL 정책 강제 (20% 추가)
- WBS-9.7: Gitea CI/CD 백업 (20% 추가)
- API 편집 기능: /api/settings/save 구현 (500 에러 해결 필요)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 00:52:26 +09:00

184 lines
6.4 KiB
Python

#!/usr/bin/env python3
"""
GatherTradingData.json 전체 데이터를 SQLite에 로드
"""
import json
import sqlite3
from pathlib import Path
from datetime import datetime
class GatherTradingDataLoader:
"""전체 거래 데이터 로더"""
def __init__(self):
self.json_file = Path('GatherTradingData.json')
self.kis_db = Path('src/quant_engine/kis_data_collection.db')
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
self.results = {
"timestamp": datetime.now().isoformat(),
"tables_loaded": {},
"errors": []
}
def load_json_data(self) -> tuple:
"""JSON 로드 - (metadata, data) 반환"""
try:
with open(self.json_file, encoding='utf-8') as f:
full_data = json.load(f)
metadata = full_data.get('metadata', {})
data = full_data.get('data', {})
return metadata, data
except:
with open(self.json_file, encoding='euc-kr') as f:
full_data = json.load(f)
metadata = full_data.get('metadata', {})
data = full_data.get('data', {})
return metadata, data
def infer_column_types(self, data: list) -> dict:
"""컬럼 타입 추론"""
if not data:
return {}
first_row = data[0]
types = {}
for col, val in first_row.items():
if val is None:
types[col] = "TEXT"
elif isinstance(val, bool):
types[col] = "INTEGER"
elif isinstance(val, int):
types[col] = "INTEGER"
elif isinstance(val, float):
types[col] = "REAL"
else:
types[col] = "TEXT"
return types
def create_and_load_table(self, db_path: Path, table_name: str, data: list) -> dict:
"""테이블 생성 및 데이터 로드"""
if not data:
return {"table": table_name, "status": "EMPTY", "rows": 0}
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 컬럼 타입 추론
column_types = self.infer_column_types(data)
columns = list(column_types.keys())
# CREATE TABLE
col_defs = ", ".join([f"{col} {column_types[col]}" for col in columns])
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(f"CREATE TABLE {table_name} ({col_defs})")
# INSERT 데이터
placeholders = ", ".join(["?" for _ in columns])
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
for row in data:
values = [row.get(col) for col in columns]
cursor.execute(insert_sql, values)
conn.commit()
conn.close()
return {
"table": table_name,
"status": "SUCCESS",
"rows": len(data),
"columns": len(columns)
}
except Exception as e:
return {
"table": table_name,
"status": "ERROR",
"error": str(e)
}
def run(self) -> dict:
"""전체 실행"""
print("="*80)
print("GatherTradingData.json 전체 로드")
print("="*80)
# JSON 로드
metadata, data = self.load_json_data()
sheets = metadata.get('sheets_included', [])
print(f"\n[발견된 시트] {len(sheets)}개")
for sheet in sheets:
print(f" - {sheet}")
# 각 시트를 테이블로 로드
print("\n[로드 중...]")
for sheet_name in sheets:
sheet_data = data.get(sheet_name, [])
if not sheet_data:
print(f" [{sheet_name}] SKIP (empty)")
continue
# 타겟 DB 결정
# kis_data_collection.db: data_feed만
# snapshot_admin.db: settings, account_snapshot, 그 외 모든 것
if sheet_name == 'data_feed':
db_path = self.kis_db
else:
db_path = self.snapshot_db
# 테이블 생성
result = self.create_and_load_table(db_path, sheet_name, sheet_data)
if result['status'] == 'SUCCESS':
print(f" [{sheet_name}] OK ({result['rows']} rows, {result['columns']} cols)")
self.results["tables_loaded"][sheet_name] = result
else:
print(f" [{sheet_name}] FAIL: {result.get('error', 'unknown')}")
self.results["errors"].append(sheet_name)
# 최종 검증
print("\n[최종 검증]")
for db_name, db_path in [("kis_data_collection", self.kis_db), ("snapshot_admin", self.snapshot_db)]:
if not db_path.exists():
continue
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence'")
tables = [row[0] for row in cursor.fetchall()]
conn.close()
print(f" {db_name}.db: {len(tables)} 테이블")
for table in tables:
cursor = sqlite3.connect(db_path).cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table}")
count = cursor.fetchone()[0]
print(f" - {table}: {count} rows")
self.results["summary"] = {
"total_sheets": len(sheets),
"loaded_sheets": len(self.results["tables_loaded"]),
"failed_sheets": len(self.results["errors"]),
"coverage_pct": (len(self.results["tables_loaded"]) / len(sheets) * 100) if sheets else 0
}
print(f"\n[결과]")
print(f" 커버리지: {self.results['summary']['coverage_pct']:.1f}%")
print(f" 로드됨: {self.results['summary']['loaded_sheets']}/{self.results['summary']['total_sheets']}")
return self.results
if __name__ == "__main__":
loader = GatherTradingDataLoader()
result = loader.run()
print(f"\n[완료] GatherTradingData.json → DB 로드 완료")
print(f"파일 크기: kis_data_collection.db = {Path('src/quant_engine/kis_data_collection.db').stat().st_size/1024:.1f}KB")
print(f"파일 크기: snapshot_admin.db = {Path('src/quant_engine/snapshot_admin.db').stat().st_size/1024:.1f}KB")