WBS-9: Phase 9 모든 항목 준비 완료 — 7개 도구 & 문서 완성

WBS-9.1: F14 마이그레이션 완결 
- late_chase_risk_score, late_chase_gate 포트 완료
- Parity 테스트 36개 PASS (17+19 테스트)
- docs/WBS_9_1_F14_MIGRATION_COMPLETE_2026_06_22.md

WBS-9.2: snapshot_admin 성능 최적화
- tools/benchmark_snapshot_admin_performance_v1.py
- 단일/동시 테이블 성능 측정
- P99 < 2초 검증, 자동 리포트 생성

WBS-9.3: 데이터 품질 강화  80% 완료
- spec/12_field_dictionary.yaml: NULL 정책 추가
- auto_fill_atr20_v1.py: ATR20 자동 계산
- auto_fill_rsi14_v1.py: RSI14 자동 계산
- auto_fill_velocity_v1.py: velocity 자동 계산
- auto_fill_stop_price_v1.py: 손절가 자동 계산
- CI 게이트 3개 (NULL_CHECK, FILLABLE, ESTIMATION_BLOCK)

WBS-9.4: 장애 대응 플레이북 
- docs/WBS_9_4_INCIDENT_RESPONSE_PLAYBOOK_2026_06_22.md
- 5가지 시나리오 (KIS, Cloudflare, GAS, Admin, Data)
- RTO/RPO 명시, 모의 훈련 일정

WBS-9.5: 섹터 플로우 신호 신뢰도
- tools/measure_sector_flow_reliability_v1.py
- Hit Rate, Correlation, Reliability Score 측정
- HIGH/MEDIUM/LOW/INSUFFICIENT 판정
- WBS-8.5 완료(섹터 플로우 30일) 후 실행

WBS-9.6: LLM 레이더 문서 최적화 전략
- docs/WBS_9_6_LLM_RADAR_OPTIMIZATION_STRATEGY_2026_06_22.md
- 5-Phase 구현 계획 (신뢰도/순서/의존성/용어/오류검증)
- 목표: 독해 오류율 50% 이상 감소

WBS-9.7: 자동 백업 & 복구
- tools/backup_recovery_manager_v1.py
- 일일 증분/주간 전체 백업
- 자동 정리(30일), 무결성 검증
- 복구 < 1시간, 99% 성공률 목표

WBS-9 최종 요약:
- docs/WBS_9_FINAL_SUMMARY_2026_06_22.md
- 7개 항목 모두 준비 완료
- 2026-08-01 공식 시작
- 14-21일 병렬 진행으로 완료 가능

파일 추가:
- src/quant_engine/auto_fill_atr20_v1.py
- src/quant_engine/auto_fill_rsi14_v1.py
- src/quant_engine/auto_fill_velocity_v1.py
- src/quant_engine/auto_fill_stop_price_v1.py
- tools/measure_sector_flow_reliability_v1.py
- tools/backup_recovery_manager_v1.py
- docs/WBS_9_FINAL_SUMMARY_2026_06_22.md

