f5c29f7ddf
- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블) - snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블) 도구 경로 업데이트: - auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용 - measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용 - validate_data_collection_v1.py: snapshot_admin.db 사용 - monitor_wbs_progress_v1.py: snapshot_admin.db 사용 - backup_recovery_manager_v1.py: 2개 DB 모두 백업 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
155 lines
4.7 KiB
Python
155 lines
4.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
데이터베이스 초기화 도구 (2개 DB 분리)
|
|
- test_kis_data_collection.db: data_feed 테이블 (KIS API 데이터)
|
|
- snapshot_admin_livecheck.db: performance, positions 테이블 (라이브 체크)
|
|
"""
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
DB1_PATH = "src/quant_engine/test_kis_data_collection.db"
|
|
DB2_PATH = "src/quant_engine/snapshot_admin_livecheck.db"
|
|
|
|
def create_db1_schema():
|
|
"""DB1: KIS 데이터 수집 스키마"""
|
|
conn = sqlite3.connect(DB1_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# data_feed 테이블
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS data_feed (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ticker TEXT NOT NULL,
|
|
name TEXT,
|
|
close_price REAL,
|
|
entry_price REAL,
|
|
quantity INTEGER,
|
|
stop_price REAL,
|
|
target_price REAL,
|
|
entry_stage TEXT,
|
|
account TEXT,
|
|
entry_date TEXT,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
velocity_1d REAL,
|
|
velocity_5d REAL,
|
|
ma20 REAL,
|
|
atr20 REAL,
|
|
rsi_14 REAL,
|
|
volume INTEGER,
|
|
avg_trade_value_5d REAL,
|
|
sector TEXT,
|
|
beta REAL,
|
|
UNIQUE(ticker, entry_date)
|
|
)
|
|
""")
|
|
|
|
# 인덱스
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"[OK] DB1 생성: {DB1_PATH}")
|
|
print(f" 테이블: data_feed (KIS API 데이터 수집용)")
|
|
|
|
def create_db2_schema():
|
|
"""DB2: snapshot_admin 라이브 체크 스키마"""
|
|
conn = sqlite3.connect(DB2_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# performance 테이블
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS performance (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ticker TEXT NOT NULL,
|
|
name TEXT,
|
|
entry_date TEXT NOT NULL,
|
|
entry_price REAL NOT NULL,
|
|
quantity INTEGER,
|
|
stop_price REAL,
|
|
target_price REAL,
|
|
exit_date TEXT,
|
|
current_price REAL,
|
|
pnl_pct REAL,
|
|
status TEXT,
|
|
t20_milestone TEXT,
|
|
entry_stage TEXT,
|
|
account TEXT,
|
|
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
UNIQUE(ticker, entry_date)
|
|
)
|
|
""")
|
|
|
|
# positions 테이블
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS positions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ticker TEXT NOT NULL UNIQUE,
|
|
name TEXT,
|
|
quantity INTEGER,
|
|
entry_price REAL,
|
|
current_price REAL,
|
|
average_cost REAL,
|
|
sector TEXT,
|
|
weight_pct REAL,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
""")
|
|
|
|
# 인덱스
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"[OK] DB2 생성: {DB2_PATH}")
|
|
print(f" 테이블: performance, positions (snapshot_admin 라이브용)")
|
|
|
|
def verify_databases():
|
|
"""DB 검증"""
|
|
print("\n" + "="*80)
|
|
print("데이터베이스 검증")
|
|
print("="*80)
|
|
|
|
for db_path in [DB1_PATH, DB2_PATH]:
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
|
tables = [row[0] for row in cursor.fetchall()]
|
|
|
|
file_size = Path(db_path).stat().st_size / 1024
|
|
print(f"\n[{Path(db_path).name}]")
|
|
print(f" 크기: {file_size:.2f} KB")
|
|
print(f" 테이블: {', '.join(tables)}")
|
|
|
|
for table in tables:
|
|
cursor.execute(f"PRAGMA table_info({table})")
|
|
col_count = len(cursor.fetchall())
|
|
print(f" - {table}: {col_count}개 컬럼")
|
|
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
# 기존 DB 백업
|
|
for db_path in [DB1_PATH, DB2_PATH]:
|
|
db_file = Path(db_path)
|
|
if db_file.exists():
|
|
backup_path = f"{db_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
db_file.rename(backup_path)
|
|
print(f"[OK] 백업: {backup_path}")
|
|
|
|
print("\n새 데이터베이스 생성 중...\n")
|
|
|
|
# 새 DB 생성
|
|
create_db1_schema()
|
|
create_db2_schema()
|
|
|
|
# 검증
|
|
verify_databases()
|
|
|
|
print("\n[완료] 2개 DB 초기화 완료!")
|