aedabdd37b
suggest/quant_investment_engine_v8_9_portfolio_optimizer_canonical_refactored.yaml의
implementation_todo_v8_9(P0~P4) 전체를 spec/tool/golden case 레벨로 구현.
- P0: PORTFOLIO_TRANSITION_UTILITY_V1, SELL_LOT_PARETO_SELECTOR_V1, FORECAST_SIMULATION_ENGINE_V1
- P1: SECTOR_EXPOSURE_GRAPH_V1/LEADER_LIFECYCLE_GATE_V1, EXECUTION_CAPACITY_LADDER_V1, MODEL_GOVERNANCE_KILL_SWITCH_V1
- P2: SCENARIO_SHOCK_MATRIX_V1, TRANSITION_SET_ENUMERATOR_V1, IMMUTABLE_DECISION_LEDGER_V1, EXECUTION_PLAN_COMPILER_V1
- P3: STATE_VECTOR_CONSTRUCTOR_V1, WALK_FORWARD_BOOTSTRAP_V1, TRANSITION_SET_ENUMERATOR_V1(MRC/CVaR 확장),
REBALANCE_CADENCE_GATE_V1, WEEKLY_LEGACY_TRANSFER_PLAN_V1
기존 regime/cluster 연동 정책 수치(현금방어선, 반도체 cap)는 그대로 유지하고 신규 cap 필드만 추가.
spec/09_decision_flow.yaml과 runtime/active_artifact_manifest.yaml에 전 엔진 배선 완료.
governance/todo/v8_9_p{0,1,2,3}_adoption_plan.yaml에 각 단계 작업 추적 기록.
검증: validate_specs/validate_golden_coverage_100(100%)/validate_calibration_registry_v1/
validate_schema_model_generation_v1/validate_agents_shrink_v1 전부 PASS. golden test 53/53 PASS.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
109 lines
4.0 KiB
Python
109 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
"""EXECUTION_PLAN_COMPILER_V1 — spec/formulas/domains/execution.yaml.
|
|
|
|
Compiles order_capacity_krw into 30/30/40 LIMIT_SPLIT slices and revalidates
|
|
cash_floor/capacity/spread before each slice, cancelling the remainder when any
|
|
cancel_remaining_if condition fires. governance/todo/v8_9_p2_adoption_plan.yaml P2-D.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_CAPACITY = ROOT / "Temp" / "execution_capacity_ladder_v1.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "execution_plan_compiler_v1.json"
|
|
|
|
SLICE_PCTS = [0.30, 0.30, 0.40]
|
|
|
|
|
|
def _load(path: Path) -> dict:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
data = json.loads(path.read_text(encoding="utf-8"))
|
|
return data if isinstance(data, dict) else {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def should_cancel_remaining(revalidation_snapshot: dict, baseline_snapshot: dict, required_cash_pct: float) -> str | None:
|
|
if revalidation_snapshot.get("spread_bps") is not None and baseline_snapshot.get("spread_bps") is not None:
|
|
if revalidation_snapshot["spread_bps"] > baseline_snapshot["spread_bps"] * 1.5:
|
|
return "spread_widens_beyond_limit"
|
|
|
|
if revalidation_snapshot.get("cash_floor_pct") is not None:
|
|
if revalidation_snapshot["cash_floor_pct"] < required_cash_pct:
|
|
return "cash_floor_after_fill_breached"
|
|
|
|
if revalidation_snapshot.get("order_capacity_krw") is not None and baseline_snapshot.get("order_capacity_krw") is not None:
|
|
if revalidation_snapshot["order_capacity_krw"] < baseline_snapshot["order_capacity_krw"] * 0.5:
|
|
return "orderbook_capacity_collapses"
|
|
|
|
return None
|
|
|
|
|
|
def compile_slices(
|
|
order_capacity_krw: float,
|
|
baseline_snapshot: dict,
|
|
revalidation_snapshots: list[dict],
|
|
required_cash_pct: float = 0.0,
|
|
) -> list[dict]:
|
|
slices = []
|
|
cancelled = False
|
|
for idx, pct in enumerate(SLICE_PCTS, start=1):
|
|
if cancelled:
|
|
slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED"})
|
|
continue
|
|
|
|
snapshot = revalidation_snapshots[idx - 1] if idx - 1 < len(revalidation_snapshots) else baseline_snapshot
|
|
cancel_reason = should_cancel_remaining(snapshot, baseline_snapshot, required_cash_pct)
|
|
if cancel_reason:
|
|
slices.append({"slice_index": idx, "slice_amount_krw": None, "status": "CANCELLED", "reason_code": cancel_reason})
|
|
cancelled = True
|
|
continue
|
|
|
|
slices.append({"slice_index": idx, "slice_amount_krw": order_capacity_krw * pct, "status": "COMPILED"})
|
|
|
|
return slices
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--capacity", default=str(DEFAULT_CAPACITY))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
doc = _load(Path(args.capacity))
|
|
rows = doc.get("rows") if isinstance(doc.get("rows"), list) else []
|
|
|
|
results = []
|
|
for row in rows:
|
|
order_capacity_krw = row.get("order_capacity_krw")
|
|
if order_capacity_krw is None:
|
|
results.append({"gate": "EXECUTION_PLAN_BLOCKED", "compiled_slices": []})
|
|
continue
|
|
baseline_snapshot = {
|
|
"spread_bps": row.get("spread_bps"),
|
|
"order_capacity_krw": order_capacity_krw,
|
|
"cash_floor_pct": row.get("cash_floor_pct"),
|
|
}
|
|
compiled = compile_slices(order_capacity_krw, baseline_snapshot, revalidation_snapshots=[baseline_snapshot] * 3)
|
|
results.append({"gate": "PASS", "compiled_slices": compiled})
|
|
|
|
result = {
|
|
"formula_id": "EXECUTION_PLAN_COMPILER_V1",
|
|
"gate": "PASS" if results else "DATA_MISSING",
|
|
"results": results,
|
|
"source_paths": [str(Path(args.capacity))],
|
|
}
|
|
out = Path(args.out)
|
|
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|