9e6e2ded2f
- operational_report.json/md와 final_decision_packet_v4 생성 경로를 .NET으로 전환했습니다. - CI, 운영 게이트, 릴리스 DAG, 대시보드의 운영 진입점을 새 경로로 정렬했습니다. - legacy Python 렌더러는 비운영으로 명시했습니다.
181 lines
7.8 KiB
Python
181 lines
7.8 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from operational_report_contract import REPORT_SECTION_ORDER
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_PATH = ROOT / "Temp" / "operational_report.json"
|
|
SCHEMA_PATH = ROOT / "schemas" / "operational_report.schema.json"
|
|
|
|
REQUIRED_TOP_LEVEL_KEYS = {"schema_version", "source_json", "section_count", "sections", "summary"}
|
|
REQUIRED_SECTIONS = [
|
|
"today_decision_summary_card",
|
|
"routing_serving_trace",
|
|
"QEH_AUDIT_BLOCK",
|
|
"investment_quality_headline",
|
|
"final_judgment_table",
|
|
"operational_truth_score",
|
|
"execution_readiness_matrix",
|
|
"pass_100_criteria",
|
|
"final_execution_decision",
|
|
"concise_hts_input_sheet",
|
|
"reference_price_ledger",
|
|
"watch_breakout_gate",
|
|
"anti_whipsaw_reentry_gate",
|
|
]
|
|
|
|
|
|
def safe_print(message: str) -> None:
|
|
try:
|
|
print(message)
|
|
except UnicodeEncodeError:
|
|
fallback = message.encode("cp949", errors="backslashreplace").decode("cp949", errors="ignore")
|
|
print(fallback)
|
|
|
|
|
|
def load_json(path: Path) -> dict[str, Any]:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
return payload if isinstance(payload, dict) else {}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Validate operational report JSON structure.")
|
|
parser.add_argument("--json", default=str(DEFAULT_PATH))
|
|
args = parser.parse_args()
|
|
|
|
path = Path(args.json)
|
|
if not path.is_absolute():
|
|
path = ROOT / path
|
|
if not path.exists():
|
|
print("OPERATIONAL_REPORT_JSON_FAIL: missing file")
|
|
return 1
|
|
|
|
payload = load_json(path)
|
|
errors: list[str] = []
|
|
|
|
if not SCHEMA_PATH.exists():
|
|
print("OPERATIONAL_REPORT_JSON_FAIL: missing schema file")
|
|
return 1
|
|
|
|
schema = load_json(SCHEMA_PATH)
|
|
if not isinstance(schema, dict):
|
|
print("OPERATIONAL_REPORT_JSON_FAIL: invalid schema file")
|
|
return 1
|
|
|
|
if payload.get("schema_version") != schema.get("properties", {}).get("schema_version", {}).get("const"):
|
|
errors.append("schema_version const mismatch")
|
|
if payload.get("source_json") != schema.get("properties", {}).get("source_json", {}).get("const"):
|
|
errors.append("source_json const mismatch")
|
|
|
|
sections = payload.get("sections")
|
|
if not isinstance(sections, list):
|
|
errors.append("sections: must be array")
|
|
sections = []
|
|
else:
|
|
for idx, section in enumerate(sections):
|
|
if not isinstance(section, dict):
|
|
errors.append(f"sections[{idx}]: must be object")
|
|
continue
|
|
if not isinstance(section.get("name"), str) or not section.get("name").strip():
|
|
errors.append(f"sections[{idx}]: missing name")
|
|
if not isinstance(section.get("title"), str) or not section.get("title").strip():
|
|
errors.append(f"sections[{idx}]: missing title")
|
|
if not isinstance(section.get("markdown"), str) or not section.get("markdown").startswith(f"## {section.get('title')}"):
|
|
errors.append(f"sections[{idx}]: markdown/title mismatch")
|
|
|
|
missing_top = REQUIRED_TOP_LEVEL_KEYS - set(payload)
|
|
if missing_top:
|
|
errors.append(f"missing_top_level_keys={sorted(missing_top)}")
|
|
|
|
sections = payload.get("sections")
|
|
if not isinstance(sections, list) or not sections:
|
|
errors.append("sections: must be a non-empty list")
|
|
sections = []
|
|
|
|
if payload.get("section_count") != len(sections):
|
|
errors.append(f"section_count mismatch: stored={payload.get('section_count')} actual={len(sections)}")
|
|
|
|
names: list[str] = []
|
|
for idx, section in enumerate(sections):
|
|
if not isinstance(section, dict):
|
|
errors.append(f"sections[{idx}]: must be object")
|
|
continue
|
|
name = str(section.get("name") or "").strip()
|
|
title = str(section.get("title") or "").strip()
|
|
markdown = str(section.get("markdown") or "").strip()
|
|
if not name:
|
|
errors.append(f"sections[{idx}]: missing name")
|
|
if not title:
|
|
errors.append(f"sections[{idx}]: missing title")
|
|
if not markdown.startswith(f"## {title}"):
|
|
errors.append(f"sections[{idx}]: markdown/title mismatch")
|
|
names.append(name)
|
|
|
|
if len(names) != len(set(names)):
|
|
errors.append("sections: duplicate section names detected")
|
|
|
|
if names:
|
|
missing_canonical = [name for name in REPORT_SECTION_ORDER if name not in names]
|
|
if missing_canonical:
|
|
errors.append(f"sections: missing canonical sections={missing_canonical[:5]}")
|
|
|
|
for required in REQUIRED_SECTIONS:
|
|
if required not in names:
|
|
errors.append(f"missing_section={required}")
|
|
|
|
routing_idx = names.index("routing_serving_trace") if "routing_serving_trace" in names else -1
|
|
qeh_idx = names.index("QEH_AUDIT_BLOCK") if "QEH_AUDIT_BLOCK" in names else -1
|
|
if routing_idx < 0 or qeh_idx < 0 or routing_idx > qeh_idx:
|
|
errors.append("section_order_invalid:routing_serving_trace_before_QEH_AUDIT_BLOCK")
|
|
|
|
summary = payload.get("summary")
|
|
if not isinstance(summary, dict):
|
|
errors.append("summary: must be object")
|
|
else:
|
|
if not isinstance(summary.get("found_settlement"), bool):
|
|
errors.append("summary.found_settlement must be bool")
|
|
if not isinstance(summary.get("found_heat"), bool):
|
|
errors.append("summary.found_heat must be bool")
|
|
if not isinstance(summary.get("found_routing"), bool):
|
|
errors.append("summary.found_routing must be bool")
|
|
if not isinstance(summary.get("found_qeh"), bool):
|
|
errors.append("summary.found_qeh must be bool")
|
|
if not isinstance(summary.get("found_concise_hts_input_sheet"), bool):
|
|
errors.append("summary.found_concise_hts_input_sheet must be bool")
|
|
if not isinstance(summary.get("found_reference_price_ledger"), bool):
|
|
errors.append("summary.found_reference_price_ledger must be bool")
|
|
if not isinstance(summary.get("canonical_order_ok"), bool):
|
|
errors.append("summary.canonical_order_ok must be bool")
|
|
if summary.get("json_validation_status") is not None and not isinstance(summary.get("json_validation_status"), str):
|
|
errors.append("summary.json_validation_status must be string or null")
|
|
if summary.get("found_outcome_eval_window") is not None and not isinstance(summary.get("found_outcome_eval_window"), bool):
|
|
errors.append("summary.found_outcome_eval_window must be bool or null")
|
|
if summary.get("outcome_eval_gate") is not None and not isinstance(summary.get("outcome_eval_gate"), str):
|
|
errors.append("summary.outcome_eval_gate must be string or null")
|
|
if summary.get("outcome_root_cause_flags") is not None and not isinstance(summary.get("outcome_root_cause_flags"), list):
|
|
errors.append("summary.outcome_root_cause_flags must be list or null")
|
|
if summary.get("found_algorithm_guidance_proof") is not None and not isinstance(summary.get("found_algorithm_guidance_proof"), bool):
|
|
errors.append("summary.found_algorithm_guidance_proof must be bool or null")
|
|
if summary.get("algorithm_guidance_proof_score") is not None and not isinstance(summary.get("algorithm_guidance_proof_score"), (int, float)):
|
|
errors.append("summary.algorithm_guidance_proof_score must be number or null")
|
|
if summary.get("algorithm_guidance_proof_gate") is not None and not isinstance(summary.get("algorithm_guidance_proof_gate"), str):
|
|
errors.append("summary.algorithm_guidance_proof_gate must be string or null")
|
|
|
|
if errors:
|
|
for error in errors:
|
|
safe_print(error)
|
|
safe_print("OPERATIONAL_REPORT_JSON_FAIL")
|
|
return 1
|
|
|
|
safe_print("OPERATIONAL_REPORT_JSON_OK")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|