Files
QuantEngineByItz/tools/build_execution_plan_compiler_v1.py
kjh2064 aedabdd37b feat(quant-engine): v8.9 제안서 P0-P3 로드맵 채택 — 15개 의사결정 엔진 신규 구현
suggest/quant_investment_engine_v8_9_portfolio_optimizer_canonical_refactored.yaml의
implementation_todo_v8_9(P0~P4) 전체를 spec/tool/golden case 레벨로 구현.

- P0: PORTFOLIO_TRANSITION_UTILITY_V1, SELL_LOT_PARETO_SELECTOR_V1, FORECAST_SIMULATION_ENGINE_V1
- P1: SECTOR_EXPOSURE_GRAPH_V1/LEADER_LIFECYCLE_GATE_V1, EXECUTION_CAPACITY_LADDER_V1, MODEL_GOVERNANCE_KILL_SWITCH_V1
- P2: SCENARIO_SHOCK_MATRIX_V1, TRANSITION_SET_ENUMERATOR_V1, IMMUTABLE_DECISION_LEDGER_V1, EXECUTION_PLAN_COMPILER_V1
- P3: STATE_VECTOR_CONSTRUCTOR_V1, WALK_FORWARD_BOOTSTRAP_V1, TRANSITION_SET_ENUMERATOR_V1(MRC/CVaR 확장),
      REBALANCE_CADENCE_GATE_V1, WEEKLY_LEGACY_TRANSFER_PLAN_V1

기존 regime/cluster 연동 정책 수치(현금방어선, 반도체 cap)는 그대로 유지하고 신규 cap 필드만 추가.
spec/09_decision_flow.yaml과 runtime/active_artifact_manifest.yaml에 전 엔진 배선 완료.
governance/todo/v8_9_p{0,1,2,3}_adoption_plan.yaml에 각 단계 작업 추적 기록.

검증: validate_specs/validate_golden_coverage_100(100%)/validate_calibration_registry_v1/
validate_schema_model_generation_v1/validate_agents_shrink_v1 전부 PASS. golden test 53/53 PASS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-18 00:06:52 +09:00

109 lines
4.0 KiB
Python

#!/usr/bin/env python3
"""EXECUTION_PLAN_COMPILER_V1 — spec/formulas/domains/execution.yaml.
Compiles order_capacity_krw into 30/30/40 LIMIT_SPLIT slices and revalidates
cash_floor/capacity/spread before each slice, cancelling the remainder when any
cancel_remaining_if condition fires. governance/todo/v8_9_p2_adoption_plan.yaml P2-D.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CAPACITY = ROOT / "Temp" / "execution_capacity_ladder_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "execution_plan_compiler_v1.json"
SLICE_PCTS = [0.30, 0.30, 0.40]
def _load(path: Path) -> dict:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def should_cancel_remaining(revalidation_snapshot: dict, baseline_snapshot: dict, required_cash_pct: float) -> str | None:
if revalidation_snapshot.get("spread_bps") is not None and baseline_snapshot.get("spread_bps") is not None:
if revalidation_snapshot["spread_bps"] > baseline_snapshot["spread_bps"] * 1.5:
return "spread_widens_beyond_limit"
if revalidation_snapshot.get("cash_floor_pct") is not None:
if revalidation_snapshot["cash_floor_pct"] < required_cash_pct:
return "cash_floor_after_fill_breached"
if revalidation_snapshot.get("order_capacity_krw") is not None and baseline_snapshot.get("order_capacity_krw") is not None:
if revalidation_snapshot["order_capacity_krw"] < baseline_snapshot["order_capacity_krw"] * 0.5:
return "orderbook_capacity_collapses"
return None
def compile_slices(
order_capacity_krw: float,
baseline_snapshot: dict,
revalidation_snapshots: list[dict],
required_cash_pct: float = 0.0,
) -> list[dict]:
slices = []
cancelled = False
for idx, pct in enumerate(SLICE_PCTS, start=1):
if cancelled:
slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED"})
continue
snapshot = revalidation_snapshots[idx - 1] if idx - 1 < len(revalidation_snapshots) else baseline_snapshot
cancel_reason = should_cancel_remaining(snapshot, baseline_snapshot, required_cash_pct)
if cancel_reason:
slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED", "reason_code": cancel_reason})
cancelled = True
continue
slices.append({"slice_index": idx, "slice_amount_krw": order_capacity_krw * pct, "status": "COMPILED"})
return slices
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--capacity", default=str(DEFAULT_CAPACITY))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
doc = _load(Path(args.capacity))
rows = doc.get("rows") if isinstance(doc.get("rows"), list) else []
results = []
for row in rows:
order_capacity_krw = row.get("order_capacity_krw")
if order_capacity_krw is None:
results.append({"gate": "EXECUTION_PLAN_BLOCKED", "compiled_slices": []})
continue
baseline_snapshot = {
"spread_bps": row.get("spread_bps"),
"order_capacity_krw": order_capacity_krw,
"cash_floor_pct": row.get("cash_floor_pct"),
}
compiled = compile_slices(order_capacity_krw, baseline_snapshot, revalidation_snapshots=[baseline_snapshot] * 3)
results.append({"gate": "PASS", "compiled_slices": compiled})
result = {
"formula_id": "EXECUTION_PLAN_COMPILER_V1",
"gate": "PASS" if results else "DATA_MISSING",
"results": results,
"source_paths": [str(Path(args.capacity))],
}
out = Path(args.out)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())