Files
QuantEngineByItz/tools/validate_operational_report_json.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

168 lines
7.0 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from jsonschema import Draft202012Validator
from operational_report_contract import REPORT_SECTION_ORDER
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_PATH = ROOT / "Temp" / "operational_report.json"
SCHEMA_PATH = ROOT / "schemas" / "operational_report.schema.json"
REQUIRED_TOP_LEVEL_KEYS = {"schema_version", "source_json", "section_count", "sections", "summary"}
REQUIRED_SECTIONS = [
"today_decision_summary_card",
"routing_serving_trace",
"QEH_AUDIT_BLOCK",
"investment_quality_headline",
"final_judgment_table",
"operational_truth_score",
"execution_readiness_matrix",
"pass_100_criteria",
"final_execution_decision",
"concise_hts_input_sheet",
"reference_price_ledger",
"watch_breakout_gate",
"anti_whipsaw_reentry_gate",
]
def safe_print(message: str) -> None:
try:
print(message)
except UnicodeEncodeError:
fallback = message.encode("cp949", errors="backslashreplace").decode("cp949", errors="ignore")
print(fallback)
def load_json(path: Path) -> dict[str, Any]:
payload = json.loads(path.read_text(encoding="utf-8"))
return payload if isinstance(payload, dict) else {}
def main() -> int:
parser = argparse.ArgumentParser(description="Validate operational report JSON structure.")
parser.add_argument("--json", default=str(DEFAULT_PATH))
args = parser.parse_args()
path = Path(args.json)
if not path.is_absolute():
path = ROOT / path
if not path.exists():
print("OPERATIONAL_REPORT_JSON_FAIL: missing file")
return 1
payload = load_json(path)
errors: list[str] = []
if not SCHEMA_PATH.exists():
print("OPERATIONAL_REPORT_JSON_FAIL: missing schema file")
return 1
schema = load_json(SCHEMA_PATH)
if not isinstance(schema, dict):
print("OPERATIONAL_REPORT_JSON_FAIL: invalid schema file")
return 1
validator = Draft202012Validator(schema)
for error in validator.iter_errors(payload):
pointer = "/".join(str(part) for part in error.absolute_path)
location = f" at {pointer}" if pointer else ""
errors.append(f"schema_error{location}: {error.message}")
missing_top = REQUIRED_TOP_LEVEL_KEYS - set(payload)
if missing_top:
errors.append(f"missing_top_level_keys={sorted(missing_top)}")
sections = payload.get("sections")
if not isinstance(sections, list) or not sections:
errors.append("sections: must be a non-empty list")
sections = []
if payload.get("section_count") != len(sections):
errors.append(f"section_count mismatch: stored={payload.get('section_count')} actual={len(sections)}")
names: list[str] = []
for idx, section in enumerate(sections):
if not isinstance(section, dict):
errors.append(f"sections[{idx}]: must be object")
continue
name = str(section.get("name") or "").strip()
title = str(section.get("title") or "").strip()
markdown = str(section.get("markdown") or "").strip()
if not name:
errors.append(f"sections[{idx}]: missing name")
if not title:
errors.append(f"sections[{idx}]: missing title")
if not markdown.startswith(f"## {title}"):
errors.append(f"sections[{idx}]: markdown/title mismatch")
names.append(name)
if len(names) != len(set(names)):
errors.append("sections: duplicate section names detected")
if names:
missing_canonical = [name for name in REPORT_SECTION_ORDER if name not in names]
if missing_canonical:
errors.append(f"sections: missing canonical sections={missing_canonical[:5]}")
for required in REQUIRED_SECTIONS:
if required not in names:
errors.append(f"missing_section={required}")
routing_idx = names.index("routing_serving_trace") if "routing_serving_trace" in names else -1
qeh_idx = names.index("QEH_AUDIT_BLOCK") if "QEH_AUDIT_BLOCK" in names else -1
if routing_idx < 0 or qeh_idx < 0 or routing_idx > qeh_idx:
errors.append("section_order_invalid:routing_serving_trace_before_QEH_AUDIT_BLOCK")
summary = payload.get("summary")
if not isinstance(summary, dict):
errors.append("summary: must be object")
else:
if not isinstance(summary.get("found_settlement"), bool):
errors.append("summary.found_settlement must be bool")
if not isinstance(summary.get("found_heat"), bool):
errors.append("summary.found_heat must be bool")
if not isinstance(summary.get("found_routing"), bool):
errors.append("summary.found_routing must be bool")
if not isinstance(summary.get("found_qeh"), bool):
errors.append("summary.found_qeh must be bool")
if not isinstance(summary.get("found_concise_hts_input_sheet"), bool):
errors.append("summary.found_concise_hts_input_sheet must be bool")
if not isinstance(summary.get("found_reference_price_ledger"), bool):
errors.append("summary.found_reference_price_ledger must be bool")
if not isinstance(summary.get("canonical_order_ok"), bool):
errors.append("summary.canonical_order_ok must be bool")
if summary.get("json_validation_status") is not None and not isinstance(summary.get("json_validation_status"), str):
errors.append("summary.json_validation_status must be string or null")
if summary.get("found_outcome_eval_window") is not None and not isinstance(summary.get("found_outcome_eval_window"), bool):
errors.append("summary.found_outcome_eval_window must be bool or null")
if summary.get("outcome_eval_gate") is not None and not isinstance(summary.get("outcome_eval_gate"), str):
errors.append("summary.outcome_eval_gate must be string or null")
if summary.get("outcome_root_cause_flags") is not None and not isinstance(summary.get("outcome_root_cause_flags"), list):
errors.append("summary.outcome_root_cause_flags must be list or null")
if summary.get("found_algorithm_guidance_proof") is not None and not isinstance(summary.get("found_algorithm_guidance_proof"), bool):
errors.append("summary.found_algorithm_guidance_proof must be bool or null")
if summary.get("algorithm_guidance_proof_score") is not None and not isinstance(summary.get("algorithm_guidance_proof_score"), (int, float)):
errors.append("summary.algorithm_guidance_proof_score must be number or null")
if summary.get("algorithm_guidance_proof_gate") is not None and not isinstance(summary.get("algorithm_guidance_proof_gate"), str):
errors.append("summary.algorithm_guidance_proof_gate must be string or null")
if errors:
for error in errors:
safe_print(error)
safe_print("OPERATIONAL_REPORT_JSON_FAIL")
return 1
safe_print("OPERATIONAL_REPORT_JSON_OK")
return 0
if __name__ == "__main__":
raise SystemExit(main())