Files
QuantEngineByItz/tools/validate_operational_report_json.py
kjh2064 9e6e2ded2f feat: .NET 운영 리포트 렌더러와 CI 경로 전환
- operational_report.json/md와 final_decision_packet_v4 생성 경로를 .NET으로 전환했습니다.
- CI, 운영 게이트, 릴리스 DAG, 대시보드의 운영 진입점을 새 경로로 정렬했습니다.
- legacy Python 렌더러는 비운영으로 명시했습니다.
2026-06-26 14:18:03 +09:00

181 lines
7.8 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from operational_report_contract import REPORT_SECTION_ORDER
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_PATH = ROOT / "Temp" / "operational_report.json"
SCHEMA_PATH = ROOT / "schemas" / "operational_report.schema.json"
REQUIRED_TOP_LEVEL_KEYS = {"schema_version", "source_json", "section_count", "sections", "summary"}
REQUIRED_SECTIONS = [
"today_decision_summary_card",
"routing_serving_trace",
"QEH_AUDIT_BLOCK",
"investment_quality_headline",
"final_judgment_table",
"operational_truth_score",
"execution_readiness_matrix",
"pass_100_criteria",
"final_execution_decision",
"concise_hts_input_sheet",
"reference_price_ledger",
"watch_breakout_gate",
"anti_whipsaw_reentry_gate",
]
def safe_print(message: str) -> None:
try:
print(message)
except UnicodeEncodeError:
fallback = message.encode("cp949", errors="backslashreplace").decode("cp949", errors="ignore")
print(fallback)
def load_json(path: Path) -> dict[str, Any]:
payload = json.loads(path.read_text(encoding="utf-8"))
return payload if isinstance(payload, dict) else {}
def main() -> int:
parser = argparse.ArgumentParser(description="Validate operational report JSON structure.")
parser.add_argument("--json", default=str(DEFAULT_PATH))
args = parser.parse_args()
path = Path(args.json)
if not path.is_absolute():
path = ROOT / path
if not path.exists():
print("OPERATIONAL_REPORT_JSON_FAIL: missing file")
return 1
payload = load_json(path)
errors: list[str] = []
if not SCHEMA_PATH.exists():
print("OPERATIONAL_REPORT_JSON_FAIL: missing schema file")
return 1
schema = load_json(SCHEMA_PATH)
if not isinstance(schema, dict):
print("OPERATIONAL_REPORT_JSON_FAIL: invalid schema file")
return 1
if payload.get("schema_version") != schema.get("properties", {}).get("schema_version", {}).get("const"):
errors.append("schema_version const mismatch")
if payload.get("source_json") != schema.get("properties", {}).get("source_json", {}).get("const"):
errors.append("source_json const mismatch")
sections = payload.get("sections")
if not isinstance(sections, list):
errors.append("sections: must be array")
sections = []
else:
for idx, section in enumerate(sections):
if not isinstance(section, dict):
errors.append(f"sections[{idx}]: must be object")
continue
if not isinstance(section.get("name"), str) or not section.get("name").strip():
errors.append(f"sections[{idx}]: missing name")
if not isinstance(section.get("title"), str) or not section.get("title").strip():
errors.append(f"sections[{idx}]: missing title")
if not isinstance(section.get("markdown"), str) or not section.get("markdown").startswith(f"## {section.get('title')}"):
errors.append(f"sections[{idx}]: markdown/title mismatch")
missing_top = REQUIRED_TOP_LEVEL_KEYS - set(payload)
if missing_top:
errors.append(f"missing_top_level_keys={sorted(missing_top)}")
sections = payload.get("sections")
if not isinstance(sections, list) or not sections:
errors.append("sections: must be a non-empty list")
sections = []
if payload.get("section_count") != len(sections):
errors.append(f"section_count mismatch: stored={payload.get('section_count')} actual={len(sections)}")
names: list[str] = []
for idx, section in enumerate(sections):
if not isinstance(section, dict):
errors.append(f"sections[{idx}]: must be object")
continue
name = str(section.get("name") or "").strip()
title = str(section.get("title") or "").strip()
markdown = str(section.get("markdown") or "").strip()
if not name:
errors.append(f"sections[{idx}]: missing name")
if not title:
errors.append(f"sections[{idx}]: missing title")
if not markdown.startswith(f"## {title}"):
errors.append(f"sections[{idx}]: markdown/title mismatch")
names.append(name)
if len(names) != len(set(names)):
errors.append("sections: duplicate section names detected")
if names:
missing_canonical = [name for name in REPORT_SECTION_ORDER if name not in names]
if missing_canonical:
errors.append(f"sections: missing canonical sections={missing_canonical[:5]}")
for required in REQUIRED_SECTIONS:
if required not in names:
errors.append(f"missing_section={required}")
routing_idx = names.index("routing_serving_trace") if "routing_serving_trace" in names else -1
qeh_idx = names.index("QEH_AUDIT_BLOCK") if "QEH_AUDIT_BLOCK" in names else -1
if routing_idx < 0 or qeh_idx < 0 or routing_idx > qeh_idx:
errors.append("section_order_invalid:routing_serving_trace_before_QEH_AUDIT_BLOCK")
summary = payload.get("summary")
if not isinstance(summary, dict):
errors.append("summary: must be object")
else:
if not isinstance(summary.get("found_settlement"), bool):
errors.append("summary.found_settlement must be bool")
if not isinstance(summary.get("found_heat"), bool):
errors.append("summary.found_heat must be bool")
if not isinstance(summary.get("found_routing"), bool):
errors.append("summary.found_routing must be bool")
if not isinstance(summary.get("found_qeh"), bool):
errors.append("summary.found_qeh must be bool")
if not isinstance(summary.get("found_concise_hts_input_sheet"), bool):
errors.append("summary.found_concise_hts_input_sheet must be bool")
if not isinstance(summary.get("found_reference_price_ledger"), bool):
errors.append("summary.found_reference_price_ledger must be bool")
if not isinstance(summary.get("canonical_order_ok"), bool):
errors.append("summary.canonical_order_ok must be bool")
if summary.get("json_validation_status") is not None and not isinstance(summary.get("json_validation_status"), str):
errors.append("summary.json_validation_status must be string or null")
if summary.get("found_outcome_eval_window") is not None and not isinstance(summary.get("found_outcome_eval_window"), bool):
errors.append("summary.found_outcome_eval_window must be bool or null")
if summary.get("outcome_eval_gate") is not None and not isinstance(summary.get("outcome_eval_gate"), str):
errors.append("summary.outcome_eval_gate must be string or null")
if summary.get("outcome_root_cause_flags") is not None and not isinstance(summary.get("outcome_root_cause_flags"), list):
errors.append("summary.outcome_root_cause_flags must be list or null")
if summary.get("found_algorithm_guidance_proof") is not None and not isinstance(summary.get("found_algorithm_guidance_proof"), bool):
errors.append("summary.found_algorithm_guidance_proof must be bool or null")
if summary.get("algorithm_guidance_proof_score") is not None and not isinstance(summary.get("algorithm_guidance_proof_score"), (int, float)):
errors.append("summary.algorithm_guidance_proof_score must be number or null")
if summary.get("algorithm_guidance_proof_gate") is not None and not isinstance(summary.get("algorithm_guidance_proof_gate"), str):
errors.append("summary.algorithm_guidance_proof_gate must be string or null")
if errors:
for error in errors:
safe_print(error)
safe_print("OPERATIONAL_REPORT_JSON_FAIL")
return 1
safe_print("OPERATIONAL_REPORT_JSON_OK")
return 0
if __name__ == "__main__":
raise SystemExit(main())