#!/usr/bin/env python3 """IMMUTABLE_DECISION_LEDGER_V1 — spec/formulas/domains/governance.yaml. Append-only decision log. Refuses to append a duplicate decision_id and never mutates an existing record. governance/todo/v8_9_p2_adoption_plan.yaml P2-C. """ from __future__ import annotations import argparse import json from datetime import datetime, timezone from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DEFAULT_LEDGER = ROOT / "Temp" / "immutable_decision_ledger_v1.json" DEFAULT_DECISION = ROOT / "Temp" / "portfolio_transition_optimizer_v1.json" REQUIRED_FIELDS = [ "decision_id", "engine_version", "input_hash_bundle", "execution_mode", "candidate_ids", ] def _load_ledger(path: Path) -> dict: if not path.exists(): return {"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "records": []} try: data = json.loads(path.read_text(encoding="utf-8")) if isinstance(data, dict) and isinstance(data.get("records"), list): return data return {"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "records": []} except Exception: return {"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "records": []} def append_decision(ledger: dict, decision: dict) -> tuple[dict, str]: missing = [f for f in REQUIRED_FIELDS if decision.get(f) is None] if missing: return ledger, "REJECTED_MISSING_FIELDS" decision_id = decision["decision_id"] existing_ids = {r["decision_id"] for r in ledger["records"]} if decision_id in existing_ids: return ledger, "DUPLICATE_DECISION_ID" record = { "decision_id": decision_id, "timestamp": decision.get("timestamp") or datetime.now(timezone.utc).isoformat(), "engine_version": decision["engine_version"], "input_hash_bundle": decision["input_hash_bundle"], "execution_mode": decision["execution_mode"], "candidate_ids": decision["candidate_ids"], "selected_transition_id": decision.get("selected_transition_id"), "hard_blocks": decision.get("hard_blocks", []), "transition_utility_krw": decision.get("transition_utility_krw"), "operator_override": decision.get("operator_override", False), "order_ids": decision.get("order_ids", []), "fill_prices": decision.get("fill_prices", []), "slippage": decision.get("slippage"), "T1_return": None, "T5_return": None, "T20_return": None, "MAE": None, "MFE": None, } new_ledger = {**ledger, "records": ledger["records"] + [record]} return new_ledger, "APPENDED" def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--ledger", default=str(DEFAULT_LEDGER)) ap.add_argument("--decision", default=str(DEFAULT_DECISION)) args = ap.parse_args() ledger = _load_ledger(Path(args.ledger)) decision_doc = {} decision_path = Path(args.decision) if decision_path.exists(): try: decision_doc = json.loads(decision_path.read_text(encoding="utf-8")) except Exception: decision_doc = {} decision = { "decision_id": decision_doc.get("packet_id") or decision_doc.get("formula_id"), "engine_version": "PORTFOLIO_TRANSITION_UTILITY_V1", "input_hash_bundle": decision_doc.get("input_hash") or "unknown", "execution_mode": decision_doc.get("execution_mode") or decision_doc.get("final_action"), "candidate_ids": [c.get("candidate_id") for c in decision_doc.get("candidate_actions", [])], "selected_transition_id": (decision_doc.get("selected_transition") or {}).get("candidate_id"), "hard_blocks": decision_doc.get("reason_codes", []), "transition_utility_krw": (decision_doc.get("selected_transition") or {}).get("transition_utility_krw"), } new_ledger, status = append_decision(ledger, decision) new_ledger["status"] = status out = Path(args.ledger) if status == "APPENDED": out.write_text(json.dumps(new_ledger, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps({"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "ledger_append_status": status}, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())