#!/usr/bin/env python3 """ validate_calibration_registry_v1.py ─────────────────────────────────────────────────────────────────────────────── 임계값 보정 레지스트리 검증기 (CALIB-V1 P2 단계) spec/calibration_registry.yaml 에 등록된 임계값들의 정직성을 검증한다. 판정: CALIBRATION_REGISTRY_OK ← 모든 체크 통과 CALIBRATION_REGISTRY_WARN ← 미보정(PROVISIONAL/EXPERT_PRIOR)이 있지만 패스 CALIBRATION_REGISTRY_FAIL ← overclaimed 또는 미등록 임계값 발견 검사 항목: (1) OVERCLAIMED_CALIBRATION: source=CALIBRATED 이면서 sample_n < 30 (2) WARN 원장: source=PROVISIONAL 또는 EXPERT_PRIOR → 경고 기록 (3) 통계 요약: 전체 임계값 중 CALIBRATED / PROVISIONAL / EXPERT_PRIOR 비율 출력: Temp/calibration_registry_v1.json 사용법: python tools/validate_calibration_registry_v1.py """ from __future__ import annotations import json import re import sys from pathlib import Path import yaml ROOT = Path(__file__).resolve().parent.parent REGISTRY = ROOT / "spec" / "calibration_registry.yaml" OUTPUT = ROOT / "Temp" / "calibration_registry_v1.json" # 스캔 대상 핫존 — 임계값이 집중된 함수 구역 HOT_ZONES = [ ("gas_apex_alpha_watch.gs", 320, 415, "ANTI_LATE_ENTRY_GATE_V2"), ("gas_data_feed.gs", 2096, 2125, "DYNAMIC_HEAT_GATE + CASH_FLOOR_BY_MRS"), ("gas_data_feed.gs", 3431, 3460, "REGIME_SCALE + CASH_UPLIFT"), ("gas_data_feed.gs", 4457, 4492, "POSITION_COUNT + DRAWDOWN_GUARD"), ("gas_data_feed.gs", 4691, 4705, "CASH_FLOOR"), ("gas_data_feed.gs", 3840, 3935, "SEMICONDUCTOR_CLUSTER_GATE + LEADER_CAP"), # 업데이트된 범위 ("gas_data_feed.gs", 3754, 3835, "LEADER_POSITION_WEIGHT_CAP_V1"), ("gas_data_feed.gs", 8780, 8790, "DISTRIBUTION_SELL_DETECTOR_V1 thresholds"), ("gas_data_feed.gs", 6658, 6710, "BREAKOUT_QUALITY_GATE_V2 thresholds"), ("gas_data_feed.gs", 6707, 6775, "ANTI_WHIPSAW_GATE_V1 thresholds"), ("tools/build_smart_cash_recovery_v4.py", 140, 165, "SCR_V4 value_damage thresholds"), ("tools/build_rebound_sell_efficiency_v1.py", 60, 105, "REBOUND_SELL_EFFICIENCY coefficients"), ] # 임계값으로 볼 수 있는 패턴: >= / <= / === / > / < 뒤에 오는 수치 _THRESHOLD_RE = re.compile( r"(?:>=|<=|===|!==|>(?!=)|<(?!=)|[=!]=)\s*(\d+(?:\.\d+)?)\b" ) # 의미없는 작은 수치 제외 (0, 1 은 인덱스/불리언으로 자주 쓰임) _MIN_VALUE = 1.5 def _scan_hot_zones(registered_locations: set[tuple[str, int]]) -> list[dict]: """핫존 파일 구역을 스캔해 레지스트리 미등록 수치 상수를 적발한다. 줄번호 ±2 허용오차 — 소스 수정/포맷팅으로 인한 ±1 오차를 수용한다. """ TOLERANCE = 2 # 파일별 등록 줄번호 집합 구성 (빠른 조회) reg_by_file: dict[str, set[int]] = {} for (fname, lno) in registered_locations: reg_by_file.setdefault(fname, set()).add(lno) def _is_registered(fname: str, lineno: int) -> bool: reg_lines = reg_by_file.get(fname, set()) return any(abs(lineno - r) <= TOLERANCE for r in reg_lines) # 문자열 리터럴 내 숫자를 제거 — push('...>=3%') 같은 디버그 메시지 오탐 방지 _STR_LITERAL_RE = re.compile(r"'[^']*'|\"[^\"]*\"") unregistered = [] for filename, start, end, zone_name in HOT_ZONES: filepath = ROOT / filename if not filepath.exists(): continue lines = filepath.read_text(encoding="utf-8").splitlines() for lineno in range(start, min(end + 1, len(lines) + 1)): line = lines[lineno - 1] # 주석 제거, 문자열 리터럴 내 내용 제거 (오탐 방지) clean = re.sub(r"//.*$|#.*$", "", line) clean = _STR_LITERAL_RE.sub("''", clean).strip() if not clean: continue for m in _THRESHOLD_RE.finditer(clean): val = float(m.group(1)) if val < _MIN_VALUE: continue if not _is_registered(filename, lineno): unregistered.append({ "file": filename, "line": lineno, "value": val, "zone": zone_name, "code": clean[:100], "violation": f"UNREGISTERED_THRESHOLD: {filename}:{lineno} 값={val} — calibration_registry.yaml 미등록", }) return unregistered if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"): sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1) def main() -> int: strict = "--strict" in sys.argv if not REGISTRY.exists(): print("CALIBRATION_REGISTRY_FAIL") print(f" - MISSING: {REGISTRY}") return 1 data = yaml.safe_load(REGISTRY.read_text(encoding="utf-8")) thresholds = data.get("thresholds", []) policy = data.get("calibration_policy", {}) # 등록된 (파일명, 줄번호) 집합 구성 registered_locations: set[tuple[str, int]] = set() for t in thresholds: for loc_field in ("gs_location", "py_location"): loc = t.get(loc_field) if not loc: continue parts = str(loc).split(":") if len(parts) == 2 and parts[1].strip().isdigit(): registered_locations.add((parts[0].strip(), int(parts[1].strip()))) overclaimed: list[dict] = [] provisional_warn: list[dict] = [] expert_prior_warn: list[dict] = [] spec_derived: list[dict] = [] calibrated: list[dict] = [] for t in thresholds: tid = t.get("id", "?") source = str(t.get("source", "EXPERT_PRIOR")) sample = int(t.get("sample_n", 0) or 0) if source == "CALIBRATED": if sample < 30: overclaimed.append({ "id": tid, "source": source, "sample_n": sample, "formula": t.get("owner_formula"), "violation": "OVERCLAIMED_CALIBRATION: source=CALIBRATED 이면서 sample_n < 30", }) else: calibrated.append({"id": tid, "sample_n": sample}) elif source == "PROVISIONAL": provisional_warn.append({"id": tid, "sample_n": sample, "formula": t.get("owner_formula")}) elif source == "SPEC_DERIVED": spec_derived.append({"id": tid}) else: # EXPERT_PRIOR (default) tc = str(t.get("threshold_class", "standard")).lower() expert_prior_warn.append({ "id": tid, "formula": t.get("owner_formula"), "threshold_class": tc, }) # 핫존 미등록 상수 스캔 unregistered = _scan_hot_zones(registered_locations) # live_critical expert_prior: threshold_class == 'live_critical' AND source == EXPERT_PRIOR # 기본값 'standard'인 경우 capped_informational로 간주 (월별 보정 대상) live_critical_ep = [t for t in expert_prior_warn if t.get("threshold_class") == "live_critical"] total = len(thresholds) overclaimed_count = len(overclaimed) unregistered_count = len(unregistered) live_critical_ep_count = len(live_critical_ep) if overclaimed_count > 0 or unregistered_count > 0 or live_critical_ep_count > 0: status = "CALIBRATION_REGISTRY_FAIL" elif len(expert_prior_warn) > 0 or len(provisional_warn) > 0: status = "CALIBRATION_REGISTRY_WARN" else: status = "CALIBRATION_REGISTRY_OK" result = { "status": status, "total_thresholds": total, "calibrated_count": len(calibrated), "spec_derived_count": len(spec_derived), "provisional_count": len(provisional_warn), "expert_prior_count": len(expert_prior_warn), "live_critical_expert_prior_count": live_critical_ep_count, "overclaimed_count": overclaimed_count, "unregistered_threshold_count": unregistered_count, "calibration_rate_pct": round(len(calibrated) / total * 100, 1) if total else 0, "overclaimed": overclaimed, "unregistered": unregistered, "provisional_warn": provisional_warn, "expert_prior_warn": expert_prior_warn, "policy_note": policy.get("current_status_2026_05_30", ""), } OUTPUT.parent.mkdir(parents=True, exist_ok=True) OUTPUT.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") sep = "=" * 70 print(sep) print(" 임계값 보정 레지스트리 검증기 (CALIB-V1)") print(sep) print(f"\n 전체 임계값: {total}") print(f" CALIBRATED (실측보정): {len(calibrated):3d} ({result['calibration_rate_pct']:.1f}%)") print(f" SPEC_DERIVED: {len(spec_derived):3d}") print(f" PROVISIONAL (예비): {len(provisional_warn):3d}") print(f" EXPERT_PRIOR (미보정): {len(expert_prior_warn):3d} ← 월별 보정 대상 (live_critical={live_critical_ep_count})") print(f" OVERCLAIMED (위장): {overclaimed_count:3d} ← {'FAIL' if overclaimed_count else 'OK'}") print(f" UNREGISTERED (미등록): {unregistered_count:3d} ← {'FAIL' if unregistered_count else 'OK'}") if overclaimed: print("\n [OVERCLAIMED_CALIBRATION] — source=CALIBRATED 이면서 sample_n<30:") for v in overclaimed: print(f" {v['id']}: {v['violation']}") if unregistered: print(f"\n [UNREGISTERED_THRESHOLD] — 핫존에서 발견된 미등록 상수 ({unregistered_count}건):") # 파일별로 묶어서 출력 by_zone: dict[str, list] = {} for u in unregistered: by_zone.setdefault(u["zone"], []).append(u) for zone, items in by_zone.items(): print(f" [{zone}] {len(items)}건:") for u in items[:5]: print(f" {u['file']}:{u['line']} 값={u['value']} 코드: {u['code'][:60]}") if len(items) > 5: print(f" ... 외 {len(items)-5}건") print(" → spec/calibration_registry.yaml 에 등록 후 source/sample_n 태깅 필요") print(f"\n ⚠ 미보정 임계값 {len(expert_prior_warn)}개 (EXPERT_PRIOR) — 보정 우선순위:") priority = [ "ALEG_V2_GATE1_BLOCK_PCT (뒷박 3% 임계)", "ALEG_V2_GATE2_BLOCK_PCT (5일 8% 임계)", "DSD_V1_CONFIRMED_WS (설거지 5.0 임계)", "K2_SPLIT_RATIO (50/50 분할)", "K2_REBOUND_TRIGGER_ATR_MULT (0.5×ATR)", ] for p in priority: print(f" → {p}") print(f"\n → 결과 저장: {OUTPUT}") print(f" {status}\n") if strict and (overclaimed_count > 0 or unregistered_count > 0 or live_critical_ep_count > 0): return 1 return 0 if __name__ == "__main__": raise SystemExit(main())