#!/usr/bin/env python3 """EXECUTION_PLAN_COMPILER_V1 — spec/formulas/domains/execution.yaml. Compiles order_capacity_krw into 30/30/40 LIMIT_SPLIT slices and revalidates cash_floor/capacity/spread before each slice, cancelling the remainder when any cancel_remaining_if condition fires. governance/todo/v8_9_p2_adoption_plan.yaml P2-D. """ from __future__ import annotations import argparse import json from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DEFAULT_CAPACITY = ROOT / "Temp" / "execution_capacity_ladder_v1.json" DEFAULT_OUT = ROOT / "Temp" / "execution_plan_compiler_v1.json" SLICE_PCTS = [0.30, 0.30, 0.40] def _load(path: Path) -> dict: if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {} except Exception: return {} def should_cancel_remaining(revalidation_snapshot: dict, baseline_snapshot: dict, required_cash_pct: float) -> str | None: if revalidation_snapshot.get("spread_bps") is not None and baseline_snapshot.get("spread_bps") is not None: if revalidation_snapshot["spread_bps"] > baseline_snapshot["spread_bps"] * 1.5: return "spread_widens_beyond_limit" if revalidation_snapshot.get("cash_floor_pct") is not None: if revalidation_snapshot["cash_floor_pct"] < required_cash_pct: return "cash_floor_after_fill_breached" if revalidation_snapshot.get("order_capacity_krw") is not None and baseline_snapshot.get("order_capacity_krw") is not None: if revalidation_snapshot["order_capacity_krw"] < baseline_snapshot["order_capacity_krw"] * 0.5: return "orderbook_capacity_collapses" return None def compile_slices( order_capacity_krw: float, baseline_snapshot: dict, revalidation_snapshots: list[dict], required_cash_pct: float = 0.0, ) -> list[dict]: slices = [] cancelled = False for idx, pct in enumerate(SLICE_PCTS, start=1): if cancelled: slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED"}) continue snapshot = revalidation_snapshots[idx - 1] if idx - 1 < len(revalidation_snapshots) else baseline_snapshot cancel_reason = should_cancel_remaining(snapshot, baseline_snapshot, required_cash_pct) if cancel_reason: slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED", "reason_code": cancel_reason}) cancelled = True continue slices.append({"slice_index": idx, "slice_amount_krw": order_capacity_krw * pct, "status": "COMPILED"}) return slices def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--capacity", default=str(DEFAULT_CAPACITY)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() doc = _load(Path(args.capacity)) rows = doc.get("rows") if isinstance(doc.get("rows"), list) else [] results = [] for row in rows: order_capacity_krw = row.get("order_capacity_krw") if order_capacity_krw is None: results.append({"gate": "EXECUTION_PLAN_BLOCKED", "compiled_slices": []}) continue baseline_snapshot = { "spread_bps": row.get("spread_bps"), "order_capacity_krw": order_capacity_krw, "cash_floor_pct": row.get("cash_floor_pct"), } compiled = compile_slices(order_capacity_krw, baseline_snapshot, revalidation_snapshots=[baseline_snapshot] * 3) results.append({"gate": "PASS", "compiled_slices": compiled}) result = { "formula_id": "EXECUTION_PLAN_COMPILER_V1", "gate": "PASS" if results else "DATA_MISSING", "results": results, "source_paths": [str(Path(args.capacity))], } out = Path(args.out) out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())