Files
QuantEngineByItz/tools/initialize_databases.py
kjh2064 f5c29f7ddf 데이터베이스 구조 재설계: 단일 DB -> 2개 DB 분리
- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블)
- snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블)

도구 경로 업데이트:
- auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용
- measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용
- validate_data_collection_v1.py: snapshot_admin.db 사용
- monitor_wbs_progress_v1.py: snapshot_admin.db 사용
- backup_recovery_manager_v1.py: 2개 DB 모두 백업

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 00:19:49 +09:00

143 lines
4.5 KiB
Python

#!/usr/bin/env python3
"""
데이터베이스 초기화 도구 (2개 DB)
1. kis_data_collection.db - KIS API 데이터 수집
2. snapshot_admin.db - 성능/포지션 관리
"""
import sqlite3
from pathlib import Path
from datetime import datetime
DB1_PATH = "src/quant_engine/kis_data_collection.db"
DB2_PATH = "src/quant_engine/snapshot_admin.db"
def create_kis_db():
"""kis_data_collection.db: KIS API 데이터 스키마"""
conn = sqlite3.connect(DB1_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS data_feed (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
close_price REAL,
entry_price REAL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
entry_stage TEXT,
account TEXT,
entry_date TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
velocity_1d REAL,
velocity_5d REAL,
ma20 REAL,
atr20 REAL,
rsi_14 REAL,
volume INTEGER,
avg_trade_value_5d REAL,
sector TEXT,
beta REAL,
UNIQUE(ticker, entry_date)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)")
conn.commit()
conn.close()
print(f"[OK] kis_data_collection.db: data_feed 테이블 생성")
def create_snapshot_admin_db():
"""snapshot_admin.db: 성능/포지션 스키마"""
conn = sqlite3.connect(DB2_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
entry_date TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
exit_date TEXT,
current_price REAL,
pnl_pct REAL,
status TEXT,
t20_milestone TEXT,
entry_stage TEXT,
account TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(ticker, entry_date)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL UNIQUE,
name TEXT,
quantity INTEGER,
entry_price REAL,
current_price REAL,
average_cost REAL,
sector TEXT,
weight_pct REAL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)")
conn.commit()
conn.close()
print(f"[OK] snapshot_admin.db: performance, positions 테이블 생성")
def verify():
"""검증"""
print("\n" + "="*80)
print("데이터베이스 구조 확인")
print("="*80)
for db_path, db_name in [(DB1_PATH, "kis_data_collection.db"), (DB2_PATH, "snapshot_admin.db")]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
file_size = Path(db_path).stat().st_size / 1024
print(f"\n[{db_name}]")
print(f" 크기: {file_size:.2f} KB")
print(f" 테이블: {', '.join(tables)}")
for table in tables:
if table != 'sqlite_sequence':
cursor.execute(f"SELECT COUNT(*) FROM {table}")
row_count = cursor.fetchone()[0]
cursor.execute(f"PRAGMA table_info({table})")
col_count = len(cursor.fetchall())
print(f" - {table}: {col_count}개 컬럼, {row_count}개 행")
conn.close()
if __name__ == "__main__":
print("2개 데이터베이스 초기화 중...\n")
create_kis_db()
create_snapshot_admin_db()
verify()
print("\n[완료] 2개 DB 초기화 완료!")
print(f" 1. kis_data_collection.db (KIS API)")
print(f" 2. snapshot_admin.db (성능/포지션)")