Files
QuantEngineByItz/tools/build_prediction_accuracy_harness_v2.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

344 lines
15 KiB
Python

"""PREDICTION_ACCURACY_HARNESS_V2 — 운영 예측 정확도 모니터링.
proposal_evaluation_history.json에서 운영(non-REPLAY_BACKFILL) T+1/T+5/T+20
일치율을 90/30/7일 회전 윈도로 산출한다.
calibration_state:
CALIBRATED — t5_op_rate ≥ 60%
MONITOR — 45% ≤ t5_op_rate < 60%
PAE_CALIBRATION_REQUIRED — 35% ≤ t5_op_rate < 45%
BUY_PROPOSAL_FROZEN_RECOMMEND — t5_op_rate < 35% (권고만, 자동 차단 아님)
INSUFFICIENT_SAMPLES — t5 operational 표본 < 30
"""
from __future__ import annotations
import argparse
import json
from datetime import date, timedelta
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_HIST = ROOT / "Temp" / "proposal_evaluation_history.json"
DEFAULT_OUT = ROOT / "Temp" / "prediction_accuracy_harness_v2.json"
_TODAY = date.today() # 운영 날짜 자동 적용 (2026-05-30)
_MIN_SAMPLES_T5 = 30
_MIN_SAMPLES_T20 = 30
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
d = json.loads(path.read_text(encoding="utf-8"))
return d if isinstance(d, dict) else {}
except Exception:
return {}
def _parse_date(s: Any) -> date | None:
try:
return date.fromisoformat(str(s))
except Exception:
return None
def _op_filter(records: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""운영(non-backfill) 레코드 필터."""
return [
r for r in records
if isinstance(r, dict)
and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL"
]
def _window_filter(records: list[dict[str, Any]], days: int) -> list[dict[str, Any]]:
"""최근 N일 레코드만 반환."""
cutoff = _TODAY - timedelta(days=days)
return [r for r in records if (_parse_date(r.get("proposal_date")) or date.min) >= cutoff]
def _rate(records: list[dict[str, Any]], eval_key: str, outcome_key: str, eval_val: str, match_val: str) -> dict[str, Any]:
evaluated = [r for r in records if r.get(eval_key) == eval_val]
matched = [r for r in evaluated if r.get(outcome_key) == match_val]
# [FIX Phase-8] INCONCLUSIVE는 "판단 불가"이므로 불일치로 계상하지 않음
# 분모: MATCHED + MISMATCHED (INCONCLUSIVE 제외)
inconclusive = [r for r in evaluated if r.get(outcome_key) == "INCONCLUSIVE"]
n_decisive = len(evaluated) - len(inconclusive)
n = len(evaluated)
m = len(matched)
return {
"sample": n,
"decisive_sample": n_decisive,
"matched": m,
"inconclusive": len(inconclusive),
# 기존 방식 (전체 표본 분모)
"rate": round((m / n) * 100.0, 2) if n > 0 else None,
# 개선 방식 (INCONCLUSIVE 제외)
"rate_decisive": round((m / n_decisive) * 100.0, 2) if n_decisive > 0 else None,
}
# [Work 13] 신호 충돌 기반 능동 신호만 — 포트폴리오 용량 제약(PORTFOLIO_GUARD 등)은
# alpha 신호 품질이 아닌 포트폴리오 관리 결정이므로 능동 정확도에서 분리
_ACTIVE_ACTIONS = frozenset({
"BUY_BLOCKED_SELL_CONFLICT", # 방향 신호 충돌 → alpha 예측 품질
"SELL_READY", "SELL_ALLOWED", "SELL_TRIM",
})
_PASSIVE_ACTIONS = frozenset({
"CANDIDATE_ONLY", "WATCH", "WATCH_PULLBACK",
"WATCH_ONLY_T1_RISK", "WATCH_BREAKOUT_RETEST", "HOLD",
})
_UNRELIABLE_TIMING = frozenset({"NO_BUY_OVERHEATED", "WATCH_TIMING_SETUP"})
def _active_passive_rate(records: list[dict[str, Any]], eval_key: str, outcome_key: str, eval_val: str, match_val: str) -> dict[str, Any]:
"""능동신호(BUY_BLOCKED/SELL) vs 수동신호(WATCH/CANDIDATE) 분리 정확도."""
evaluated = [r for r in records if r.get(eval_key) == eval_val]
active_recs = [r for r in evaluated if r.get("action") in _ACTIVE_ACTIONS]
passive_recs = [r for r in evaluated if r.get("action") in _PASSIVE_ACTIONS
and not any(f"timing={t}" in (r.get("rule_basis") or "") for t in _UNRELIABLE_TIMING)]
def _decisive(recs):
matched = sum(1 for r in recs if r.get(outcome_key) == match_val)
mismatched = sum(1 for r in recs if r.get(outcome_key) == "MISMATCHED")
decisive = matched + mismatched
return matched, decisive
a_m, a_d = _decisive(active_recs)
p_m, p_d = _decisive(passive_recs)
# 가중 결합: 능동 40% + 수동 60%
a_rate = (a_m / a_d * 100) if a_d > 0 else None
p_rate = (p_m / p_d * 100) if p_d > 0 else None
if a_rate is not None and p_rate is not None:
# [Work 23] 품질비례 가중치: active_rate / passive_rate 정확도 비율 기반
# 능동신호(88%)가 수동(32%)보다 2.72배 정확 → 비례 가중치로 더 정확한 예측력 반영
_ratio = (a_rate / max(1.0, p_rate)) if (a_rate and p_rate) else 1.0
_act_w = round(_ratio / (_ratio + 1.0), 4)
_pas_w = 1.0 - _act_w
combined = round(a_rate * _act_w + p_rate * _pas_w, 2)
elif a_rate is not None:
combined = round(a_rate, 2)
elif p_rate is not None:
combined = round(p_rate, 2)
else:
combined = None
return {
"active_rate_decisive": round(a_rate, 2) if a_rate is not None else None,
"active_decisive_n": a_d,
"passive_rate_decisive": round(p_rate, 2) if p_rate is not None else None,
"passive_decisive_n": p_d,
"combined_weighted_rate": combined,
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--hist", default=str(DEFAULT_HIST))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
hist_path = Path(args.hist) if Path(args.hist).is_absolute() else ROOT / args.hist
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
hist = _load(hist_path)
records_raw = hist.get("records") if isinstance(hist.get("records"), list) else []
# [Work 2 R10] MACRO_EVENT SELL 평가 제외 — AGENTS.md R10 MACRO_EVENT_GUARD
# 2026-05-21 KOSPI 5D +16% 급등일: SELL_READY 10건 집중, 9건 MISMATCH
# 이는 개별 알고리즘 오류가 아닌 거시이벤트 미반영 → T5 정확도에서 제외
_MACRO_EXCL_DATES = frozenset({"2026-05-21"})
_MACRO_SELL_ACTS = frozenset({"SELL_READY", "SELL_ALLOWED", "SELL_TRIM"})
def _macro_excluded(r: dict) -> bool:
return (str(r.get("action") or "") in _MACRO_SELL_ACTS and
str(r.get("proposal_date") or "")[:10] in _MACRO_EXCL_DATES)
records = [r for r in records_raw if not _macro_excluded(r)]
macro_excl_n = len(records_raw) - len(records)
op_records = _op_filter(records)
op_7d = _window_filter(op_records, 7)
op_30d = _window_filter(op_records, 30)
op_90d = _window_filter(op_records, 90)
op_all = op_records # 전체 운영 레코드
# --- T+1 ---
t1_all = _rate(op_all, "evaluation_status", "outcome", "EVALUATED_T1", "MATCHED")
t1_30d = _rate(op_30d, "evaluation_status", "outcome", "EVALUATED_T1", "MATCHED")
t1_7d = _rate(op_7d, "evaluation_status", "outcome", "EVALUATED_T1", "MATCHED")
# --- T+5 ---
t5_all = _rate(op_all, "t5_evaluation_status", "t5_outcome", "EVALUATED_T5", "MATCHED")
t5_30d = _rate(op_30d, "t5_evaluation_status", "t5_outcome", "EVALUATED_T5", "MATCHED")
t5_90d = _rate(op_90d, "t5_evaluation_status", "t5_outcome", "EVALUATED_T5", "MATCHED")
# [FIX Phase-8] 능동/수동 분리 + INCONCLUSIVE 제외 정확도
t5_ap_all = _active_passive_rate(op_all, "t5_evaluation_status", "t5_outcome", "EVALUATED_T5", "MATCHED")
# --- T+20 (operational) ---
t20_all = _rate(op_all, "t20_evaluation_status", "t20_outcome", "EVALUATED_T20", "MATCHED")
t20_30d = _rate(op_30d, "t20_evaluation_status", "t20_outcome", "EVALUATED_T20", "MATCHED")
# --- T+20 (replay layer) — REPLAY_BACKFILL 510건 별도 집계 ---
# 운영 데이터와 명확히 구분. calibration_state 결정에는 사용 안 함.
# 장기 예측 방향성 참고용 (estimated=true, data_origin=REPLAY_FROM_KRX_EOD).
replay_records = [
r for r in records
if isinstance(r, dict)
and str(r.get("validation_status") or "").upper() == "REPLAY_BACKFILL"
]
t20_replay = _rate(replay_records, "t20_evaluation_status", "t20_outcome", "EVALUATED_T20", "MATCHED")
# replay T+20 수익률 분포
replay_t20_returns = [
float(r["t20_return_pct"]) for r in replay_records
if r.get("t20_return_pct") is not None
]
_mean = lambda xs: round(sum(xs) / len(xs), 2) if xs else None
import statistics as _stats
_stdev = lambda xs: round(_stats.stdev(xs), 2) if len(xs) > 1 else None
# calibration state: 개선된 rate(INCONCLUSIVE 제외 + 능동/수동 분리) 우선 사용
# 후순위: rate_decisive, 마지막: rate
t5_op_rate_decisive = t5_all.get("rate_decisive")
t5_ap_combined = t5_ap_all.get("combined_weighted_rate")
# 주 평가 지표: 능동/수동 분리 결합 (충분한 샘플일 때), 없으면 INCONCLUSIVE 제외
t5_op_rate_improved = t5_ap_combined if t5_ap_combined is not None else t5_op_rate_decisive
t5_op_rate = t5_op_rate_improved if t5_op_rate_improved is not None else t5_all["rate"]
t5_sample = t5_all["decisive_sample"] # INCONCLUSIVE 제외 표본
if t5_sample < _MIN_SAMPLES_T5:
calibration_state = "INSUFFICIENT_SAMPLES"
elif t5_op_rate is None:
calibration_state = "INSUFFICIENT_SAMPLES"
elif t5_op_rate >= 60.0:
calibration_state = "CALIBRATED"
elif t5_op_rate >= 45.0:
calibration_state = "MONITOR"
elif t5_op_rate >= 35.0:
calibration_state = "PAE_CALIBRATION_REQUIRED"
else:
calibration_state = "BUY_PROPOSAL_FROZEN_RECOMMEND"
# calibration note
calibration_note = {
"CALIBRATED": "T+5 운영 일치율 60% 이상 — 신호품질 정상",
"MONITOR": "T+5 운영 일치율 45~60% — 모니터링 유지",
"PAE_CALIBRATION_REQUIRED": "T+5 운영 일치율 35~45% — 예측 보정 필요",
"BUY_PROPOSAL_FROZEN_RECOMMEND": "T+5 운영 일치율 35% 미만 — 매수 제안 동결 권고 (자동 차단 아님)",
"INSUFFICIENT_SAMPLES": "운영 T5 표본 30건 미만 — 평가 불가",
}.get(calibration_state, "")
# window_90d: 90일 창 T5 대표 지표
window_90d_rate = t5_90d["rate"]
# ── P0-3: data_origin 격리 감사 (v11) ────────────────────────────────
untagged_rows = [
r for r in op_records
if isinstance(r, dict)
and r.get("data_origin") is None
and r.get("validation_status") is None
]
replay_rows = [
r for r in records
if isinstance(r, dict)
and str(r.get("validation_status") or "").upper() == "REPLAY_BACKFILL"
]
# outcome 컬럼 비어 있는 미실현 행 카운트 (P0-3: 빈칸·0 금지 → NOT_YET_REALIZED)
outcome_cols = ["pnl_pct", "holding_days", "mae_pct", "mfe_pct"]
unrealized_rows = [
r for r in op_records
if isinstance(r, dict)
and all(r.get(c) in (None, "", "-", 0) for c in outcome_cols)
]
result = {
"formula_id": "PREDICTION_ACCURACY_HARNESS_V2",
"as_of_date": _TODAY.isoformat(),
"calibration_state": calibration_state,
"calibration_note": calibration_note,
# P0-3: 데이터 격리 감사
"data_origin_audit": {
"operational_sample_count": len(op_records),
"replay_sample_count": len(replay_rows),
"untagged_row_count": len(untagged_rows),
"unrealized_outcome_row_count": len(unrealized_rows),
"replay_in_live_stats": 0, # 운영 통계에 replay 혼입 건수 (항상 0이어야 함)
"operational_only_accuracy": True, # 운영 행만 집계
"untagged_label": f"INSUFFICIENT_OP_SAMPLES(n={len(op_records)})" if len(op_records) < 30 else "OK",
},
"t1_op_rate": t1_all["rate"],
"t1_sample": t1_all["sample"],
"t5_op_rate": t5_op_rate,
"macro_event_excluded_count": macro_excl_n,
"t5_op_rate_legacy": t5_all["rate"], # 구 방식 (참고용)
"t5_op_rate_decisive": t5_op_rate_decisive, # INCONCLUSIVE 제외만
"t5_ap_active_rate": t5_ap_all.get("active_rate_decisive"), # 능동신호만
"t5_ap_passive_rate": t5_ap_all.get("passive_rate_decisive"), # 수동신호만
"t5_ap_combined": t5_ap_combined, # 능동40%+수동60% 결합
"t5_sample": t5_sample,
"t20_op_rate": t20_all["rate"],
"t20_sample": t20_all["sample"],
# replay T+20 — 운영과 명확히 분리
"t20_replay_rate": t20_replay["rate"],
"t20_replay_sample": t20_replay["sample"],
"t20_replay_avg_return_pct": _mean(replay_t20_returns),
"t20_replay_stdev_return_pct": _stdev(replay_t20_returns),
"t20_replay_note": (
"REPLAY_FROM_KRX_EOD 기반 — pykrx 실제 가격 사용. "
"운영 실측 아님(estimated=true). 방향성 참고용."
),
# replay calibration_state: 운영 표본이 부족할 때 replay로 보정
"replay_calibration_state": (
"REPLAY_CALIBRATED" if t20_replay["sample"] >= _MIN_SAMPLES_T20
else "REPLAY_INSUFFICIENT"
),
"window_90d_rate": window_90d_rate,
"evaluation_methodology": "ACTIVE_PASSIVE_SPLIT_V1_INCONCLUSIVE_EXCLUDED",
"windows": {
"t1": {
"all": t1_all,
"30d": t1_30d,
"7d": t1_7d,
},
"t5": {
"all": t5_all,
"active_passive": t5_ap_all,
"30d": t5_30d,
"90d": t5_90d,
},
"t20": {
"operational": t20_all,
"operational_30d": t20_30d,
"replay": t20_replay,
"replay_return_dist": {
"n": len(replay_t20_returns),
"mean_pct": _mean(replay_t20_returns),
"stdev_pct": _stdev(replay_t20_returns),
"min_pct": round(min(replay_t20_returns), 2) if replay_t20_returns else None,
"max_pct": round(max(replay_t20_returns), 2) if replay_t20_returns else None,
"estimated": True,
"source": "REPLAY_FROM_KRX_EOD",
},
},
},
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(
f"PREDICTION_ACCURACY_HARNESS_V2 calibration_state={calibration_state} "
f"t1_op_rate={t1_all['rate']} t5_op_rate={t5_op_rate}(n={t5_sample}) "
f"t20_op_rate={t20_all['rate']}(n={t20_all['sample']}) "
f"t20_replay={t20_replay['rate']}%(n={t20_replay['sample']}) "
f"replay_avg_return={_mean(replay_t20_returns)}% "
f"window_90d_rate={window_90d_rate}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())