Files
QuantEngineByItz/tools/validate_pipeline_runtime_contract.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

103 lines
3.3 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import yaml
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CONTRACT = ROOT / "spec" / "22_pipeline_runtime_contract.yaml"
DEFAULT_PROFILE = ROOT / "Temp" / "pipeline_runtime_profile_v1.json"
DEFAULT_GATE = ROOT / "Temp" / "engine_harness_gate_result.json"
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _load_yaml(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--contract", default=str(DEFAULT_CONTRACT))
ap.add_argument("--profile", default=str(DEFAULT_PROFILE))
ap.add_argument("--gate", default=str(DEFAULT_GATE))
args = ap.parse_args()
cp = Path(args.contract)
pp = Path(args.profile)
gp = Path(args.gate)
if not cp.is_absolute():
cp = ROOT / cp
if not pp.is_absolute():
pp = ROOT / pp
if not gp.is_absolute():
gp = ROOT / gp
contract = _load_yaml(cp).get("pipeline_runtime_contract", {})
profile = _load_json(pp)
gate = _load_json(gp)
mode = str(profile.get("mode") or "")
modes = contract.get("modes") if isinstance(contract.get("modes"), dict) else {}
mode_cfg = modes.get(mode) if isinstance(modes.get(mode), dict) else {}
max_target = float(mode_cfg.get("max_elapsed_sec_target") or 0.0)
elapsed = float(profile.get("elapsed_sec_total") or 0.0)
dup_removed = int(profile.get("duplicate_steps_removed_count") or 0)
steps = profile.get("steps") if isinstance(profile.get("steps"), list) else []
failed: list[str] = []
warnings: list[str] = []
if not mode_cfg:
failed.append("MODE_NOT_IN_CONTRACT")
if max_target > 0 and elapsed > max_target:
warnings.append(f"ELAPSED_OVER_TARGET:{elapsed}>{max_target}")
if str(gate.get("status") or "") != "OK":
failed.append("ENGINE_GATE_NOT_OK")
# failed_checks 중 warn_only가 아닌 hard-fail만 체크
all_checks = gate.get("checks") if isinstance(gate.get("checks"), list) else []
hard_fails = [
c.get("name") for c in all_checks
if isinstance(c, dict) and c.get("exit_code", 0) != 0 and not c.get("warn_only", False)
]
if hard_fails:
failed.append("ENGINE_GATE_HARD_FAIL_EXISTS")
if mode == "release" and dup_removed < 1:
warnings.append("DUPLICATE_STEPS_NOT_REMOVED")
if len(steps) == 0 and mode != "package-only":
failed.append("PROFILE_STEPS_EMPTY")
status = "FAIL" if failed else "OK"
result = {
"formula_id": "PIPELINE_RUNTIME_CONTRACT_VALIDATOR_V1",
"status": status,
"mode": mode,
"elapsed_sec_total": elapsed,
"max_elapsed_sec_target": max_target,
"failed": failed,
"warnings": warnings,
}
print(json.dumps(result, ensure_ascii=False, indent=2))
return 1 if failed else 0
if __name__ == "__main__":
raise SystemExit(main())