from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "decision_evidence_score_v1.json" DEFAULT_POLICY = ROOT / "spec" / "strategy_execution_lock_policy.yaml" def _load(path: Path) -> dict[str, Any]: data = json.loads(path.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {} def _rows(v: Any) -> list[dict[str, Any]]: if isinstance(v, list): return [x for x in v if isinstance(x, dict)] if isinstance(v, str): try: return _rows(json.loads(v)) except Exception: return [] return [] def _load_policy(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) except Exception: return {} root = payload.get("strategy_execution_lock_policy") if isinstance(payload, dict) else {} obj = root.get("decision_evidence_score_v1") if isinstance(root, dict) else {} return obj if isinstance(obj, dict) else {} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) ap.add_argument("--policy", default=str(DEFAULT_POLICY)) args = ap.parse_args() json_path = Path(args.json) out_path = Path(args.out) policy_path = Path(args.policy) if not json_path.is_absolute(): json_path = ROOT / json_path if not out_path.is_absolute(): out_path = ROOT / out_path if not policy_path.is_absolute(): policy_path = ROOT / policy_path # GAS 규칙 코드 → 공식 ID 역산 맵 # GAS가 버전 없는 규칙 코드를 사용할 때 정규식이 매칭하지 못하는 경우를 보완 _RULE_CODE_TO_FORMULA: dict[str, str] = { "SELL_RULE:": "SELL_WATERFALL_ENGINE_V1", # 매도 규칙 엔진 "DE1_": "LLM_SERVING_CONSTRAINT_V1", # Direction E1 #1 수동 검토 "WHIPSAW_V1": "ANTI_WHIPSAW_GATE_V1", # 반등 의심 게이트 (이미 regex 매칭) } payload = _load(json_path) policy = _load_policy(policy_path) data = payload.get("data") if isinstance(payload.get("data"), dict) else {} h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {} bp = _rows(h.get("order_blueprint_json")) required_keys = tuple(policy.get("required_keys")) if isinstance(policy.get("required_keys"), list) else ("ticker", "order_type", "validation_status", "rationale_code") actionable = {str(x).upper() for x in (policy.get("actionable_order_types") if isinstance(policy.get("actionable_order_types"), list) else ["BUY", "SELL", "STOP_LOSS", "ADD_ON", "STAGED_BUY"])} rationale_pat = str(policy.get("rationale_formula_regex") or r"([A-Z][A-Z0-9_]*_V[0-9]+|NO_EXECUTION:[A-Z_]+)") rationale_re = re.compile(rationale_pat) complete = 0 conflicts = 0 rationale_ok = 0 rationale_total = 0 decisions_out: list[dict[str, Any]] = [] for r in bp: if all(str(r.get(k) or "").strip() for k in required_keys): complete += 1 ot = str(r.get("order_type") or "").upper() vs = str(r.get("validation_status") or "").upper() if ot in ("BUY", "ADD_ON", "STAGED_BUY") and vs == "PASS" and str(r.get("blocked_by_gate") or "").strip(): conflicts += 1 if ot in actionable and vs in ("PASS", "BLOCKED", "REVIEW_ONLY"): rationale_total += 1 rc = str(r.get("rationale_code") or "") inferred_formula = "" matched = bool(rationale_re.search(rc)) if not matched: # 정규식 미매칭 시 규칙 코드 역산 맵으로 공식 ID 보완 for prefix, fid in _RULE_CODE_TO_FORMULA.items(): if prefix in rc: inferred_formula = fid matched = bool(rationale_re.search(rc + "|" + fid)) break if matched: rationale_ok += 1 decisions_out.append({ "ticker": str(r.get("ticker") or ""), "order_type": ot, "validation_status": vs, "rationale_code": rc, "inferred_formula": inferred_formula, "rationale_ok": matched, }) total = max(1, len(bp)) completeness_pct = complete / total * 100.0 conflict_rate = conflicts / total * 100.0 rationale_quality_pct = 100.0 if rationale_total == 0 else (rationale_ok / rationale_total) * 100.0 w = policy.get("weights") if isinstance(policy.get("weights"), dict) else {} wc = float(w.get("completeness_pct") or 0.55) wf = float(w.get("conflict_safety_pct") or 0.20) wr = float(w.get("rationale_quality_pct") or 0.25) score = round(max(0.0, min(100.0, wc * completeness_pct + wf * (100.0 - conflict_rate) + wr * rationale_quality_pct)), 2) pass_th = float(policy.get("pass_threshold") or 92.0) caution_th = float(policy.get("caution_threshold") or 80.0) no_action_state = str(policy.get("no_actionable_orders_state") or "NO_ACTIONABLE_ORDERS") if rationale_total == 0: gate = no_action_state else: gate = "PASS" if score >= pass_th else ("NO_NEW_BUY" if score >= caution_th else "BLOCK") result = { "formula_id": "DECISION_EVIDENCE_SCORE_V1", "score": score, "gate": gate, "metrics": { "completeness_pct": round(completeness_pct, 2), "conflict_rate_pct": round(conflict_rate, 2), "rationale_quality_pct": round(rationale_quality_pct, 2), "rationale_total": rationale_total, "rationale_ok": rationale_ok, "rows": len(bp), }, "decisions": decisions_out, "policy_used": { "policy_path": str(policy_path), "pass_threshold": pass_th, "caution_threshold": caution_th, "actionable_order_types": sorted(actionable), "rationale_formula_regex": rationale_pat, "no_actionable_orders_state": no_action_state, }, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())