"""validate_prediction_accuracy_harness_v2.py — PREDICTION_ACCURACY_HARNESS_VALIDATE_V2 Temp/prediction_accuracy_harness_v2.json의 기본 구조와 허용된 데이터 게이트 상태를 검증한다. 현재는 운영 T+5/T+20 표본이 부족할 수 있으므로 INSUFFICIENT_SAMPLES는 허용한다. """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_INPUT = ROOT / "Temp" / "prediction_accuracy_harness_v2.json" DEFAULT_OUT = ROOT / "Temp" / "validate_prediction_accuracy_harness_v2.json" FORMULA_ID = "PREDICTION_ACCURACY_HARNESS_VALIDATE_V2" ALLOWED_CALIBRATION = { "CALIBRATED", "MONITOR", "PAE_CALIBRATION_REQUIRED", "BUY_PROPOSAL_FROZEN_RECOMMEND", "INSUFFICIENT_SAMPLES", } def _load(path: Path) -> Any: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _is_dict(value: Any) -> bool: return isinstance(value, dict) def _ensure_fields(payload: dict[str, Any], path: str, fields: list[str], errors: list[str]) -> None: block = payload if path: for part in path.split("."): block = block.get(part) if isinstance(block, dict) else None if not isinstance(block, dict): errors.append(f"{path or 'root'} must be object") return for field in fields: if field not in block: errors.append(f"missing field: {path + '.' if path else ''}{field}") def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--input", default=str(DEFAULT_INPUT)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() input_path = Path(args.input) input_path = input_path if input_path.is_absolute() else ROOT / input_path out_path = Path(args.out) out_path = out_path if out_path.is_absolute() else ROOT / out_path payload = _load(input_path) errors: list[str] = [] if not _is_dict(payload): errors.append("payload must be object") else: if payload.get("formula_id") != "PREDICTION_ACCURACY_HARNESS_V2": errors.append("formula_id mismatch") calibration_state = str(payload.get("calibration_state") or "") if calibration_state not in ALLOWED_CALIBRATION: errors.append(f"calibration_state={calibration_state}") for key in [ "as_of_date", "data_origin_audit", "windows", "evaluation_methodology", ]: if key not in payload: errors.append(f"missing field: {key}") audit = payload.get("data_origin_audit") if isinstance(audit, dict): for key in [ "operational_sample_count", "replay_sample_count", "untagged_row_count", "unrealized_outcome_row_count", "replay_in_live_stats", "operational_only_accuracy", ]: if key not in audit: errors.append(f"missing field: data_origin_audit.{key}") for key in [ "t1_op_rate", "t1_sample", "t5_op_rate", "t5_sample", "t20_op_rate", "t20_sample", "t20_replay_rate", "t20_replay_sample", "t20_replay_avg_return_pct", "t20_replay_stdev_return_pct", "window_90d_rate", ]: if key not in payload: errors.append(f"missing field: {key}") windows = payload.get("windows") if isinstance(windows, dict): _ensure_fields(windows, "t1", ["all", "30d", "7d"], errors) _ensure_fields(windows, "t5", ["all", "active_passive", "30d", "90d"], errors) _ensure_fields(windows, "t20", ["operational", "operational_30d", "replay", "replay_return_dist"], errors) else: errors.append("windows must be object") t5_sample = payload.get("t5_sample") if isinstance(t5_sample, int) and t5_sample < 30 and calibration_state != "INSUFFICIENT_SAMPLES": errors.append("t5_sample < 30 requires INSUFFICIENT_SAMPLES") result = { "formula_id": FORMULA_ID, "gate": "PASS" if not errors else "FAIL", "checked_file": str(Path(args.input).as_posix()), "errors": errors, } out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if not errors else 1 if __name__ == "__main__": raise SystemExit(main())