f5c29f7ddf
- kis_data_collection.db: KIS API 데이터 수집용 (data_feed 테이블) - snapshot_admin.db: 성능/포지션 관리용 (performance, positions 테이블) 도구 경로 업데이트: - auto_collect_t20_ledger_v1.py: kis_data_collection.db 사용 - measure_sector_flow_reliability_v1.py: kis_data_collection.db 사용 - validate_data_collection_v1.py: snapshot_admin.db 사용 - monitor_wbs_progress_v1.py: snapshot_admin.db 사용 - backup_recovery_manager_v1.py: 2개 DB 모두 백업 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
287 lines
10 KiB
Python
287 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WBS-9.5: 섹터 플로우 신호 신뢰도 측정
|
|
|
|
목표: 섹터별 flow_credit vs 실제 수익률 상관도 계산
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Tuple
|
|
import statistics
|
|
|
|
class SectorFlowReliabilityMeasure:
|
|
"""섹터 플로우 신뢰도 측정 도구"""
|
|
|
|
def __init__(self, db_path: str = None):
|
|
self.db_path = db_path or "src/quant_engine/kis_data_collection.db"
|
|
self.results = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"sectors": {},
|
|
"summary": {}
|
|
}
|
|
|
|
def _query_sector_trades(self, sector: str, days: int = 30) -> List[Dict]:
|
|
"""특정 섹터의 거래 데이터 조회 (T+20 결과 포함)"""
|
|
try:
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
query = """
|
|
SELECT
|
|
ticker,
|
|
entry_date,
|
|
exit_date,
|
|
entry_price,
|
|
exit_price,
|
|
pnl_pct,
|
|
flow_credit,
|
|
sector
|
|
FROM performance
|
|
WHERE sector = ?
|
|
AND entry_date >= datetime('now', '-' || ? || ' days')
|
|
AND exit_date IS NOT NULL
|
|
ORDER BY entry_date DESC
|
|
"""
|
|
|
|
cursor.execute(query, (sector, days))
|
|
trades = [dict(row) for row in cursor.fetchall()]
|
|
conn.close()
|
|
|
|
return trades
|
|
except Exception as e:
|
|
print(f"Error querying trades for {sector}: {e}")
|
|
return []
|
|
|
|
def _calculate_hit_rate(self, signal_correct: List[bool]) -> float:
|
|
"""신호 정확도 계산 (몇 %가 맞았는가)"""
|
|
if not signal_correct:
|
|
return 0.0
|
|
return (sum(signal_correct) / len(signal_correct)) * 100
|
|
|
|
def _calculate_correlation(
|
|
self,
|
|
flow_credits: List[float],
|
|
pnl_pcts: List[float]
|
|
) -> float:
|
|
"""flow_credit vs pnl 상관계수 계산"""
|
|
if len(flow_credits) < 2 or len(pnl_pcts) < 2:
|
|
return None
|
|
|
|
if len(flow_credits) != len(pnl_pcts):
|
|
return None
|
|
|
|
mean_flow = statistics.mean(flow_credits)
|
|
mean_pnl = statistics.mean(pnl_pcts)
|
|
|
|
covariance = sum(
|
|
(flow_credits[i] - mean_flow) * (pnl_pcts[i] - mean_pnl)
|
|
for i in range(len(flow_credits))
|
|
) / len(flow_credits)
|
|
|
|
std_flow = statistics.stdev(flow_credits) if len(flow_credits) > 1 else 0
|
|
std_pnl = statistics.stdev(pnl_pcts) if len(pnl_pcts) > 1 else 0
|
|
|
|
if std_flow == 0 or std_pnl == 0:
|
|
return 0.0
|
|
|
|
correlation = covariance / (std_flow * std_pnl)
|
|
return round(min(1.0, max(-1.0, correlation)), 3)
|
|
|
|
def measure_sector(self, sector: str, days: int = 30) -> Dict:
|
|
"""
|
|
특정 섹터의 신뢰도 측정
|
|
|
|
입력:
|
|
sector: 섹터명 (e.g., "금융", "IT")
|
|
days: 회고 기간 (default: 30일)
|
|
|
|
출력:
|
|
{
|
|
"sector": str,
|
|
"sample_count": int,
|
|
"flow_signal_hit_rate": float (0-100),
|
|
"correlation": float (-1~1),
|
|
"mean_pnl_correct": float,
|
|
"mean_pnl_incorrect": float,
|
|
"reliability_score": float (0-100),
|
|
"status": "HIGH" | "MEDIUM" | "LOW" | "INSUFFICIENT"
|
|
}
|
|
"""
|
|
trades = self._query_sector_trades(sector, days)
|
|
|
|
if len(trades) < 5:
|
|
return {
|
|
"sector": sector,
|
|
"sample_count": len(trades),
|
|
"status": "INSUFFICIENT",
|
|
"note": f"Samples < 5 ({len(trades)} found)"
|
|
}
|
|
|
|
# 신호 정확도 (flow_credit > 0 인 거래가 실제 수익인가?)
|
|
flow_credits = []
|
|
pnl_pcts = []
|
|
signal_correct = []
|
|
|
|
for trade in trades:
|
|
flow = trade.get("flow_credit", 0)
|
|
pnl = trade.get("pnl_pct", 0)
|
|
|
|
flow_credits.append(flow)
|
|
pnl_pcts.append(pnl)
|
|
|
|
# 신호: flow > 0이면 수익일 것으로 예측
|
|
is_profitable = pnl > 0
|
|
signal_predicts_profit = flow > 0
|
|
is_correct = is_profitable == signal_predicts_profit
|
|
|
|
signal_correct.append(is_correct)
|
|
|
|
# 상관도 계산
|
|
correlation = self._calculate_correlation(flow_credits, pnl_pcts)
|
|
|
|
# Hit rate (신호 정확도)
|
|
hit_rate = self._calculate_hit_rate(signal_correct)
|
|
|
|
# 평균 수익 (신호 맞음 vs 틀림)
|
|
correct_pnls = [pnl_pcts[i] for i in range(len(pnl_pcts)) if signal_correct[i]]
|
|
incorrect_pnls = [pnl_pcts[i] for i in range(len(pnl_pcts)) if not signal_correct[i]]
|
|
|
|
mean_pnl_correct = statistics.mean(correct_pnls) if correct_pnls else 0
|
|
mean_pnl_incorrect = statistics.mean(incorrect_pnls) if incorrect_pnls else 0
|
|
|
|
# 신뢰도 점수 (0-100)
|
|
# Hit rate 60% + Correlation이 높을수록 높음
|
|
reliability_score = (hit_rate * 0.7) + (
|
|
(correlation + 1) * 50 * 0.3 if correlation is not None else 0
|
|
)
|
|
|
|
# 상태 판정
|
|
if reliability_score >= 70:
|
|
status = "HIGH"
|
|
elif reliability_score >= 50:
|
|
status = "MEDIUM"
|
|
else:
|
|
status = "LOW"
|
|
|
|
return {
|
|
"sector": sector,
|
|
"sample_count": len(trades),
|
|
"flow_signal_hit_rate": round(hit_rate, 1),
|
|
"correlation": correlation,
|
|
"mean_pnl_correct": round(mean_pnl_correct, 2),
|
|
"mean_pnl_incorrect": round(mean_pnl_incorrect, 2),
|
|
"reliability_score": round(reliability_score, 1),
|
|
"status": status,
|
|
"lookback_days": days
|
|
}
|
|
|
|
def measure_all_sectors(self, days: int = 30) -> Dict:
|
|
"""모든 섹터에 대해 신뢰도 측정"""
|
|
sectors = [
|
|
"금융", "IT", "전기전자", "화학", "철강금속",
|
|
"기계", "의약품", "반도체", "통신", "에너지"
|
|
]
|
|
|
|
for sector in sectors:
|
|
result = self.measure_sector(sector, days)
|
|
self.results["sectors"][sector] = result
|
|
|
|
self._generate_summary()
|
|
return self.results
|
|
|
|
def _generate_summary(self):
|
|
"""전체 요약 생성"""
|
|
sectors_results = self.results["sectors"]
|
|
|
|
high_reliability = [
|
|
s for s, r in sectors_results.items() if r.get("status") == "HIGH"
|
|
]
|
|
medium_reliability = [
|
|
s for s, r in sectors_results.items() if r.get("status") == "MEDIUM"
|
|
]
|
|
low_reliability = [
|
|
s for s, r in sectors_results.items() if r.get("status") == "LOW"
|
|
]
|
|
insufficient = [
|
|
s for s, r in sectors_results.items() if r.get("status") == "INSUFFICIENT"
|
|
]
|
|
|
|
avg_hit_rate = statistics.mean([
|
|
r["flow_signal_hit_rate"]
|
|
for r in sectors_results.values()
|
|
if "flow_signal_hit_rate" in r
|
|
]) if any("flow_signal_hit_rate" in r for r in sectors_results.values()) else 0
|
|
|
|
self.results["summary"] = {
|
|
"total_sectors": len(sectors_results),
|
|
"high_reliability": len(high_reliability),
|
|
"medium_reliability": len(medium_reliability),
|
|
"low_reliability": len(low_reliability),
|
|
"insufficient_data": len(insufficient),
|
|
"avg_hit_rate": round(avg_hit_rate, 1),
|
|
"high_reliability_sectors": high_reliability,
|
|
"low_reliability_sectors": low_reliability,
|
|
"recommendation": (
|
|
"✓ 신호 신뢰도 충분 (≥60% hit rate)"
|
|
if avg_hit_rate >= 60 else
|
|
"⚠ 신호 신뢰도 미흡 (< 60% hit rate)"
|
|
)
|
|
}
|
|
|
|
def print_report(self):
|
|
"""리포트 출력"""
|
|
print("\n" + "=" * 80)
|
|
print("SECTOR FLOW RELIABILITY MEASUREMENT REPORT")
|
|
print("=" * 80)
|
|
print(f"Timestamp: {self.results['timestamp']}\n")
|
|
|
|
print("SECTOR-BY-SECTOR RESULTS:")
|
|
print("-" * 80)
|
|
for sector, result in sorted(self.results["sectors"].items()):
|
|
if result.get("status") in ["HIGH", "MEDIUM", "LOW"]:
|
|
status_marker = "✓" if result["status"] == "HIGH" else "⚠"
|
|
print(
|
|
f"{status_marker} {sector:10} | "
|
|
f"Samples: {result['sample_count']:2} | "
|
|
f"Hit Rate: {result['flow_signal_hit_rate']:5.1f}% | "
|
|
f"Correlation: {result['correlation']:6.3f} | "
|
|
f"Score: {result['reliability_score']:5.1f}"
|
|
)
|
|
else:
|
|
print(f"- {sector:10} | {result.get('note', 'INSUFFICIENT DATA')}")
|
|
|
|
print("\nSUMMARY:")
|
|
print("-" * 80)
|
|
s = self.results["summary"]
|
|
print(f"Total sectors: {s['total_sectors']}")
|
|
print(f"High reliability: {s['high_reliability']} {s['high_reliability_sectors']}")
|
|
print(f"Medium reliability: {s['medium_reliability']}")
|
|
print(f"Low reliability: {s['low_reliability']} {s['low_reliability_sectors']}")
|
|
print(f"Insufficient data: {s['insufficient_data']}")
|
|
print(f"\nAverage hit rate: {s['avg_hit_rate']:.1f}%")
|
|
print(f"Recommendation: {s['recommendation']}")
|
|
print("=" * 80 + "\n")
|
|
|
|
def save_report(self, output_file: str = None):
|
|
"""리포트 저장"""
|
|
if not output_file:
|
|
output_file = f"Temp/sector_flow_reliability_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
|
|
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.results, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"Report saved: {output_file}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 30일 회고 기반 신뢰도 측정
|
|
measurer = SectorFlowReliabilityMeasure()
|
|
measurer.measure_all_sectors(days=30)
|
|
measurer.print_report()
|
|
measurer.save_report()
|