#!/usr/bin/env python3 """TRANSITION_SET_ENUMERATOR_V1 — spec/formulas/domains/portfolio.yaml. Evaluates combinations of already-vetoed candidates as a portfolio-level set, so individually-passing candidates that jointly breach cash floor or concentration caps are rejected. governance/todo/v8_9_p2_adoption_plan.yaml P2-B. """ from __future__ import annotations import argparse import itertools import json from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DEFAULT_TRANSITION_OPTIMIZER = ROOT / "Temp" / "portfolio_transition_optimizer_v1.json" DEFAULT_OUT = ROOT / "Temp" / "transition_set_enumerator_v1.json" COMPLEXITY_PENALTY_RATE_KRW = 5000.0 def _load(path: Path) -> dict: if not path.exists(): return {} try: data = json.loads(path.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {} except Exception: return {} def post_trade_mrc(transition_set: tuple[dict, ...], portfolio_total_risk_budget: float) -> float | None: if not portfolio_total_risk_budget: return None total = sum(c.get("marginal_risk_contribution", 0.0) for c in transition_set) return total / portfolio_total_risk_budget def post_trade_cvar95_krw(transition_set: tuple[dict, ...]) -> float | None: values = [c.get("cvar95_loss_krw") for c in transition_set] if any(v is None for v in values): return None return sum(values) def set_hard_constraint_pass( transition_set: tuple[dict, ...], portfolio_total_risk_budget: float = 0.0, mrc_cap: float = 1.0, cvar95_budget_krw: float | None = None, ) -> bool: post_trade_cash_floor_pct = sum(c.get("post_trade_cash_floor_delta_pct", 0.0) for c in transition_set) post_trade_concentration_pct = sum(c.get("post_trade_concentration_delta_pct", 0.0) for c in transition_set) if post_trade_cash_floor_pct < 0 or post_trade_concentration_pct > 0: return False mrc = post_trade_mrc(transition_set, portfolio_total_risk_budget) if mrc is not None and mrc > mrc_cap: return False cvar95 = post_trade_cvar95_krw(transition_set) if cvar95 is not None and cvar95_budget_krw is not None and cvar95 < -abs(cvar95_budget_krw): return False return True def set_transition_utility_krw(transition_set: tuple[dict, ...]) -> float: total = sum(c.get("transition_utility_krw") or 0.0 for c in transition_set) combination_penalty = COMPLEXITY_PENALTY_RATE_KRW * (len(transition_set) - 1) return total - combination_penalty def enumerate_transition_sets( candidates: list[dict], max_set_size: int = 3, portfolio_total_risk_budget: float = 0.0, mrc_cap: float = 1.0, cvar95_budget_krw: float | None = None, ) -> list[dict]: passing = [c for c in candidates if c.get("hard_constraint_pass") is True] evaluated_sets = [] for size in range(1, min(max_set_size, len(passing)) + 1): for combo in itertools.combinations(passing, size): evaluated_sets.append( { "candidate_ids": [c.get("candidate_id") for c in combo], "set_hard_constraint_pass": set_hard_constraint_pass( combo, portfolio_total_risk_budget, mrc_cap, cvar95_budget_krw ), "set_transition_utility_krw": set_transition_utility_krw(combo), "post_trade_mrc": post_trade_mrc(combo, portfolio_total_risk_budget), "post_trade_cvar95_krw": post_trade_cvar95_krw(combo), } ) return evaluated_sets def select_best_set(evaluated_sets: list[dict]) -> dict | None: passing_sets = [s for s in evaluated_sets if s["set_hard_constraint_pass"]] if not passing_sets: return None return max( passing_sets, key=lambda s: (s["set_transition_utility_krw"], -len(s["candidate_ids"])), ) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--transition-optimizer", default=str(DEFAULT_TRANSITION_OPTIMIZER)) ap.add_argument("--max-set-size", type=int, default=3) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() doc = _load(Path(args.transition_optimizer)) candidates = doc.get("candidate_actions") if isinstance(doc.get("candidate_actions"), list) else [] if not candidates: result = { "formula_id": "TRANSITION_SET_ENUMERATOR_V1", "gate": "NO_TRADE", "selected_transition_set": [], "rejected_sets_count": 0, "source_paths": [str(Path(args.transition_optimizer))], } out = Path(args.out) out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 evaluated_sets = enumerate_transition_sets(candidates, args.max_set_size) best = select_best_set(evaluated_sets) rejected_count = len(evaluated_sets) - (1 if best else 0) result = { "formula_id": "TRANSITION_SET_ENUMERATOR_V1", "gate": "PASS" if best else "NO_TRADE", "selected_transition_set": best["candidate_ids"] if best else [], "set_transition_utility_krw": best["set_transition_utility_krw"] if best else None, "post_trade_mrc": best["post_trade_mrc"] if best else None, "post_trade_cvar95_krw": best["post_trade_cvar95_krw"] if best else None, "rejected_sets_count": rejected_count, "source_paths": [str(Path(args.transition_optimizer))], } out = Path(args.out) out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())