from __future__ import annotations import argparse import json import re import sys from pathlib import Path from typing import Any def _ensure_utf8_stdio() -> None: if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"): sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1) if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"): sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1) ROOT = Path(__file__).resolve().parents[1] DEFAULT_REPORT = ROOT / "Temp" / "operational_report.json" DEFAULT_OUT = ROOT / "Temp" / "blank_cell_audit_v1.json" STUB_TOKENS = { "데이터 누락", # 결손 일률 라벨 "DATA_MISSING", # 영문 결손 라벨 "중립", # GAS 일률 중립 (스마트머니/fundamental) "NEUTRAL", # 영문 일률 중립 # 주의: LOSING, GAINING, STABLE 은 실제 신호값이므로 stub 아님 # WATCH_PENDING_SAMPLE, NO_PEER_DATA 는 허용값 } def _load(path: Path) -> dict[str, Any]: try: data = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return data if isinstance(data, dict) else {} def _sections(payload: dict[str, Any]) -> list[dict[str, Any]]: if isinstance(payload.get("sections"), list): return [s for s in payload["sections"] if isinstance(s, dict)] return [] def _count_table_issues(md: str) -> tuple[int, int]: """GFM 테이블에서 빈 셀 수와 stub 토큰 수를 카운트한다. ∙ `| a | b |` 형식을 `|`로 분리하면 앞뒤 빈 문자열이 생기므로 strip 후 첫/마지막 빈 요소를 제거(파이프 구분자 아티팩트). ∙ 구분선(`--- | ---`)은 무시. """ blanks = 0 stubs = 0 for line in md.splitlines(): if "|" not in line: continue # 구분선 skip if re.match(r"^\s*\|?\s*[-:]+\s*(\|\s*[-:]+\s*)+\|?\s*$", line): continue cells = [c.strip() for c in line.split("|")] # 파이프 구분자 아티팩트: 앞뒤 빈 문자열 제거 if cells and cells[0] == "": cells = cells[1:] if cells and cells[-1] == "": cells = cells[:-1] for c in cells: if c == "": blanks += 1 if c in STUB_TOKENS: stubs += 1 return blanks, stubs def main() -> int: _ensure_utf8_stdio() ap = argparse.ArgumentParser() ap.add_argument("--report", default=str(DEFAULT_REPORT)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() rp = Path(args.report) op = Path(args.out) if not rp.is_absolute(): rp = ROOT / rp if not op.is_absolute(): op = ROOT / op payload = _load(rp) sections = _sections(payload) rows = [] total_blank = 0 total_stub = 0 for s in sections: md = str(s.get("markdown") or "") b, t = _count_table_issues(md) total_blank += b total_stub += t rows.append( { "section": s.get("title") or s.get("id") or "unknown", "blank_cells": b, "stub_tokens": t, "status": "INCOMPLETE_TABLE" if (b > 0 or t > 0) else "OK", } ) total_tables = max(1, len(rows)) fill_pct = round(max(0.0, 100.0 - ((total_blank / total_tables))), 2) incomplete_tables = [r["section"] for r in rows if r["status"] != "OK"] out = { "formula_id": "BLANK_CELL_AUDIT_V1", "enforcement_mode": "WARN_ONLY", "blank_fill_pct": fill_pct, "incomplete_tables": incomplete_tables, "summary": { "sections": len(rows), "blank_cells": total_blank, "stub_tokens": total_stub, "incomplete_tables": len(incomplete_tables), }, "tables": rows, "gate": "WARN" if total_blank > 0 or total_stub > 0 else "PASS", } op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())