Files
QuantEngineByItz/tools/auto_collect_t20_ledger_v1.py
kjh2064 f5c29f7ddf 데이터베이스 구조 재설계: 단일 DB -> 2개 DB 분리
- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블)
- snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블)

도구 경로 업데이트:
- auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용
- measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용
- validate_data_collection_v1.py: snapshot_admin.db 사용
- monitor_wbs_progress_v1.py: snapshot_admin.db 사용
- backup_recovery_manager_v1.py: 2개 DB 모두 백업

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 00:19:49 +09:00

335 lines
11 KiB
Python

#!/usr/bin/env python3
"""
WBS-8.1: T+20 레저 30건 자동 수집 체계
목표: 거래 후 T+20일 경과시 자동으로 성과 데이터 수집 및 정리
"""
import sqlite3
import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import sys
class T20LedgerCollector:
"""T+20 거래 성과 자동 수집"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/kis_data_collection.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"collections": [],
"summary": {}
}
def _query_pending_trades(self) -> List[Dict]:
"""T+20이 경과했지만 성과가 미기록된 거래 조회"""
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# T+20 경과 기준: entry_date + 20 <= today
query = """
SELECT
ticker,
name,
entry_date,
entry_price,
quantity,
stop_price,
target_price,
entry_stage,
account,
CAST((julianday('now') - julianday(entry_date)) AS INTEGER) as days_elapsed
FROM data_feed
WHERE ticker NOT NULL
AND entry_date NOT NULL
AND entry_price NOT NULL
AND quantity NOT NULL
AND days_elapsed >= 20
AND ticker NOT IN (
SELECT DISTINCT ticker FROM performance
WHERE entry_date = data_feed.entry_date
)
ORDER BY entry_date ASC
LIMIT 10
"""
cursor.execute(query)
trades = [dict(row) for row in cursor.fetchall()]
conn.close()
return trades
except Exception as e:
print(f"Error querying trades: {e}")
return []
def _get_current_price(self, ticker: str) -> Optional[float]:
"""현재 가격 조회 (data_feed에서 최신값)"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = """
SELECT close_price FROM data_feed
WHERE ticker = ?
ORDER BY updated_at DESC
LIMIT 1
"""
cursor.execute(query, (ticker,))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
except:
return None
def collect_t20_performance(self) -> Dict:
"""T+20 성과 데이터 수집"""
pending_trades = self._query_pending_trades()
if not pending_trades:
return {
"status": "NO_PENDING_TRADES",
"message": "T+20 경과 미기록 거래 없음",
"collected_count": 0
}
collected = []
for trade in pending_trades:
ticker = trade["ticker"]
current_price = self._get_current_price(ticker)
if current_price is None:
continue
# 수익률 계산
entry_price = trade["entry_price"]
pnl_pct = ((current_price - entry_price) / entry_price) * 100
collection = {
"ticker": ticker,
"name": trade["name"],
"entry_date": trade["entry_date"],
"t20_date": (
datetime.strptime(trade["entry_date"], "%Y-%m-%d") +
timedelta(days=20)
).strftime("%Y-%m-%d"),
"days_elapsed": trade["days_elapsed"],
"entry_price": entry_price,
"current_price": current_price,
"pnl_pct": round(pnl_pct, 2),
"quantity": trade["quantity"],
"stop_price": trade["stop_price"],
"target_price": trade["target_price"],
"entry_stage": trade["entry_stage"],
"account": trade["account"],
"status": "T20_MILESTONE_REACHED"
}
collected.append(collection)
return {
"status": "SUCCESS",
"collected_count": len(collected),
"trades": collected,
"next_action": "Record in performance sheet"
}
def record_t20_in_performance(self, collections: List[Dict]) -> Dict:
"""T+20 성과를 performance 탭에 기록"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
recorded = 0
for trade in collections:
# 이미 기록되어 있는지 확인
cursor.execute(
"SELECT COUNT(*) FROM performance WHERE ticker = ? AND entry_date = ?",
(trade["ticker"], trade["entry_date"])
)
if cursor.fetchone()[0] > 0:
continue # 이미 기록됨
# 성과 기록
cursor.execute("""
INSERT INTO performance (
ticker, name, entry_date, entry_price,
quantity, stop_price, target_price,
entry_stage, account, current_price,
pnl_pct, status, t20_milestone
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
trade["ticker"],
trade["name"],
trade["entry_date"],
trade["entry_price"],
trade["quantity"],
trade["stop_price"],
trade["target_price"],
trade["entry_stage"],
trade["account"],
trade["current_price"],
trade["pnl_pct"],
"T20_RECORDED",
trade["t20_date"]
))
recorded += 1
conn.commit()
conn.close()
return {
"status": "SUCCESS",
"recorded_count": recorded,
"message": f"{recorded}개 거래 T+20 성과 기록 완료"
}
except Exception as e:
return {
"status": "ERROR",
"error": str(e),
"recorded_count": 0
}
def check_t20_ledger_status(self) -> Dict:
"""T+20 레저 진행 상태 확인"""
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 전체 완료 거래
cursor.execute(
"SELECT COUNT(*) FROM performance WHERE exit_date IS NOT NULL"
)
total_completed = cursor.fetchone()[0]
# T+20 이상 경과한 거래
cursor.execute("""
SELECT COUNT(*) FROM data_feed
WHERE ticker NOT NULL
AND entry_date NOT NULL
AND CAST((julianday('now') - julianday(entry_date)) AS INTEGER) >= 20
""")
t20_eligible = cursor.fetchone()[0]
# T+20 기록된 거래
cursor.execute(
"SELECT COUNT(*) FROM performance WHERE t20_milestone IS NOT NULL"
)
t20_recorded = cursor.fetchone()[0]
conn.close()
progress = (t20_recorded / 30) * 100 if 30 > 0 else 0
return {
"status": "MONITORING",
"target": 30,
"recorded": t20_recorded,
"eligible": t20_eligible,
"progress_pct": round(progress, 1),
"days_to_target": self._estimate_days_to_target(t20_recorded)
}
except Exception as e:
return {
"status": "ERROR",
"error": str(e)
}
def _estimate_days_to_target(self, current_count: int, target: int = 30) -> int:
"""목표 도달까지 예상 일수"""
if current_count >= target:
return 0
# 평균 수집 속도 추정 (일일 ~0.5건)
remaining = target - current_count
avg_daily_rate = 0.5
estimated_days = int(remaining / avg_daily_rate)
return max(1, estimated_days)
def generate_report(self) -> Dict:
"""전체 리포트 생성"""
# T+20 경과 거래 수집
collection_result = self.collect_t20_performance()
# 성과 기록
if collection_result.get("trades"):
record_result = self.record_t20_in_performance(
collection_result["trades"]
)
collection_result["recorded"] = record_result
# 현황 확인
status = self.check_t20_ledger_status()
self.results["collections"] = collection_result
self.results["status"] = status
self.results["summary"] = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"wbs_8_1_target": 30,
"wbs_8_1_progress": status.get("recorded", 0),
"wbs_8_1_progress_pct": status.get("progress_pct", 0),
"next_milestone": "2026-07-15 (30건 목표)",
"data_quality": "MONITORING"
}
return self.results
def print_report(self):
"""리포트 출력"""
print("\n" + "=" * 80)
print("[WBS-8.1] T+20 거래 레저 자동 수집 실행")
print("=" * 80)
print(f"시간: {self.results['timestamp']}\n")
# 수집 현황
collection = self.results.get("collections", {})
if collection.get("status") == "SUCCESS":
print(f"[수집] {collection['collected_count']}개 거래 T+20 도달")
for trade in collection.get("trades", [])[:5]:
print(f" - {trade['ticker']}: {trade['pnl_pct']:+.2f}% (T+{trade['days_elapsed']})")
else:
print(f"[수집] {collection.get('message', 'N/A')}")
# 기록 현황
if "recorded" in collection:
recorded = collection["recorded"]
print(f"\n[기록] {recorded.get('recorded_count', 0)}개 성과 기록")
# 진행률
status = self.results.get("status", {})
print(f"\n[진행률]")
print(f" 목표: {status.get('target', 30)}건")
print(f" 달성: {status.get('recorded', 0)}건 ({status.get('progress_pct', 0):.1f}%)")
print(f" 남은 기간: 약 {status.get('days_to_target', 'N/A')}일")
print("\n" + "=" * 80 + "\n")
def save_report(self, output_file: str = None):
"""리포트 저장"""
if not output_file:
output_file = f"Temp/t20_ledger_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"리포트 저장: {output_file}")
if __name__ == "__main__":
collector = T20LedgerCollector()
collector.generate_report()
collector.print_report()
collector.save_report()