#!/usr/bin/env python3 """ WBS-8.1: T+20 레저 30건 자동 수집 체계 목표: 거래 후 T+20일 경과시 자동으로 성과 데이터 수집 및 정리 """ import sqlite3 import json from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Optional import sys class T20LedgerCollector: """T+20 거래 성과 자동 수집""" def __init__(self, db_path: str = None): self.db_path = db_path or "src/quant_engine/kis_data_collection.db" self.results = { "timestamp": datetime.now().isoformat(), "collections": [], "summary": {} } def _query_pending_trades(self) -> List[Dict]: """T+20이 경과했지만 성과가 미기록된 거래 조회""" try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() # T+20 경과 기준: entry_date + 20 <= today query = """ SELECT ticker, name, entry_date, entry_price, quantity, stop_price, target_price, entry_stage, account, CAST((julianday('now') - julianday(entry_date)) AS INTEGER) as days_elapsed FROM data_feed WHERE ticker NOT NULL AND entry_date NOT NULL AND entry_price NOT NULL AND quantity NOT NULL AND days_elapsed >= 20 AND ticker NOT IN ( SELECT DISTINCT ticker FROM performance WHERE entry_date = data_feed.entry_date ) ORDER BY entry_date ASC LIMIT 10 """ cursor.execute(query) trades = [dict(row) for row in cursor.fetchall()] conn.close() return trades except Exception as e: print(f"Error querying trades: {e}") return [] def _get_current_price(self, ticker: str) -> Optional[float]: """현재 가격 조회 (data_feed에서 최신값)""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = """ SELECT close_price FROM data_feed WHERE ticker = ? ORDER BY updated_at DESC LIMIT 1 """ cursor.execute(query, (ticker,)) result = cursor.fetchone() conn.close() return result[0] if result else None except: return None def collect_t20_performance(self) -> Dict: """T+20 성과 데이터 수집""" pending_trades = self._query_pending_trades() if not pending_trades: return { "status": "NO_PENDING_TRADES", "message": "T+20 경과 미기록 거래 없음", "collected_count": 0 } collected = [] for trade in pending_trades: ticker = trade["ticker"] current_price = self._get_current_price(ticker) if current_price is None: continue # 수익률 계산 entry_price = trade["entry_price"] pnl_pct = ((current_price - entry_price) / entry_price) * 100 collection = { "ticker": ticker, "name": trade["name"], "entry_date": trade["entry_date"], "t20_date": ( datetime.strptime(trade["entry_date"], "%Y-%m-%d") + timedelta(days=20) ).strftime("%Y-%m-%d"), "days_elapsed": trade["days_elapsed"], "entry_price": entry_price, "current_price": current_price, "pnl_pct": round(pnl_pct, 2), "quantity": trade["quantity"], "stop_price": trade["stop_price"], "target_price": trade["target_price"], "entry_stage": trade["entry_stage"], "account": trade["account"], "status": "T20_MILESTONE_REACHED" } collected.append(collection) return { "status": "SUCCESS", "collected_count": len(collected), "trades": collected, "next_action": "Record in performance sheet" } def record_t20_in_performance(self, collections: List[Dict]) -> Dict: """T+20 성과를 performance 탭에 기록""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() recorded = 0 for trade in collections: # 이미 기록되어 있는지 확인 cursor.execute( "SELECT COUNT(*) FROM performance WHERE ticker = ? AND entry_date = ?", (trade["ticker"], trade["entry_date"]) ) if cursor.fetchone()[0] > 0: continue # 이미 기록됨 # 성과 기록 cursor.execute(""" INSERT INTO performance ( ticker, name, entry_date, entry_price, quantity, stop_price, target_price, entry_stage, account, current_price, pnl_pct, status, t20_milestone ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( trade["ticker"], trade["name"], trade["entry_date"], trade["entry_price"], trade["quantity"], trade["stop_price"], trade["target_price"], trade["entry_stage"], trade["account"], trade["current_price"], trade["pnl_pct"], "T20_RECORDED", trade["t20_date"] )) recorded += 1 conn.commit() conn.close() return { "status": "SUCCESS", "recorded_count": recorded, "message": f"{recorded}개 거래 T+20 성과 기록 완료" } except Exception as e: return { "status": "ERROR", "error": str(e), "recorded_count": 0 } def check_t20_ledger_status(self) -> Dict: """T+20 레저 진행 상태 확인""" try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 전체 완료 거래 cursor.execute( "SELECT COUNT(*) FROM performance WHERE exit_date IS NOT NULL" ) total_completed = cursor.fetchone()[0] # T+20 이상 경과한 거래 cursor.execute(""" SELECT COUNT(*) FROM data_feed WHERE ticker NOT NULL AND entry_date NOT NULL AND CAST((julianday('now') - julianday(entry_date)) AS INTEGER) >= 20 """) t20_eligible = cursor.fetchone()[0] # T+20 기록된 거래 cursor.execute( "SELECT COUNT(*) FROM performance WHERE t20_milestone IS NOT NULL" ) t20_recorded = cursor.fetchone()[0] conn.close() progress = (t20_recorded / 30) * 100 if 30 > 0 else 0 return { "status": "MONITORING", "target": 30, "recorded": t20_recorded, "eligible": t20_eligible, "progress_pct": round(progress, 1), "days_to_target": self._estimate_days_to_target(t20_recorded) } except Exception as e: return { "status": "ERROR", "error": str(e) } def _estimate_days_to_target(self, current_count: int, target: int = 30) -> int: """목표 도달까지 예상 일수""" if current_count >= target: return 0 # 평균 수집 속도 추정 (일일 ~0.5건) remaining = target - current_count avg_daily_rate = 0.5 estimated_days = int(remaining / avg_daily_rate) return max(1, estimated_days) def generate_report(self) -> Dict: """전체 리포트 생성""" # T+20 경과 거래 수집 collection_result = self.collect_t20_performance() # 성과 기록 if collection_result.get("trades"): record_result = self.record_t20_in_performance( collection_result["trades"] ) collection_result["recorded"] = record_result # 현황 확인 status = self.check_t20_ledger_status() self.results["collections"] = collection_result self.results["status"] = status self.results["summary"] = { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "wbs_8_1_target": 30, "wbs_8_1_progress": status.get("recorded", 0), "wbs_8_1_progress_pct": status.get("progress_pct", 0), "next_milestone": "2026-07-15 (30건 목표)", "data_quality": "MONITORING" } return self.results def print_report(self): """리포트 출력""" print("\n" + "=" * 80) print("[WBS-8.1] T+20 거래 레저 자동 수집 실행") print("=" * 80) print(f"시간: {self.results['timestamp']}\n") # 수집 현황 collection = self.results.get("collections", {}) if collection.get("status") == "SUCCESS": print(f"[수집] {collection['collected_count']}개 거래 T+20 도달") for trade in collection.get("trades", [])[:5]: print(f" - {trade['ticker']}: {trade['pnl_pct']:+.2f}% (T+{trade['days_elapsed']})") else: print(f"[수집] {collection.get('message', 'N/A')}") # 기록 현황 if "recorded" in collection: recorded = collection["recorded"] print(f"\n[기록] {recorded.get('recorded_count', 0)}개 성과 기록") # 진행률 status = self.results.get("status", {}) print(f"\n[진행률]") print(f" 목표: {status.get('target', 30)}건") print(f" 달성: {status.get('recorded', 0)}건 ({status.get('progress_pct', 0):.1f}%)") print(f" 남은 기간: 약 {status.get('days_to_target', 'N/A')}일") print("\n" + "=" * 80 + "\n") def save_report(self, output_file: str = None): """리포트 저장""" if not output_file: output_file = f"Temp/t20_ledger_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" Path(output_file).parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(self.results, f, indent=2, ensure_ascii=False) print(f"리포트 저장: {output_file}") if __name__ == "__main__": collector = T20LedgerCollector() collector.generate_report() collector.print_report() collector.save_report()