#!/usr/bin/env python3 """ account_snapshot 테이블을 올바른 스키마로 마이그레이션 현재: captured_at, account, ticker, ... (XLSX 스키마) 목표: ordinal (PK), row_json, captured_at, account, ... , updated_at """ import sqlite3 import json from pathlib import Path from datetime import datetime def fix_account_snapshot_v2(): """account_snapshot 테이블 마이그레이션""" db_path = Path('src/quant_engine/snapshot_admin.db') conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # 현재 데이터 백업 cursor.execute("SELECT * FROM account_snapshot") old_rows = cursor.fetchall() print(f"현재 account_snapshot: {len(old_rows)}개 행") # 기존 컬럼 확인 cursor.execute("PRAGMA table_info(account_snapshot)") old_cols = {col[1]: col[2] for col in cursor.fetchall()} print(f"현재 컬럼: {len(old_cols)}개") # 기존 테이블 백업 cursor.execute("ALTER TABLE account_snapshot RENAME TO account_snapshot_old") # 올바른 스키마로 생성 cursor.execute(""" CREATE TABLE account_snapshot ( ordinal INTEGER PRIMARY KEY, row_json TEXT NOT NULL, captured_at TEXT NOT NULL DEFAULT '', account TEXT NOT NULL DEFAULT '', account_type TEXT NOT NULL DEFAULT '', ticker TEXT NOT NULL DEFAULT '', name TEXT NOT NULL DEFAULT '', parse_status TEXT NOT NULL DEFAULT '', user_confirmed TEXT NOT NULL DEFAULT '', updated_at TEXT NOT NULL ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_captured_at ON account_snapshot(captured_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_ticker ON account_snapshot(ticker)") print("\n새 스키마 생성:") print(" ordinal INTEGER PRIMARY KEY") print(" row_json TEXT NOT NULL") print(" captured_at TEXT NOT NULL DEFAULT ''") print(" ... (8개 핵심 컬럼) ...") print(" updated_at TEXT NOT NULL") # 데이터 마이그레이션 timestamp = datetime.now().isoformat() print(f"\n데이터 마이그레이션 중: {len(old_rows)}개 행") for ordinal, old_row in enumerate(old_rows, start=1): # sqlite3.Row를 dict로 변환 row_dict = dict(old_row) # 필요한 필드 추출 captured_at = str(row_dict.get('captured_at') or '') account = str(row_dict.get('account') or '') account_type = str(row_dict.get('account_type') or '') ticker = str(row_dict.get('ticker') or '') name = str(row_dict.get('name') or '') parse_status = str(row_dict.get('parse_status') or '') user_confirmed = str(row_dict.get('user_confirmed') or '') # 전체 행을 row_json으로 저장 row_json = json.dumps(row_dict, default=str, ensure_ascii=False) cursor.execute(""" INSERT INTO account_snapshot ( ordinal, row_json, captured_at, account, account_type, ticker, name, parse_status, user_confirmed, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( ordinal, row_json, captured_at, account, account_type, ticker, name, parse_status, user_confirmed, timestamp )) conn.commit() # 검증 cursor.execute("SELECT COUNT(*) FROM account_snapshot") count = cursor.fetchone()[0] print(f"마이그레이션된 account_snapshot: {count}개 행") cursor.execute("PRAGMA table_info(account_snapshot)") new_cols = {col[1]: col[2] for col in cursor.fetchall()} print(f"새 컬럼: {list(new_cols.keys())}") # 이전 테이블 삭제 cursor.execute("DROP TABLE account_snapshot_old") conn.close() print(f"\n[OK] account_snapshot 테이블 마이그레이션 완료") if __name__ == "__main__": fix_account_snapshot_v2()