from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_OUT = ROOT / "Temp" / "root_cause_attribution_v1.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} data = json.loads(path.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() out_path = Path(args.out) if not out_path.is_absolute(): out_path = ROOT / out_path di = _load(ROOT / "Temp" / "data_integrity_score_v1.json") dv = _load(ROOT / "Temp" / "derivation_validity_score_v1.json") eg = _load(ROOT / "Temp" / "engine_harness_gate_result.json") failed = eg.get("failed_checks") if isinstance(eg.get("failed_checks"), list) else [] data_risk = max(0.0, 100.0 - float(di.get("score") or 0.0)) deriv_risk = max(0.0, 100.0 - float(dv.get("score") or 0.0)) decision_risk = 20.0 if any("BUY_BLOCK" in x or "LOCK" in x for x in failed) else 5.0 execution_risk = 20.0 if any("report_quality" in x for x in failed) else 5.0 total = data_risk + deriv_risk + decision_risk + execution_risk if total <= 0: total = 1.0 breakdown = { "data": round(data_risk / total * 100.0, 2), "derivation": round(deriv_risk / total * 100.0, 2), "decision": round(decision_risk / total * 100.0, 2), "execution": round(execution_risk / total * 100.0, 2), } top = max(breakdown, key=breakdown.get) result = { "formula_id": "ROOT_CAUSE_ATTRIBUTION_V1", "breakdown_pct": breakdown, "top_priority_layer": top, "failed_checks": failed, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())