f5c29f7ddf
- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블) - snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블) 도구 경로 업데이트: - auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용 - measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용 - validate_data_collection_v1.py: snapshot_admin.db 사용 - monitor_wbs_progress_v1.py: snapshot_admin.db 사용 - backup_recovery_manager_v1.py: 2개 DB 모두 백업 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
335 lines
11 KiB
Python
335 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WBS-8.1: T+20 레저 30건 자동 수집 체계
|
|
|
|
목표: 거래 후 T+20일 경과시 자동으로 성과 데이터 수집 및 정리
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional
|
|
import sys
|
|
|
|
|
|
class T20LedgerCollector:
|
|
"""T+20 거래 성과 자동 수집"""
|
|
|
|
def __init__(self, db_path: str = None):
|
|
self.db_path = db_path or "src/quant_engine/kis_data_collection.db"
|
|
self.results = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"collections": [],
|
|
"summary": {}
|
|
}
|
|
|
|
def _query_pending_trades(self) -> List[Dict]:
|
|
"""T+20이 경과했지만 성과가 미기록된 거래 조회"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# T+20 경과 기준: entry_date + 20 <= today
|
|
query = """
|
|
SELECT
|
|
ticker,
|
|
name,
|
|
entry_date,
|
|
entry_price,
|
|
quantity,
|
|
stop_price,
|
|
target_price,
|
|
entry_stage,
|
|
account,
|
|
CAST((julianday('now') - julianday(entry_date)) AS INTEGER) as days_elapsed
|
|
FROM data_feed
|
|
WHERE ticker NOT NULL
|
|
AND entry_date NOT NULL
|
|
AND entry_price NOT NULL
|
|
AND quantity NOT NULL
|
|
AND days_elapsed >= 20
|
|
AND ticker NOT IN (
|
|
SELECT DISTINCT ticker FROM performance
|
|
WHERE entry_date = data_feed.entry_date
|
|
)
|
|
ORDER BY entry_date ASC
|
|
LIMIT 10
|
|
"""
|
|
|
|
cursor.execute(query)
|
|
trades = [dict(row) for row in cursor.fetchall()]
|
|
conn.close()
|
|
|
|
return trades
|
|
except Exception as e:
|
|
print(f"Error querying trades: {e}")
|
|
return []
|
|
|
|
def _get_current_price(self, ticker: str) -> Optional[float]:
|
|
"""현재 가격 조회 (data_feed에서 최신값)"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT close_price FROM data_feed
|
|
WHERE ticker = ?
|
|
ORDER BY updated_at DESC
|
|
LIMIT 1
|
|
"""
|
|
|
|
cursor.execute(query, (ticker,))
|
|
result = cursor.fetchone()
|
|
conn.close()
|
|
|
|
return result[0] if result else None
|
|
except:
|
|
return None
|
|
|
|
def collect_t20_performance(self) -> Dict:
|
|
"""T+20 성과 데이터 수집"""
|
|
pending_trades = self._query_pending_trades()
|
|
|
|
if not pending_trades:
|
|
return {
|
|
"status": "NO_PENDING_TRADES",
|
|
"message": "T+20 경과 미기록 거래 없음",
|
|
"collected_count": 0
|
|
}
|
|
|
|
collected = []
|
|
for trade in pending_trades:
|
|
ticker = trade["ticker"]
|
|
current_price = self._get_current_price(ticker)
|
|
|
|
if current_price is None:
|
|
continue
|
|
|
|
# 수익률 계산
|
|
entry_price = trade["entry_price"]
|
|
pnl_pct = ((current_price - entry_price) / entry_price) * 100
|
|
|
|
collection = {
|
|
"ticker": ticker,
|
|
"name": trade["name"],
|
|
"entry_date": trade["entry_date"],
|
|
"t20_date": (
|
|
datetime.strptime(trade["entry_date"], "%Y-%m-%d") +
|
|
timedelta(days=20)
|
|
).strftime("%Y-%m-%d"),
|
|
"days_elapsed": trade["days_elapsed"],
|
|
"entry_price": entry_price,
|
|
"current_price": current_price,
|
|
"pnl_pct": round(pnl_pct, 2),
|
|
"quantity": trade["quantity"],
|
|
"stop_price": trade["stop_price"],
|
|
"target_price": trade["target_price"],
|
|
"entry_stage": trade["entry_stage"],
|
|
"account": trade["account"],
|
|
"status": "T20_MILESTONE_REACHED"
|
|
}
|
|
|
|
collected.append(collection)
|
|
|
|
return {
|
|
"status": "SUCCESS",
|
|
"collected_count": len(collected),
|
|
"trades": collected,
|
|
"next_action": "Record in performance sheet"
|
|
}
|
|
|
|
def record_t20_in_performance(self, collections: List[Dict]) -> Dict:
|
|
"""T+20 성과를 performance 탭에 기록"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
recorded = 0
|
|
for trade in collections:
|
|
# 이미 기록되어 있는지 확인
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM performance WHERE ticker = ? AND entry_date = ?",
|
|
(trade["ticker"], trade["entry_date"])
|
|
)
|
|
|
|
if cursor.fetchone()[0] > 0:
|
|
continue # 이미 기록됨
|
|
|
|
# 성과 기록
|
|
cursor.execute("""
|
|
INSERT INTO performance (
|
|
ticker, name, entry_date, entry_price,
|
|
quantity, stop_price, target_price,
|
|
entry_stage, account, current_price,
|
|
pnl_pct, status, t20_milestone
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
trade["ticker"],
|
|
trade["name"],
|
|
trade["entry_date"],
|
|
trade["entry_price"],
|
|
trade["quantity"],
|
|
trade["stop_price"],
|
|
trade["target_price"],
|
|
trade["entry_stage"],
|
|
trade["account"],
|
|
trade["current_price"],
|
|
trade["pnl_pct"],
|
|
"T20_RECORDED",
|
|
trade["t20_date"]
|
|
))
|
|
|
|
recorded += 1
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return {
|
|
"status": "SUCCESS",
|
|
"recorded_count": recorded,
|
|
"message": f"{recorded}개 거래 T+20 성과 기록 완료"
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "ERROR",
|
|
"error": str(e),
|
|
"recorded_count": 0
|
|
}
|
|
|
|
def check_t20_ledger_status(self) -> Dict:
|
|
"""T+20 레저 진행 상태 확인"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# 전체 완료 거래
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM performance WHERE exit_date IS NOT NULL"
|
|
)
|
|
total_completed = cursor.fetchone()[0]
|
|
|
|
# T+20 이상 경과한 거래
|
|
cursor.execute("""
|
|
SELECT COUNT(*) FROM data_feed
|
|
WHERE ticker NOT NULL
|
|
AND entry_date NOT NULL
|
|
AND CAST((julianday('now') - julianday(entry_date)) AS INTEGER) >= 20
|
|
""")
|
|
t20_eligible = cursor.fetchone()[0]
|
|
|
|
# T+20 기록된 거래
|
|
cursor.execute(
|
|
"SELECT COUNT(*) FROM performance WHERE t20_milestone IS NOT NULL"
|
|
)
|
|
t20_recorded = cursor.fetchone()[0]
|
|
|
|
conn.close()
|
|
|
|
progress = (t20_recorded / 30) * 100 if 30 > 0 else 0
|
|
|
|
return {
|
|
"status": "MONITORING",
|
|
"target": 30,
|
|
"recorded": t20_recorded,
|
|
"eligible": t20_eligible,
|
|
"progress_pct": round(progress, 1),
|
|
"days_to_target": self._estimate_days_to_target(t20_recorded)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "ERROR",
|
|
"error": str(e)
|
|
}
|
|
|
|
def _estimate_days_to_target(self, current_count: int, target: int = 30) -> int:
|
|
"""목표 도달까지 예상 일수"""
|
|
if current_count >= target:
|
|
return 0
|
|
|
|
# 평균 수집 속도 추정 (일일 ~0.5건)
|
|
remaining = target - current_count
|
|
avg_daily_rate = 0.5
|
|
|
|
estimated_days = int(remaining / avg_daily_rate)
|
|
return max(1, estimated_days)
|
|
|
|
def generate_report(self) -> Dict:
|
|
"""전체 리포트 생성"""
|
|
# T+20 경과 거래 수집
|
|
collection_result = self.collect_t20_performance()
|
|
|
|
# 성과 기록
|
|
if collection_result.get("trades"):
|
|
record_result = self.record_t20_in_performance(
|
|
collection_result["trades"]
|
|
)
|
|
collection_result["recorded"] = record_result
|
|
|
|
# 현황 확인
|
|
status = self.check_t20_ledger_status()
|
|
|
|
self.results["collections"] = collection_result
|
|
self.results["status"] = status
|
|
self.results["summary"] = {
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"wbs_8_1_target": 30,
|
|
"wbs_8_1_progress": status.get("recorded", 0),
|
|
"wbs_8_1_progress_pct": status.get("progress_pct", 0),
|
|
"next_milestone": "2026-07-15 (30건 목표)",
|
|
"data_quality": "MONITORING"
|
|
}
|
|
|
|
return self.results
|
|
|
|
def print_report(self):
|
|
"""리포트 출력"""
|
|
print("\n" + "=" * 80)
|
|
print("[WBS-8.1] T+20 거래 레저 자동 수집 실행")
|
|
print("=" * 80)
|
|
print(f"시간: {self.results['timestamp']}\n")
|
|
|
|
# 수집 현황
|
|
collection = self.results.get("collections", {})
|
|
if collection.get("status") == "SUCCESS":
|
|
print(f"[수집] {collection['collected_count']}개 거래 T+20 도달")
|
|
for trade in collection.get("trades", [])[:5]:
|
|
print(f" - {trade['ticker']}: {trade['pnl_pct']:+.2f}% (T+{trade['days_elapsed']})")
|
|
else:
|
|
print(f"[수집] {collection.get('message', 'N/A')}")
|
|
|
|
# 기록 현황
|
|
if "recorded" in collection:
|
|
recorded = collection["recorded"]
|
|
print(f"\n[기록] {recorded.get('recorded_count', 0)}개 성과 기록")
|
|
|
|
# 진행률
|
|
status = self.results.get("status", {})
|
|
print(f"\n[진행률]")
|
|
print(f" 목표: {status.get('target', 30)}건")
|
|
print(f" 달성: {status.get('recorded', 0)}건 ({status.get('progress_pct', 0):.1f}%)")
|
|
print(f" 남은 기간: 약 {status.get('days_to_target', 'N/A')}일")
|
|
|
|
print("\n" + "=" * 80 + "\n")
|
|
|
|
def save_report(self, output_file: str = None):
|
|
"""리포트 저장"""
|
|
if not output_file:
|
|
output_file = f"Temp/t20_ledger_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
|
|
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.results, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"리포트 저장: {output_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
collector = T20LedgerCollector()
|
|
collector.generate_report()
|
|
collector.print_report()
|
|
collector.save_report()
|