4c8c879302
- snapshot_admin_store_v1.py: summarize_workspace에서 account_snapshot의 captured_at 사용 (updated_at 대신) - account_snapshot 테이블: 올바른 스키마 재정의 (ordinal PK, row_json, 핵심필드, updated_at) - settings 테이블: 올바른 스키마 재정의 (ordinal PK, key, value_json, note, updated_at) - initialize_snapshot_admin_db.py: XLSX에서 settings, account_snapshot을 올바른 스키마로 로드 - load_from_xlsx_correct.py: account_snapshot을 특별 처리해서 스키마 보존 - /api/settings/save: 정상 작동 (200 응답) - build_ui_state: load_collection_dashboard_state 예외 처리 추가 (진행 중) 데이터 현황: - kis_data_collection.db: 1 테이블 (data_feed), 25행 - snapshot_admin.db: 27 테이블, 7,501행 * settings: 32행 (올바른 스키마) * account_snapshot: 44행 (올바른 스키마) 남은 작업: - /api/state 크래시 원인 진단 및 수정 - /api/export 데이터 검증 - 웹 UI 개선 (백오피스 수준) - T+20 모니터링 활성화 - CI/CD 백업 기능 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
111 lines
3.9 KiB
Python
111 lines
3.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
account_snapshot 테이블을 올바른 스키마로 마이그레이션
|
|
현재: captured_at, account, ticker, ... (XLSX 스키마)
|
|
목표: ordinal (PK), row_json, captured_at, account, ... , updated_at
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
def fix_account_snapshot_v2():
|
|
"""account_snapshot 테이블 마이그레이션"""
|
|
|
|
db_path = Path('src/quant_engine/snapshot_admin.db')
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# 현재 데이터 백업
|
|
cursor.execute("SELECT * FROM account_snapshot")
|
|
old_rows = cursor.fetchall()
|
|
print(f"현재 account_snapshot: {len(old_rows)}개 행")
|
|
|
|
# 기존 컬럼 확인
|
|
cursor.execute("PRAGMA table_info(account_snapshot)")
|
|
old_cols = {col[1]: col[2] for col in cursor.fetchall()}
|
|
print(f"현재 컬럼: {len(old_cols)}개")
|
|
|
|
# 기존 테이블 백업
|
|
cursor.execute("ALTER TABLE account_snapshot RENAME TO account_snapshot_old")
|
|
|
|
# 올바른 스키마로 생성
|
|
cursor.execute("""
|
|
CREATE TABLE account_snapshot (
|
|
ordinal INTEGER PRIMARY KEY,
|
|
row_json TEXT NOT NULL,
|
|
captured_at TEXT NOT NULL DEFAULT '',
|
|
account TEXT NOT NULL DEFAULT '',
|
|
account_type TEXT NOT NULL DEFAULT '',
|
|
ticker TEXT NOT NULL DEFAULT '',
|
|
name TEXT NOT NULL DEFAULT '',
|
|
parse_status TEXT NOT NULL DEFAULT '',
|
|
user_confirmed TEXT NOT NULL DEFAULT '',
|
|
updated_at TEXT NOT NULL
|
|
)
|
|
""")
|
|
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_captured_at ON account_snapshot(captured_at)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_ticker ON account_snapshot(ticker)")
|
|
|
|
print("\n새 스키마 생성:")
|
|
print(" ordinal INTEGER PRIMARY KEY")
|
|
print(" row_json TEXT NOT NULL")
|
|
print(" captured_at TEXT NOT NULL DEFAULT ''")
|
|
print(" ... (8개 핵심 컬럼) ...")
|
|
print(" updated_at TEXT NOT NULL")
|
|
|
|
# 데이터 마이그레이션
|
|
timestamp = datetime.now().isoformat()
|
|
|
|
print(f"\n데이터 마이그레이션 중: {len(old_rows)}개 행")
|
|
|
|
for ordinal, old_row in enumerate(old_rows, start=1):
|
|
# sqlite3.Row를 dict로 변환
|
|
row_dict = dict(old_row)
|
|
|
|
# 필요한 필드 추출
|
|
captured_at = str(row_dict.get('captured_at') or '')
|
|
account = str(row_dict.get('account') or '')
|
|
account_type = str(row_dict.get('account_type') or '')
|
|
ticker = str(row_dict.get('ticker') or '')
|
|
name = str(row_dict.get('name') or '')
|
|
parse_status = str(row_dict.get('parse_status') or '')
|
|
user_confirmed = str(row_dict.get('user_confirmed') or '')
|
|
|
|
# 전체 행을 row_json으로 저장
|
|
row_json = json.dumps(row_dict, default=str, ensure_ascii=False)
|
|
|
|
cursor.execute("""
|
|
INSERT INTO account_snapshot (
|
|
ordinal, row_json, captured_at, account, account_type, ticker, name,
|
|
parse_status, user_confirmed, updated_at
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
ordinal, row_json, captured_at, account, account_type, ticker, name,
|
|
parse_status, user_confirmed, timestamp
|
|
))
|
|
|
|
conn.commit()
|
|
|
|
# 검증
|
|
cursor.execute("SELECT COUNT(*) FROM account_snapshot")
|
|
count = cursor.fetchone()[0]
|
|
print(f"마이그레이션된 account_snapshot: {count}개 행")
|
|
|
|
cursor.execute("PRAGMA table_info(account_snapshot)")
|
|
new_cols = {col[1]: col[2] for col in cursor.fetchall()}
|
|
print(f"새 컬럼: {list(new_cols.keys())}")
|
|
|
|
# 이전 테이블 삭제
|
|
cursor.execute("DROP TABLE account_snapshot_old")
|
|
|
|
conn.close()
|
|
|
|
print(f"\n[OK] account_snapshot 테이블 마이그레이션 완료")
|
|
|
|
if __name__ == "__main__":
|
|
fix_account_snapshot_v2()
|