from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_SCORE = ROOT / "Temp" / "data_integrity_score_v1.json" DEFAULT_OUT = ROOT / "Temp" / "data_integrity_100_lock_v2.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: v = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return v if isinstance(v, dict) else {} def _f(v: Any) -> float: try: return float(v) except Exception: return 0.0 def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--score", default=str(DEFAULT_SCORE)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() sp = Path(args.score) op = Path(args.out) if not sp.is_absolute(): sp = ROOT / sp if not op.is_absolute(): op = ROOT / op src = _load(sp) m = src.get("metrics") if isinstance(src.get("metrics"), dict) else {} score = _f(src.get("score")) placeholder_safety = _f(m.get("placeholder_safety_pct")) required_complete = _f(m.get("required_field_completeness_pct")) sla_breached = bool(m.get("sla_breached")) json_validation_status = str(m.get("json_validation_status") or "") reasons: list[str] = [] if score < 100.0: reasons.append("DATA_INTEGRITY_SCORE_NOT_100") if placeholder_safety < 100.0: reasons.append("PLACEHOLDER_SAFETY_NOT_100") if required_complete < 100.0: reasons.append("REQUIRED_COMPLETENESS_NOT_100") if sla_breached: reasons.append("CAPTURE_SLA_BREACHED") if json_validation_status in {"PENDING_EXPORT", "EXPORT_BLOCKED_CRITICAL"}: reasons.append("JSON_VALIDATION_NOT_EXPORT_READY") if reasons: gate = "HTS_ENTRY_BLOCK" mode = "REVIEW_ONLY" else: gate = "PASS_100" mode = "ALLOW_EXECUTION" out = { "formula_id": "DATA_INTEGRITY_100_LOCK_V2", "gate": gate, "execution_mode": mode, "reasons": reasons, "metrics": { "data_integrity_score": score, "placeholder_safety_pct": placeholder_safety, "required_field_completeness_pct": required_complete, "sla_breached": sla_breached, "json_validation_status": json_validation_status }, "target": { "data_integrity_score": 100.0, "placeholder_safety_pct": 100.0, "required_field_completeness_pct": 100.0 } } op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())