from __future__ import annotations import argparse import json from datetime import datetime from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" LATE_PATH = ROOT / "Temp" / "late_chase_attribution_v1.json" REB_PATH = ROOT / "Temp" / "rebound_sell_efficiency_v1.json" DI_PATH = ROOT / "Temp" / "data_integrity_score_v1.json" DV_PATH = ROOT / "Temp" / "derivation_validity_score_v1.json" DE_PATH = ROOT / "Temp" / "decision_evidence_score_v1.json" OQ_PATH = ROOT / "Temp" / "outcome_quality_score_v1.json" OEA_PATH = ROOT / "Temp" / "operational_evidence_audit_v1.json" SHM_PATH = ROOT / "Temp" / "short_horizon_outcome_monitor_v1.json" EHC_PATH = ROOT / "Temp" / "evaluation_history_coverage_v1.json" POLICY_PATH = ROOT / "spec" / "strategy_execution_lock_policy.yaml" def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return data if isinstance(data, dict) else {} def _parse_rows(v: Any) -> list[dict[str, Any]]: if isinstance(v, list): return [x for x in v if isinstance(x, dict)] if isinstance(v, str): try: p = json.loads(v) return _parse_rows(p) except Exception: return [] return [] def _to_json_string_if_needed(original: Any, value: Any) -> Any: if isinstance(original, str): return json.dumps(value, ensure_ascii=False) return value def _as_obj(value: Any) -> dict[str, Any]: if isinstance(value, dict): return value if isinstance(value, str): try: parsed = json.loads(value) return parsed if isinstance(parsed, dict) else {} except Exception: return {} return {} def _latest_snapshot_captured_at_iso(rows: list[dict[str, Any]]) -> str | None: latest_dt: datetime | None = None latest_iso: str | None = None for row in rows: if not isinstance(row, dict): continue for key in ("captured_at", "last_updated"): raw = str(row.get(key) or "").strip() if not raw: continue try: dt = datetime.fromisoformat(raw.replace("Z", "+00:00")) except Exception: continue if dt.tzinfo is None: dt = dt.astimezone() if latest_dt is None or dt > latest_dt: latest_dt = dt latest_iso = raw return latest_iso def _compute_blueprint_checksum(rows: list[dict[str, Any]]) -> int: s = "" for row in rows: s += ( f"{row.get('ticker', '')}|" f"{row.get('order_type', '')}|" f"{row.get('quantity', '') if row.get('quantity') is not None else ''}|" f"{row.get('limit_price_krw', '') if row.get('limit_price_krw') is not None else ''}|" f"{row.get('validation_status', '')};" ) total = 0 for ch in s: total = (total + ord(ch)) & 0xFFFFFFFF return total def _load_lock_policy(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) except Exception: return {} root = payload.get("strategy_execution_lock_policy") if isinstance(payload, dict) else {} obj = root.get("strategy_execution_locks_v1") if isinstance(root, dict) else {} return obj if isinstance(obj, dict) else {} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) args = ap.parse_args() json_path = Path(args.json) if not json_path.is_absolute(): json_path = ROOT / json_path payload = _load_json(json_path) if not payload: print("STRATEGY_EXEC_LOCKS_FAIL: input json missing/invalid") return 1 data = payload.get("data") if isinstance(payload.get("data"), dict) else {} hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {} hapex = payload.get("hApex") if isinstance(payload.get("hApex"), dict) else {} if not isinstance(hctx, dict): hctx = {} if not isinstance(hapex, dict): hapex = {} h = dict(hctx) h.update(hapex) if not isinstance(h, dict): print("STRATEGY_EXEC_LOCKS_FAIL: harness context missing") return 1 fresh_captured_at = _latest_snapshot_captured_at_iso(data.get("account_snapshot", []) if isinstance(data.get("account_snapshot"), list) else []) if fresh_captured_at: h["captured_at"] = fresh_captured_at hctx["captured_at"] = fresh_captured_at hapex["captured_at"] = fresh_captured_at late = _load_json(LATE_PATH) reb = _load_json(REB_PATH) di = _load_json(DI_PATH) dv = _load_json(DV_PATH) de = _load_json(DE_PATH) oq = _load_json(OQ_PATH) oea = _load_json(OEA_PATH) shm = _load_json(SHM_PATH) ehc = _load_json(EHC_PATH) policy = _load_lock_policy(POLICY_PATH) h["late_chase_attribution_v1_json"] = late h["rebound_sell_efficiency_v1_json"] = reb if di: h["data_integrity_score_v1_json"] = di if dv: h["derivation_validity_score_v1_json"] = dv if de: h["decision_evidence_score_v1_json"] = de if oq: h["outcome_quality_score_v1_json"] = oq if oea: h["operational_evidence_audit_v1_json"] = oea if shm: h["short_horizon_outcome_monitor_v1_json"] = shm if ehc: h["evaluation_history_coverage_v1_json"] = ehc ob_original = h.get("order_blueprint_json") rows = _parse_rows(ob_original) export_gate = _as_obj(h.get("export_gate_json")) late_status = str(late.get("status") or "") reb_score = float((reb.get("metrics") or {}).get("rebound_efficiency_score") or 0.0) di_score = float(di.get("score") or 0.0) dv_score = float(dv.get("score") or 0.0) de_score = float(de.get("score") or 0.0) oq_score = float(oq.get("score") or 0.0) di_gate = str(di.get("gate") or "") dv_gate = str(dv.get("gate") or "") de_gate = str(de.get("gate") or "") oq_gate = str(oq.get("gate") or "") oq_sufficient_eval = bool((oq.get("metrics") or {}).get("has_sufficient_eval")) di_block_threshold = float(policy.get("data_integrity_block_threshold") or 90.0) dv_block_threshold = float(policy.get("derivation_validity_block_threshold") or 90.0) de_block_threshold = float(policy.get("decision_evidence_block_threshold") or 85.0) oq_buy_block_threshold = float(policy.get("outcome_buy_block_threshold") or 50.0) oq_sell_scale_threshold = float(policy.get("outcome_sell_scale_threshold") or 60.0) oq_sell_scale_ratio = float(policy.get("outcome_sell_scale_ratio") or 0.70) buy_block_count = 0 sell_scale_count = 0 hard_block_count = 0 for r in rows: order_type = str(r.get("order_type") or "").upper() validation = str(r.get("validation_status") or "") rationale = str(r.get("rationale_code") or "") # P0 hard lock: data/derivation score gate if di_score < di_block_threshold or di_gate == "EXPORT_BLOCKED_CRITICAL": r["validation_status"] = "BLOCKED" r["blocked_by_gate"] = "DATA_INTEGRITY_SCORE_V1" tag = "DI1_EXPORT_BLOCKED_CRITICAL" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag for qk in ("quantity", "order_qty", "buy_qty", "sell_qty", "proposed_immediate_qty", "proposed_staged_qty"): if isinstance(r.get(qk), (int, float)): r[qk] = 0 hard_block_count += 1 continue if dv_score < dv_block_threshold or dv_gate == "NO_PRICE_QTY_EXPORT": r["validation_status"] = "BLOCKED" r["blocked_by_gate"] = "DERIVATION_VALIDITY_SCORE_V1" tag = "DV1_NO_PRICE_QTY_EXPORT" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag for qk in ("quantity", "order_qty", "buy_qty", "sell_qty", "proposed_immediate_qty", "proposed_staged_qty"): if isinstance(r.get(qk), (int, float)): r[qk] = 0 hard_block_count += 1 continue if de_score < de_block_threshold or de_gate in ("NEEDS_MANUAL_REVIEW", "BLOCK"): r["validation_status"] = "BLOCKED" r["blocked_by_gate"] = "DECISION_EVIDENCE_SCORE_V1" tag = "DE1_MANUAL_REVIEW" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag for qk in ("quantity", "order_qty", "buy_qty", "sell_qty", "proposed_immediate_qty", "proposed_staged_qty"): if isinstance(r.get(qk), (int, float)): r[qk] = 0 hard_block_count += 1 continue if late_status == "DEGRADE_BUY_PERMISSION" and order_type in ("BUY", "ADD_ON", "STAGED_BUY"): r["validation_status"] = "BLOCKED" r["blocked_by_gate"] = "LATE_CHASE_ATTRIBUTION_V1" tag = "LCA1_BUY_BLOCK" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag for qk in ("quantity", "order_qty", "buy_qty", "proposed_staged_qty"): if isinstance(r.get(qk), (int, float)): r[qk] = 0 buy_block_count += 1 continue if reb_score < 60.0 and order_type in ("SELL", "STOP_LOSS") and validation == "PASS": scaled = False for qk in ("quantity", "order_qty", "sell_qty", "proposed_immediate_qty"): qv = r.get(qk) if isinstance(qv, (int, float)) and qv > 0: r[qk] = int(max(1, round(qv * 0.8))) scaled = True if scaled: tag = "RSE1_SELL_SCALE_80" r["lock_applied"] = "REBOUND_SELL_EFFICIENCY_V1" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag sell_scale_count += 1 if oq_gate != "INSUFFICIENT_EVAL" and oq_score < oq_buy_block_threshold and order_type in ("BUY", "ADD_ON", "STAGED_BUY"): r["validation_status"] = "BLOCKED" r["blocked_by_gate"] = "OUTCOME_QUALITY_SCORE_V1" tag = "OQ1_BUY_BLOCK_LOW_OUTCOME" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag for qk in ("quantity", "order_qty", "buy_qty", "proposed_staged_qty"): if isinstance(r.get(qk), (int, float)): r[qk] = 0 buy_block_count += 1 continue if oq_gate != "INSUFFICIENT_EVAL" and oq_score < oq_sell_scale_threshold and order_type in ("SELL", "STOP_LOSS") and str(r.get("validation_status") or "") == "PASS": scaled = False for qk in ("quantity", "order_qty", "sell_qty", "proposed_immediate_qty"): qv = r.get(qk) if isinstance(qv, (int, float)) and qv > 0: r[qk] = int(max(1, round(qv * oq_sell_scale_ratio))) scaled = True if scaled: tag = "OQ1_SELL_SCALE_70" r["lock_applied"] = "OUTCOME_QUALITY_SCORE_V1" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag sell_scale_count += 1 if export_gate.get("hts_entry_allowed") is False: for r in rows: if str(r.get("validation_status") or "").upper() == "PASS": rationale = str(r.get("rationale_code") or "") tag = "EXPORT_GATE_BLOCK" r["validation_status"] = "BLOCKED" r["blocked_by_gate"] = "EXPORT_GATE_V1" r["rationale_code"] = f"{rationale}|{tag}" if rationale else tag h["order_blueprint_json"] = _to_json_string_if_needed(ob_original, rows) checksum = _compute_blueprint_checksum(rows) h["blueprint_checksum"] = checksum h["rendered_output_checksum"] = checksum h["rendered_report_checksum"] = checksum h["strategy_execution_locks_v1_json"] = { "formula_id": "STRATEGY_EXECUTION_LOCKS_V1", "data_integrity_score": di_score, "derivation_validity_score": dv_score, "data_integrity_gate": di_gate, "derivation_validity_gate": dv_gate, "decision_evidence_score": de_score, "decision_evidence_gate": de_gate, "outcome_quality_score": oq_score, "outcome_quality_gate": oq_gate, "outcome_quality_has_sufficient_eval": oq_sufficient_eval, "outcome_lock_mode": "SUSPENDED_DUE_TO_INSUFFICIENT_EVAL" if oq_gate == "INSUFFICIENT_EVAL" else "ACTIVE", "late_chase_status": late_status, "rebound_efficiency_score": reb_score, "hard_block_count": hard_block_count, "buy_block_count": buy_block_count, "sell_scale_count": sell_scale_count, "policy_path": str(POLICY_PATH), "policy": { "data_integrity_block_threshold": di_block_threshold, "derivation_validity_block_threshold": dv_block_threshold, "decision_evidence_block_threshold": de_block_threshold, "outcome_buy_block_threshold": oq_buy_block_threshold, "outcome_sell_scale_threshold": oq_sell_scale_threshold, "outcome_sell_scale_ratio": oq_sell_scale_ratio, }, } # write back to both locations data["_harness_context"] = h payload["data"] = data payload["hApex"] = h json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print("STRATEGY_EXEC_LOCKS_OK") print( json.dumps( { "late_chase_status": late_status, "rebound_efficiency_score": reb_score, "data_integrity_score": di_score, "derivation_validity_score": dv_score, "decision_evidence_score": de_score, "outcome_quality_score": oq_score, "hard_block_count": hard_block_count, "buy_block_count": buy_block_count, "sell_scale_count": sell_scale_count, }, ensure_ascii=False, ) ) return 0 if __name__ == "__main__": raise SystemExit(main())