""" build_cross_section_consistency_v1.py 목적: operational_report.json의 모든 섹션 markdown에서 canonical_metrics_registry에 정의된 논리 지표가 여러 섹션에서 상이한 값으로 렌더링되는지 검사. 정책: AGENTS.md R1 enforcement_mode_until 방식 - now < enforcement_mode_until → conflict 있어도 gate=WARN (보고서 발행 허용) - now >= enforcement_mode_until → gate=FAIL (hard-block) 출력 Temp/cross_section_consistency_v1.json: { "formula_id": "CROSS_SECTION_CONSISTENCY_V1", "score": 0~100, "conflict_count": N, "conflicts": [{metric, section, rendered, canonical}], "forbidden_uniform_labels": N, "incomplete_tables": N, "enforcement_mode_until": "2026-06-15", "gate": "PASS" | "WARN" | "FAIL" } """ import json import pathlib import re import sys from datetime import date, datetime, timezone ROOT = pathlib.Path(__file__).parent.parent REPORT_PATH = ROOT / "Temp" / "operational_report.json" CANON_PATH = ROOT / "Temp" / "canonical_metrics_v1.json" REGISTRY_PATH = ROOT / "spec" / "25_canonical_metrics_registry.yaml" OUT_PATH = ROOT / "Temp" / "cross_section_consistency_v1.json" ENFORCEMENT_MODE_UNTIL = date(2026, 6, 15) # AGENTS.md R1 금지 일률값 # 주의: "정상"은 STATUS_LABELS["NORMAL"] 정상 번역값이므로 제외. # R1 금지 대상은 stub/은폐용 일률 placeholder이며 실제 상태코드는 제외. FORBIDDEN_LABELS = { "데이터 누락", "DATA_MISSING", # LOSING은 market_share_proxy_v1에서 실제 알고리즘 산출 상태코드이므로 제외. # R1 금지 대상: 실질 데이터가 있어야 할 위치에 "정보 없음"으로 은폐하는 stub만 해당. } # 화이트리스트 컬럼 (이 컬럼에 있으면 금지 값 허용) WHITELIST_COLS = {"비고", "해제조건", "remarks", "해석", "근거"} # 섹션별 예외 허용 (해당 섹션에서는 forbidden_labels 검사 제외) # pa1_report_table: 471990처럼 universe에 없는 종목은 PA1 미수집이 정당 WHITELIST_SECTIONS = {"pa1_report_table"} # 섹션별 검사 대상 지표 + 검색 패턴 # key = metric_id, value = 섹션·패턴 맵핑 METRIC_PATTERNS = { "cluster_pct": { "sections": ["cluster_sync_audit", "portfolio_structure_risks", "mandatory_reduction_plan"], "pattern": r"cluster_pct\s*[=:]\s*([\d.]+)%|반도체 클러스터[^\|]*\|\s*([\d.]+)\s*\|", }, "cash_min_required_krw": { "sections": ["exec_safety_declaration", "cash_recovery_plan_crdl", "QEH_AUDIT_BLOCK"], "pattern": r"최소\s+([\d,]+)원|최소 필요 현금:\s*\*\*([\d,]+)원|현금 부족분[^\|]*\|\s*([\d,]+)\s*\|", }, "cash_reference_total_krw": { "sections": ["cash_recovery_plan_crdl"], "pattern": r"참고용 전체 후보 누적 \(([\d,]+)원\)", }, } def _load_json(p: pathlib.Path) -> dict: if p.exists(): try: return json.loads(p.read_text(encoding="utf-8")) except Exception: pass return {} def _parse_krw(s: str) -> float | None: """'39,797,073' → 39797073.0""" if s is None: return None cleaned = s.replace(",", "").replace("원", "").strip() try: return float(cleaned) except ValueError: return None def _extract_value(text: str, pattern: str) -> str | None: """markdown에서 정규식으로 값 추출 (첫 번째 그룹).""" m = re.search(pattern, text) if not m: return None for g in m.groups(): if g is not None: return g.strip() return None def check_conflicts(report_sections: dict[str, str], canon: dict) -> list[dict]: conflicts = [] metrics_canon = canon.get("metrics", {}) for metric_id, info in METRIC_PATTERNS.items(): canonical_val = metrics_canon.get(metric_id) if canonical_val is None: continue for section_name in info["sections"]: md = report_sections.get(section_name, "") if not md: continue rendered_raw = _extract_value(md, info["pattern"]) if rendered_raw is None: continue # 값 파싱 — 숫자형 비교 rendered_num = _parse_krw(rendered_raw) if rendered_num is None: try: rendered_num = float(rendered_raw) except ValueError: continue tol = 0.1 if metric_id == "cluster_pct" else 0 if abs(rendered_num - float(canonical_val)) > tol: conflicts.append({ "metric": metric_id, "section": section_name, "rendered": rendered_raw, "canonical": canonical_val, "diff": round(rendered_num - float(canonical_val), 4), }) return conflicts def check_forbidden_labels(report_sections: dict[str, str]) -> int: """GFM 표 셀에서 금지 일률값 개수 반환.""" count = 0 for section_name, md in report_sections.items(): if section_name in WHITELIST_SECTIONS: continue for line in md.splitlines(): if "|" not in line: continue cells = [c.strip() for c in line.split("|")] for cell in cells: if any(wl in cell for wl in WHITELIST_COLS): break if cell in FORBIDDEN_LABELS: count += 1 return count def check_incomplete_tables(report_sections: dict[str, str]) -> int: """ 핵심 산출 표(stub_token이 실질 데이터여야 할 컬럼에 있을 때) INCOMPLETE_TABLE 판정. AGENTS.md R1 기준: blank ≥ 5% = WARN, ≥ 20% = FAIL. 단, 이 게이트는 교차섹션 일관성 검사 범위이므로 '지정가·수량·손익률·클러스터%' 핵심 컬럼에 stub이 있는 경우만 집계. """ CRITICAL_STUBS = {"미산출", "DATA_MISSING", "데이터 누락"} incomplete = 0 for md in report_sections.values(): table_lines = [l for l in md.splitlines() if l.strip().startswith("|") and "---" not in l] if len(table_lines) < 3: continue data_lines = table_lines[1:] critical_stubs = 0 total_data_cells = 0 for line in data_lines: cells = [c.strip() for c in line.split("|") if c.strip()] total_data_cells += len(cells) critical_stubs += sum(1 for c in cells if c in CRITICAL_STUBS) if total_data_cells > 0 and critical_stubs / total_data_cells >= 0.05: incomplete += 1 return incomplete def main() -> int: report_data = _load_json(REPORT_PATH) canon_data = _load_json(CANON_PATH) sections_raw = report_data.get("sections") or [] report_sections: dict[str, str] = { str(s.get("name") or ""): str(s.get("markdown") or "") for s in sections_raw if isinstance(s, dict) } conflicts = check_conflicts(report_sections, canon_data) forbidden_count = check_forbidden_labels(report_sections) incomplete_count = check_incomplete_tables(report_sections) conflict_count = len(conflicts) today = date.today() in_enforcement = today >= ENFORCEMENT_MODE_UNTIL if conflict_count == 0 and forbidden_count == 0 and incomplete_count == 0: gate = "PASS" elif in_enforcement: gate = "FAIL" else: gate = "WARN" # 점수: conflict 1건당 -10, forbidden 1개당 -2, incomplete 1개당 -5 score = max(0, 100 - conflict_count * 10 - forbidden_count * 2 - incomplete_count * 5) out = { "formula_id": "CROSS_SECTION_CONSISTENCY_V1", "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "score": score, "conflict_count": conflict_count, "conflicts": conflicts, "forbidden_uniform_labels": forbidden_count, "incomplete_tables": incomplete_count, "enforcement_mode_until": str(ENFORCEMENT_MODE_UNTIL), "enforcement_active": in_enforcement, "gate": gate, } OUT_PATH.parent.mkdir(parents=True, exist_ok=True) OUT_PATH.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") print(f"CROSS_SECTION_CONSISTENCY_V1: gate={gate} score={score} conflicts={conflict_count} " f"forbidden={forbidden_count} incomplete={incomplete_count}") if conflicts: print(" CONFLICTS:") for c in conflicts: print(f" [{c['metric']}] section={c['section']} rendered={c['rendered']} canonical={c['canonical']} diff={c['diff']}") return 0 if gate in ("PASS", "WARN") else 1 if __name__ == "__main__": sys.exit(main())