캘리브레이션 거버넌스 도구 + WBS-7.1/7.2 실증 격차 가시화

캘리브레이션 백로그 → 우선순위 → 검토리포트 → 승인목록 → 결정초안으로
이어지는 임계값 보정 거버넌스 파이프라인을 추가하고, 2026-06-21
비판적 리뷰에서 발견한 두 가지 stale-수치 문제를 도구 차원에서 해소한다.

- registry_health(): 190여 개 임계값의 source별(SPEC_DERIVED/EXPERT_PRIOR/
  PROVISIONAL/CALIBRATED) 분포를 매 실행마다 자동 집계 — 수동 grep 불필요
- live_t5_status(): T+5 적중률을 하드코딩(35.86 리터럴) 대신
  Temp/prediction_accuracy_harness_v2.json에서 항상 최신값으로 읽음
- spec/calibration_registry.yaml: SEMI_CLUSTER_CAP_RISK_OFF 중복 id로
  인한 조용한 무시 버그 수정(SEMI_CLUSTER_CAP_RISK_OFF_MWA로 분리)
- spec/27_bch_calibration_runbook.yaml: current_status_2026_06_21 블록
  신설(단일 진실원천), 기존 05-30 스냅샷은 "역사적, 현재로 인용 금지"로 명시
This commit is contained in:
2026-06-21 20:07:32 +09:00
parent f99f9821d2
commit ee4d1fdab8
8 changed files with 855 additions and 42 deletions
+136
View File
@@ -0,0 +1,136 @@
#!/usr/bin/env python3
"""
build_calibration_approval_list_v1.py
───────────────────────────────────────────────────────────────────────────────
calibration_review_report_v1.json을 읽어 PROVISIONAL 승격 승인 리스트를 만든다.
목적:
- source=PROVISIONAL 인 임계값을 별도 승인 대상 리스트로 분리
- reviewer가 바로 볼 수 있는 Markdown/JSON 산출물 생성
- PROVISIONAL 승격과 provisional review를 분리해 운영 책임을 명확화
출력:
Temp/calibration_approval_list_v1.json
Temp/calibration_approval_list_v1.md
사용법:
python tools/build_calibration_approval_list_v1.py
"""
from __future__ import annotations
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parent.parent
REVIEW = ROOT / "Temp" / "calibration_review_report_v1.json"
OUT_JSON = ROOT / "Temp" / "calibration_approval_list_v1.json"
OUT_MD = ROOT / "Temp" / "calibration_approval_list_v1.md"
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return data if isinstance(data, dict) else {}
def _table(rows: list[dict[str, Any]], keys: list[str], max_rows: int = 25) -> str:
if not rows:
return "_데이터 없음_"
header = "| " + " | ".join(keys) + " |"
sep = "| " + " | ".join(["---"] * len(keys)) + " |"
body = []
for row in rows[:max_rows]:
body.append("| " + " | ".join(str(row.get(k, "")).replace("|", "") for k in keys) + " |")
suffix = f"\n\n_...총 {len(rows)}행 중 {max_rows}행 표시_" if len(rows) > max_rows else ""
return "\n".join([header, sep, *body]) + suffix
def main() -> int:
review = _load_json(REVIEW)
rows = review.get("review_rows") if isinstance(review.get("review_rows"), list) else []
approval_candidates: list[dict[str, Any]] = []
provisional_review_candidates: list[dict[str, Any]] = []
for row in rows:
if not isinstance(row, dict):
continue
source = str(row.get("source") or "")
readiness = str(row.get("readiness") or "")
sample_n = int(row.get("sample_n") or 0)
base = {
"id": row.get("id", ""),
"source": source,
"sample_n": sample_n,
"value": row.get("value"),
"unit": row.get("unit", ""),
"owner_formula": row.get("owner_formula", ""),
"readiness": readiness,
"reason": row.get("reason", ""),
}
if source == "PROVISIONAL":
approval_candidates.append(base)
elif readiness == "PROVISIONAL_CANDIDATE":
provisional_review_candidates.append(base)
approval_candidates.sort(key=lambda item: (-int(item.get("sample_n") or 0), str(item.get("id") or "")))
provisional_review_candidates.sort(key=lambda item: (-int(item.get("sample_n") or 0), str(item.get("id") or "")))
report = {
"formula_id": "CALIBRATION_APPROVAL_LIST_V1",
"generated_at": datetime.now(timezone.utc).isoformat(),
"review_report_path": str(REVIEW),
"approval_candidate_count": len(approval_candidates),
"provisional_review_candidate_count": len(provisional_review_candidates),
"approval_candidates": approval_candidates,
"provisional_review_candidates": provisional_review_candidates,
}
OUT_JSON.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
md_lines = [
"# Calibration Approval List",
"",
"## Summary",
"",
f"- approval candidates: {len(approval_candidates)}",
f"- provisional review candidates: {len(provisional_review_candidates)}",
"",
"## Approval Candidates",
"",
_table(approval_candidates, ["id", "source", "sample_n", "value", "unit", "owner_formula", "readiness", "reason"]),
"",
"## Provisional Review Candidates",
"",
_table(provisional_review_candidates, ["id", "source", "sample_n", "value", "unit", "owner_formula", "readiness", "reason"]),
"",
"## Evidence",
"",
f"- review report: {REVIEW}",
]
OUT_MD.write_text("\n".join(md_lines), encoding="utf-8")
print(json.dumps({
"formula_id": report["formula_id"],
"gate": "PASS" if approval_candidates else "WARN",
"approval_candidate_count": len(approval_candidates),
"provisional_review_candidate_count": len(provisional_review_candidates),
"json_path": str(OUT_JSON),
"md_path": str(OUT_MD),
}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
"""
build_calibration_decision_draft_v1.py
───────────────────────────────────────────────────────────────────────────────
calibration_review_report_v1.json / calibration_approval_list_v1.json을 바탕으로
운영 승인 초안(APPROVE / HOLD / REJECT)을 만든다.
목적:
- 사람 검토 전 단계에서 결정 초안을 자동 생성
- source=PROVISIONAL은 원칙적으로 APPROVE
- PROVISIONAL_CANDIDATE는 HOLD
- 나머지는 REJECT 또는 HOLD로 사유를 명시
출력:
Temp/calibration_decision_draft_v1.json
Temp/calibration_decision_draft_v1.md
사용법:
python tools/build_calibration_decision_draft_v1.py
"""
from __future__ import annotations
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parent.parent
REVIEW = ROOT / "Temp" / "calibration_review_report_v1.json"
APPROVAL = ROOT / "Temp" / "calibration_approval_list_v1.json"
OUT_JSON = ROOT / "Temp" / "calibration_decision_draft_v1.json"
OUT_MD = ROOT / "Temp" / "calibration_decision_draft_v1.md"
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return data if isinstance(data, dict) else {}
def _table(rows: list[dict[str, Any]], keys: list[str], max_rows: int = 25) -> str:
if not rows:
return "_데이터 없음_"
header = "| " + " | ".join(keys) + " |"
sep = "| " + " | ".join(["---"] * len(keys)) + " |"
body = []
for row in rows[:max_rows]:
body.append("| " + " | ".join(str(row.get(k, "")).replace("|", "") for k in keys) + " |")
suffix = f"\n\n_...총 {len(rows)}행 중 {max_rows}행 표시_" if len(rows) > max_rows else ""
return "\n".join([header, sep, *body]) + suffix
def _decide(row: dict[str, Any]) -> tuple[str, str]:
source = str(row.get("source") or "")
readiness = str(row.get("readiness") or "")
sample_n = int(row.get("sample_n") or 0)
if source == "PROVISIONAL" and sample_n >= 30:
return "APPROVE", "source=PROVISIONAL and sample_n>=30"
if source == "PROVISIONAL":
return "APPROVE", "source=PROVISIONAL"
if readiness == "PROVISIONAL_CANDIDATE":
return "HOLD", "Needs provisional review"
if sample_n >= 10:
return "HOLD", "Sample present but not provisional"
return "REJECT", "Insufficient evidence"
def main() -> int:
review = _load_json(REVIEW)
approval = _load_json(APPROVAL)
review_rows = review.get("review_rows") if isinstance(review.get("review_rows"), list) else []
decisions: list[dict[str, Any]] = []
summary = {"APPROVE": 0, "HOLD": 0, "REJECT": 0}
for row in review_rows:
if not isinstance(row, dict):
continue
decision, reason = _decide(row)
item = {
"id": row.get("id", ""),
"source": row.get("source", ""),
"sample_n": int(row.get("sample_n") or 0),
"value": row.get("value"),
"unit": row.get("unit", ""),
"owner_formula": row.get("owner_formula", ""),
"readiness": row.get("readiness", ""),
"decision": decision,
"reason": reason,
}
decisions.append(item)
summary[decision] += 1
decisions.sort(key=lambda item: ({"APPROVE": 0, "HOLD": 1, "REJECT": 2}.get(str(item.get("decision") or ""), 3), -int(item.get("sample_n") or 0), str(item.get("id") or "")))
report = {
"formula_id": "CALIBRATION_DECISION_DRAFT_V1",
"generated_at": datetime.now(timezone.utc).isoformat(),
"review_report_path": str(REVIEW),
"approval_list_path": str(APPROVAL),
"summary": summary,
"decision_count": len(decisions),
"decisions": decisions,
"approval_candidate_count": int(approval.get("approval_candidate_count") or 0),
}
OUT_JSON.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
md_lines = [
"# Calibration Decision Draft",
"",
"## Summary",
"",
f"- APPROVE: {summary['APPROVE']}",
f"- HOLD: {summary['HOLD']}",
f"- REJECT: {summary['REJECT']}",
f"- decision_count: {len(decisions)}",
"",
"## Decision Table",
"",
_table(decisions, ["id", "source", "sample_n", "decision", "reason", "owner_formula", "readiness"]),
"",
"## Evidence",
"",
f"- review report: {REVIEW}",
f"- approval list: {APPROVAL}",
]
OUT_MD.write_text("\n".join(md_lines), encoding="utf-8")
print(json.dumps({
"formula_id": report["formula_id"],
"gate": "PASS" if summary["APPROVE"] else "WARN",
"approve_count": summary["APPROVE"],
"hold_count": summary["HOLD"],
"reject_count": summary["REJECT"],
"json_path": str(OUT_JSON),
"md_path": str(OUT_MD),
}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+110 -39
View File
@@ -29,6 +29,41 @@ ROOT = Path(__file__).resolve().parent.parent
AFL = ROOT / "Temp" / "alpha_feedback_loop_v2.json"
REG = ROOT / "spec" / "calibration_registry.yaml"
OUTPUT = ROOT / "Temp" / "calibration_priority_v1.json"
PREDICTION_ACCURACY = ROOT / "Temp" / "prediction_accuracy_harness_v2.json"
def registry_source_breakdown(reg_index: dict[str, dict]) -> dict:
"""WBS-7.1(2026-06-21) — calibration_registry.yaml 전체의 source별 분포를 매 실행마다
집계해 'CALIBRATED 비율이 실제로 몇 %인가'를 사람이 grep으로 직접 세지 않아도
항상 최신 상태로 노출한다(2026-06-21 비판적 리뷰 0c절에서 0/190 발견 당시 수동 집계 필요했던 문제 해소)."""
counts: dict[str, int] = {"SPEC_DERIVED": 0, "EXPERT_PRIOR": 0, "PROVISIONAL": 0, "CALIBRATED": 0}
for entry in reg_index.values():
source = str(entry.get("source", "")).upper()
if source in counts:
counts[source] += 1
total = sum(counts.values())
return {
"total_thresholds": total,
"counts": counts,
"calibrated_pct": round(100.0 * counts["CALIBRATED"] / total, 2) if total else 0.0,
"unvalidated_pct": round(100.0 * (counts["SPEC_DERIVED"] + counts["EXPERT_PRIOR"]) / total, 2) if total else 0.0,
}
def live_t5_status() -> dict:
"""WBS-7.2/7.1(2026-06-21) — T+5 수치를 하드코딩하지 않고 항상 최신 산출물에서 읽는다.
Temp/prediction_accuracy_harness_v2.json이 없거나 sample=0이면 정직하게 DATA_GATED로 보고한다."""
if not PREDICTION_ACCURACY.exists():
return {"status": "ARTIFACT_MISSING", "t5_sample": 0, "t5_match_rate_pct": None}
data = load_json(PREDICTION_ACCURACY)
t5_sample = int(data.get("t5_sample") or 0)
t5_rate = data.get("t5_op_rate")
return {
"status": "DATA_GATED" if t5_sample == 0 else "OK",
"as_of_date": data.get("as_of_date"),
"t5_sample": t5_sample,
"t5_match_rate_pct": t5_rate,
}
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
@@ -90,6 +125,42 @@ def load_registry(p: Path) -> dict[str, dict]:
return {t["id"]: t for t in data.get("thresholds", []) if "id" in t}
def _priority_from_registry_entry(entry: dict, source_tag: str, urgency_bias: int) -> dict:
sample_n = int(entry.get("sample_n", 0) or 0)
source = str(entry.get("source", "EXPERT_PRIOR"))
threshold_class = str(entry.get("threshold_class", "standard"))
urgency = urgency_bias
if source == "EXPERT_PRIOR":
urgency += 10
if source == "PROVISIONAL":
urgency += 20
if threshold_class == "live_critical":
urgency += 15
if sample_n == 0:
urgency += 5
if sample_n > 0:
urgency += max(0, 30 - sample_n)
return {
"calibration_id": entry.get("id", ""),
"current_value": entry.get("value"),
"owner_formula": entry.get("owner_formula", ""),
"source": source,
"sample_n": sample_n,
"linked_factor": source_tag,
"alpha_action": "registry_review",
"urgency_score": urgency,
"calibration_path": (
(
"표본 30건 이상 확보 후 PROVISIONAL 승격 → "
if sample_n >= 30
else f"표본 {30 - sample_n}건 추가 수집 후 PROVISIONAL 승격 → "
)
+ "실측 T+5 승률 기반 최적값 backtest → CALIBRATED 확정"
),
"rationale": f"source={source}, class={threshold_class}, sample_n={sample_n}",
}
def main() -> int:
afl_data = load_json(AFL)
reg_index = load_registry(REG)
@@ -112,48 +183,32 @@ def main() -> int:
priority_list: list[dict] = []
for adj in adjustments:
factor = adj.get("factor", "")
action = adj.get("action", "")
rationale = adj.get("rationale", "")
reg_ids = FACTOR_TO_REGISTRY.get(factor, [])
factor = str(adj.get("factor", ""))
action = str(adj.get("action", ""))
rationale = str(adj.get("rationale", ""))
reg_ids = FACTOR_TO_REGISTRY.get(factor, [])
for rid in reg_ids:
reg_entry = reg_index.get(rid)
if not reg_entry:
continue
source = reg_entry.get("source", "EXPERT_PRIOR")
sample_n = int(reg_entry.get("sample_n", 0) or 0)
value = reg_entry.get("value")
formula = reg_entry.get("owner_formula", "")
item = _priority_from_registry_entry(reg_entry, factor, miss5_count if factor == "passive_signal_quality" else 0)
item["alpha_action"] = action or "feedback_review"
if rationale:
item["rationale"] = rationale[:200]
priority_list.append(item)
# 보정 우선도 점수: miss5_count 기여 + 미보정 가중
urgency = 0
if factor == "passive_signal_quality":
urgency += miss5_count # miss가 많을수록 높은 urgency
if source == "EXPERT_PRIOR":
urgency += 10
if sample_n == 0:
urgency += 5
priority_list.append({
"calibration_id": rid,
"current_value": value,
"owner_formula": formula,
"source": source,
"sample_n": sample_n,
"linked_factor": factor,
"alpha_action": action,
"urgency_score": urgency,
"calibration_path": (
(
"표본 30건 이상 확보 후 PROVISIONAL 승격 → "
if sample_n >= 30
else f"표본 {30 - sample_n}건 추가 수집 후 PROVISIONAL 승격 → "
)
+ "실측 T+5 승률 기반 최적값 backtest → CALIBRATED 확정"
),
"rationale": rationale[:200] if rationale else "",
})
if not priority_list:
# alpha_feedback_loop가 비어 있어도 registry 자체의 보정 debt를 추적할 수 있게 한다.
for reg_id, reg_entry in reg_index.items():
source = str(reg_entry.get("source", "EXPERT_PRIOR"))
if source not in {"EXPERT_PRIOR", "PROVISIONAL"}:
continue
tag = f"registry:{source.lower()}"
item = _priority_from_registry_entry(reg_entry, tag, 0)
if source == "PROVISIONAL":
item["urgency_score"] += 5
priority_list.append(item)
# 중복 제거 (같은 rid, 높은 urgency 유지)
seen: dict[str, dict] = {}
@@ -177,7 +232,19 @@ def main() -> int:
print(f" Step 2 (30건 후): ALEG_V2_GATE1_BLOCK_PCT 3.0% → 실측 최적값으로 PROVISIONAL 승격")
print(f" Step 3 (50건 후): DSD_V1 가중치 logistic regression 최적화")
print(f" Step 4 (100건 후): K2_SPLIT_RATIO backtest 비교 → CALIBRATED 확정")
print(f" miss5_count={miss5_count}건 → passive_signal_quality 개선이 T+5 35.86%→50%+ 핵심")
registry_health = registry_source_breakdown(reg_index)
t5_status = live_t5_status()
print(f"\n [캘리브레이션 레지스트리 건강도] (WBS-7.1)")
print(f" total={registry_health['total_thresholds']} {registry_health['counts']}")
print(f" CALIBRATED={registry_health['calibrated_pct']}% 미검증(SPEC_DERIVED+EXPERT_PRIOR)={registry_health['unvalidated_pct']}%")
if t5_status["status"] == "DATA_GATED":
print(f" miss5_count={miss5_count}건 → T+5 현재 DATA_GATED(sample=0) — passive_signal_quality 개선 영향은 표본 누적 후 측정 가능")
elif t5_status["status"] == "ARTIFACT_MISSING":
print(f" miss5_count={miss5_count}건 → T+5 산출물 없음(Temp/prediction_accuracy_harness_v2.json) — 먼저 생성 필요")
else:
print(f" miss5_count={miss5_count}건 → T+5={t5_status['t5_match_rate_pct']}% (as_of={t5_status.get('as_of_date')}) → passive_signal_quality 개선 핵심")
result = {
"status": "CALIBRATION_PRIORITY_OK",
@@ -191,10 +258,14 @@ def main() -> int:
"step3": "50건 후: DSD_V1 가중치 logistic regression 최적화",
"step4": "100건 후: K2_SPLIT_RATIO 30/70~60/40 backtest → CALIBRATED",
},
"priority_basis": "alpha_feedback_loop_v2" if adjustments else "registry_warning_fallback",
"registry_health": registry_health,
"target_improvement": {
"current_t5_pct": 35.86,
"t5_status": t5_status["status"],
"current_t5_pct": t5_status["t5_match_rate_pct"],
"t5_as_of_date": t5_status.get("as_of_date"),
"target_t5_pct": 55.0,
"key_lever": "passive_signal_quality (miss5_count=51건 개선)",
"key_lever": f"passive_signal_quality (miss5_count={miss5_count}건 개선)",
},
}
+205
View File
@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""
build_calibration_review_report_v1.py
───────────────────────────────────────────────────────────────────────────────
calibration_registry.yaml + calibration_priority_v1.json + calibration_change_ledger_v4.json
을 묶어 운영용 보정 리뷰 리포트를 만든다.
목적:
- PROVISIONAL / CALIBRATED 승격 후보를 사람이 읽을 수 있게 정리
- registry warning fallback 상태를 숨기지 않고 그대로 공시
- 월간 보정 운영에서 바로 참고 가능한 Markdown + JSON 산출물 생성
출력:
Temp/calibration_review_report_v1.json
Temp/calibration_review_report_v1.md
사용법:
python tools/build_calibration_review_report_v1.py
"""
from __future__ import annotations
import json
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import yaml
ROOT = Path(__file__).resolve().parent.parent
REGISTRY = ROOT / "spec" / "calibration_registry.yaml"
PRIORITY = ROOT / "Temp" / "calibration_priority_v1.json"
LEDGER = ROOT / "Temp" / "calibration_change_ledger_v4.json"
OUT_JSON = ROOT / "Temp" / "calibration_review_report_v1.json"
OUT_MD = ROOT / "Temp" / "calibration_review_report_v1.md"
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return data if isinstance(data, dict) else {}
def _load_registry(path: Path) -> list[dict[str, Any]]:
if not path.exists():
return []
data = yaml.safe_load(path.read_text(encoding="utf-8")) or {}
thresholds = data.get("thresholds", [])
return [t for t in thresholds if isinstance(t, dict)]
def _readiness(entry: dict[str, Any]) -> tuple[str, str]:
source = str(entry.get("source") or "EXPERT_PRIOR")
sample_n = int(entry.get("sample_n") or 0)
if source == "CALIBRATED":
return "CALIBRATED", "Already calibrated"
if source == "PROVISIONAL" and sample_n >= 30:
return "CALIBRATION_READY", "Ready for calibrated review"
if source == "PROVISIONAL":
return "PROVISIONAL_ACTIVE", "Provisional with live samples"
if sample_n >= 10:
return "PROVISIONAL_CANDIDATE", "Candidate for provisional review"
return "WATCH", "Keep under watch"
def _table(rows: list[dict[str, Any]], keys: list[str], max_rows: int = 25) -> str:
if not rows:
return "_데이터 없음_"
header = "| " + " | ".join(keys) + " |"
sep = "| " + " | ".join(["---"] * len(keys)) + " |"
body = []
for row in rows[:max_rows]:
body.append("| " + " | ".join(str(row.get(k, "")).replace("|", "") for k in keys) + " |")
suffix = f"\n\n_...총 {len(rows)}행 중 {max_rows}행 표시_" if len(rows) > max_rows else ""
return "\n".join([header, sep, *body]) + suffix
def main() -> int:
registry = _load_registry(REGISTRY)
priority = _load_json(PRIORITY)
ledger = _load_json(LEDGER)
source_counts: dict[str, int] = {}
readiness_counts: dict[str, int] = {}
reviewed_rows: list[dict[str, Any]] = []
for entry in registry:
source = str(entry.get("source") or "EXPERT_PRIOR")
source_counts[source] = source_counts.get(source, 0) + 1
readiness, reason = _readiness(entry)
readiness_counts[readiness] = readiness_counts.get(readiness, 0) + 1
if readiness in {"PROVISIONAL_CANDIDATE", "CALIBRATION_READY", "PROVISIONAL_ACTIVE"}:
reviewed_rows.append(
{
"id": entry.get("id", ""),
"source": source,
"sample_n": int(entry.get("sample_n") or 0),
"value": entry.get("value"),
"unit": entry.get("unit", ""),
"owner_formula": entry.get("owner_formula", ""),
"readiness": readiness,
"reason": reason,
"notes": str(entry.get("notes") or "")[:120],
}
)
priority_list = priority.get("priority_list") if isinstance(priority.get("priority_list"), list) else []
priority_rows = []
for item in priority_list[:20]:
if not isinstance(item, dict):
continue
priority_rows.append(
{
"calibration_id": item.get("calibration_id", ""),
"source": item.get("source", ""),
"sample_n": item.get("sample_n", 0),
"urgency_score": item.get("urgency_score", 0),
"linked_factor": item.get("linked_factor", ""),
"owner_formula": item.get("owner_formula", ""),
}
)
report = {
"formula_id": "CALIBRATION_REVIEW_REPORT_V1",
"generated_at": datetime.now(timezone.utc).isoformat(),
"registry_path": str(REGISTRY),
"priority_path": str(PRIORITY),
"ledger_path": str(LEDGER),
"summary": {
"total_thresholds": len(registry),
"source_counts": source_counts,
"readiness_counts": readiness_counts,
"priority_count": int(priority.get("priority_count") or len(priority_rows)),
"ledger_change_count": len(ledger.get("changes", [])) if isinstance(ledger.get("changes"), list) else 0,
"ledger_without_change_count": int(ledger.get("threshold_change_without_ledger_count") or 0),
},
"top_priority_rows": priority_rows,
"review_rows": reviewed_rows,
}
OUT_JSON.write_text(json.dumps(report, ensure_ascii=False, indent=2), encoding="utf-8")
md_lines = [
"# Calibration Review Report",
"",
"## Summary",
"",
f"- total thresholds: {report['summary']['total_thresholds']}",
f"- priority count: {report['summary']['priority_count']}",
f"- ledger change count: {report['summary']['ledger_change_count']}",
f"- ledger without change count: {report['summary']['ledger_without_change_count']}",
"",
"### Source Counts",
"",
_table(
[{"source": k, "count": v} for k, v in sorted(source_counts.items())],
["source", "count"],
max_rows=50,
),
"",
"### Readiness Counts",
"",
_table(
[{"readiness": k, "count": v} for k, v in sorted(readiness_counts.items())],
["readiness", "count"],
max_rows=50,
),
"",
"## Top Priority Rows",
"",
_table(priority_rows, ["calibration_id", "source", "sample_n", "urgency_score", "linked_factor", "owner_formula"]),
"",
"## Review Candidates",
"",
_table(reviewed_rows, ["id", "source", "sample_n", "value", "unit", "owner_formula", "readiness", "reason"]),
"",
"## Evidence",
"",
f"- registry: {REGISTRY}",
f"- priority: {PRIORITY}",
f"- ledger: {LEDGER}",
]
OUT_MD.write_text("\n".join(md_lines), encoding="utf-8")
print(json.dumps({
"formula_id": report["formula_id"],
"gate": "PASS" if reviewed_rows or priority_rows else "WARN",
"review_rows": len(reviewed_rows),
"priority_rows": len(priority_rows),
"json_path": str(OUT_JSON),
"md_path": str(OUT_MD),
}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())