feat(quant-engine): v8.9 제안서 P0-P3 로드맵 채택 — 15개 의사결정 엔진 신규 구현

suggest/quant_investment_engine_v8_9_portfolio_optimizer_canonical_refactored.yaml의
implementation_todo_v8_9(P0~P4) 전체를 spec/tool/golden case 레벨로 구현.

- P0: PORTFOLIO_TRANSITION_UTILITY_V1, SELL_LOT_PARETO_SELECTOR_V1, FORECAST_SIMULATION_ENGINE_V1
- P1: SECTOR_EXPOSURE_GRAPH_V1/LEADER_LIFECYCLE_GATE_V1, EXECUTION_CAPACITY_LADDER_V1, MODEL_GOVERNANCE_KILL_SWITCH_V1
- P2: SCENARIO_SHOCK_MATRIX_V1, TRANSITION_SET_ENUMERATOR_V1, IMMUTABLE_DECISION_LEDGER_V1, EXECUTION_PLAN_COMPILER_V1
- P3: STATE_VECTOR_CONSTRUCTOR_V1, WALK_FORWARD_BOOTSTRAP_V1, TRANSITION_SET_ENUMERATOR_V1(MRC/CVaR 확장),
      REBALANCE_CADENCE_GATE_V1, WEEKLY_LEGACY_TRANSFER_PLAN_V1

기존 regime/cluster 연동 정책 수치(현금방어선, 반도체 cap)는 그대로 유지하고 신규 cap 필드만 추가.
spec/09_decision_flow.yaml과 runtime/active_artifact_manifest.yaml에 전 엔진 배선 완료.
governance/todo/v8_9_p{0,1,2,3}_adoption_plan.yaml에 각 단계 작업 추적 기록.

