데이터베이스 구조 재설계: 단일 DB -> 2개 DB 분리

- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블)
- snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블)

도구 경로 업데이트:
- auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용
- measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용
- validate_data_collection_v1.py: snapshot_admin.db 사용
- monitor_wbs_progress_v1.py: snapshot_admin.db 사용
- backup_recovery_manager_v1.py: 2개 DB 모두 백업

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-06-23 00:19:49 +09:00
parent 1dddffca5c
commit f5c29f7ddf
11 changed files with 451 additions and 17 deletions
View File
Binary file not shown.
Binary file not shown.
+1 -1
View File
@@ -17,7 +17,7 @@ class T20LedgerCollector:
"""T+20 거래 성과 자동 수집"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/data_feed.db"
self.db_path = db_path or "src/quant_engine/kis_data_collection.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"collections": [],
+10 -6
View File
@@ -45,7 +45,8 @@ class BackupRecoveryManager:
try:
# 필요한 파일 목록
files_to_backup = [
self.data_dir / "data_feed.db",
self.data_dir / "kis_data_collection.db",
self.data_dir / "snapshot_admin.db",
self.data_dir / "calibration_registry.yaml",
Path("spec") / "12_field_dictionary.yaml",
Path("spec") / "13_formula_registry.yaml",
@@ -294,18 +295,21 @@ class BackupRecoveryManager:
actual_files = len(list(backup_path.glob("*"))) - 1 # metadata 제외
expected_files = metadata.get("files_backed_up", actual_files)
# DB 무결성 검증
db_file = backup_path / "data_feed.db"
db_integrity = "OK"
# DB 무결성 검증 (2개 DB)
db_integrity = {}
for db_name in ["kis_data_collection.db", "snapshot_admin.db"]:
db_file = backup_path / db_name
if db_file.exists():
try:
conn = sqlite3.connect(db_file)
cursor = conn.execute("PRAGMA integrity_check")
result = cursor.fetchone()
db_integrity = result[0] if result else "UNKNOWN"
db_integrity[db_name] = result[0] if result else "UNKNOWN"
conn.close()
except Exception:
db_integrity = "FAILED"
db_integrity[db_name] = "FAILED"
else:
db_integrity[db_name] = "NOT_FOUND"
return {
"backup_name": backup_name,
+134
View File
@@ -0,0 +1,134 @@
#!/usr/bin/env python3
"""
데이터베이스 초기화 도구
data_feed, performance, positions 테이블 생성
"""
import sqlite3
from pathlib import Path
from datetime import datetime
DB_PATH = "src/quant_engine/data_feed.db"
def create_tables():
"""필수 테이블 생성"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# data_feed 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS data_feed (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
close_price REAL,
entry_price REAL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
entry_stage TEXT,
account TEXT,
entry_date TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
velocity_1d REAL,
velocity_5d REAL,
ma20 REAL,
atr20 REAL,
rsi_14 REAL,
volume INTEGER,
avg_trade_value_5d REAL,
sector TEXT,
beta REAL,
UNIQUE(ticker, entry_date)
)
""")
# performance 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
entry_date TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
exit_date TEXT,
current_price REAL,
pnl_pct REAL,
status TEXT,
t20_milestone TEXT,
entry_stage TEXT,
account TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(ticker, entry_date)
)
""")
# positions 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL UNIQUE,
name TEXT,
quantity INTEGER,
entry_price REAL,
current_price REAL,
average_cost REAL,
sector TEXT,
weight_pct REAL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 인덱스 생성
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)")
conn.commit()
conn.close()
print(f"[OK] 데이터베이스 초기화 완료: {DB_PATH}")
print(f" 생성된 테이블: data_feed, performance, positions")
def verify_database():
"""데이터베이스 검증"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 테이블 확인
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
print(f"\n[검증] 테이블 목록: {', '.join(tables)}")
print(f"[검증] 파일 크기: {Path(DB_PATH).stat().st_size / 1024:.2f} KB")
# 각 테이블 구조 확인
for table in tables:
cursor.execute(f"PRAGMA table_info({table})")
columns = cursor.fetchall()
print(f"\n{table} 컬럼:")
for col in columns:
print(f" - {col[1]} ({col[2]})")
conn.close()
if __name__ == "__main__":
# 기존 DB 백업
db_file = Path(DB_PATH)
if db_file.exists():
backup_path = f"{DB_PATH}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
db_file.rename(backup_path)
print(f"[OK] 기존 데이터베이스 백업: {backup_path}")
# 새 DB 생성
create_tables()
# 검증
verify_database()
print("\n[완료] 데이터베이스 초기화 완료!")
+154
View File
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
데이터베이스 초기화 도구 (2개 DB 분리)
- test_kis_data_collection.db: data_feed 테이블 (KIS API 데이터)
- snapshot_admin_livecheck.db: performance, positions 테이블 (라이브 체크)
"""
import sqlite3
from pathlib import Path
from datetime import datetime
DB1_PATH = "src/quant_engine/test_kis_data_collection.db"
DB2_PATH = "src/quant_engine/snapshot_admin_livecheck.db"
def create_db1_schema():
"""DB1: KIS 데이터 수집 스키마"""
conn = sqlite3.connect(DB1_PATH)
cursor = conn.cursor()
# data_feed 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS data_feed (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
close_price REAL,
entry_price REAL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
entry_stage TEXT,
account TEXT,
entry_date TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
velocity_1d REAL,
velocity_5d REAL,
ma20 REAL,
atr20 REAL,
rsi_14 REAL,
volume INTEGER,
avg_trade_value_5d REAL,
sector TEXT,
beta REAL,
UNIQUE(ticker, entry_date)
)
""")
# 인덱스
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)")
conn.commit()
conn.close()
print(f"[OK] DB1 생성: {DB1_PATH}")
print(f" 테이블: data_feed (KIS API 데이터 수집용)")
def create_db2_schema():
"""DB2: snapshot_admin 라이브 체크 스키마"""
conn = sqlite3.connect(DB2_PATH)
cursor = conn.cursor()
# performance 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
entry_date TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
exit_date TEXT,
current_price REAL,
pnl_pct REAL,
status TEXT,
t20_milestone TEXT,
entry_stage TEXT,
account TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(ticker, entry_date)
)
""")
# positions 테이블
cursor.execute("""
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL UNIQUE,
name TEXT,
quantity INTEGER,
entry_price REAL,
current_price REAL,
average_cost REAL,
sector TEXT,
weight_pct REAL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 인덱스
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)")
conn.commit()
conn.close()
print(f"[OK] DB2 생성: {DB2_PATH}")
print(f" 테이블: performance, positions (snapshot_admin 라이브용)")
def verify_databases():
"""DB 검증"""
print("\n" + "="*80)
print("데이터베이스 검증")
print("="*80)
for db_path in [DB1_PATH, DB2_PATH]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
file_size = Path(db_path).stat().st_size / 1024
print(f"\n[{Path(db_path).name}]")
print(f" 크기: {file_size:.2f} KB")
print(f" 테이블: {', '.join(tables)}")
for table in tables:
cursor.execute(f"PRAGMA table_info({table})")
col_count = len(cursor.fetchall())
print(f" - {table}: {col_count}개 컬럼")
conn.close()
if __name__ == "__main__":
# 기존 DB 백업
for db_path in [DB1_PATH, DB2_PATH]:
db_file = Path(db_path)
if db_file.exists():
backup_path = f"{db_path}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
db_file.rename(backup_path)
print(f"[OK] 백업: {backup_path}")
print("\n새 데이터베이스 생성 중...\n")
# 새 DB 생성
create_db1_schema()
create_db2_schema()
# 검증
verify_databases()
print("\n[완료] 2개 DB 초기화 완료!")
+142
View File
@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
데이터베이스 초기화 도구 (2개 DB)
1. kis_data_collection.db - KIS API 데이터 수집
2. snapshot_admin.db - 성능/포지션 관리
"""
import sqlite3
from pathlib import Path
from datetime import datetime
DB1_PATH = "src/quant_engine/kis_data_collection.db"
DB2_PATH = "src/quant_engine/snapshot_admin.db"
def create_kis_db():
"""kis_data_collection.db: KIS API 데이터 스키마"""
conn = sqlite3.connect(DB1_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS data_feed (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
close_price REAL,
entry_price REAL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
entry_stage TEXT,
account TEXT,
entry_date TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
velocity_1d REAL,
velocity_5d REAL,
ma20 REAL,
atr20 REAL,
rsi_14 REAL,
volume INTEGER,
avg_trade_value_5d REAL,
sector TEXT,
beta REAL,
UNIQUE(ticker, entry_date)
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_ticker ON data_feed(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_feed_entry_date ON data_feed(entry_date)")
conn.commit()
conn.close()
print(f"[OK] kis_data_collection.db: data_feed 테이블 생성")
def create_snapshot_admin_db():
"""snapshot_admin.db: 성능/포지션 스키마"""
conn = sqlite3.connect(DB2_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL,
name TEXT,
entry_date TEXT NOT NULL,
entry_price REAL NOT NULL,
quantity INTEGER,
stop_price REAL,
target_price REAL,
exit_date TEXT,
current_price REAL,
pnl_pct REAL,
status TEXT,
t20_milestone TEXT,
entry_stage TEXT,
account TEXT,
recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(ticker, entry_date)
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS positions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ticker TEXT NOT NULL UNIQUE,
name TEXT,
quantity INTEGER,
entry_price REAL,
current_price REAL,
average_cost REAL,
sector TEXT,
weight_pct REAL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_ticker ON performance(ticker)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_performance_entry_date ON performance(entry_date)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_positions_ticker ON positions(ticker)")
conn.commit()
conn.close()
print(f"[OK] snapshot_admin.db: performance, positions 테이블 생성")
def verify():
"""검증"""
print("\n" + "="*80)
print("데이터베이스 구조 확인")
print("="*80)
for db_path, db_name in [(DB1_PATH, "kis_data_collection.db"), (DB2_PATH, "snapshot_admin.db")]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
file_size = Path(db_path).stat().st_size / 1024
print(f"\n[{db_name}]")
print(f" 크기: {file_size:.2f} KB")
print(f" 테이블: {', '.join(tables)}")
for table in tables:
if table != 'sqlite_sequence':
cursor.execute(f"SELECT COUNT(*) FROM {table}")
row_count = cursor.fetchone()[0]
cursor.execute(f"PRAGMA table_info({table})")
col_count = len(cursor.fetchall())
print(f" - {table}: {col_count}개 컬럼, {row_count}개 행")
conn.close()
if __name__ == "__main__":
print("2개 데이터베이스 초기화 중...\n")
create_kis_db()
create_snapshot_admin_db()
verify()
print("\n[완료] 2개 DB 초기화 완료!")
print(f" 1. kis_data_collection.db (KIS API)")
print(f" 2. snapshot_admin.db (성능/포지션)")
+1 -1
View File
@@ -16,7 +16,7 @@ class SectorFlowReliabilityMeasure:
"""섹터 플로우 신뢰도 측정 도구"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/data_feed.db"
self.db_path = db_path or "src/quant_engine/kis_data_collection.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"sectors": {},
+1 -1
View File
@@ -17,7 +17,7 @@ class WBSProgressMonitor:
"""WBS-8 & WBS-9 진행률 모니터링"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/data_feed.db"
self.db_path = db_path or "src/quant_engine/snapshot_admin.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"wbs_8": {},
+1 -1
View File
@@ -16,7 +16,7 @@ class DataCollectionValidator:
"""데이터 수집 기능 및 테이블 검증"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/data_feed.db"
self.db_path = db_path or "src/quant_engine/snapshot_admin.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"database_path": self.db_path,