"""validate_sector_flow_history_progress_v1.py — SECTOR_FLOW_HISTORY_PROGRESS_VALIDATE_V1 sector_flow_history 진척도 산출물의 기본 계약을 검증한다. """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_INPUT = ROOT / "Temp" / "sector_flow_history_progress_v1.json" DEFAULT_OUT = ROOT / "Temp" / "validate_sector_flow_history_progress_v1.json" FORMULA_ID = "SECTOR_FLOW_HISTORY_PROGRESS_VALIDATE_V1" def _load(path: Path) -> Any: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--input", default=str(DEFAULT_INPUT)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() input_path = Path(args.input) input_path = input_path if input_path.is_absolute() else ROOT / input_path out_path = Path(args.out) out_path = out_path if out_path.is_absolute() else ROOT / out_path payload = _load(input_path) errors: list[str] = [] if not isinstance(payload, dict) or not payload: errors.append("payload must be object") else: if payload.get("formula_id") != "SECTOR_FLOW_HISTORY_PROGRESS_V1": errors.append("formula_id mismatch") if payload.get("status") not in {"DONE", "DATA_GATED"}: errors.append(f"status={payload.get('status')}") for key in [ "current_dates", "target_dates", "coverage_pct", "row_count", "rows_per_date", "flow_credit_coverage_pct", ]: if key not in payload: errors.append(f"missing field: {key}") current_dates = payload.get("current_dates") target_dates = payload.get("target_dates") if isinstance(current_dates, int) and isinstance(target_dates, int): if current_dates < target_dates and payload.get("status") != "DATA_GATED": errors.append("current_dates < target_dates requires DATA_GATED") if current_dates >= target_dates and payload.get("status") != "DONE": errors.append("current_dates >= target_dates requires DONE") result = { "formula_id": FORMULA_ID, "gate": "PASS" if not errors else "FAIL", "checked_file": str(Path(args.input).as_posix()), "errors": errors, } out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if not errors else 1 if __name__ == "__main__": raise SystemExit(main())