from __future__ import annotations import argparse import json from datetime import date, datetime, timedelta from pathlib import Path from typing import Any from pykrx import stock ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_HISTORY = ROOT / "Temp" / "proposal_evaluation_history.json" def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: obj = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return obj if isinstance(obj, dict) else {} def _parse_rows(value: Any) -> list[dict[str, Any]]: if isinstance(value, list): return [r for r in value if isinstance(r, dict)] if isinstance(value, str): try: parsed = json.loads(value) if isinstance(parsed, list): return [r for r in parsed if isinstance(r, dict)] except Exception: return [] return [] def _text(v: Any) -> str: return str(v or "").strip() def _to_num(v: Any) -> float | None: try: if v is None or v == "": return None return float(v) except Exception: return None def _expected_direction(action: str, order_type: str) -> str: raw = f"{action} {order_type}".upper() if "BUY" in raw or "ADD" in raw: return "UP" if "SELL" in raw or "TRIM" in raw or "EXIT" in raw or "STOP" in raw: return "DOWN_OR_RISK_REDUCED" if "WATCH" in raw: return "NEUTRAL_TO_UP" return "NEUTRAL" def _classify(ret: float, expected: str, action: str, horizon: str) -> str: if horizon == "t1": up_pass, up_fail = 0.5, -1.0 down_pass, down_fail = 0.5, 1.5 nu_lo, nu_hi, nu_fail_lo, nu_fail_hi, neut = -1.5, 3.0, -2.5, 5.0, 1.5 elif horizon == "t5": up_pass, up_fail = 2.0, -3.0 down_pass, down_fail = 1.0, 4.0 nu_lo, nu_hi, nu_fail_lo, nu_fail_hi, neut = -3.0, 7.0, -6.0, 12.0, 3.0 else: up_pass, up_fail = 5.0, -8.0 down_pass, down_fail = 2.0, 10.0 nu_lo, nu_hi, nu_fail_lo, nu_fail_hi, neut = -5.0, 15.0, -10.0, 25.0, 6.0 if expected == "UP": if ret >= up_pass: return "MATCHED" if ret <= up_fail: return "MISMATCHED" return "INCONCLUSIVE" if expected == "DOWN_OR_RISK_REDUCED": if ret <= down_pass: return "MATCHED" if ret >= down_fail: return "MISMATCHED" return "INCONCLUSIVE" if expected == "NEUTRAL_TO_UP": if nu_lo <= ret <= nu_hi: return "MATCHED" if ret <= nu_fail_lo or ret >= nu_fail_hi: return "MISMATCHED" return "INCONCLUSIVE" if abs(ret) <= neut: return "MATCHED" if abs(ret) >= neut * 2: return "MISMATCHED" return "INCONCLUSIVE" def _summarize(records: list[dict[str, Any]]) -> dict[str, Any]: def hsum(status_key: str, outcome_key: str, ret_key: str) -> dict[str, Any]: ev = [r for r in records if str(r.get(status_key) or "").startswith("EVALUATED_")] m = [r for r in ev if r.get(outcome_key) == "MATCHED"] mm = [r for r in ev if r.get(outcome_key) == "MISMATCHED"] rets = [r.get(ret_key) for r in ev if isinstance(r.get(ret_key), (int, float))] return { "evaluated_count": len(ev), "matched_count": len(m), "mismatched_count": len(mm), "match_rate_pct": round((len(m) / len(ev)) * 100, 2) if ev else None, "avg_return_pct": round(sum(rets) / len(rets), 2) if rets else None, } t1 = [r for r in records if r.get("evaluation_status") == "EVALUATED_T1"] t1m = [r for r in t1 if r.get("outcome") == "MATCHED"] t1mm = [r for r in t1 if r.get("outcome") == "MISMATCHED"] return { "evaluated_count": len(t1), "matched_count": len(t1m), "mismatched_count": len(t1mm), "match_rate_pct": round((len(t1m) / len(t1)) * 100, 2) if t1 else None, "t5_horizon": hsum("t5_evaluation_status", "t5_outcome", "t5_return_pct"), "t20_horizon": hsum("t20_evaluation_status", "t20_outcome", "t20_return_pct"), "last_updated": datetime.now().isoformat(timespec="seconds"), } def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--history", default=str(DEFAULT_HISTORY)) ap.add_argument("--lookback_days", type=int, default=90) ap.add_argument("--max_trade_days", type=int, default=45) args = ap.parse_args() jp = Path(args.json) hp = Path(args.history) if not jp.is_absolute(): jp = ROOT / jp if not hp.is_absolute(): hp = ROOT / hp payload = _load_json(jp) data = payload.get("data") if isinstance(payload.get("data"), dict) else {} hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {} hist = _load_json(hp) records = hist.get("records") if isinstance(hist.get("records"), list) else [] existing = {_text(r.get("proposal_id")) for r in records if isinstance(r, dict)} decisions = { _text(r.get("ticker")): r for r in _parse_rows(hctx.get("decisions_json")) if _text(r.get("ticker")) } blueprint = _parse_rows(hctx.get("order_blueprint_json")) names = {} templates: list[dict[str, Any]] = [] for row in blueprint: ticker = _text(row.get("ticker")) if not ticker: continue dec = decisions.get(ticker, {}) action = _text(dec.get("final_action") or row.get("order_type") or "WATCH") order_type = _text(row.get("order_type") or "WATCH") names[ticker] = _text(row.get("name")) templates.append({"ticker": ticker, "name": names[ticker], "action": action, "order_type": order_type}) end_d = date.today() start_d = end_d - timedelta(days=max(35, args.lookback_days)) start_s = start_d.strftime("%Y%m%d") end_s = end_d.strftime("%Y%m%d") replay_rows: list[dict[str, Any]] = [] for t in templates: ticker = t["ticker"] try: df = stock.get_market_ohlcv(start_s, end_s, ticker) except Exception: continue if df is None or len(df.index) < 30: continue closes = [] for idx, row in df.iterrows(): c = _to_num(row.get("종가")) if c is None or c <= 0: continue d = idx.date().isoformat() if hasattr(idx, "date") else str(idx)[:10] closes.append((d, c)) if len(closes) < 30: continue start_i = max(0, len(closes) - args.max_trade_days - 21) end_i = len(closes) - 21 expected = _expected_direction(t["action"], t["order_type"]) for i in range(start_i, end_i): proposal_date, p_close = closes[i] d1, c1 = closes[i + 1] d5, c5 = closes[i + 5] d20, c20 = closes[i + 20] pid = f"REPLAY:{proposal_date}:{ticker}:{t['order_type']}:{t['action']}" if pid in existing: continue ret1 = round((c1 / p_close - 1.0) * 100.0, 2) ret5 = round((c5 / p_close - 1.0) * 100.0, 2) ret20 = round((c20 / p_close - 1.0) * 100.0, 2) replay_rows.append({ "proposal_id": pid, "record_type": "HISTORICAL_REPLAY_EOD", "data_origin": "REPLAY_FROM_KRX_EOD", "proposal_date": proposal_date, "ticker": ticker, "name": t["name"], "action": t["action"], "order_type": t["order_type"], "validation_status": "REPLAY_BACKFILL", "expected_direction": expected, "proposed_close": p_close, "proposed_limit_price": None, "proposed_quantity": None, "rule_basis": "REPLAY_BACKFILL_KRX_EOD", "evaluation_status": "EVALUATED_T1", "result_date": d1, "result_close": c1, "next_return_pct": ret1, "outcome": _classify(ret1, expected, t["action"], "t1"), "error_cause": "REPLAY_BACKFILL", "improvement_proposal": "REPLAY_ONLY_DO_NOT_AUTO_ADOPT", "t5_evaluation_status": "EVALUATED_T5", "t5_result_date": d5, "t5_return_pct": ret5, "t5_outcome": _classify(ret5, expected, t["action"], "t5"), "t20_evaluation_status": "EVALUATED_T20", "t20_result_date": d20, "t20_return_pct": ret20, "t20_outcome": _classify(ret20, expected, t["action"], "t20"), }) records.extend(replay_rows) records = [r for r in records if isinstance(r, dict)] records.sort(key=lambda r: (_text(r.get("proposal_date")), _text(r.get("ticker")), _text(r.get("proposal_id")))) hist["schema_version"] = "2026-05-25-proposal-evaluation-v3-replay" hist["records"] = records hist["summary"] = _summarize(records) hp.parent.mkdir(parents=True, exist_ok=True) hp.write_text(json.dumps(hist, ensure_ascii=False, indent=2), encoding="utf-8") print(f"REPLAY_BACKFILL_OK records_added={len(replay_rows)} total_records={len(records)}") return 0 if __name__ == "__main__": raise SystemExit(main())