#!/usr/bin/env python3 """ 데이터베이스 초기화 도구 (2개 DB 분리) - test_kis_data_collection.db: data_feed 테이블 (KIS API 데이터) - snapshot_admin_livecheck.db: performance, positions 테이블 (라이브 체크) """ import sqlite3 from pathlib import Path from datetime import datetime DB1_PATH = "src/quant_engine/test_kis_data_collection.db" DB2_PATH = "src/quant_engine/snapshot_admin_livecheck.db" def create_db1_schema(): """DB1: KIS 데이터 수집 스키마""" conn = sqlite3.connect(DB1_PATH) cursor = conn.cursor() # data_feed 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS data_feed ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, name TEXT, close_price REAL, entry_price REAL, quantity INTEGER, stop_price REAL, target_price REAL, entry_stage TEXT, account TEXT, entry_date TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, velocity_1d REAL, velocity_5d REAL, ma20 REAL, atr20 REAL, rsi_14 REAL, volume INTEGER, avg_trade_value_5d REAL, sector TEXT, beta REAL, UNIQUE(ticker, entry_date) ) """) # 인덱스 cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)") conn.commit() conn.close() print(f"[OK] DB1 생성: {DB1_PATH}") print(f" 테이블: data_feed (KIS API 데이터 수집용)") def create_db2_schema(): """DB2: snapshot_admin 라이브 체크 스키마""" conn = sqlite3.connect(DB2_PATH) cursor = conn.cursor() # performance 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS performance ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL, name TEXT, entry_date TEXT NOT NULL, entry_price REAL NOT NULL, quantity INTEGER, stop_price REAL, target_price REAL, exit_date TEXT, current_price REAL, pnl_pct REAL, status TEXT, t20_milestone TEXT, entry_stage TEXT, account TEXT, recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(ticker, entry_date) ) """) # positions 테이블 cursor.execute(""" CREATE TABLE IF NOT EXISTS positions ( id INTEGER PRIMARY KEY AUTOINCREMENT, ticker TEXT NOT NULL UNIQUE, name TEXT, quantity INTEGER, entry_price REAL, current_price REAL, average_cost REAL, sector TEXT, weight_pct REAL, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 인덱스 cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)") conn.commit() conn.close() print(f"[OK] DB2 생성: {DB2_PATH}") print(f" 테이블: performance, positions (snapshot_admin 라이브용)") def verify_databases(): """DB 검증""" print("\n" + "="*80) print("데이터베이스 검증") print("="*80) for db_path in [DB1_PATH, DB2_PATH]: conn = sqlite3.connect(db_path) cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table'") tables = [row[0] for row in cursor.fetchall()] file_size = Path(db_path).stat().st_size / 1024 print(f"\n[{Path(db_path).name}]") print(f" 크기: {file_size:.2f} KB") print(f" 테이블: {', '.join(tables)}") for table in tables: cursor.execute(f"PRAGMA table_info({table})") col_count = len(cursor.fetchall()) print(f" - {table}: {col_count}개 컬럼") conn.close() if __name__ == "__main__": # 기존 DB 백업 for db_path in [DB1_PATH, DB2_PATH]: db_file = Path(db_path) if db_file.exists(): backup_path = f"{db_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" db_file.rename(backup_path) print(f"[OK] 백업: {backup_path}") print("\n새 데이터베이스 생성 중...\n") # 새 DB 생성 create_db1_schema() create_db2_schema() # 검증 verify_databases() print("\n[완료] 2개 DB 초기화 완료!")