from __future__ import annotations import argparse import json import sys from pathlib import Path from statistics import mean, quantiles from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_HISTORY = ROOT / "Temp" / "proposal_evaluation_history.json" DEFAULT_OUT = ROOT / "Temp" / "late_chase_attribution_v1.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return data if isinstance(data, dict) else {} def _parse_rows(value: Any) -> list[dict[str, Any]]: if isinstance(value, list): return [x for x in value if isinstance(x, dict)] if isinstance(value, str): try: parsed = json.loads(value) return _parse_rows(parsed) except Exception: return [] return [] def _to_float(value: Any) -> float | None: try: if value is None or value == "": return None return float(value) except Exception: return None def main() -> int: try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") except Exception: pass ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--history", default=str(DEFAULT_HISTORY)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() json_path = Path(args.json) hist_path = Path(args.history) out_path = Path(args.out) if not json_path.is_absolute(): json_path = ROOT / json_path if not hist_path.is_absolute(): hist_path = ROOT / hist_path if not out_path.is_absolute(): out_path = ROOT / out_path payload = _load(json_path) history = _load(hist_path) data = payload.get("data") if isinstance(payload.get("data"), dict) else {} h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else (payload.get("hApex") or {}) entry_rows = _parse_rows(h.get("entry_freshness_json")) alpha_fb = h.get("alpha_feedback_json") if isinstance(h.get("alpha_feedback_json"), dict) else {} # Operational samples are drawn from the candidate ledger when a T+5 outcome exists. # The history does not carry explicit velocity_1d for those rows, so we use # buy_timing_score as the entry-timing proxy from the same operational record. recs = history.get("records") if isinstance(history.get("records"), list) else [] op_candidates = [ r for r in recs if isinstance(r, dict) and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL" and str(r.get("t5_evaluation_status") or "") == "EVALUATED_T5" and _to_float(r.get("buy_timing_score")) is not None ] proxy_field = "buy_timing_score" proxy_values = [float(r.get(proxy_field)) for r in op_candidates if _to_float(r.get(proxy_field)) is not None] # Current watchlist remains sourced from the live entry freshness gate. high_risk = [r for r in entry_rows if float(r.get("late_chase_risk_score") or 0) >= 70] blocked = [r for r in entry_rows if str(r.get("freshness_state") or "").upper() == "BLOCK_LATE_CHASE"] pullback_wait = [r for r in entry_rows if str(r.get("freshness_state") or "").upper() == "PULLBACK_WAIT"] watchlist = [] for r in high_risk: watchlist.append( { "ticker": r.get("ticker"), "name": r.get("name"), "late_chase_risk_score": r.get("late_chase_risk_score"), "freshness_state": r.get("freshness_state"), "follow_through_state": r.get("follow_through_state"), "action_hint": "NO_BUY_UNTIL_PULLBACK" if str(r.get("freshness_state")) == "BLOCK_LATE_CHASE" else "WATCH_PULLBACK_ONLY", } ) threshold_grid = [20, 30, 40, 50, 60, 70, 80] threshold_ledger: list[dict[str, Any]] = [] chosen: dict[str, Any] | None = None for threshold in threshold_grid: blocked_rows = [r for r in op_candidates if float(r.get(proxy_field)) < threshold] if not blocked_rows: continue matched = sum(1 for r in blocked_rows if r.get("t5_outcome") == "MATCHED") mismatched = sum(1 for r in blocked_rows if r.get("t5_outcome") == "MISMATCHED") decisive = matched + mismatched match_rate = round((matched / decisive) * 100.0, 2) if decisive else None false_positive_rate = round((matched / decisive) * 100.0, 2) if decisive else None avg_t5_return = None t5_returns = [float(r.get("t5_return_pct")) for r in blocked_rows if _to_float(r.get("t5_return_pct")) is not None] if t5_returns: avg_t5_return = round(mean(t5_returns), 2) row = { "threshold": threshold, "proxy_field": proxy_field, "blocked_count": len(blocked_rows), "matched_count": matched, "mismatched_count": mismatched, "decisive_count": decisive, "match_rate_pct": match_rate, "false_positive_rate_pct": false_positive_rate, "avg_t5_return_pct": avg_t5_return, } threshold_ledger.append(row) if chosen is None and false_positive_rate is not None and false_positive_rate <= 20.0: chosen = row if len(op_candidates) < 30: status = "WATCH_PENDING_SAMPLE" elif chosen is not None: status = "PASS" else: status = "DEGRADE_BUY_PERMISSION" if chosen is None and threshold_ledger: chosen = max(threshold_ledger, key=lambda r: float(r.get("match_rate_pct") or 0.0)) # [LC1/NF3] velocity_decile_thresholds — buy_timing_score 실측 분포 10분위 계산 # samples >= 30 이면 실측 분위를 BUY 차단 커트오프 후보로 제공 velocity_decile_thresholds: dict[str, object] = {} if len(proxy_values) >= 30: # 10분위 경계값 계산 (1~9 분위점) decile_cuts = quantiles(proxy_values, n=10) # T+5 승률 최저 분위 → 차단 임계값 권고 recommended_cut = chosen.get("threshold") if chosen else None velocity_decile_thresholds = { "source": "실측 분포 (buy_timing_score 10분위)", "proxy_field": proxy_field, "sample_n": len(proxy_values), "decile_1_pct": round(decile_cuts[0], 2), "decile_2_pct": round(decile_cuts[1], 2), "decile_3_pct": round(decile_cuts[2], 2), "decile_5_pct": round(decile_cuts[4], 2), "decile_7_pct": round(decile_cuts[6], 2), "decile_9_pct": round(decile_cuts[8], 2), "recommended_block_threshold": recommended_cut, "calibration_status": "CALIBRATED_FROM_LEDGER", "note": "velocity_1d 실측값 미확보 → buy_timing_score 분위 사용. T+5 최저승률 분위를 BUY 차단 기준으로 권고.", } else: # [LC1] samples < 30 → 프록시값 사용 금지, WATCH_PENDING_SAMPLE 명시 velocity_decile_thresholds = { "source": "WATCH_PENDING_SAMPLE", "proxy_field": proxy_field, "sample_n": len(proxy_values), "recommended_block_threshold": None, "calibration_status": "WATCH_PENDING_SAMPLE", "note": ( f"[LC1] samples={len(proxy_values)}<30 — 실측 분위 캘리브레이션 불가. " "현재 임계값은 EXPERT_PRIOR(3%/10%). 30건 누적 후 자동 교체." ), } # [LC1] late_chase_block_precision — 프록시 100.0 금지, 실측값만 precision_val = chosen.get("match_rate_pct") if chosen else None if precision_val is not None and len(op_candidates) < 30: # 표본 부족 시 precision 노출 자체를 WATCH_PENDING_SAMPLE으로 표기 precision_label = "WATCH_PENDING_SAMPLE" else: precision_label = f"{precision_val}%" if precision_val is not None else "DATA_MISSING" result = { "formula_id": "LATE_CHASE_ATTRIBUTION_V1", "status": status, "samples": len(op_candidates) if op_candidates else int(alpha_fb.get("total_samples") or 0), "operational_samples": len(op_candidates), "gate_hit_miss_rate_published": True, # [LC1] velocity_decile_thresholds — 실측 분위 임계값 "velocity_decile_thresholds": velocity_decile_thresholds, "metrics": { "late_chase_high_risk_count": len(high_risk), "late_chase_blocked_count": len(blocked), "pullback_wait_count": len(pullback_wait), "chase_entry_rate": float(alpha_fb.get("chase_entry_rate") or 0.0), "distribution_entry_rate": float(alpha_fb.get("distribution_entry_rate") or 0.0), "late_chase_proxy_field": proxy_field, "late_chase_proxy_mean": round(mean(proxy_values), 2) if proxy_values else None, "late_chase_proxy_min": round(min(proxy_values), 2) if proxy_values else None, "late_chase_proxy_max": round(max(proxy_values), 2) if proxy_values else None, # [LC1] 실측 precision — 프록시 100.0 금지 "late_chase_block_precision_label": precision_label, "late_chase_proxy_match_rate_pct": chosen.get("match_rate_pct") if chosen else None, "late_chase_proxy_false_positive_rate_pct": chosen.get("false_positive_rate_pct") if chosen else None, }, "policy": { "pilot_only_threshold": 0.25, "no_buy_days_threshold": 0.35, "applied_mode": ( "NO_BUY_DAYS_3" if float(alpha_fb.get("chase_entry_rate") or 0.0) >= 0.35 else "PILOT_ONLY" if float(alpha_fb.get("chase_entry_rate") or 0.0) >= 0.25 else "NORMAL" ), # [LC1] 현재 임계값 하드코딩 여부 명시 "velocity_threshold_source": ( "CALIBRATED_FROM_LEDGER" if len(proxy_values) >= 30 else "EXPERT_PRIOR_PENDING_CALIBRATION" ), }, "threshold_ledger": threshold_ledger, "watchlist": watchlist, "supporting_artifacts": [ "Temp/proposal_evaluation_history.json", "Temp/entry_freshness_json", ], "note": ( "operational_samples는 proposal_evaluation_history의 비-REPLAY T+5 평가행이며, " "explicit velocity_1d가 없어 buy_timing_score를 entry-timing proxy로 사용. " "[LC1] samples<30 구간에서 precision/precision_label=WATCH_PENDING_SAMPLE." ), } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())