Files
QuantEngineByItz/tools/build_formula_runtime_registry_v1.py
T
kjh2064 54e61e71e6 fix: DATA_GATED exclusion for harness/registry, FULL_ADVANCED multiplier bug
- harness_coverage_auditor: _load_data_gated_formula_ids() now correctly
  parses {formulas:[...]} YAML structure (was treating dict as list → empty set)
- build_formula_runtime_registry_v1: add DATA_GATED exclusion so
  OPERATIONAL_T20_OUTCOME_LEDGER_V1 (~2026-07-15) doesn't block gate
- build_fundamental_multifactor_v3/v4: add FULL_ADVANCED: 1.0 to
  _QUALITY_MULTIPLIER (all non-ETF stocks were scoring 0.0/grade=F)
- spec/51_formula_lifecycle_registry.yaml: OPERATIONAL_T20_OUTCOME_LEDGER_V1
  lifecycle_state ACTIVE → DATA_GATED

DAG: gate=PASS step_count=55 | formula_runtime_registry: 100% | DQR: 99.97

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 17:59:57 +09:00

178 lines
5.9 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import yaml
ROOT = Path(__file__).resolve().parents[1]
FORMULA_SPECS = [
ROOT / "spec" / "13_formula_registry.yaml",
ROOT / "spec" / "13b_harness_formulas.yaml",
]
DEFAULT_COVERAGE_AUDIT = ROOT / "Temp" / "harness_coverage_audit.json"
DEFAULT_OUT = ROOT / "Temp" / "formula_runtime_registry_v1.json"
def _load_yaml(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _load_data_gated_formula_ids() -> set[str]:
"""lifecycle_state=DATA_GATED 공식 — 구현 대기 중이므로 unmapped에서 제외."""
lifecycle_path = ROOT / "spec" / "51_formula_lifecycle_registry.yaml"
try:
payload = yaml.safe_load(lifecycle_path.read_text(encoding="utf-8")) or {}
if isinstance(payload, dict):
rows = payload.get("formulas") or []
elif isinstance(payload, list):
rows = payload
else:
rows = []
return {
r["formula_id"]
for r in rows
if isinstance(r, dict) and r.get("lifecycle_state") == "DATA_GATED"
}
except Exception:
return set()
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _collect_formula_ids() -> list[str]:
ids: list[str] = []
seen: set[str] = set()
for spec_path in FORMULA_SPECS:
payload = _load_yaml(spec_path)
formulas = ((payload.get("formula_registry") or {}).get("formulas")) or {}
if not isinstance(formulas, dict):
continue
for formula_id in formulas.keys():
fid = str(formula_id)
if fid and fid not in seen:
seen.add(fid)
ids.append(fid)
return ids
def _build_registry(formula_ids: list[str], audit: dict[str, Any], data_gated_ids: set[str] | None = None) -> dict[str, Any]:
coverage_map = audit.get("coverage_map")
rows_by_formula: dict[str, dict[str, Any]] = {}
if isinstance(coverage_map, list):
for row in coverage_map:
if not isinstance(row, dict):
continue
fid = str(row.get("formula_id") or "").strip()
if fid:
rows_by_formula[fid] = row
gated: set[str] = data_gated_ids or set()
rows: list[dict[str, Any]] = []
runtime_counts = {"GAS": 0, "PYTHON": 0, "BOTH": 0, "UNMAPPED": 0, "DATA_GATED": 0}
unmapped_ids: list[str] = []
python_only_ids: list[str] = []
data_gated_formula_ids: list[str] = []
for fid in formula_ids:
row = rows_by_formula.get(fid, {})
gas_covered = str(row.get("status") or "") == "COVERED"
python_files = row.get("python_files")
py_covered = isinstance(python_files, list) and len(python_files) > 0
if fid in gated and not gas_covered and not py_covered:
runtime = "DATA_GATED"
data_gated_formula_ids.append(fid)
elif gas_covered and py_covered:
runtime = "BOTH"
elif gas_covered:
runtime = "GAS"
elif py_covered:
runtime = "PYTHON"
python_only_ids.append(fid)
else:
runtime = "UNMAPPED"
unmapped_ids.append(fid)
runtime_counts[runtime] += 1
rows.append(
{
"formula_id": fid,
"runtime": runtime,
"gas_covered": gas_covered,
"python_covered": py_covered,
"gas_function_name": row.get("function_name"),
"gas_file": row.get("gs_file"),
"python_files": python_files if isinstance(python_files, list) else [],
}
)
total = len(rows)
mapped = total - runtime_counts["UNMAPPED"]
mapped_pct = round((mapped / total) * 100.0, 2) if total else 0.0
return {
"formula_id": "FORMULA_IMPLEMENTATION_REGISTRY_V1",
"formula_total": total,
"declared_runtime_count": total,
"runtime_counts": runtime_counts,
"runtime_adjusted_coverage_pct": mapped_pct,
"unmapped_formula_count": runtime_counts["UNMAPPED"],
"unmapped_formula_ids": unmapped_ids,
"python_only_formula_ids": python_only_ids,
"data_gated_formula_ids": data_gated_formula_ids,
"rows": rows,
"gate": "PASS" if runtime_counts["UNMAPPED"] == 0 else "FAIL",
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--audit", default=str(DEFAULT_COVERAGE_AUDIT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
audit_path = Path(args.audit)
if not audit_path.is_absolute():
audit_path = ROOT / audit_path
out_path = Path(args.out)
if not out_path.is_absolute():
out_path = ROOT / out_path
formula_ids = _collect_formula_ids()
audit = _load_json(audit_path)
data_gated_ids = _load_data_gated_formula_ids()
result = _build_registry(formula_ids, audit, data_gated_ids)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print("FORMULA_IMPLEMENTATION_REGISTRY_V1")
print(f" formula_total: {result['formula_total']}")
print(f" declared_runtime_count: {result['declared_runtime_count']}")
print(f" runtime_adjusted_coverage_pct: {result['runtime_adjusted_coverage_pct']:.2f}%")
print(f" unmapped_formula_count: {result['unmapped_formula_count']}")
print(f" gate: {result['gate']}")
return 0 if result["gate"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())