from __future__ import annotations import argparse import json from pathlib import Path from typing import Any from operational_report_contract import REPORT_SECTION_ORDER ROOT = Path(__file__).resolve().parents[1] DEFAULT_PATH = ROOT / "Temp" / "operational_report.json" SCHEMA_PATH = ROOT / "schemas" / "operational_report.schema.json" REQUIRED_TOP_LEVEL_KEYS = {"schema_version", "source_json", "section_count", "sections", "summary"} REQUIRED_SECTIONS = [ "today_decision_summary_card", "routing_serving_trace", "QEH_AUDIT_BLOCK", "investment_quality_headline", "final_judgment_table", "operational_truth_score", "execution_readiness_matrix", "pass_100_criteria", "final_execution_decision", "concise_hts_input_sheet", "reference_price_ledger", "watch_breakout_gate", "anti_whipsaw_reentry_gate", ] def safe_print(message: str) -> None: try: print(message) except UnicodeEncodeError: fallback = message.encode("cp949", errors="backslashreplace").decode("cp949", errors="ignore") print(fallback) def load_json(path: Path) -> dict[str, Any]: payload = json.loads(path.read_text(encoding="utf-8")) return payload if isinstance(payload, dict) else {} def main() -> int: parser = argparse.ArgumentParser(description="Validate operational report JSON structure.") parser.add_argument("--json", default=str(DEFAULT_PATH)) args = parser.parse_args() path = Path(args.json) if not path.is_absolute(): path = ROOT / path if not path.exists(): print("OPERATIONAL_REPORT_JSON_FAIL: missing file") return 1 payload = load_json(path) errors: list[str] = [] if not SCHEMA_PATH.exists(): print("OPERATIONAL_REPORT_JSON_FAIL: missing schema file") return 1 schema = load_json(SCHEMA_PATH) if not isinstance(schema, dict): print("OPERATIONAL_REPORT_JSON_FAIL: invalid schema file") return 1 if payload.get("schema_version") != schema.get("properties", {}).get("schema_version", {}).get("const"): errors.append("schema_version const mismatch") if payload.get("source_json") != schema.get("properties", {}).get("source_json", {}).get("const"): errors.append("source_json const mismatch") sections = payload.get("sections") if not isinstance(sections, list): errors.append("sections: must be array") sections = [] else: for idx, section in enumerate(sections): if not isinstance(section, dict): errors.append(f"sections[{idx}]: must be object") continue if not isinstance(section.get("name"), str) or not section.get("name").strip(): errors.append(f"sections[{idx}]: missing name") if not isinstance(section.get("title"), str) or not section.get("title").strip(): errors.append(f"sections[{idx}]: missing title") if not isinstance(section.get("markdown"), str) or not section.get("markdown").startswith(f"## {section.get('title')}"): errors.append(f"sections[{idx}]: markdown/title mismatch") missing_top = REQUIRED_TOP_LEVEL_KEYS - set(payload) if missing_top: errors.append(f"missing_top_level_keys={sorted(missing_top)}") sections = payload.get("sections") if not isinstance(sections, list) or not sections: errors.append("sections: must be a non-empty list") sections = [] if payload.get("section_count") != len(sections): errors.append(f"section_count mismatch: stored={payload.get('section_count')} actual={len(sections)}") names: list[str] = [] for idx, section in enumerate(sections): if not isinstance(section, dict): errors.append(f"sections[{idx}]: must be object") continue name = str(section.get("name") or "").strip() title = str(section.get("title") or "").strip() markdown = str(section.get("markdown") or "").strip() if not name: errors.append(f"sections[{idx}]: missing name") if not title: errors.append(f"sections[{idx}]: missing title") if not markdown.startswith(f"## {title}"): errors.append(f"sections[{idx}]: markdown/title mismatch") names.append(name) if len(names) != len(set(names)): errors.append("sections: duplicate section names detected") if names: missing_canonical = [name for name in REPORT_SECTION_ORDER if name not in names] if missing_canonical: errors.append(f"sections: missing canonical sections={missing_canonical[:5]}") for required in REQUIRED_SECTIONS: if required not in names: errors.append(f"missing_section={required}") routing_idx = names.index("routing_serving_trace") if "routing_serving_trace" in names else -1 qeh_idx = names.index("QEH_AUDIT_BLOCK") if "QEH_AUDIT_BLOCK" in names else -1 if routing_idx < 0 or qeh_idx < 0 or routing_idx > qeh_idx: errors.append("section_order_invalid:routing_serving_trace_before_QEH_AUDIT_BLOCK") summary = payload.get("summary") if not isinstance(summary, dict): errors.append("summary: must be object") else: if not isinstance(summary.get("found_settlement"), bool): errors.append("summary.found_settlement must be bool") if not isinstance(summary.get("found_heat"), bool): errors.append("summary.found_heat must be bool") if not isinstance(summary.get("found_routing"), bool): errors.append("summary.found_routing must be bool") if not isinstance(summary.get("found_qeh"), bool): errors.append("summary.found_qeh must be bool") if not isinstance(summary.get("found_concise_hts_input_sheet"), bool): errors.append("summary.found_concise_hts_input_sheet must be bool") if not isinstance(summary.get("found_reference_price_ledger"), bool): errors.append("summary.found_reference_price_ledger must be bool") if not isinstance(summary.get("canonical_order_ok"), bool): errors.append("summary.canonical_order_ok must be bool") if summary.get("json_validation_status") is not None and not isinstance(summary.get("json_validation_status"), str): errors.append("summary.json_validation_status must be string or null") if summary.get("found_outcome_eval_window") is not None and not isinstance(summary.get("found_outcome_eval_window"), bool): errors.append("summary.found_outcome_eval_window must be bool or null") if summary.get("outcome_eval_gate") is not None and not isinstance(summary.get("outcome_eval_gate"), str): errors.append("summary.outcome_eval_gate must be string or null") if summary.get("outcome_root_cause_flags") is not None and not isinstance(summary.get("outcome_root_cause_flags"), list): errors.append("summary.outcome_root_cause_flags must be list or null") if summary.get("found_algorithm_guidance_proof") is not None and not isinstance(summary.get("found_algorithm_guidance_proof"), bool): errors.append("summary.found_algorithm_guidance_proof must be bool or null") if summary.get("algorithm_guidance_proof_score") is not None and not isinstance(summary.get("algorithm_guidance_proof_score"), (int, float)): errors.append("summary.algorithm_guidance_proof_score must be number or null") if summary.get("algorithm_guidance_proof_gate") is not None and not isinstance(summary.get("algorithm_guidance_proof_gate"), str): errors.append("summary.algorithm_guidance_proof_gate must be string or null") if errors: for error in errors: safe_print(error) safe_print("OPERATIONAL_REPORT_JSON_FAIL") return 1 safe_print("OPERATIONAL_REPORT_JSON_OK") return 0 if __name__ == "__main__": raise SystemExit(main())