from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "Temp" / "data_quality_reconciliation_v1.json" def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: obj = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return obj if isinstance(obj, dict) else {} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--min-schema-score", type=float, default=100.0) ap.add_argument("--min-investment-quality-score", type=float, default=90.0) ap.add_argument("--allow-conflict", action="store_true") args = ap.parse_args() json_path = Path(args.json) if not json_path.is_absolute(): json_path = ROOT / json_path payload = _load_json(json_path) errors: list[str] = [] formula_id = str(payload.get("formula_id") or "") schema_score = float(payload.get("schema_presence_score") or 0.0) invest_score = float(payload.get("investment_quality_score") or 0.0) conflict = bool(payload.get("quality_conflict_flag")) if formula_id != "DATA_QUALITY_RECONCILIATION_V1": errors.append(f"formula_id={formula_id}") if schema_score < args.min_schema_score: errors.append(f"schema_presence_score={schema_score:.2f} < {args.min_schema_score:.2f}") if invest_score < args.min_investment_quality_score: errors.append( f"investment_quality_score={invest_score:.2f} < {args.min_investment_quality_score:.2f}" ) if conflict and not args.allow_conflict: errors.append("quality_conflict_flag=true") if errors: print("DATA_QUALITY_RECONCILIATION_V1_FAIL") for err in errors: print(f" {err}") return 1 print("DATA_QUALITY_RECONCILIATION_V1_OK") print(f" schema_presence_score: {schema_score:.2f}") print(f" investment_quality_score: {invest_score:.2f}") print(f" quality_conflict_flag: {conflict}") return 0 if __name__ == "__main__": raise SystemExit(main())