검증: validate_specs/validate_golden_coverage_100(100%)/validate_calibration_registry_v1/
validate_schema_model_generation_v1/validate_agents_shrink_v1 전부 PASS. golden test 53/53 PASS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-18 00:06:52 +09:00
parent aed1eae421
commit aedabdd37b
82 changed files with 7515 additions and 5 deletions
@@ -0,0 +1,84 @@
#!/usr/bin/env python3
"""EXECUTION_CAPACITY_LADDER_V1 — spec/formulas/domains/execution.yaml.
Caps a planned order amount to the asset's actual fillable capacity and blocks
the execution plan outright when the broker_microstructure_packet is incomplete.
governance/todo/v8_9_p1_adoption_plan.yaml P1-B.2.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_ORDERS = ROOT / "Temp" / "execution_capacity_orders_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "execution_capacity_ladder_v1.json"
SPREAD_WIDEN_MULTIPLIER = 1.5
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def evaluate_order_capacity(order: dict) -> dict:
if order.get("halt_status") is True:
return {**order, "gate": "EXECUTION_PLAN_BLOCKED", "reason_code": "trading_halt", "order_capacity_krw": None}
required = ["avg_trade_value_20d_krw", "intraday_trade_value_krw", "orderbook_top3_depth_krw", "spread_bps"]
if any(order.get(f) is None for f in required):
return {
**order,
"gate": "EXECUTION_PLAN_BLOCKED",
"reason_code": "broker_packet_missing",
"order_capacity_krw": None,
}
planned = float(order.get("planned_order_amount_krw") or 0.0)
capacity = min(
planned,
float(order["avg_trade_value_20d_krw"]) * 0.003,
float(order["intraday_trade_value_krw"]) * 0.01,
float(order["orderbook_top3_depth_krw"]) * 0.30,
)
gate = "ORDER_SIZE_CAPPED" if capacity < planned else "PASS"
return {**order, "gate": gate, "order_capacity_krw": capacity}
def should_cancel_remaining_slices(current_spread_bps: float, baseline_spread_bps: float) -> bool:
return current_spread_bps > baseline_spread_bps * SPREAD_WIDEN_MULTIPLIER
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--orders", default=str(DEFAULT_ORDERS))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
doc = _load(Path(args.orders))
orders = doc.get("orders") if isinstance(doc.get("orders"), list) else []
rows = [evaluate_order_capacity(order) for order in orders if isinstance(order, dict)]
result = {
"formula_id": "EXECUTION_CAPACITY_LADDER_V1",
"gate": "PASS" if rows else "DATA_MISSING",
"rows": rows,
"split_order_template": {"slice_1_pct": 30, "slice_2_pct": 30, "slice_3_pct": 40},
"source_paths": [str(Path(args.orders))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+108
View File
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
"""EXECUTION_PLAN_COMPILER_V1 — spec/formulas/domains/execution.yaml.
Compiles order_capacity_krw into 30/30/40 LIMIT_SPLIT slices and revalidates
cash_floor/capacity/spread before each slice, cancelling the remainder when any
cancel_remaining_if condition fires. governance/todo/v8_9_p2_adoption_plan.yaml P2-D.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CAPACITY = ROOT / "Temp" / "execution_capacity_ladder_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "execution_plan_compiler_v1.json"
SLICE_PCTS = [0.30, 0.30, 0.40]
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def should_cancel_remaining(revalidation_snapshot: dict, baseline_snapshot: dict, required_cash_pct: float) -> str | None:
if revalidation_snapshot.get("spread_bps") is not None and baseline_snapshot.get("spread_bps") is not None:
if revalidation_snapshot["spread_bps"] > baseline_snapshot["spread_bps"] * 1.5:
return "spread_widens_beyond_limit"
if revalidation_snapshot.get("cash_floor_pct") is not None:
if revalidation_snapshot["cash_floor_pct"] < required_cash_pct:
return "cash_floor_after_fill_breached"
if revalidation_snapshot.get("order_capacity_krw") is not None and baseline_snapshot.get("order_capacity_krw") is not None:
if revalidation_snapshot["order_capacity_krw"] < baseline_snapshot["order_capacity_krw"] * 0.5:
return "orderbook_capacity_collapses"
return None
def compile_slices(
order_capacity_krw: float,
baseline_snapshot: dict,
revalidation_snapshots: list[dict],
required_cash_pct: float = 0.0,
) -> list[dict]:
slices = []
cancelled = False
for idx, pct in enumerate(SLICE_PCTS, start=1):
if cancelled:
slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED"})
continue
snapshot = revalidation_snapshots[idx - 1] if idx - 1 < len(revalidation_snapshots) else baseline_snapshot
cancel_reason = should_cancel_remaining(snapshot, baseline_snapshot, required_cash_pct)
if cancel_reason:
slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED", "reason_code": cancel_reason})
cancelled = True
continue
slices.append({"slice_index": idx, "slice_amount_krw": order_capacity_krw * pct, "status": "COMPILED"})
return slices
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--capacity", default=str(DEFAULT_CAPACITY))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
doc = _load(Path(args.capacity))
rows = doc.get("rows") if isinstance(doc.get("rows"), list) else []
results = []
for row in rows:
order_capacity_krw = row.get("order_capacity_krw")
if order_capacity_krw is None:
results.append({"gate": "EXECUTION_PLAN_BLOCKED", "compiled_slices": []})
continue
baseline_snapshot = {
"spread_bps": row.get("spread_bps"),
"order_capacity_krw": order_capacity_krw,
"cash_floor_pct": row.get("cash_floor_pct"),
}
compiled = compile_slices(order_capacity_krw, baseline_snapshot, revalidation_snapshots=[baseline_snapshot] * 3)
results.append({"gate": "PASS", "compiled_slices": compiled})
result = {
"formula_id": "EXECUTION_PLAN_COMPILER_V1",
"gate": "PASS" if results else "DATA_MISSING",
"results": results,
"source_paths": [str(Path(args.capacity))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""FORECAST_SIMULATION_ENGINE_V1 — spec/formulas/domains/simulation.yaml.
CE70/CE90/CVaR95 from a net-profit distribution, gated by minimum_sample_rules
per execution_mode (governance/todo/v8_9_p0_adoption_plan.yaml P0-3.2).
Hard rule (AGENTS.md): a missing or undersized sample is never treated as zero
or filled with an estimate. spec/29_backtest_harness_contract.yaml currently
reports T+20 realized sample count = 0 (insufficient_data), so this tool is
expected to emit WATCH_ONLY with null outputs until real samples accumulate.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_BACKTEST_CONTRACT = ROOT / "spec" / "29_backtest_harness_contract.yaml"
DEFAULT_DISTRIBUTION = ROOT / "Temp" / "net_profit_distribution_v1.json"
DEFAULT_DECISION_PACKET = ROOT / "Temp" / "final_decision_packet_active.json"
DEFAULT_OUT = ROOT / "Temp" / "forecast_simulation_engine_v1.json"
MINIMUM_SAMPLE_RULES = {
"AUDIT_ONLY": {"sample_count_total_min": 0, "sample_count_same_regime_min": 0},
"SHADOW": {"sample_count_total_min": 30, "sample_count_same_regime_min": 10},
"PILOT": {"sample_count_total_min": 80, "sample_count_same_regime_min": 20},
"LIVE_LIMITED": {"sample_count_total_min": 150, "sample_count_same_regime_min": 30},
"LIVE_FULL": {"sample_count_total_min": 300, "sample_count_same_regime_min": 50},
}
def _load_json(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _load_yaml(path: Path) -> dict:
if not path.exists():
return {}
try:
import yaml # type: ignore
data = yaml.safe_load(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _sample_counts_from_backtest_contract(contract: dict) -> tuple[int, int]:
metrics = contract.get("current_metrics") or {}
direction_accuracy = metrics.get("direction_accuracy") or {}
t20 = direction_accuracy.get("t20_op_rate") or {}
n_sample = t20.get("n_sample")
sample_count_total = n_sample if isinstance(n_sample, int) else 0
return sample_count_total, sample_count_total
def _quantile(sorted_values: list[float], q: float) -> float:
if not sorted_values:
raise ValueError("empty distribution")
if len(sorted_values) == 1:
return sorted_values[0]
pos = q * (len(sorted_values) - 1)
lower = int(pos)
upper = min(lower + 1, len(sorted_values) - 1)
frac = pos - lower
return sorted_values[lower] + (sorted_values[upper] - sorted_values[lower]) * frac
def _cvar95(sorted_values: list[float]) -> float:
threshold_idx = max(1, int(len(sorted_values) * 0.05))
tail = sorted_values[:threshold_idx]
return sum(tail) / len(tail)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--backtest-contract", default=str(DEFAULT_BACKTEST_CONTRACT))
ap.add_argument("--distribution", default=str(DEFAULT_DISTRIBUTION))
ap.add_argument("--decision-packet", default=str(DEFAULT_DECISION_PACKET))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
backtest_contract = _load_yaml(Path(args.backtest_contract))
distribution_doc = _load_json(Path(args.distribution))
decision_packet = _load_json(Path(args.decision_packet))
execution_mode = (
decision_packet.get("execution_mode")
or decision_packet.get("global_execution_gate")
or "AUDIT_ONLY"
)
rule = MINIMUM_SAMPLE_RULES.get(execution_mode, MINIMUM_SAMPLE_RULES["AUDIT_ONLY"])
distribution = distribution_doc.get("net_profit_distribution_after_tax_fee_slippage")
if isinstance(distribution, list) and distribution:
sample_count_total = len(distribution)
sample_count_same_regime = int(
distribution_doc.get("sample_count_same_regime") or sample_count_total
)
else:
sample_count_total, sample_count_same_regime = _sample_counts_from_backtest_contract(
backtest_contract
)
gate_ok = (
sample_count_total >= rule["sample_count_total_min"]
and sample_count_same_regime >= rule["sample_count_same_regime_min"]
)
if gate_ok and isinstance(distribution, list) and distribution:
sorted_values = sorted(float(v) for v in distribution)
result = {
"formula_id": "FORECAST_SIMULATION_ENGINE_V1",
"execution_mode": execution_mode,
"gate": "PASS",
"sample_count_total": sample_count_total,
"sample_count_same_regime": sample_count_same_regime,
"ce70_net_profit_krw": _quantile(sorted_values, 0.30),
"ce90_net_profit_krw": _quantile(sorted_values, 0.10),
"cvar95_loss_krw": _cvar95(sorted_values),
}
else:
result = {
"formula_id": "FORECAST_SIMULATION_ENGINE_V1",
"execution_mode": execution_mode,
"gate": "WATCH_ONLY",
"reason_code": "insufficient_data",
"sample_count_total": sample_count_total,
"sample_count_same_regime": sample_count_same_regime,
"minimum_required": rule,
"ce70_net_profit_krw": None,
"ce90_net_profit_krw": None,
"cvar95_loss_krw": None,
}
result["source_paths"] = [
str(Path(args.backtest_contract)),
str(Path(args.distribution)),
str(Path(args.decision_packet)),
]
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+109
View File
@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""IMMUTABLE_DECISION_LEDGER_V1 — spec/formulas/domains/governance.yaml.
Append-only decision log. Refuses to append a duplicate decision_id and never
mutates an existing record. governance/todo/v8_9_p2_adoption_plan.yaml P2-C.
"""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_LEDGER = ROOT / "Temp" / "immutable_decision_ledger_v1.json"
DEFAULT_DECISION = ROOT / "Temp" / "portfolio_transition_optimizer_v1.json"
REQUIRED_FIELDS = [
"decision_id",
"engine_version",
"input_hash_bundle",
"execution_mode",
"candidate_ids",
]
def _load_ledger(path: Path) -> dict:
if not path.exists():
return {"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "records": []}
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict) and isinstance(data.get("records"), list):
return data
return {"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "records": []}
except Exception:
return {"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "records": []}
def append_decision(ledger: dict, decision: dict) -> tuple[dict, str]:
missing = [f for f in REQUIRED_FIELDS if decision.get(f) is None]
if missing:
return ledger, "REJECTED_MISSING_FIELDS"
decision_id = decision["decision_id"]
existing_ids = {r["decision_id"] for r in ledger["records"]}
if decision_id in existing_ids:
return ledger, "DUPLICATE_DECISION_ID"
record = {
"decision_id": decision_id,
"timestamp": decision.get("timestamp") or datetime.now(timezone.utc).isoformat(),
"engine_version": decision["engine_version"],
"input_hash_bundle": decision["input_hash_bundle"],
"execution_mode": decision["execution_mode"],
"candidate_ids": decision["candidate_ids"],
"selected_transition_id": decision.get("selected_transition_id"),
"hard_blocks": decision.get("hard_blocks", []),
"transition_utility_krw": decision.get("transition_utility_krw"),
"operator_override": decision.get("operator_override", False),
"order_ids": decision.get("order_ids", []),
"fill_prices": decision.get("fill_prices", []),
"slippage": decision.get("slippage"),
"T1_return": None,
"T5_return": None,
"T20_return": None,
"MAE": None,
"MFE": None,
}
new_ledger = {**ledger, "records": ledger["records"] + [record]}
return new_ledger, "APPENDED"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--ledger", default=str(DEFAULT_LEDGER))
ap.add_argument("--decision", default=str(DEFAULT_DECISION))
args = ap.parse_args()
ledger = _load_ledger(Path(args.ledger))
decision_doc = {}
decision_path = Path(args.decision)
if decision_path.exists():
try:
decision_doc = json.loads(decision_path.read_text(encoding="utf-8"))
except Exception:
decision_doc = {}
decision = {
"decision_id": decision_doc.get("packet_id") or decision_doc.get("formula_id"),
"engine_version": "PORTFOLIO_TRANSITION_UTILITY_V1",
"input_hash_bundle": decision_doc.get("input_hash") or "unknown",
"execution_mode": decision_doc.get("execution_mode") or decision_doc.get("final_action"),
"candidate_ids": [c.get("candidate_id") for c in decision_doc.get("candidate_actions", [])],
"selected_transition_id": (decision_doc.get("selected_transition") or {}).get("candidate_id"),
"hard_blocks": decision_doc.get("reason_codes", []),
"transition_utility_krw": (decision_doc.get("selected_transition") or {}).get("transition_utility_krw"),
}
new_ledger, status = append_decision(ledger, decision)
new_ledger["status"] = status
out = Path(args.ledger)
if status == "APPENDED":
out.write_text(json.dumps(new_ledger, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({"formula_id": "IMMUTABLE_DECISION_LEDGER_V1", "ledger_append_status": status}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""MODEL_GOVERNANCE_KILL_SWITCH_V1 — spec/formulas/domains/governance.yaml.
Evaluates the 5 v8.9 kill-switch conditions and demotes execution_mode by exactly
one rung on the promotion ladder when any condition fires. No automatic promotion —
promotion requires an operator_override record (v8.9 V89_039).
governance/todo/v8_9_p1_adoption_plan.yaml P1-C.2.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_METRICS = ROOT / "Temp" / "model_governance_metrics_v1.json"
DEFAULT_DECISION_PACKET = ROOT / "Temp" / "final_decision_packet_active.json"
DEFAULT_OUT = ROOT / "Temp" / "model_governance_kill_switch_v1.json"
PROMOTION_LADDER = ["AUDIT_ONLY", "SHADOW", "PILOT", "LIVE_LIMITED", "LIVE_FULL"]
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def evaluate_kill_switches(metrics: dict) -> list[str]:
triggered = []
quarantine = metrics.get("data_quarantine_rate_pct")
if quarantine is not None and quarantine > 5.0:
triggered.append("data_quarantine_rate_above_5pct")
shortfall = metrics.get("implementation_shortfall_ratio")
if shortfall is not None and shortfall > 2.0:
triggered.append("implementation_shortfall_above_2x_expected")
t5_hit_rate = metrics.get("t5_hit_rate_pct")
t5_sample_count = metrics.get("t5_sample_count") or 0
if t5_hit_rate is not None and t5_sample_count >= 30 and t5_hit_rate < 50.0:
triggered.append("t5_hit_rate_below_50pct_for_30_trades")
calibration_error = metrics.get("calibration_error")
calibration_limit = metrics.get("calibration_error_limit")
if calibration_error is not None and calibration_limit is not None and calibration_error > calibration_limit:
triggered.append("calibration_error_above_limit")
mdd = metrics.get("account_mdd_pct")
mdd_budget = metrics.get("account_mdd_budget_pct")
if mdd is not None and mdd_budget is not None and mdd > mdd_budget:
triggered.append("unexpected_drawdown_breach")
return triggered
def demote_one_rung(current_mode: str) -> str:
if current_mode not in PROMOTION_LADDER:
return "AUDIT_ONLY"
idx = PROMOTION_LADDER.index(current_mode)
return PROMOTION_LADDER[max(idx - 1, 0)]
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--metrics", default=str(DEFAULT_METRICS))
ap.add_argument("--decision-packet", default=str(DEFAULT_DECISION_PACKET))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
metrics = _load(Path(args.metrics))
decision_packet = _load(Path(args.decision_packet))
current_mode = decision_packet.get("execution_mode") or decision_packet.get("global_execution_gate") or "AUDIT_ONLY"
if not metrics:
result = {
"formula_id": "MODEL_GOVERNANCE_KILL_SWITCH_V1",
"gate": "DATA_MISSING",
"execution_mode": current_mode,
"execution_mode_changed": False,
"kill_switch_triggered": False,
"kill_switch_reason_codes": [],
"source_paths": [str(Path(args.metrics)), str(Path(args.decision_packet))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
reason_codes = evaluate_kill_switches(metrics)
if reason_codes:
new_mode = demote_one_rung(current_mode)
else:
new_mode = current_mode
result = {
"formula_id": "MODEL_GOVERNANCE_KILL_SWITCH_V1",
"gate": "KILL_SWITCH_TRIGGERED" if reason_codes else "PASS",
"execution_mode": new_mode,
"execution_mode_changed": new_mode != current_mode,
"kill_switch_triggered": bool(reason_codes),
"kill_switch_reason_codes": reason_codes,
"source_paths": [str(Path(args.metrics)), str(Path(args.decision_packet))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,209 @@
#!/usr/bin/env python3
"""PORTFOLIO_TRANSITION_UTILITY_V1 — spec/formulas/domains/portfolio.yaml.
Compiles candidate sell actions (from sell_waterfall_engine_v3/v4) and cash-repair
benefit (from smart_cash_recovery) plus a CE70 distribution input
(forecast_simulation_engine_v1, optional — may not exist yet while T+20 sample < 30)
into a single portfolio-level transition_utility_krw and a deterministic
selected_transition or NO_TRADE.
Hard rule (AGENTS.md): missing required numeric input is never treated as zero.
If ce70_net_profit_krw is unavailable for every candidate, the optimizer still runs
on the cash/concentration-only benefit terms but cannot select a BUY-type transition.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DECISION_PACKET = ROOT / "Temp" / "final_decision_packet_active.json"
DEFAULT_SELL_WATERFALL = ROOT / "Temp" / "sell_waterfall_engine_v3.json"
DEFAULT_CASH_RECOVERY = ROOT / "Temp" / "smart_cash_recovery_v9.json"
DEFAULT_SIMULATION = ROOT / "Temp" / "forecast_simulation_engine_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "portfolio_transition_optimizer_v1.json"
HARD_VETO_ORDER = [
"DATA_INVALID",
"EXECUTION_MODE_BLOCK",
"CASH_FLOOR_BLOCK",
"HARD_CONCENTRATION_BLOCK",
"NEGATIVE_TRANSITION_UTILITY",
]
LIVE_ORDER_MODES = {"LIVE_LIMITED", "LIVE_FULL"}
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _candidate_actions_from_sell_waterfall(sell_waterfall: dict) -> list[dict]:
rows = sell_waterfall.get("rows") if isinstance(sell_waterfall.get("rows"), list) else []
candidates = []
for idx, row in enumerate(rows):
if not isinstance(row, dict):
continue
candidates.append(
{
"candidate_id": row.get("candidate_id") or f"SELL_{idx}",
"asset_id": row.get("종목명") or row.get("asset_id") or "UNKNOWN",
"action_type": row.get("매도유형") or row.get("action_type") or "SELL_CASH_REPAIR",
"planned_amount_krw": row.get("예상순현금") or row.get("planned_amount_krw"),
"source_signal_ids": ["SELL_WATERFALL_ENGINE_V3"],
"numeric_provenance_status": "PASS" if row.get("예상순현금") is not None else "DATA_MISSING",
}
)
return candidates
def _hard_constraint_pass(candidate: dict, decision_packet: dict) -> tuple[bool, str | None]:
if candidate.get("numeric_provenance_status") != "PASS":
return False, "DATA_INVALID"
execution_mode = decision_packet.get("execution_mode") or decision_packet.get("global_execution_gate")
if execution_mode in (None, "DATA_MISSING"):
return False, "EXECUTION_MODE_BLOCK"
return True, None
def _transition_utility_krw(
candidate: dict,
ce70_net_profit_krw: float | None,
tax_fee_slippage_krw: float,
cash_repair_benefit_krw: float,
concentration_reduction_benefit_krw: float,
turnover_penalty_krw: float,
) -> float | None:
is_sell = str(candidate.get("action_type", "")).startswith("SELL")
if is_sell:
planned = candidate.get("planned_amount_krw") or 0.0
return (
float(planned)
+ cash_repair_benefit_krw
+ concentration_reduction_benefit_krw
- tax_fee_slippage_krw
- turnover_penalty_krw
)
if ce70_net_profit_krw is None:
return None
return (
ce70_net_profit_krw
- tax_fee_slippage_krw
+ cash_repair_benefit_krw
+ concentration_reduction_benefit_krw
- turnover_penalty_krw
)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--decision-packet", default=str(DEFAULT_DECISION_PACKET))
ap.add_argument("--sell-waterfall", default=str(DEFAULT_SELL_WATERFALL))
ap.add_argument("--cash-recovery", default=str(DEFAULT_CASH_RECOVERY))
ap.add_argument("--simulation", default=str(DEFAULT_SIMULATION))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
decision_packet = _load(Path(args.decision_packet))
sell_waterfall = _load(Path(args.sell_waterfall))
cash_recovery = _load(Path(args.cash_recovery))
simulation = _load(Path(args.simulation))
source_paths = [
str(Path(args.decision_packet)),
str(Path(args.sell_waterfall)),
str(Path(args.cash_recovery)),
str(Path(args.simulation)),
]
if not decision_packet:
result = {
"formula_id": "PORTFOLIO_TRANSITION_UTILITY_V1",
"gate": "NO_TRADE_AND_QUARANTINE",
"final_action": "NO_TRADE",
"reason_codes": ["missing_optimizer_inputs"],
"selected_transition": None,
"candidate_actions": [],
"source_paths": source_paths,
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
cash_repair_benefit_krw = float(cash_recovery.get("value_damage_pct_avg") or cash_recovery.get("cash_repair_benefit_krw") or 0.0)
tax_fee_slippage_krw = float(sell_waterfall.get("tax_fee_slippage_krw") or 0.0)
concentration_reduction_benefit_krw = 0.0
turnover_penalty_krw = 0.0
ce70_net_profit_krw = simulation.get("ce70_net_profit_krw")
if isinstance(ce70_net_profit_krw, str):
ce70_net_profit_krw = None
candidates = _candidate_actions_from_sell_waterfall(sell_waterfall)
evaluated = []
best = None
for candidate in candidates:
ok, veto_reason = _hard_constraint_pass(candidate, decision_packet)
utility = (
_transition_utility_krw(
candidate,
ce70_net_profit_krw,
tax_fee_slippage_krw,
cash_repair_benefit_krw,
concentration_reduction_benefit_krw,
turnover_penalty_krw,
)
if ok
else None
)
if ok and utility is not None and utility <= 0:
ok = False
veto_reason = "NEGATIVE_TRANSITION_UTILITY"
row = {
**candidate,
"hard_constraint_pass": ok,
"veto_reason": veto_reason,
"transition_utility_krw": utility,
}
evaluated.append(row)
if ok and (best is None or (utility or 0) > (best["transition_utility_krw"] or 0)):
best = row
if best is None:
final_action = "NO_TRADE"
reason_codes = ["NO_TRADE_BAND"] if candidates else ["missing_optimizer_inputs"]
selected_transition = None
else:
execution_mode = decision_packet.get("execution_mode") or decision_packet.get("global_execution_gate")
if execution_mode in LIVE_ORDER_MODES:
final_action = "LIVE_ORDER_REVIEW"
else:
final_action = "SHADOW_LEDGER_ONLY"
reason_codes = ["TRANSITION_UTILITY_POSITIVE"]
selected_transition = best
result = {
"formula_id": "PORTFOLIO_TRANSITION_UTILITY_V1",
"default_action": "NO_TRADE",
"final_action": final_action,
"hard_veto_order": HARD_VETO_ORDER,
"reason_codes": reason_codes,
"candidate_actions": evaluated,
"selected_transition": selected_transition,
"source_paths": source_paths,
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+90
View File
@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""REBALANCE_CADENCE_GATE_V1 — spec/formulas/domains/portfolio.yaml.
Mandatory weekly (Sat/Sun) and monthly (1/11/21) cadence checks always emit a
review, but actual rebalancing execution is allowed only when transition utility
after tax cost is positive or a hard risk block is active.
governance/todo/v8_9_p3_adoption_plan.yaml P3-D.
"""
from __future__ import annotations
import argparse
import json
from datetime import date
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_OUT = ROOT / "Temp" / "rebalance_cadence_gate_v1.json"
WEEKLY_WEEKDAYS = {5, 6} # Saturday=5, Sunday=6 (date.weekday())
MONTHLY_MID_CHECK_DAYS = {1, 11, 21}
def cadence_check_required(check_date: date, event_driven_trigger: bool = False) -> tuple[bool, str | None]:
if check_date.weekday() in WEEKLY_WEEKDAYS:
return True, "weekly_rebalance_required"
if check_date.day in MONTHLY_MID_CHECK_DAYS:
return True, "mid_check_required"
if event_driven_trigger:
return True, "event_driven_trigger"
return False, None
def evaluate_rebalance_gate(
check_date: date,
transition_utility_after_tax_cost_krw: float | None,
hard_risk_block_active: bool | None,
event_driven_trigger: bool = False,
) -> dict:
required, reason = cadence_check_required(check_date, event_driven_trigger)
if not required:
return {
"cadence_check_required": False,
"review_emitted": False,
"rebalance_execution_allowed": False,
"cadence_trigger_reason": None,
}
if transition_utility_after_tax_cost_krw is None and hard_risk_block_active is None:
return {
"cadence_check_required": True,
"review_emitted": True,
"rebalance_execution_allowed": False,
"cadence_trigger_reason": reason,
"gate": "DATA_MISSING",
}
allowed = bool(hard_risk_block_active) or (
transition_utility_after_tax_cost_krw is not None and transition_utility_after_tax_cost_krw > 0
)
return {
"cadence_check_required": True,
"review_emitted": True,
"rebalance_execution_allowed": allowed,
"cadence_trigger_reason": reason,
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--date", default=None, help="YYYY-MM-DD, default today")
ap.add_argument("--transition-utility", type=float, default=None)
ap.add_argument("--hard-risk-block", action="store_true")
ap.add_argument("--event-driven-trigger", action="store_true")
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
check_date = date.fromisoformat(args.date) if args.date else date.today()
result = {
"formula_id": "REBALANCE_CADENCE_GATE_V1",
**evaluate_rebalance_gate(check_date, args.transition_utility, args.hard_risk_block, args.event_driven_trigger),
"check_date": check_date.isoformat(),
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+112
View File
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""SCENARIO_SHOCK_MATRIX_V1 — spec/formulas/domains/simulation.yaml.
Applies 5 deterministic stress multipliers to a base net-profit distribution to
produce per-scenario CE70/CVaR95. governance/todo/v8_9_p2_adoption_plan.yaml P2-A.
Hard rule (AGENTS.md): no base distribution -> all scenarios DATA_MISSING. Never
fabricate a shocked distribution from nothing.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_SIMULATION = ROOT / "Temp" / "forecast_simulation_engine_v1.json"
DEFAULT_DISTRIBUTION = ROOT / "Temp" / "net_profit_distribution_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "scenario_shock_matrix_v1.json"
SCENARIO_DEFINITIONS = {
"base_case": {"shock_multiplier": 1.0},
"adverse_case": {"shock_multiplier": 1.5},
"liquidity_drought_case": {"shock_multiplier": 1.3, "capacity_derate_pct": 40},
"crisis_case": {"shock_multiplier": 2.0, "correlation_to_one": True},
"fx_shock_case": {"shock_multiplier": 1.2, "applies_only_to": "foreign_assets"},
"tax_cost_case": {"shock_multiplier": 1.0, "additional_cost_pct": 5},
}
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _quantile(sorted_values: list[float], q: float) -> float:
if len(sorted_values) == 1:
return sorted_values[0]
pos = q * (len(sorted_values) - 1)
lower = int(pos)
upper = min(lower + 1, len(sorted_values) - 1)
frac = pos - lower
return sorted_values[lower] + (sorted_values[upper] - sorted_values[lower]) * frac
def _cvar95(sorted_values: list[float]) -> float:
threshold_idx = max(1, int(len(sorted_values) * 0.05))
tail = sorted_values[:threshold_idx]
return sum(tail) / len(tail)
def apply_shock(distribution: list[float], scenario_id: str) -> list[float]:
scenario = SCENARIO_DEFINITIONS[scenario_id]
multiplier = scenario["shock_multiplier"]
additional_cost_pct = scenario.get("additional_cost_pct", 0)
shocked = []
for v in distribution:
value = v * multiplier if v < 0 else v / multiplier
if additional_cost_pct:
value -= abs(value) * (additional_cost_pct / 100.0)
shocked.append(value)
return shocked
def evaluate_scenario(distribution: list[float] | None, scenario_id: str) -> dict:
if not distribution:
return {
"scenario_id": scenario_id,
"scenario_ce70_krw": None,
"scenario_cvar95_krw": None,
"gate": "DATA_MISSING",
}
shocked = sorted(apply_shock(distribution, scenario_id))
return {
"scenario_id": scenario_id,
"scenario_ce70_krw": _quantile(shocked, 0.30),
"scenario_cvar95_krw": _cvar95(shocked),
"gate": "PASS",
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--distribution", default=str(DEFAULT_DISTRIBUTION))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
doc = _load(Path(args.distribution))
distribution = doc.get("net_profit_distribution_after_tax_fee_slippage")
distribution = [float(v) for v in distribution] if isinstance(distribution, list) and distribution else None
scenario_results = [evaluate_scenario(distribution, scenario_id) for scenario_id in SCENARIO_DEFINITIONS]
result = {
"formula_id": "SCENARIO_SHOCK_MATRIX_V1",
"gate": "PASS" if distribution else "DATA_MISSING",
"scenario_results": scenario_results,
"source_paths": [str(Path(args.distribution))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+149
View File
@@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""SECTOR_EXPOSURE_GRAPH_V1 + LEADER_LIFECYCLE_GATE_V1 — spec/formulas/domains/sector.yaml.
ETF lookthrough exposure + factor beta residualization + leader role promotion/demotion.
governance/todo/v8_9_p1_adoption_plan.yaml P1-A.3.
Hard rule (AGENTS.md): missing ETF constituents or peer betas are never assumed zero.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_POSITIONS = ROOT / "Temp" / "sector_exposure_positions_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "sector_exposure_graph_v1.json"
PROMOTION_PATH = ["LAGGARD", "CYCLICAL_BETA", "ENABLER", "CORE_LEADER", "CAPTAIN"]
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def sector_exposure(position: dict) -> dict:
direct_weight_pct = float(position.get("direct_weight_pct") or 0.0)
etf_constituents = position.get("etf_constituents_json")
etf_weight_pct = position.get("etf_weight_pct")
sector_id = position.get("sector_id")
if etf_constituents is None or etf_weight_pct is None:
return {
"sector_id": sector_id,
"direct_weight_pct": direct_weight_pct,
"lookthrough_etf_weight_pct": None,
"sector_family_total_pct": None,
"gate": "ETF_BUY_BLOCKED",
"reason_code": "constituents_missing",
}
lookthrough = sum(
float(c.get("weight_pct", 0.0)) * float(etf_weight_pct) / 100.0
for c in etf_constituents
if isinstance(c, dict) and c.get("sector_id") == sector_id
)
sector_family_total_pct = direct_weight_pct + lookthrough
return {
"sector_id": sector_id,
"direct_weight_pct": direct_weight_pct,
"lookthrough_etf_weight_pct": lookthrough,
"sector_family_total_pct": sector_family_total_pct,
"gate": "PASS",
}
def residualize_factor_beta(factor_beta_raw: float, peer_sector_betas: list | None) -> tuple[float | None, str]:
if peer_sector_betas is None:
return factor_beta_raw, "PARTIAL_raw_beta_peer_data_missing"
shared_variance = sum(float(p.get("shared_variance", 0.0)) for p in peer_sector_betas if isinstance(p, dict))
return factor_beta_raw - shared_variance, "PASS"
def evaluate_leader_role(position: dict) -> dict:
required_fields = [
"relative_strength_leads_sector",
"volume_quality_confirmed",
"above_ma60_or_reclaim_confirmed",
"earnings_revision_status",
"institutional_flow_status",
]
current_role = position.get("current_role") or "LAGGARD"
if any(position.get(f) is None for f in required_fields):
return {"leader_role": current_role, "role_transition_reason": "DATA_MISSING", "role_changed": False}
demotion = (
(position["above_ma60_or_reclaim_confirmed"] is False and position["institutional_flow_status"] == "distribution")
or position["earnings_revision_status"] == "negative"
or (
position["institutional_flow_status"] == "distribution"
and current_role in ("CAPTAIN", "CORE_LEADER")
)
)
if demotion:
return {
"leader_role": "DISTRIBUTION_RISK",
"role_transition_reason": "demotion_trigger",
"role_changed": current_role != "DISTRIBUTION_RISK",
}
promotion_ok = (
position["relative_strength_leads_sector"] is True
and position["volume_quality_confirmed"] is True
and position["above_ma60_or_reclaim_confirmed"] is True
and position["earnings_revision_status"] != "negative"
and position["institutional_flow_status"] != "distribution"
)
if promotion_ok and current_role in PROMOTION_PATH:
idx = PROMOTION_PATH.index(current_role)
next_role = PROMOTION_PATH[min(idx + 1, len(PROMOTION_PATH) - 1)]
return {
"leader_role": next_role,
"role_transition_reason": "promotion_requires_all_satisfied",
"role_changed": next_role != current_role,
}
return {"leader_role": current_role, "role_transition_reason": "no_change", "role_changed": False}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--positions", default=str(DEFAULT_POSITIONS))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
doc = _load(Path(args.positions))
positions = doc.get("positions") if isinstance(doc.get("positions"), list) else []
rows = []
for position in positions:
if not isinstance(position, dict):
continue
exposure = sector_exposure(position)
leader = evaluate_leader_role(position)
beta_residualized, beta_status = residualize_factor_beta(
float(position.get("factor_beta_raw") or 0.0), position.get("peer_sector_betas")
)
rows.append({**exposure, **leader, "factor_beta_residualized": beta_residualized, "beta_status": beta_status})
result = {
"formula_id": "SECTOR_EXPOSURE_GRAPH_V1",
"gate": "PASS" if rows else "DATA_MISSING",
"rows": rows,
"source_paths": [str(Path(args.positions))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+154
View File
@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""SELL_LOT_PARETO_SELECTOR_V1 — spec/formulas/domains/cash.yaml.
Extends tools/build_sell_waterfall_engine_v3.py output with lot-level scoring
(tax_loss_benefit, missed_upside_penalty, reentry_cost) and a Pareto dominance
ranking within each hard_precedence stage, per
governance/todo/v8_9_p0_adoption_plan.yaml P0-2.2.
Backward compatible: every row from v3 is preserved unchanged; only new fields
are appended.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_V3 = ROOT / "Temp" / "sell_waterfall_engine_v3.json"
DEFAULT_OUT = ROOT / "Temp" / "sell_waterfall_engine_v4.json"
MAXIMIZE_FIELDS = [
"avoided_tail_loss_krw",
"cash_repair_benefit_krw",
"concentration_reduction_benefit_krw",
"tax_loss_benefit_krw",
]
MINIMIZE_FIELDS = [
"tax_fee_slippage_krw",
"reentry_cost_krw",
"missed_upside_penalty_krw",
]
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _numeric_or_missing(row: dict, field: str) -> tuple[float, bool]:
value = row.get(field)
if value is None:
return 0.0, True
try:
return float(value), False
except (TypeError, ValueError):
return 0.0, True
def _lot_sell_score(row: dict) -> tuple[float, list[str]]:
missing_fields = []
values = {}
for field in MAXIMIZE_FIELDS + MINIMIZE_FIELDS:
value, missing = _numeric_or_missing(row, field)
values[field] = value
if missing:
missing_fields.append(field)
score = (
values["avoided_tail_loss_krw"]
+ values["cash_repair_benefit_krw"]
+ values["concentration_reduction_benefit_krw"]
+ values["tax_loss_benefit_krw"]
- values["tax_fee_slippage_krw"]
- values["reentry_cost_krw"]
- values["missed_upside_penalty_krw"]
)
return score, missing_fields
def _dominates(a: dict, b: dict) -> bool:
at_least_as_good = all(a.get(f, 0.0) >= b.get(f, 0.0) for f in MAXIMIZE_FIELDS) and all(
a.get(f, 0.0) <= b.get(f, 0.0) for f in MINIMIZE_FIELDS
)
strictly_better = any(a.get(f, 0.0) > b.get(f, 0.0) for f in MAXIMIZE_FIELDS) or any(
a.get(f, 0.0) < b.get(f, 0.0) for f in MINIMIZE_FIELDS
)
return at_least_as_good and strictly_better
def _rank_pareto_group(rows: list[dict]) -> list[dict]:
annotated = []
for row in rows:
dominated_by = [
other["candidate_id"]
for other in rows
if other is not row and _dominates(other, row)
]
annotated.append({**row, "pareto_dominated": bool(dominated_by), "dominated_by": dominated_by})
annotated.sort(
key=lambda r: (
r["pareto_dominated"],
-r["lot_sell_score_krw"],
r.get("tax_fee_slippage_krw", 0.0),
r.get("reentry_cost_krw", 0.0),
)
)
for idx, row in enumerate(annotated, start=1):
row["pareto_rank"] = idx
return annotated
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--base", default=str(DEFAULT_V3))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
base = _load(Path(args.base))
rows = base.get("rows") if isinstance(base.get("rows"), list) else []
scored_rows = []
for idx, row in enumerate(rows):
if not isinstance(row, dict):
continue
candidate_id = row.get("candidate_id") or row.get("종목명") or f"LOT_{idx}"
score, missing_fields = _lot_sell_score(row)
scored_rows.append(
{
**row,
"candidate_id": candidate_id,
**{f: _numeric_or_missing(row, f)[0] for f in MAXIMIZE_FIELDS + MINIMIZE_FIELDS},
"lot_sell_score_krw": score,
"lot_sell_score_missing_fields": missing_fields,
"hard_precedence_stage": row.get("hard_precedence_stage") or row.get("우선순위단계"),
}
)
groups: dict[object, list[dict]] = {}
for row in scored_rows:
groups.setdefault(row.get("hard_precedence_stage"), []).append(row)
ranked_rows: list[dict] = []
for _stage, group_rows in groups.items():
ranked_rows.extend(_rank_pareto_group(group_rows))
result = {
"formula_id": "SELL_LOT_PARETO_SELECTOR_V1",
"gate": base.get("gate") or "PASS",
"rows": ranked_rows,
"source_paths": [str(Path(args.base))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""STATE_VECTOR_CONSTRUCTOR_V1 — spec/formulas/domains/portfolio.yaml.
Merges holdings/cash/tax_lots/sector_graph/factor_exposures/macro_regime_probabilities
into one state_vector. Missing components stay null -- never backfilled from another
component. governance/todo/v8_9_p3_adoption_plan.yaml P3-A.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CASH_LADDER = ROOT / "Temp" / "cash_ratios_v1.json"
DEFAULT_POSITIONS = ROOT / "Temp" / "account_snapshot_v1.json"
DEFAULT_SECTOR_GRAPH = ROOT / "Temp" / "sector_exposure_graph_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "state_vector_constructor_v1.json"
COMPONENT_KEYS = [
"cash_ladder",
"positions",
"sector_exposure_graph",
"factor_exposures",
"tax_lots",
"risk_bucket_weights",
"macro_regime_probabilities",
"goal_progress_pct",
]
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def construct_state_vector(components: dict) -> dict:
state_vector = {}
missing_components = []
for key in COMPONENT_KEYS:
value = components.get(key)
state_vector[key] = value
if value is None:
missing_components.append(key)
completeness_pct = 100.0 * (len(COMPONENT_KEYS) - len(missing_components)) / len(COMPONENT_KEYS)
return {
"state_vector": state_vector,
"state_vector_completeness_pct": completeness_pct,
"missing_components": missing_components,
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--cash-ladder", default=str(DEFAULT_CASH_LADDER))
ap.add_argument("--positions", default=str(DEFAULT_POSITIONS))
ap.add_argument("--sector-graph", default=str(DEFAULT_SECTOR_GRAPH))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
cash_ladder_doc = _load(Path(args.cash_ladder))
positions_doc = _load(Path(args.positions))
sector_graph_doc = _load(Path(args.sector_graph))
components = {
"cash_ladder": cash_ladder_doc or None,
"positions": positions_doc.get("positions") if positions_doc else None,
"sector_exposure_graph": sector_graph_doc.get("rows") if sector_graph_doc.get("rows") else None,
"factor_exposures": None,
"tax_lots": positions_doc.get("tax_lots") if positions_doc else None,
"risk_bucket_weights": None,
"macro_regime_probabilities": None,
"goal_progress_pct": positions_doc.get("goal_progress_pct") if positions_doc else None,
}
result = {"formula_id": "STATE_VECTOR_CONSTRUCTOR_V1", **construct_state_vector(components)}
result["source_paths"] = [str(Path(args.cash_ladder)), str(Path(args.positions)), str(Path(args.sector_graph))]
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+153
View File
@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""TRANSITION_SET_ENUMERATOR_V1 — spec/formulas/domains/portfolio.yaml.
Evaluates combinations of already-vetoed candidates as a portfolio-level set,
so individually-passing candidates that jointly breach cash floor or
concentration caps are rejected. governance/todo/v8_9_p2_adoption_plan.yaml P2-B.
"""
from __future__ import annotations
import argparse
import itertools
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_TRANSITION_OPTIMIZER = ROOT / "Temp" / "portfolio_transition_optimizer_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "transition_set_enumerator_v1.json"
COMPLEXITY_PENALTY_RATE_KRW = 5000.0
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def post_trade_mrc(transition_set: tuple[dict, ...], portfolio_total_risk_budget: float) -> float | None:
if not portfolio_total_risk_budget:
return None
total = sum(c.get("marginal_risk_contribution", 0.0) for c in transition_set)
return total / portfolio_total_risk_budget
def post_trade_cvar95_krw(transition_set: tuple[dict, ...]) -> float | None:
values = [c.get("cvar95_loss_krw") for c in transition_set]
if any(v is None for v in values):
return None
return sum(values)
def set_hard_constraint_pass(
transition_set: tuple[dict, ...],
portfolio_total_risk_budget: float = 0.0,
mrc_cap: float = 1.0,
cvar95_budget_krw: float | None = None,
) -> bool:
post_trade_cash_floor_pct = sum(c.get("post_trade_cash_floor_delta_pct", 0.0) for c in transition_set)
post_trade_concentration_pct = sum(c.get("post_trade_concentration_delta_pct", 0.0) for c in transition_set)
if post_trade_cash_floor_pct < 0 or post_trade_concentration_pct > 0:
return False
mrc = post_trade_mrc(transition_set, portfolio_total_risk_budget)
if mrc is not None and mrc > mrc_cap:
return False
cvar95 = post_trade_cvar95_krw(transition_set)
if cvar95 is not None and cvar95_budget_krw is not None and cvar95 < -abs(cvar95_budget_krw):
return False
return True
def set_transition_utility_krw(transition_set: tuple[dict, ...]) -> float:
total = sum(c.get("transition_utility_krw") or 0.0 for c in transition_set)
combination_penalty = COMPLEXITY_PENALTY_RATE_KRW * (len(transition_set) - 1)
return total - combination_penalty
def enumerate_transition_sets(
candidates: list[dict],
max_set_size: int = 3,
portfolio_total_risk_budget: float = 0.0,
mrc_cap: float = 1.0,
cvar95_budget_krw: float | None = None,
) -> list[dict]:
passing = [c for c in candidates if c.get("hard_constraint_pass") is True]
evaluated_sets = []
for size in range(1, min(max_set_size, len(passing)) + 1):
for combo in itertools.combinations(passing, size):
evaluated_sets.append(
{
"candidate_ids": [c.get("candidate_id") for c in combo],
"set_hard_constraint_pass": set_hard_constraint_pass(
combo, portfolio_total_risk_budget, mrc_cap, cvar95_budget_krw
),
"set_transition_utility_krw": set_transition_utility_krw(combo),
"post_trade_mrc": post_trade_mrc(combo, portfolio_total_risk_budget),
"post_trade_cvar95_krw": post_trade_cvar95_krw(combo),
}
)
return evaluated_sets
def select_best_set(evaluated_sets: list[dict]) -> dict | None:
passing_sets = [s for s in evaluated_sets if s["set_hard_constraint_pass"]]
if not passing_sets:
return None
return max(
passing_sets,
key=lambda s: (s["set_transition_utility_krw"], -len(s["candidate_ids"])),
)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--transition-optimizer", default=str(DEFAULT_TRANSITION_OPTIMIZER))
ap.add_argument("--max-set-size", type=int, default=3)
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
doc = _load(Path(args.transition_optimizer))
candidates = doc.get("candidate_actions") if isinstance(doc.get("candidate_actions"), list) else []
if not candidates:
result = {
"formula_id": "TRANSITION_SET_ENUMERATOR_V1",
"gate": "NO_TRADE",
"selected_transition_set": [],
"rejected_sets_count": 0,
"source_paths": [str(Path(args.transition_optimizer))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
evaluated_sets = enumerate_transition_sets(candidates, args.max_set_size)
best = select_best_set(evaluated_sets)
rejected_count = len(evaluated_sets) - (1 if best else 0)
result = {
"formula_id": "TRANSITION_SET_ENUMERATOR_V1",
"gate": "PASS" if best else "NO_TRADE",
"selected_transition_set": best["candidate_ids"] if best else [],
"set_transition_utility_krw": best["set_transition_utility_krw"] if best else None,
"post_trade_mrc": best["post_trade_mrc"] if best else None,
"post_trade_cvar95_krw": best["post_trade_cvar95_krw"] if best else None,
"rejected_sets_count": rejected_count,
"source_paths": [str(Path(args.transition_optimizer))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+124
View File
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""WALK_FORWARD_BOOTSTRAP_V1 — spec/formulas/domains/simulation.yaml.
Generates net_profit_distribution_after_tax_fee_slippage from historical_returns via
walk-forward (non-overlapping in/out-of-sample split, block resample on out-of-sample
only) or regime-matched (filter + resample-with-replacement) bootstrapping.
governance/todo/v8_9_p3_adoption_plan.yaml P3-B.
Hard rule: no historical_returns or fewer than 2 samples -> DATA_MISSING. Never
interpolate or fabricate a distribution.
"""
from __future__ import annotations
import argparse
import json
import random
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_HISTORICAL_RETURNS = ROOT / "Temp" / "historical_returns_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "walk_forward_bootstrap_v1.json"
BLOCK_SIZE = 5
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def walk_forward_resample(historical_returns: list[dict], resample_count: int, rng: random.Random) -> list[float]:
sorted_returns = sorted(historical_returns, key=lambda r: r["date"])
split_idx = int(len(sorted_returns) * 0.7)
out_of_sample = sorted_returns[split_idx:]
if len(out_of_sample) < 2:
return []
values = [r["net_return_after_cost_pct"] for r in out_of_sample]
distribution = []
for _ in range(resample_count):
start = rng.randrange(0, max(1, len(values) - BLOCK_SIZE + 1))
block = values[start:start + BLOCK_SIZE]
distribution.append(sum(block) / len(block))
return distribution
def regime_matched_resample(
historical_returns: list[dict], current_regime_state: str, resample_count: int, rng: random.Random
) -> list[float]:
filtered = [r["net_return_after_cost_pct"] for r in historical_returns if r.get("regime_state") == current_regime_state]
if len(filtered) < 2:
return []
return [rng.choice(filtered) for _ in range(resample_count)]
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--historical-returns", default=str(DEFAULT_HISTORICAL_RETURNS))
ap.add_argument("--current-regime-state", default=None)
ap.add_argument("--bootstrap-method", default="walk_forward", choices=["walk_forward", "regime_matched"])
ap.add_argument("--resample-count", type=int, default=1000)
ap.add_argument("--out", default=str(DEFAULT_OUT))
ap.add_argument("--seed", type=int, default=None)
args = ap.parse_args()
doc = _load(Path(args.historical_returns))
historical_returns = doc.get("historical_returns") if isinstance(doc.get("historical_returns"), list) else None
if not historical_returns or len(historical_returns) < 2:
result = {
"formula_id": "WALK_FORWARD_BOOTSTRAP_V1",
"gate": "DATA_MISSING",
"net_profit_distribution_after_tax_fee_slippage": None,
"sample_count_total": len(historical_returns) if historical_returns else 0,
"sample_count_same_regime": 0,
"source_paths": [str(Path(args.historical_returns))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
rng = random.Random(args.seed)
if args.bootstrap_method == "walk_forward":
distribution = walk_forward_resample(historical_returns, args.resample_count, rng)
else:
distribution = regime_matched_resample(historical_returns, args.current_regime_state, args.resample_count, rng)
sample_count_same_regime = len(
[r for r in historical_returns if r.get("regime_state") == args.current_regime_state]
)
if not distribution:
result = {
"formula_id": "WALK_FORWARD_BOOTSTRAP_V1",
"gate": "DATA_MISSING",
"net_profit_distribution_after_tax_fee_slippage": None,
"sample_count_total": len(historical_returns),
"sample_count_same_regime": sample_count_same_regime,
"source_paths": [str(Path(args.historical_returns))],
}
else:
result = {
"formula_id": "WALK_FORWARD_BOOTSTRAP_V1",
"gate": "PASS",
"net_profit_distribution_after_tax_fee_slippage": distribution,
"sample_count_total": len(historical_returns),
"sample_count_same_regime": sample_count_same_regime,
"source_paths": [str(Path(args.historical_returns))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
"""WEEKLY_LEGACY_TRANSFER_PLAN_V1 — spec/formulas/domains/cash.yaml.
A weekly legacy-stock-to-CMA transfer plan is a planning input, not deployable
cash, until the deposit is actually confirmed. governance/todo/v8_9_p3_adoption_plan.yaml P3-E.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_OUT = ROOT / "Temp" / "weekly_legacy_transfer_plan_v1.json"
def evaluate_transfer_plan(
weekly_legacy_to_cma_transfer_plan_krw: float,
transfer_confirmed: bool | None,
transfer_confirmed_amount_krw: float | None,
) -> dict:
confirmed = bool(transfer_confirmed)
if not confirmed:
return {
"deployable_cash_contribution_krw": 0.0,
"plan_status": "PLANNED_NOT_DEPLOYABLE",
"planned_amount_krw": weekly_legacy_to_cma_transfer_plan_krw,
}
amount = transfer_confirmed_amount_krw if transfer_confirmed_amount_krw is not None else 0.0
return {
"deployable_cash_contribution_krw": amount,
"plan_status": "CONFIRMED_DEPLOYABLE",
"planned_amount_krw": weekly_legacy_to_cma_transfer_plan_krw,
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--planned-amount", type=float, default=4000000.0)
ap.add_argument("--transfer-confirmed", action="store_true")
ap.add_argument("--confirmed-amount", type=float, default=None)
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
result = {
"formula_id": "WEEKLY_LEGACY_TRANSFER_PLAN_V1",
**evaluate_transfer_plan(args.planned_amount, args.transfer_confirmed, args.confirmed_amount),
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())