Files
QuantEngineByItz/tools/initialize_database_v2.py
T
kjh2064 f5c29f7ddf 데이터베이스 구조 재설계: 단일 DB -> 2개 DB 분리
- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블)
- snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블)

도구 경로 업데이트:
- auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용
- measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용
- validate_data_collection_v1.py: snapshot_admin.db 사용
- monitor_wbs_progress_v1.py: snapshot_admin.db 사용
- backup_recovery_manager_v1.py: 2개 DB 모두 백업

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 00:19:49 +09:00

155 lines
4.7 KiB
Python

#!/usr/bin/env python3
"""
데이터베이스 초기화 도구 (2개 DB 분리)
- test_kis_data_collection.db: data_feed 테이블 (KIS API 데이터)
- snapshot_admin_livecheck.db: performance, positions 테이블 (라이브 체크)
"""
import sqlite3
from pathlib import Path
from datetime import datetime
DB1_PATH = "src/quant_engine/test_kis_data_collection.db"
DB2_PATH = "src/quant_engine/snapshot_admin_livecheck.db"
def create_db1_schema():
"""DB1: KIS 데이터 수집 스키마"""
conn = sqlite3.connect(DB1_PATH)
cursor = conn.cursor()
# data_feed 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS data_feed (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
close_price REAL,
entry_price REAL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
entry_stage TEXT,
account TEXT,
entry_date TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
velocity_1d REAL,
velocity_5d REAL,
ma20 REAL,
atr20 REAL,
rsi_14 REAL,
volume INTEGER,
avg_trade_value_5d REAL,
sector TEXT,
beta REAL,
UNIQUE(ticker, entry_date)
)
""")
# 인덱스
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)")
conn.commit()
conn.close()
print(f"[OK] DB1 생성: {DB1_PATH}")
print(f" 테이블: data_feed (KIS API 데이터 수집용)")
def create_db2_schema():
"""DB2: snapshot_admin 라이브 체크 스키마"""
conn = sqlite3.connect(DB2_PATH)
cursor = conn.cursor()
# performance 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
entry_date TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
exit_date TEXT,
current_price REAL,
pnl_pct REAL,
status TEXT,
t20_milestone TEXT,
entry_stage TEXT,
account TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(ticker, entry_date)
)
""")
# positions 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL UNIQUE,
name TEXT,
quantity INTEGER,
entry_price REAL,
current_price REAL,
average_cost REAL,
sector TEXT,
weight_pct REAL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 인덱스
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)")
conn.commit()
conn.close()
print(f"[OK] DB2 생성: {DB2_PATH}")
print(f" 테이블: performance, positions (snapshot_admin 라이브용)")
def verify_databases():
"""DB 검증"""
print("\n" + "="*80)
print("데이터베이스 검증")
print("="*80)
for db_path in [DB1_PATH, DB2_PATH]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
file_size = Path(db_path).stat().st_size / 1024
print(f"\n[{Path(db_path).name}]")
print(f" 크기: {file_size:.2f} KB")
print(f" 테이블: {', '.join(tables)}")
for table in tables:
cursor.execute(f"PRAGMA table_info({table})")
col_count = len(cursor.fetchall())
print(f" - {table}: {col_count}개 컬럼")
conn.close()
if __name__ == "__main__":
# 기존 DB 백업
for db_path in [DB1_PATH, DB2_PATH]:
db_file = Path(db_path)
if db_file.exists():
backup_path = f"{db_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
db_file.rename(backup_path)
print(f"[OK] 백업: {backup_path}")
print("\n새 데이터베이스 생성 중...\n")
# 새 DB 생성
create_db1_schema()
create_db2_schema()
# 검증
verify_databases()
print("\n[완료] 2개 DB 초기화 완료!")