#!/usr/bin/env python3 """ validate_number_provenance_v1.py ─────────────────────────────────────────────────────────────────────────────── LLM 자유도 측정기 (LFM-V1 P3 단계) operational_report.json 의 핵심 숫자들이 harness_context(GAS 산출값)로 역추적 가능한지 검사한다. 측정 지표: llm_freedom_pct = ungrounded_number_count / total_tracked_numbers * 100 목표: llm_freedom_pct = 0.0 (측정값) 검사 방식: (A) harness_grounded: 해당 숫자 필드가 GatherTradingData.json의 _harness_context 또는 order_blueprint_json 에서 그대로 복사되어야 함. (B) formula_grounded: 해당 숫자가 spec/13_formula_registry.yaml 의 registered 공식 출력임을 formula_id 참조로 증빙해야 함. (C) ungrounded: harness 근거도 formula 근거도 없는 경우 → LLM 자유계산 의심. 또한 prompts/analysis_prompt.md 의 자유도 위험 구간을 스캔해 "harness 결측 시 LLM 직접계산 허용" 문구를 적발한다. 출력: Temp/llm_freedom_v1.json 사용법: python tools/validate_number_provenance_v1.py python tools/validate_number_provenance_v1.py --strict """ from __future__ import annotations import json import argparse import re import sys from pathlib import Path import yaml ROOT = Path(__file__).resolve().parent.parent REPORT_JSON = ROOT / "Temp" / "operational_report.json" GATHER_JSON = ROOT / "GatherTradingData.json" PROMPT_MD = ROOT / "prompts" / "analysis_prompt.md" SPEC_13 = ROOT / "spec" / "13_formula_registry.yaml" OUTPUT = ROOT / "Temp" / "llm_freedom_v1.json" if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"): sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1) def load_json(p: Path) -> dict | list: if not p.exists(): return {} return json.loads(p.read_text(encoding="utf-8")) def _flatten(obj, prefix="") -> dict: """JSON 객체를 평탄화 → {path: value} dict.""" items: dict = {} if isinstance(obj, dict): for k, v in obj.items(): key = f"{prefix}.{k}" if prefix else k if isinstance(v, (dict, list)): items.update(_flatten(v, key)) else: items[key] = v elif isinstance(obj, list): for i, v in enumerate(obj): key = f"{prefix}[{i}]" if isinstance(v, (dict, list)): items.update(_flatten(v, key)) else: items[key] = v return items # 핵심 가격·수량·현금 필드 (보고서에서 LLM이 "계산"할 위험이 있는 숫자) TRACKED_REPORT_FIELDS = [ # 현금 관련 "cash_recovery_plan_crdl", "QEH_AUDIT_BLOCK", "portfolio_structure_risks", # 판단 테이블 "final_judgment_table", "exec_safety_declaration", # 라우팅 "routing_serving_trace", ] REQUIRED_PROVENANCE_FIELDS = [ "source_path", "json_pointer", "formula_id", "input_hash", "freshness_status", ] # Deterministic code-derived sections may contain transformed numbers # that are not copied 1:1 from harness context. DERIVED_SAFE_SECTIONS = { "cash_recovery_plan_crdl", } # harness_context에서 직접 복사되어야 하는 필드 명칭 패턴 HARNESS_GROUNDED_PATTERNS = [ r"settlement_cash_d2_krw", r"total_heat_pct", r"cash_shortfall_min_krw", r"cash_floor_status", r"buy_power_krw", r"goal_achievement_pct", r"portfolio_health_score", r"behavioral_coverage_pct", # BCH-V1 신규 ] # 보고서 숫자 중 LLM 계산 의심 패턴 (특정 숫자가 harness에 없는데 보고서에 나타나면) LLM_SUSPICIOUS_PATTERNS = [ r"약\s*[\d,]+\s*원", # "약 X원" — LLM 추정 서술 r"예상\s*[\d,]+\s*원", # "예상 X원" r"합산.*?[\d,]+\s*원", # "합산 X원" r"조합으로.*?[\d,]+\s*원", # "조합으로 약 X원" ] # analysis_prompt.md 내 자유도 위험 구간 패턴 PROMPT_FREEDOM_PATTERNS = [ # "직접 계산한다" — 허용 행위 (금지 선언 제외) (r"직접\s*계산한다(?!.*금지)(?!.*위반)", "LLM 직접계산 허용 문구 — HS011 잠재 위반"), # "spec 규칙에 따라 직접 계산" — 금지 선언 제외 (r"spec\s*규칙에\s*따라\s*직접\s*계산(?!.*금지)(?!.*위반)", "spec 따라 직접계산 — harness 결측 시 자유도 발생"), # "legacy fallback" — LLM 계산 허용 명시 (r"legacy\s*fallback.*직접\s*계산", "legacy fallback LLM 직접계산 허용"), # harness 결측 → 직접계산 허용 (금지 선언 제외) (r"harness.*missing.*직접\s*계산(?!.*금지)", "harness 결측 → 직접계산 fallback"), ] def scan_prompt_freedom(prompt_text: str) -> list[dict]: """analysis_prompt.md에서 LLM 자유도 위험 구간 스캔.""" findings = [] lines = prompt_text.split("\n") for i, line in enumerate(lines, 1): for pattern, desc in PROMPT_FREEDOM_PATTERNS: if re.search(pattern, line): findings.append({ "line": i, "text": line.strip()[:120], "risk": desc, "recommendation": "DATA_MISSING 강제 처리로 대체 — 직접계산 제거", }) return findings def _is_ignorable_number(section: str, value: int) -> bool: # Ignore date-like values and ticker-like identifiers for hallucination checks. if 19000101 <= value <= 20991231: return True if section in {"final_judgment_table", "watch_breakout_gate"} and 100000 <= value <= 999999: return True return False def check_harness_grounding(gather_data: dict, report_data: dict) -> list[dict]: """보고서 핵심 숫자가 harness_context에서 추적되는지 확인.""" ungrounded = [] grounded = [] hctx = gather_data.get("data", {}).get("_harness_context", {}) or {} apex = gather_data.get("hApex", {}) or {} merged = {} if isinstance(hctx, dict): merged.update(hctx) if isinstance(apex, dict): merged.update(apex) source_values = set() for v in _flatten(merged).values(): if v is not None: source_values.add(str(v).replace(",", "")) # Include deterministic Temp artifacts to avoid false positives for code-derived metrics. temp_dir = ROOT / "Temp" if temp_dir.exists(): for p in temp_dir.glob("*.json"): try: payload = load_json(p) for v in _flatten(payload).values(): if v is not None: source_values.add(str(v).replace(",", "")) except Exception: continue # 보고서 섹션별 핵심 숫자 추출 sections = report_data.get("sections", []) for sec in sections: if sec.get("name") not in TRACKED_REPORT_FIELDS: continue if sec.get("name") in DERIVED_SAFE_SECTIONS: continue md = sec.get("markdown", "") provenance = sec.get("numeric_provenance") if isinstance(sec.get("numeric_provenance"), dict) else {} has_required_provenance = all(str(provenance.get(key) or "").strip() for key in REQUIRED_PROVENANCE_FIELDS) # 한국 숫자 패턴 추출 (예: 36,092,555) nums = re.findall(r"[\d]{1,3}(?:,[\d]{3})+", md) for num in nums: clean = num.replace(",", "") if not clean.isdigit(): continue val = int(clean) if val < 100: # 날짜·비율 등 소수 무시 continue if _is_ignorable_number(str(sec.get("name") or ""), val): continue if has_required_provenance: grounded.append({ "section": sec["name"], "value": val, "source": "numeric_provenance", "formula_id": provenance.get("formula_id"), "source_path": provenance.get("source_path"), "json_pointer": provenance.get("json_pointer"), "input_hash": provenance.get("input_hash"), "freshness_status": provenance.get("freshness_status"), }) continue # harness_context에 같은 값이 있는지 확인 if str(val) in source_values or num.replace(",", "") in source_values: grounded.append({"section": sec["name"], "value": val, "source": "harness_context"}) else: # LLM 자유계산 의심 — harness에 없는 구체적 금액 ungrounded.append({ "section": sec["name"], "value": val, "risk": "UNGROUNDED_NUMBER", "note": f"harness_context에서 {val:,}원 미발견 — LLM 계산 또는 정상 보고서 숫자", }) return grounded, ungrounded def check_formula_grounding(report_data: dict) -> list[dict]: """보고서 판단 근거에 formula_id 참조가 있는지 확인.""" formula_violations = [] sections = report_data.get("sections", []) for sec in sections: md = sec.get("markdown", "") # LLM 추정 어휘 검사 for pattern in LLM_SUSPICIOUS_PATTERNS: matches = re.findall(pattern, md) if matches: formula_violations.append({ "section": sec.get("name"), "pattern": pattern, "matches": matches[:3], "risk": "POSSIBLE_LLM_COMPUTED_NUMBER", "note": "HS011 위반 가능 — harness 산출값이 아닌 LLM 추정 서술", }) return formula_violations def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--strict", action="store_true") ap.add_argument("--max-ungrounded", type=int, default=0) args = ap.parse_args() gather_data = load_json(GATHER_JSON) report_data = load_json(REPORT_JSON) prompt_text = PROMPT_MD.read_text(encoding="utf-8") if PROMPT_MD.exists() else "" sep = "=" * 70 print(sep) print(" LLM 자유도 측정기 (LFM-V1 P3)") print(sep) # (1) 프롬프트 자유도 위험 구간 스캔 prompt_risks = scan_prompt_freedom(prompt_text) # (2) harness 근거 확인 grounded, ungrounded = check_harness_grounding(gather_data, report_data) # (3) formula 근거 확인 formula_violations = check_formula_grounding(report_data) # llm_freedom_pct 계산 total_tracked = len(grounded) + len(ungrounded) # ungrounded 숫자 중 일부는 정상 보고서 숫자일 수 있으므로 # LLM 자유도는 prompt_risks + formula_violations 로 측정 freedom_signals = len(prompt_risks) + len(formula_violations) total_signals = max(freedom_signals + 10, 10) # 분모 최소 10 보정 llm_freedom_pct = round(freedom_signals / total_signals * 100, 2) print(f"\n [프롬프트 자유도 위험 구간]: {len(prompt_risks)}건") for r in prompt_risks[:5]: print(f" line {r['line']}: {r['text'][:80]}") print(f" → {r['risk']}") print(f"\n [harness 근거 미확인 숫자]: {len(ungrounded)}건") for u in ungrounded[:5]: print(f" [{u['section']}] {u['value']:,}원 — {u['note'][:80]}") print(f"\n [LLM 추정 어휘 감지]: {len(formula_violations)}건") for v in formula_violations[:3]: print(f" [{v['section']}] {v['risk']}: {v['matches'][:2]}") # 핵심 수치 요약 print(f"\n llm_freedom_pct (추정) = {llm_freedom_pct}%") print(f" (freedom_signals={freedom_signals} / total_signals={total_signals})") print("\n [개선 권고 — 자유도 폐쇄 조치]:") if prompt_risks: print(f" 1. prompts/analysis_prompt.md 의 자유도 위험 구간({len(prompt_risks)}건):") print(f" '직접 계산' 문구를 'DATA_MISSING — 하네스 업데이트 필요' 로 교체") print(f" → harness 결측 시 LLM 계산 금지 (HS011)") if formula_violations: print(f" 2. '약 N원' 등 LLM 추정 서술({len(formula_violations)}건)을 harness 필드 직접 인용으로 교체") print(f" 3. 목표: llm_freedom_pct = 0.0 (측정값)") ungrounded_count = len(ungrounded) is_ok = freedom_signals == 0 and ungrounded_count <= args.max_ungrounded result = { "status": "LFM_V1_OK" if is_ok else "LFM_V1_FAIL", "llm_freedom_pct": llm_freedom_pct, "freedom_signals_count": freedom_signals, "prompt_risks": prompt_risks, "ungrounded_numbers": ungrounded[:20], "grounded_numbers_count": len(grounded), "formula_violations": formula_violations[:10], "target": { "llm_freedom_pct_target": 0.0, "ungrounded_number_count_target": 0, "resolution": [ "prompts/analysis_prompt.md 의 '_harness_context 부재 시 직접 계산' 문구 제거", "하네스 미산출 영역 → DATA_MISSING 강제 표기 (HS011)", "LLM 내러티브 완화 어휘 → INVALID_SOFTENING 차단 (LLM_NARRATIVE_TEMPLATE_LOCK_V1 확장)", ], }, } OUTPUT.parent.mkdir(parents=True, exist_ok=True) OUTPUT.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(f"\n → 결과 저장: {OUTPUT}") print(f" {result['status']}\n") if args.strict and not is_ok: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())