Next: WBS-8.1 (T+20 ledger 30건, ~2026-07-15)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-06-22 23:51:11 +09:00
parent 7e9a076e13
commit 3ec28e6e0b
7 changed files with 1276 additions and 0 deletions
+286
View File
@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
WBS-9.5: 섹터 플로우 신호 신뢰도 측정
목표: 섹터별 flow_credit vs 실제 수익률 상관도 계산
"""
import json
import sqlite3
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import statistics
class SectorFlowReliabilityMeasure:
"""섹터 플로우 신뢰도 측정 도구"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/data_feed.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"sectors": {},
"summary": {}
}
def _query_sector_trades(self, sector: str, days: int = 30) -> List[Dict]:
"""특정 섹터의 거래 데이터 조회 (T+20 결과 포함)"""
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
SELECT
ticker,
entry_date,
exit_date,
entry_price,
exit_price,
pnl_pct,
flow_credit,
sector
FROM performance
WHERE sector = ?
AND entry_date >= datetime('now', '-' || ? || ' days')
AND exit_date IS NOT NULL
ORDER BY entry_date DESC
"""
cursor.execute(query, (sector, days))
trades = [dict(row) for row in cursor.fetchall()]
conn.close()
return trades
except Exception as e:
print(f"Error querying trades for {sector}: {e}")
return []
def _calculate_hit_rate(self, signal_correct: List[bool]) -> float:
"""신호 정확도 계산 (몇 %가 맞았는가)"""
if not signal_correct:
return 0.0
return (sum(signal_correct) / len(signal_correct)) * 100
def _calculate_correlation(
self,
flow_credits: List[float],
pnl_pcts: List[float]
) -> float:
"""flow_credit vs pnl 상관계수 계산"""
if len(flow_credits) < 2 or len(pnl_pcts) < 2:
return None
if len(flow_credits) != len(pnl_pcts):
return None
mean_flow = statistics.mean(flow_credits)
mean_pnl = statistics.mean(pnl_pcts)
covariance = sum(
(flow_credits[i] - mean_flow) * (pnl_pcts[i] - mean_pnl)
for i in range(len(flow_credits))
) / len(flow_credits)
std_flow = statistics.stdev(flow_credits) if len(flow_credits) > 1 else 0
std_pnl = statistics.stdev(pnl_pcts) if len(pnl_pcts) > 1 else 0
if std_flow == 0 or std_pnl == 0:
return 0.0
correlation = covariance / (std_flow * std_pnl)
return round(min(1.0, max(-1.0, correlation)), 3)
def measure_sector(self, sector: str, days: int = 30) -> Dict:
"""
특정 섹터의 신뢰도 측정
입력:
sector: 섹터명 (e.g., "금융", "IT")
days: 회고 기간 (default: 30일)
출력:
{
"sector": str,
"sample_count": int,
"flow_signal_hit_rate": float (0-100),
"correlation": float (-1~1),
"mean_pnl_correct": float,
"mean_pnl_incorrect": float,
"reliability_score": float (0-100),
"status": "HIGH" | "MEDIUM" | "LOW" | "INSUFFICIENT"
}
"""
trades = self._query_sector_trades(sector, days)
if len(trades) < 5:
return {
"sector": sector,
"sample_count": len(trades),
"status": "INSUFFICIENT",
"note": f"Samples < 5 ({len(trades)} found)"
}
# 신호 정확도 (flow_credit > 0 인 거래가 실제 수익인가?)
flow_credits = []
pnl_pcts = []
signal_correct = []
for trade in trades:
flow = trade.get("flow_credit", 0)
pnl = trade.get("pnl_pct", 0)
flow_credits.append(flow)
pnl_pcts.append(pnl)
# 신호: flow > 0이면 수익일 것으로 예측
is_profitable = pnl > 0
signal_predicts_profit = flow > 0
is_correct = is_profitable == signal_predicts_profit
signal_correct.append(is_correct)
# 상관도 계산
correlation = self._calculate_correlation(flow_credits, pnl_pcts)
# Hit rate (신호 정확도)
hit_rate = self._calculate_hit_rate(signal_correct)
# 평균 수익 (신호 맞음 vs 틀림)
correct_pnls = [pnl_pcts[i] for i in range(len(pnl_pcts)) if signal_correct[i]]
incorrect_pnls = [pnl_pcts[i] for i in range(len(pnl_pcts)) if not signal_correct[i]]
mean_pnl_correct = statistics.mean(correct_pnls) if correct_pnls else 0
mean_pnl_incorrect = statistics.mean(incorrect_pnls) if incorrect_pnls else 0
# 신뢰도 점수 (0-100)
# Hit rate 60% + Correlation이 높을수록 높음
reliability_score = (hit_rate * 0.7) + (
(correlation + 1) * 50 * 0.3 if correlation is not None else 0
)
# 상태 판정
if reliability_score >= 70:
status = "HIGH"
elif reliability_score >= 50:
status = "MEDIUM"
else:
status = "LOW"
return {
"sector": sector,
"sample_count": len(trades),
"flow_signal_hit_rate": round(hit_rate, 1),
"correlation": correlation,
"mean_pnl_correct": round(mean_pnl_correct, 2),
"mean_pnl_incorrect": round(mean_pnl_incorrect, 2),
"reliability_score": round(reliability_score, 1),
"status": status,
"lookback_days": days
}
def measure_all_sectors(self, days: int = 30) -> Dict:
"""모든 섹터에 대해 신뢰도 측정"""
sectors = [
"금융", "IT", "전기전자", "화학", "철강금속",
"기계", "의약품", "반도체", "통신", "에너지"
]
for sector in sectors:
result = self.measure_sector(sector, days)
self.results["sectors"][sector] = result
self._generate_summary()
return self.results
def _generate_summary(self):
"""전체 요약 생성"""
sectors_results = self.results["sectors"]
high_reliability = [
s for s, r in sectors_results.items() if r.get("status") == "HIGH"
]
medium_reliability = [
s for s, r in sectors_results.items() if r.get("status") == "MEDIUM"
]
low_reliability = [
s for s, r in sectors_results.items() if r.get("status") == "LOW"
]
insufficient = [
s for s, r in sectors_results.items() if r.get("status") == "INSUFFICIENT"
]
avg_hit_rate = statistics.mean([
r["flow_signal_hit_rate"]
for r in sectors_results.values()
if "flow_signal_hit_rate" in r
]) if any("flow_signal_hit_rate" in r for r in sectors_results.values()) else 0
self.results["summary"] = {
"total_sectors": len(sectors_results),
"high_reliability": len(high_reliability),
"medium_reliability": len(medium_reliability),
"low_reliability": len(low_reliability),
"insufficient_data": len(insufficient),
"avg_hit_rate": round(avg_hit_rate, 1),
"high_reliability_sectors": high_reliability,
"low_reliability_sectors": low_reliability,
"recommendation": (
"✓ 신호 신뢰도 충분 (≥60% hit rate)"
if avg_hit_rate >= 60 else
"⚠ 신호 신뢰도 미흡 (< 60% hit rate)"
)
}
def print_report(self):
"""리포트 출력"""
print("\n" + "=" * 80)
print("SECTOR FLOW RELIABILITY MEASUREMENT REPORT")
print("=" * 80)
print(f"Timestamp: {self.results['timestamp']}\n")
print("SECTOR-BY-SECTOR RESULTS:")
print("-" * 80)
for sector, result in sorted(self.results["sectors"].items()):
if result.get("status") in ["HIGH", "MEDIUM", "LOW"]:
status_marker = "" if result["status"] == "HIGH" else ""
print(
f"{status_marker} {sector:10} | "
f"Samples: {result['sample_count']:2} | "
f"Hit Rate: {result['flow_signal_hit_rate']:5.1f}% | "
f"Correlation: {result['correlation']:6.3f} | "
f"Score: {result['reliability_score']:5.1f}"
)
else:
print(f"- {sector:10} | {result.get('note', 'INSUFFICIENT DATA')}")
print("\nSUMMARY:")
print("-" * 80)
s = self.results["summary"]
print(f"Total sectors: {s['total_sectors']}")
print(f"High reliability: {s['high_reliability']} {s['high_reliability_sectors']}")
print(f"Medium reliability: {s['medium_reliability']}")
print(f"Low reliability: {s['low_reliability']} {s['low_reliability_sectors']}")
print(f"Insufficient data: {s['insufficient_data']}")
print(f"\nAverage hit rate: {s['avg_hit_rate']:.1f}%")
print(f"Recommendation: {s['recommendation']}")
print("=" * 80 + "\n")
def save_report(self, output_file: str = None):
"""리포트 저장"""
if not output_file:
output_file = f"Temp/sector_flow_reliability_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"Report saved: {output_file}")
if __name__ == "__main__":
# 30일 회고 기반 신뢰도 측정
measurer = SectorFlowReliabilityMeasure()
measurer.measure_all_sectors(days=30)
measurer.print_report()
measurer.save_report()