diff --git a/.gitignore b/.gitignore index ecd802b..3055c58 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,5 @@ node_modules/ # Claude 세션 캐시 (자동메모리 제외) .claude/projects/ +*.db-shm +*.db-wal diff --git a/src/quant_engine/kis_data_collection.db b/src/quant_engine/kis_data_collection.db index f2c5fce..befa468 100644 Binary files a/src/quant_engine/kis_data_collection.db and b/src/quant_engine/kis_data_collection.db differ diff --git a/src/quant_engine/snapshot_admin.db b/src/quant_engine/snapshot_admin.db index d618e2b..24296d9 100644 Binary files a/src/quant_engine/snapshot_admin.db and b/src/quant_engine/snapshot_admin.db differ diff --git a/src/quant_engine/snapshot_admin_server_v1.py b/src/quant_engine/snapshot_admin_server_v1.py index 19f829d..995b187 100644 --- a/src/quant_engine/snapshot_admin_server_v1.py +++ b/src/quant_engine/snapshot_admin_server_v1.py @@ -306,7 +306,10 @@ def build_ui_state(db_path: Path | str | None = None) -> dict[str, Any]: snapshot_errors = validate_account_snapshot_rows(account_rows) suggestions = build_validation_suggestions(settings_rows, account_rows) autofix_actions = build_safe_autofix_actions(settings_rows, account_rows) - collection = load_collection_dashboard_state(KIS_COLLECTION_DB, KIS_COLLECTION_REPORT) + try: + collection = load_collection_dashboard_state(KIS_COLLECTION_DB, KIS_COLLECTION_REPORT) + except Exception: + collection = {} return { "version": { "app": SNAPSHOT_ADMIN_VERSION, diff --git a/src/quant_engine/snapshot_admin_store_v1.py b/src/quant_engine/snapshot_admin_store_v1.py index 4375c42..acd105c 100644 --- a/src/quant_engine/snapshot_admin_store_v1.py +++ b/src/quant_engine/snapshot_admin_store_v1.py @@ -723,11 +723,11 @@ def summarize_workspace(db_path: Path | str | None = None) -> dict[str, Any]: snapshot_count = conn.execute(f"SELECT COUNT(*) FROM {SNAPSHOT_TABLE}").fetchone()[0] latest_update = conn.execute( f""" - SELECT MAX(updated_at) + SELECT MAX(latest_ts) FROM ( - SELECT updated_at FROM {SETTINGS_TABLE} + SELECT updated_at as latest_ts FROM {SETTINGS_TABLE} UNION ALL - SELECT updated_at FROM {SNAPSHOT_TABLE} + SELECT captured_at FROM {SNAPSHOT_TABLE} ) """ ).fetchone()[0] diff --git a/tools/check_schema.py b/tools/check_schema.py new file mode 100644 index 0000000..468c94b --- /dev/null +++ b/tools/check_schema.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 +import sqlite3 +from pathlib import Path + +db_path = Path('src/quant_engine/snapshot_admin.db') +conn = sqlite3.connect(db_path) +cursor = conn.cursor() + +# 테이블 목록 +cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") +tables = [row[0] for row in cursor.fetchall()] +print(f"전체 테이블: {tables}\n") + +# 각 테이블 스키마 +for table_name in ['account_snapshot', 'snapshot', 'settings', 'performance', 'positions']: + try: + cursor.execute(f"PRAGMA table_info({table_name})") + cols = cursor.fetchall() + if cols: + print(f"{table_name} 컬럼:") + for col in cols: + print(f" {col[1]} ({col[2]})") + print() + except: + pass + +conn.close() diff --git a/tools/diagnose_api_error.py b/tools/diagnose_api_error.py new file mode 100644 index 0000000..e89d041 --- /dev/null +++ b/tools/diagnose_api_error.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python3 +""" +/api/settings/save 500 에러 진단 +replace_settings 함수 직접 테스트 +""" + +import sys +sys.path.insert(0, 'src/quant_engine') + +from snapshot_admin_store_v1 import ( + open_connection, + replace_settings, + load_settings_rows, + validate_settings_rows, +) +from pathlib import Path + +def diagnose_settings_save(): + """Settings 저장 함수 직접 테스트""" + + db_path = Path('src/quant_engine/snapshot_admin.db') + + print("="*80) + print("Settings 저장 함수 진단") + print("="*80) + + # 테스트 데이터 + test_rows = [ + { + "ordinal": 5, + "key": "total_asset_krw", + "value": "450000000", + "note": "테스트 수정" + } + ] + + print("\n[1단계] 검증 테스트") + try: + errors = validate_settings_rows(test_rows) + if errors: + print(f" [FAIL] 검증 오류: {errors}") + return + print(f" [OK] 검증 통과") + except Exception as e: + print(f" [ERROR] 검증 함수 실패: {e}") + return + + print("\n[2단계] replace_settings 함수 테스트") + try: + with open_connection(db_path) as conn: + replace_settings(conn, test_rows) + print(f" [OK] replace_settings 성공") + except Exception as e: + print(f" [FAIL] replace_settings 오류") + print(f" 오류 타입: {type(e).__name__}") + print(f" 오류 메시지: {e}") + import traceback + traceback.print_exc() + return + + print("\n[3단계] 저장 결과 확인") + try: + with open_connection(db_path) as conn: + rows = load_settings_rows_from_conn(conn) + for row in rows: + if row['key'] == 'total_asset_krw': + print(f" [OK] {row['key']} = {row['value']}") + except Exception as e: + print(f" [ERROR] 조회 실패: {e}") + + print("\n[완료] 진단 끝") + +# Helper 함수 +def load_settings_rows_from_conn(conn): + """직접 로드""" + import sqlite3 + import json + + rows = conn.execute( + "SELECT ordinal, key, value_json, note, updated_at FROM settings ORDER BY ordinal ASC" + ).fetchall() + + return [ + { + "ordinal": int(row[0]), + "key": row[1], + "value": json.loads(row[2]), + "note": row[3], + "updated_at": row[4], + } + for row in rows + ] + +if __name__ == "__main__": + diagnose_settings_save() diff --git a/tools/fix_account_snapshot_schema.py b/tools/fix_account_snapshot_schema.py new file mode 100644 index 0000000..9542f60 --- /dev/null +++ b/tools/fix_account_snapshot_schema.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +""" +account_snapshot 테이블 스키마 수정 +last_updated -> updated_at +""" + +import sqlite3 +from pathlib import Path + +def fix_account_snapshot_schema(): + """account_snapshot 테이블 스키마 수정""" + + db_path = Path('src/quant_engine/snapshot_admin.db') + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + # 현재 컬럼 확인 + cursor.execute("PRAGMA table_info(account_snapshot)") + current_cols = {col[1]: col[2] for col in cursor.fetchall()} + print(f"현재 컬럼: {list(current_cols.keys())}") + + # 데이터 백업 + cursor.execute("SELECT * FROM account_snapshot LIMIT 1") + sample = cursor.fetchone() + print(f"\n샘플 행 컬럼: {sample.keys() if sample else 'NO DATA'}") + + # account_snapshot 데이터 백업 + cursor.execute("SELECT * FROM account_snapshot") + backup = cursor.fetchall() + print(f"백업 행 수: {len(backup)}") + + # 기존 테이블 삭제 + cursor.execute("DROP TABLE IF EXISTS account_snapshot") + + # 올바른 스키마로 생성 + cursor.execute(""" + CREATE TABLE account_snapshot ( + ordinal INTEGER NOT NULL, + row_json TEXT NOT NULL, + captured_at TEXT NOT NULL DEFAULT '', + account TEXT NOT NULL DEFAULT '', + account_type TEXT NOT NULL DEFAULT '', + ticker TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + parse_status TEXT NOT NULL DEFAULT '', + user_confirmed TEXT NOT NULL DEFAULT '', + updated_at TEXT NOT NULL + ) + """) + + cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_captured_at ON account_snapshot(captured_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_ticker ON account_snapshot(ticker)") + + print("\n새 스키마 생성:") + print(" ordinal INTEGER NOT NULL") + print(" row_json TEXT NOT NULL") + print(" captured_at TEXT NOT NULL DEFAULT ''") + print(" account TEXT NOT NULL DEFAULT ''") + print(" account_type TEXT NOT NULL DEFAULT ''") + print(" ticker TEXT NOT NULL DEFAULT ''") + print(" name TEXT NOT NULL DEFAULT ''") + print(" parse_status TEXT NOT NULL DEFAULT ''") + print(" user_confirmed TEXT NOT NULL DEFAULT ''") + print(" updated_at TEXT NOT NULL") + + # 데이터 복원 + if backup: + print(f"\n데이터 복원 중: {len(backup)}개 행") + + for row_dict in backup: + # row_dict는 sqlite3.Row 타입 + values = [] + for col_name in [ + 'ordinal', 'row_json', 'captured_at', 'account', 'account_type', + 'ticker', 'name', 'parse_status', 'user_confirmed' + ]: + if col_name in row_dict.keys(): + values.append(row_dict[col_name]) + else: + values.append(None) + + # updated_at: last_updated 또는 captured_at 사용 + if 'last_updated' in row_dict.keys() and row_dict['last_updated']: + values.append(str(row_dict['last_updated'])) + elif 'captured_at' in row_dict.keys() and row_dict['captured_at']: + values.append(str(row_dict['captured_at'])) + else: + values.append('') + + cursor.execute(""" + INSERT INTO account_snapshot ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, values) + + conn.commit() + print(f"[OK] {len(backup)}개 행 복원") + else: + conn.commit() + print("[OK] 테이블 생성 (데이터 없음)") + + # 검증 + cursor.execute("SELECT COUNT(*) FROM account_snapshot") + count = cursor.fetchone()[0] + print(f"\n검증: {count}개 행") + + cursor.execute("PRAGMA table_info(account_snapshot)") + new_cols = {col[1]: col[2] for col in cursor.fetchall()} + print(f"새 컬럼: {list(new_cols.keys())}") + + conn.close() + + print("\n[OK] account_snapshot 테이블 스키마 수정 완료") + +if __name__ == "__main__": + fix_account_snapshot_schema() diff --git a/tools/fix_account_snapshot_v2.py b/tools/fix_account_snapshot_v2.py new file mode 100644 index 0000000..78b8d25 --- /dev/null +++ b/tools/fix_account_snapshot_v2.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 +""" +account_snapshot 테이블을 올바른 스키마로 마이그레이션 +현재: captured_at, account, ticker, ... (XLSX 스키마) +목표: ordinal (PK), row_json, captured_at, account, ... , updated_at +""" + +import sqlite3 +import json +from pathlib import Path +from datetime import datetime + +def fix_account_snapshot_v2(): + """account_snapshot 테이블 마이그레이션""" + + db_path = Path('src/quant_engine/snapshot_admin.db') + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + # 현재 데이터 백업 + cursor.execute("SELECT * FROM account_snapshot") + old_rows = cursor.fetchall() + print(f"현재 account_snapshot: {len(old_rows)}개 행") + + # 기존 컬럼 확인 + cursor.execute("PRAGMA table_info(account_snapshot)") + old_cols = {col[1]: col[2] for col in cursor.fetchall()} + print(f"현재 컬럼: {len(old_cols)}개") + + # 기존 테이블 백업 + cursor.execute("ALTER TABLE account_snapshot RENAME TO account_snapshot_old") + + # 올바른 스키마로 생성 + cursor.execute(""" + CREATE TABLE account_snapshot ( + ordinal INTEGER PRIMARY KEY, + row_json TEXT NOT NULL, + captured_at TEXT NOT NULL DEFAULT '', + account TEXT NOT NULL DEFAULT '', + account_type TEXT NOT NULL DEFAULT '', + ticker TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + parse_status TEXT NOT NULL DEFAULT '', + user_confirmed TEXT NOT NULL DEFAULT '', + updated_at TEXT NOT NULL + ) + """) + + cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_captured_at ON account_snapshot(captured_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_ticker ON account_snapshot(ticker)") + + print("\n새 스키마 생성:") + print(" ordinal INTEGER PRIMARY KEY") + print(" row_json TEXT NOT NULL") + print(" captured_at TEXT NOT NULL DEFAULT ''") + print(" ... (8개 핵심 컬럼) ...") + print(" updated_at TEXT NOT NULL") + + # 데이터 마이그레이션 + timestamp = datetime.now().isoformat() + + print(f"\n데이터 마이그레이션 중: {len(old_rows)}개 행") + + for ordinal, old_row in enumerate(old_rows, start=1): + # sqlite3.Row를 dict로 변환 + row_dict = dict(old_row) + + # 필요한 필드 추출 + captured_at = str(row_dict.get('captured_at') or '') + account = str(row_dict.get('account') or '') + account_type = str(row_dict.get('account_type') or '') + ticker = str(row_dict.get('ticker') or '') + name = str(row_dict.get('name') or '') + parse_status = str(row_dict.get('parse_status') or '') + user_confirmed = str(row_dict.get('user_confirmed') or '') + + # 전체 행을 row_json으로 저장 + row_json = json.dumps(row_dict, default=str, ensure_ascii=False) + + cursor.execute(""" + INSERT INTO account_snapshot ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, timestamp + )) + + conn.commit() + + # 검증 + cursor.execute("SELECT COUNT(*) FROM account_snapshot") + count = cursor.fetchone()[0] + print(f"마이그레이션된 account_snapshot: {count}개 행") + + cursor.execute("PRAGMA table_info(account_snapshot)") + new_cols = {col[1]: col[2] for col in cursor.fetchall()} + print(f"새 컬럼: {list(new_cols.keys())}") + + # 이전 테이블 삭제 + cursor.execute("DROP TABLE account_snapshot_old") + + conn.close() + + print(f"\n[OK] account_snapshot 테이블 마이그레이션 완료") + +if __name__ == "__main__": + fix_account_snapshot_v2() diff --git a/tools/fix_settings_schema_v2.py b/tools/fix_settings_schema_v2.py new file mode 100644 index 0000000..41838b5 --- /dev/null +++ b/tools/fix_settings_schema_v2.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 +""" +settings 테이블을 올바른 스키마로 수정 +현재: key, value +목표: ordinal (PK), key (NOT NULL), value_json (JSON), note, updated_at +""" + +import sqlite3 +import json +from pathlib import Path +from datetime import datetime + +def fix_settings_schema_v2(): + """settings 테이블 스키마 수정""" + + db_path = Path('src/quant_engine/snapshot_admin.db') + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + # 현재 데이터 백업 + cursor.execute("SELECT key, value FROM settings") + old_rows = cursor.fetchall() + print(f"현재 settings: {len(old_rows)}개 행") + + # 기존 테이블 삭제 + cursor.execute("DROP TABLE IF EXISTS settings") + + # 올바른 스키마로 생성 + cursor.execute(""" + CREATE TABLE settings ( + ordinal INTEGER PRIMARY KEY, + key TEXT NOT NULL, + value_json TEXT NOT NULL, + note TEXT DEFAULT '', + updated_at TEXT NOT NULL + ) + """) + + print("새 스키마 생성:") + print(" ordinal INTEGER PRIMARY KEY") + print(" key TEXT NOT NULL") + print(" value_json TEXT NOT NULL") + print(" note TEXT DEFAULT ''") + print(" updated_at TEXT NOT NULL") + + # 데이터 복원 + timestamp = datetime.now().isoformat() + + for ordinal, row in enumerate(old_rows, start=1): + key = row['key'] + value = row['value'] + + # value를 JSON으로 변환 + try: + value_json = json.dumps(str(value), ensure_ascii=False) + except: + value_json = json.dumps("", ensure_ascii=False) + + cursor.execute(""" + INSERT INTO settings (ordinal, key, value_json, note, updated_at) + VALUES (?, ?, ?, ?, ?) + """, (ordinal, key, value_json, "", timestamp)) + + conn.commit() + + # 검증 + cursor.execute("SELECT COUNT(*) FROM settings") + count = cursor.fetchone()[0] + print(f"\n복원된 settings: {count}개 행") + + cursor.execute("SELECT ordinal, key, value_json FROM settings LIMIT 3") + print("샘플 데이터:") + for ordinal, key, value_json in cursor.fetchall(): + value = json.loads(value_json) + print(f" {ordinal}. {key} = {value}") + + conn.close() + + print(f"\n[OK] settings 테이블 스키마 수정 완료") + +if __name__ == "__main__": + fix_settings_schema_v2() diff --git a/tools/initialize_snapshot_admin_db.py b/tools/initialize_snapshot_admin_db.py new file mode 100644 index 0000000..8923e1a --- /dev/null +++ b/tools/initialize_snapshot_admin_db.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python3 +""" +snapshot_admin.db를 올바른 스키마와 XLSX 데이터로 초기화 +""" + +import sqlite3 +import json +import pandas as pd +from pathlib import Path +from datetime import datetime + +def initialize_snapshot_admin_db(): + """snapshot_admin.db 초기화""" + + db_path = Path('src/quant_engine/snapshot_admin.db') + xlsx_file = Path('GatherTradingData.xlsx') + json_file = Path('GatherTradingData.json') + + print("="*80) + print("snapshot_admin.db 초기화") + print("="*80) + + # JSON 메타데이터 로드 + with open(json_file, encoding='utf-8') as f: + metadata = json.load(f).get('metadata', {}) + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # 1. settings 테이블 초기화 + print("\n[1] settings 테이블 초기화") + cursor.execute("DROP TABLE IF EXISTS settings") + cursor.execute(""" + CREATE TABLE settings ( + ordinal INTEGER PRIMARY KEY, + key TEXT NOT NULL, + value_json TEXT NOT NULL, + note TEXT DEFAULT '', + updated_at TEXT NOT NULL + ) + """) + + # XLSX에서 settings 데이터 로드 + df_settings = pd.read_excel(xlsx_file, sheet_name='settings', header=None) + # 처음 2개 컬럼만 사용 (key, value) + df_settings = df_settings.iloc[:, :2] + df_settings.columns = ['key', 'value'] + timestamp = datetime.now().isoformat() + + print(f" {len(df_settings)}개 설정 로드 중...") + for ordinal, (idx, row) in enumerate(df_settings.iterrows(), start=1): + key = str(row['key']) + value = str(row['value']) + value_json = json.dumps(value, ensure_ascii=False) + + cursor.execute(""" + INSERT INTO settings (ordinal, key, value_json, note, updated_at) + VALUES (?, ?, ?, ?, ?) + """, (ordinal, key, value_json, "", timestamp)) + + cursor.execute("SELECT COUNT(*) FROM settings") + count = cursor.fetchone()[0] + print(f" [OK] {count}개 설정 로드") + + # 2. account_snapshot 테이블 초기화 + print("\n[2] account_snapshot 테이블 초기화") + cursor.execute("DROP TABLE IF EXISTS account_snapshot") + cursor.execute(""" + CREATE TABLE account_snapshot ( + ordinal INTEGER PRIMARY KEY, + row_json TEXT NOT NULL, + captured_at TEXT NOT NULL DEFAULT '', + account TEXT NOT NULL DEFAULT '', + account_type TEXT NOT NULL DEFAULT '', + ticker TEXT NOT NULL DEFAULT '', + name TEXT NOT NULL DEFAULT '', + parse_status TEXT NOT NULL DEFAULT '', + user_confirmed TEXT NOT NULL DEFAULT '', + updated_at TEXT NOT NULL + ) + """) + + # XLSX에서 account_snapshot 데이터 로드 + df_snapshot = pd.read_excel(xlsx_file, sheet_name='account_snapshot', header=1) + + print(f" {len(df_snapshot)}개 스냅샷 로드 중...") + for ordinal, (idx, row) in enumerate(df_snapshot.iterrows(), start=1): + row_dict = row.to_dict() + + # row_json으로 저장 + row_json = json.dumps(row_dict, default=str, ensure_ascii=False) + + # 핵심 필드 추출 + captured_at = str(row_dict.get('captured_at', '')) + account = str(row_dict.get('account', '')) + account_type = str(row_dict.get('account_type', '')) + ticker = str(row_dict.get('ticker', '')) + name = str(row_dict.get('name', '')) + parse_status = str(row_dict.get('parse_status', '')) + user_confirmed = str(row_dict.get('user_confirmed', '')) + + cursor.execute(""" + INSERT INTO account_snapshot ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, timestamp + )) + + cursor.execute("SELECT COUNT(*) FROM account_snapshot") + count = cursor.fetchone()[0] + print(f" [OK] {count}개 스냅샷 로드") + + # 3. 인덱스 생성 + print("\n[3] 인덱스 생성") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_captured_at ON account_snapshot(captured_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_account_snapshot_ticker ON account_snapshot(ticker)") + print(f" [OK] 인덱스 생성") + + conn.commit() + conn.close() + + # 검증 + print("\n[검증]") + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + cursor.execute("SELECT COUNT(*) FROM settings") + settings_count = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM account_snapshot") + snapshot_count = cursor.fetchone()[0] + + print(f" settings: {settings_count}개") + print(f" account_snapshot: {snapshot_count}개") + + # 스키마 확인 + cursor.execute("PRAGMA table_info(settings)") + settings_cols = [col[1] for col in cursor.fetchall()] + print(f" settings 컬럼: {settings_cols}") + + cursor.execute("PRAGMA table_info(account_snapshot)") + snapshot_cols = [col[1] for col in cursor.fetchall()] + print(f" account_snapshot 컬럼: {snapshot_cols[:5]}... ({len(snapshot_cols)} 개)") + + conn.close() + + print("\n[OK] snapshot_admin.db 초기화 완료") + +if __name__ == "__main__": + initialize_snapshot_admin_db() diff --git a/tools/load_from_xlsx_correct.py b/tools/load_from_xlsx_correct.py index 7f1d87c..b162a05 100644 --- a/tools/load_from_xlsx_correct.py +++ b/tools/load_from_xlsx_correct.py @@ -32,7 +32,7 @@ class CorrectXLSXLoader: return data.get('metadata', {}) def load_excel_sheets(self, metadata: dict) -> dict: - """Excel에서 올바른 header를 사용해서 모든 시트 로드""" + """Excel에서 올바른 header를 사용해서 모든 시트 로드 (account_snapshot 제외)""" print("[로드 중] Excel 파일 읽기...") sheet_headers = metadata.get('sheet_headers', {}) @@ -43,6 +43,11 @@ class CorrectXLSXLoader: sheets_data = {} for sheet_name in sheet_names: + # account_snapshot은 건너뛴다 (별도 처리) + if sheet_name == 'account_snapshot': + print(f" [SKIP] {sheet_name} (수동 처리)") + continue + # metadata에서 header_row_1based 읽기 header_info = sheet_headers.get(sheet_name, {}) header_row_1based = header_info.get('header_row_1based', 1) @@ -86,7 +91,13 @@ class CorrectXLSXLoader: try: conn = sqlite3.connect(db_path) - df.to_sql(sheet_name, conn, if_exists='replace', index=False) + + # account_snapshot은 특별하게 처리: 스키마를 보존하면서 데이터만 추가 + if sheet_name == 'account_snapshot': + self._load_account_snapshot(conn, df) + else: + df.to_sql(sheet_name, conn, if_exists='replace', index=False) + conn.close() print(f" [OK] {sheet_name}: {len(df)} rows → {db_path.name}") @@ -100,6 +111,45 @@ class CorrectXLSXLoader: print(f" [FAIL] {sheet_name}: {str(e)[:80]}") self.results["errors"].append(sheet_name) + def _load_account_snapshot(self, conn: sqlite3.Connection, df: pd.DataFrame) -> None: + """account_snapshot 데이터를 올바른 스키마로 로드""" + import json + from datetime import datetime + + cursor = conn.cursor() + timestamp = datetime.now().isoformat() + + # 기존 데이터 삭제 (옵션: DELETE 또는 유지) + cursor.execute("DELETE FROM account_snapshot") + + for ordinal, row in enumerate(df.iterrows(), start=1): + idx, series = row + row_dict = series.to_dict() + + # row_json으로 저장 + row_json = json.dumps(row_dict, default=str, ensure_ascii=False) + + # 핵심 필드 추출 + captured_at = str(row_dict.get('captured_at', '')) + account = str(row_dict.get('account', '')) + account_type = str(row_dict.get('account_type', '')) + ticker = str(row_dict.get('ticker', '')) + name = str(row_dict.get('name', '')) + parse_status = str(row_dict.get('parse_status', '')) + user_confirmed = str(row_dict.get('user_confirmed', '')) + + cursor.execute(""" + INSERT INTO account_snapshot ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + ordinal, row_json, captured_at, account, account_type, ticker, name, + parse_status, user_confirmed, timestamp + )) + + conn.commit() + def verify(self) -> None: """로드 검증""" print("\n[검증 중...]") diff --git a/tools/test_api_components.py b/tools/test_api_components.py new file mode 100644 index 0000000..7281c2e --- /dev/null +++ b/tools/test_api_components.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +""" +API 핸들러의 각 컴포넌트 테스트 +""" + +import sys +sys.path.insert(0, 'src/quant_engine') + +from snapshot_admin_store_v1 import ( + is_locked, + lock_conflicts_for_rows, + summarize_workspace, + open_connection, +) +from pathlib import Path + +db_path = Path('src/quant_engine/snapshot_admin.db') + +print("="*80) +print("API 컴포넌트 테스트") +print("="*80) + +# 1. is_locked 테스트 +print("\n[1] is_locked 테스트") +try: + locked = is_locked(db_path, "settings") + print(f" [OK] is_locked result: {locked}") +except Exception as e: + print(f" [ERROR] {e}") + +# 2. lock_conflicts_for_rows 테스트 +print("\n[2] lock_conflicts_for_rows 테스트") +try: + test_rows = [ + { + "ordinal": 5, + "key": "total_asset_krw", + "value": "450000000", + "note": "test" + } + ] + + conflicts = lock_conflicts_for_rows(db_path, "settings", test_rows) + print(f" [OK] conflicts: {conflicts}") +except Exception as e: + print(f" [ERROR] {e}") + +# 3. summarize_workspace 테스트 +print("\n[3] summarize_workspace 테스트") +try: + import time + start = time.time() + summary = summarize_workspace(db_path) + elapsed = time.time() - start + print(f" [OK] summarize_workspace completed in {elapsed:.2f}s") + print(f" Keys: {list(summary.keys())[:5]}") +except Exception as e: + print(f" [ERROR] {e}") + import traceback + traceback.print_exc() + +print("\n[완료]") diff --git a/tools/test_api_settings_save.py b/tools/test_api_settings_save.py new file mode 100644 index 0000000..838a3f9 --- /dev/null +++ b/tools/test_api_settings_save.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 +""" +/api/settings/save 엔드포인트 테스트 +""" + +import requests +import json +import time + +BASE_URL = "http://localhost:5000" + +print("="*80) +print("/api/settings/save 엔드포인트 테스트") +print("="*80) + +# 데이터 준비 +test_data = { + "rows": [ + { + "ordinal": 5, + "key": "total_asset_krw", + "value": "500000000", + "note": "API 테스트 수정" + } + ] +} + +print(f"\n[요청] POST {BASE_URL}/api/settings/save") +print(f"[데이터] {json.dumps(test_data, ensure_ascii=False, indent=2)}") + +try: + start = time.time() + response = requests.post( + f"{BASE_URL}/api/settings/save", + json=test_data, + timeout=10 + ) + elapsed = time.time() - start + + print(f"\n[응답 시간] {elapsed:.2f}s") + print(f"[상태 코드] {response.status_code}") + + if response.status_code == 200: + result = response.json() + print(f"[결과] {json.dumps(result, ensure_ascii=False, indent=2)}") + print(f"\n[OK] /api/settings/save 성공") + else: + print(f"[오류 응답]") + print(f" 상태: {response.status_code}") + print(f" 본문: {response.text[:200]}") + print(f"\n[FAIL] /api/settings/save 실패") + +except Exception as e: + print(f"[ERROR] {e}") + print(f"\n[FAIL] 요청 실패") + +print("\n[완료]") diff --git a/tools/test_build_ui_state.py b/tools/test_build_ui_state.py new file mode 100644 index 0000000..1fc966e --- /dev/null +++ b/tools/test_build_ui_state.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +build_ui_state 함수의 각 단계 테스트 +""" + +import sys +sys.path.insert(0, '.') + +# Import from package +import importlib.util +spec = importlib.util.spec_from_file_location( + "snapshot_admin_store_v1", + "src/quant_engine/snapshot_admin_store_v1.py" +) +store = importlib.util.module_from_spec(spec) +spec.loader.exec_module(store) + +from snapshot_admin_store_v1 import ( + summarize_workspace, + load_settings_rows, + load_account_snapshot_rows, + validate_settings_rows, + validate_account_snapshot_rows, + load_approval_rows, + load_locks, + load_change_log_rows, +) +from pathlib import Path +import time + +db_path = Path('src/quant_engine/snapshot_admin.db') + +print("="*80) +print("build_ui_state 함수 단계별 테스트") +print("="*80) + +# 1. summarize_workspace +print("\n[1] summarize_workspace") +try: + start = time.time() + result = summarize_workspace(db_path) + elapsed = time.time() - start + print(f" [OK] {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 2. load_settings_rows +print("\n[2] load_settings_rows") +try: + start = time.time() + result = load_settings_rows(db_path) + elapsed = time.time() - start + print(f" [OK] {len(result)} rows, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 3. load_account_snapshot_rows +print("\n[3] load_account_snapshot_rows") +try: + start = time.time() + result = load_account_snapshot_rows(db_path) + elapsed = time.time() - start + print(f" [OK] {len(result)} rows, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 4. validate_settings_rows +print("\n[4] validate_settings_rows") +try: + start = time.time() + settings = load_settings_rows(db_path) + result = validate_settings_rows(settings) + elapsed = time.time() - start + print(f" [OK] {len(result)} errors, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 5. validate_account_snapshot_rows +print("\n[5] validate_account_snapshot_rows") +try: + start = time.time() + snapshot = load_account_snapshot_rows(db_path) + result = validate_account_snapshot_rows(snapshot) + elapsed = time.time() - start + print(f" [OK] {len(result)} errors, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 6. load_approval_rows +print("\n[6] load_approval_rows") +try: + start = time.time() + result = load_approval_rows(db_path) + elapsed = time.time() - start + print(f" [OK] {len(result)} rows, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 7. load_locks +print("\n[7] load_locks") +try: + start = time.time() + result = load_locks(db_path) + elapsed = time.time() - start + print(f" [OK] {len(result)} rows, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 8. load_change_log_rows +print("\n[8] load_change_log_rows") +try: + start = time.time() + result = load_change_log_rows(db_path, limit=12) + elapsed = time.time() - start + print(f" [OK] {len(result)} rows, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + +# 9. Full build_ui_state (at the end) +print("\n[9] build_ui_state (FULL)") +try: + start = time.time() + result = build_ui_state(db_path) + elapsed = time.time() - start + print(f" [OK] keys={len(result)}, {elapsed:.2f}s") +except Exception as e: + print(f" [ERROR] {e}") + import traceback + traceback.print_exc() + +print("\n[완료]") diff --git a/tools/test_build_ui_state_simple.py b/tools/test_build_ui_state_simple.py new file mode 100644 index 0000000..8004eaa --- /dev/null +++ b/tools/test_build_ui_state_simple.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +""" +build_ui_state 함수의 각 단계 테스트 - 직접 호출 +""" + +import sys +sys.path.insert(0, '.') + +# Store 함수들을 직접 import +from pathlib import Path +import sqlite3 +import time +import json + +def test_each_function(): + """각 함수를 개별 테스트""" + + db_path = Path('src/quant_engine/snapshot_admin.db') + + print("="*80) + print("Database 함수 단계별 테스트") + print("="*80) + + # 1. 테이블 존재 확인 + print("\n[테이블 확인]") + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + tables = [row[0] for row in cursor.fetchall()] + print(f" 테이블 수: {len(tables)}") + print(f" 주요 테이블: settings, account_snapshot, workspace_approval_v2") + + # 각 테이블의 행 수 + for table in ['settings', 'account_snapshot', 'workspace_approval_v2', 'workspace_change_log']: + cursor.execute(f"SELECT COUNT(*) FROM {table}") + count = cursor.fetchone()[0] + print(f" {table}: {count} rows") + + # 2. settings SELECT 테스트 + print("\n[settings SELECT 테스트]") + try: + cursor.execute("SELECT ordinal, key, value_json, note, updated_at FROM settings LIMIT 1") + row = cursor.fetchone() + if row: + print(f" [OK] {len(row)} 컬럼: {row}") + else: + print(f" [OK] 데이터 없음") + except Exception as e: + print(f" [ERROR] {e}") + + # 3. account_snapshot SELECT 테스트 + print("\n[account_snapshot SELECT 테스트]") + try: + cursor.execute("SELECT ordinal, row_json, captured_at, account, account_type, ticker, name, parse_status, user_confirmed, updated_at FROM account_snapshot LIMIT 1") + row = cursor.fetchone() + if row: + print(f" [OK] {len(row)} 컬럼") + else: + print(f" [OK] 데이터 없음") + except Exception as e: + print(f" [ERROR] {e}") + + # 4. workspace_approval_v2 SELECT 테스트 + print("\n[workspace_approval_v2 SELECT 테스트]") + try: + cursor.execute("SELECT domain, target_ref, status, approved_by, approved_at, note, updated_at FROM workspace_approval_v2 LIMIT 1") + row = cursor.fetchone() + if row: + print(f" [OK] {len(row)} 컬럼") + else: + print(f" [OK] 데이터 없음") + except Exception as e: + print(f" [ERROR] {e}") + + # 5. workspace_change_log SELECT 테스트 + print("\n[workspace_change_log SELECT 테스트]") + try: + cursor.execute("SELECT id, domain, action, target_ref, actor, note, before_json, after_json, created_at FROM workspace_change_log LIMIT 1") + row = cursor.fetchone() + if row: + print(f" [OK] {len(row)} 컬럼") + else: + print(f" [OK] 데이터 없음") + except Exception as e: + print(f" [ERROR] {e}") + + conn.close() + + print("\n[완료]") + +if __name__ == "__main__": + test_each_function() diff --git a/tools/test_import.py b/tools/test_import.py new file mode 100644 index 0000000..46bba05 --- /dev/null +++ b/tools/test_import.py @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 + +import sys +sys.path.insert(0, '.') + +try: + from src.quant_engine import snapshot_admin_server_v1 + print("[OK] 임포트 성공") +except Exception as e: + print(f"[ERROR] 임포트 실패: {e}") + import traceback + traceback.print_exc() diff --git a/tools/test_remaining_apis.py b/tools/test_remaining_apis.py new file mode 100644 index 0000000..af8edba --- /dev/null +++ b/tools/test_remaining_apis.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python3 +""" +/api/state, /api/export 등 다른 엔드포인트 테스트 +""" + +import requests +import json +import time + +BASE_URL = "http://localhost:5000" + +print("="*80) +print("API 엔드포인트 테스트") +print("="*80) + +# 1. /api/state 테스트 +print("\n[1] GET /api/state") +try: + start = time.time() + response = requests.get(f"{BASE_URL}/api/state", timeout=15) + elapsed = time.time() - start + + print(f" 응답 시간: {elapsed:.2f}s") + print(f" 상태 코드: {response.status_code}") + + if response.status_code == 200: + result = response.json() + print(f" [OK] 키: {list(result.keys())[:5]}") + else: + print(f" [FAIL] {response.text[:100]}") + +except Exception as e: + print(f" [ERROR] {e}") + +# 2. /api/export 테스트 +print("\n[2] GET /api/export") +try: + start = time.time() + response = requests.get(f"{BASE_URL}/api/export", timeout=15) + elapsed = time.time() - start + + print(f" 응답 시간: {elapsed:.2f}s") + print(f" 상태 코드: {response.status_code}") + print(f" 응답 크기: {len(response.text)} bytes") + + if response.status_code == 200: + try: + result = response.json() + print(f" [OK] 키: {list(result.keys())[:3]}") + except: + print(f" [OK] (JSON 파싱 불가, 바이너리일 수 있음)") + else: + print(f" [FAIL] {response.text[:100]}") + +except Exception as e: + print(f" [ERROR] {e}") + +# 3. /api/tables 테스트 +print("\n[3] GET /api/tables") +try: + start = time.time() + response = requests.get(f"{BASE_URL}/api/tables", timeout=15) + elapsed = time.time() - start + + print(f" 응답 시간: {elapsed:.2f}s") + print(f" 상태 코드: {response.status_code}") + + if response.status_code == 200: + result = response.json() + print(f" [OK] {len(result)} 테이블") + for table in result[:3]: + print(f" - {table['table']}: {table['row_count']} rows") + else: + print(f" [FAIL] {response.text[:100]}") + +except Exception as e: + print(f" [ERROR] {e}") + +print("\n[완료]") diff --git a/tools/verify_data_load.py b/tools/verify_data_load.py new file mode 100644 index 0000000..f79bc50 --- /dev/null +++ b/tools/verify_data_load.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +데이터베이스 로드 상태 검증 +""" + +import sqlite3 +from pathlib import Path + +def verify_databases(): + """두 데이터베이스의 상태 확인""" + + kis_db = Path('src/quant_engine/kis_data_collection.db') + snapshot_db = Path('src/quant_engine/snapshot_admin.db') + + print("="*80) + print("데이터베이스 로드 상태 검증") + print("="*80) + + for db_name, db_path in [("kis_data_collection", kis_db), ("snapshot_admin", snapshot_db)]: + print(f"\n[{db_name}]") + + if not db_path.exists(): + print(f" 파일 없음") + continue + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + # 테이블 목록 + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name") + tables = [row[0] for row in cursor.fetchall()] + print(f" 테이블 수: {len(tables)}") + print(f" 테이블: {', '.join(tables[:5])}..." if len(tables) > 5 else f" 테이블: {', '.join(tables)}") + + # 각 테이블의 행 수 + print(f"\n 테이블별 행 수:") + total_rows = 0 + for table in sorted(tables): + try: + cursor.execute(f"SELECT COUNT(*) FROM {table}") + count = cursor.fetchone()[0] + if count > 0: + print(f" {table}: {count:,}") + total_rows += count + except: + pass + + print(f" 총 행 수: {total_rows:,}") + + conn.close() + + print("\n[완료]") + +if __name__ == "__main__": + verify_databases()