54e61e71e6
- harness_coverage_auditor: _load_data_gated_formula_ids() now correctly
parses {formulas:[...]} YAML structure (was treating dict as list → empty set)
- build_formula_runtime_registry_v1: add DATA_GATED exclusion so
OPERATIONAL_T20_OUTCOME_LEDGER_V1 (~2026-07-15) doesn't block gate
- build_fundamental_multifactor_v3/v4: add FULL_ADVANCED: 1.0 to
_QUALITY_MULTIPLIER (all non-ETF stocks were scoring 0.0/grade=F)
- spec/51_formula_lifecycle_registry.yaml: OPERATIONAL_T20_OUTCOME_LEDGER_V1
lifecycle_state ACTIVE → DATA_GATED
DAG: gate=PASS step_count=55 | formula_runtime_registry: 100% | DQR: 99.97
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
178 lines
5.9 KiB
Python
178 lines
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
FORMULA_SPECS = [
|
|
ROOT / "spec" / "13_formula_registry.yaml",
|
|
ROOT / "spec" / "13b_harness_formulas.yaml",
|
|
]
|
|
DEFAULT_COVERAGE_AUDIT = ROOT / "Temp" / "harness_coverage_audit.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "formula_runtime_registry_v1.json"
|
|
|
|
|
|
def _load_yaml(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
obj = yaml.safe_load(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return obj if isinstance(obj, dict) else {}
|
|
|
|
|
|
def _load_data_gated_formula_ids() -> set[str]:
|
|
"""lifecycle_state=DATA_GATED 공식 — 구현 대기 중이므로 unmapped에서 제외."""
|
|
lifecycle_path = ROOT / "spec" / "51_formula_lifecycle_registry.yaml"
|
|
try:
|
|
payload = yaml.safe_load(lifecycle_path.read_text(encoding="utf-8")) or {}
|
|
if isinstance(payload, dict):
|
|
rows = payload.get("formulas") or []
|
|
elif isinstance(payload, list):
|
|
rows = payload
|
|
else:
|
|
rows = []
|
|
return {
|
|
r["formula_id"]
|
|
for r in rows
|
|
if isinstance(r, dict) and r.get("lifecycle_state") == "DATA_GATED"
|
|
}
|
|
except Exception:
|
|
return set()
|
|
|
|
|
|
def _load_json(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
obj = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return obj if isinstance(obj, dict) else {}
|
|
|
|
|
|
def _collect_formula_ids() -> list[str]:
|
|
ids: list[str] = []
|
|
seen: set[str] = set()
|
|
for spec_path in FORMULA_SPECS:
|
|
payload = _load_yaml(spec_path)
|
|
formulas = ((payload.get("formula_registry") or {}).get("formulas")) or {}
|
|
if not isinstance(formulas, dict):
|
|
continue
|
|
for formula_id in formulas.keys():
|
|
fid = str(formula_id)
|
|
if fid and fid not in seen:
|
|
seen.add(fid)
|
|
ids.append(fid)
|
|
return ids
|
|
|
|
|
|
def _build_registry(formula_ids: list[str], audit: dict[str, Any], data_gated_ids: set[str] | None = None) -> dict[str, Any]:
|
|
coverage_map = audit.get("coverage_map")
|
|
rows_by_formula: dict[str, dict[str, Any]] = {}
|
|
if isinstance(coverage_map, list):
|
|
for row in coverage_map:
|
|
if not isinstance(row, dict):
|
|
continue
|
|
fid = str(row.get("formula_id") or "").strip()
|
|
if fid:
|
|
rows_by_formula[fid] = row
|
|
|
|
gated: set[str] = data_gated_ids or set()
|
|
rows: list[dict[str, Any]] = []
|
|
runtime_counts = {"GAS": 0, "PYTHON": 0, "BOTH": 0, "UNMAPPED": 0, "DATA_GATED": 0}
|
|
unmapped_ids: list[str] = []
|
|
python_only_ids: list[str] = []
|
|
data_gated_formula_ids: list[str] = []
|
|
|
|
for fid in formula_ids:
|
|
row = rows_by_formula.get(fid, {})
|
|
gas_covered = str(row.get("status") or "") == "COVERED"
|
|
python_files = row.get("python_files")
|
|
py_covered = isinstance(python_files, list) and len(python_files) > 0
|
|
|
|
if fid in gated and not gas_covered and not py_covered:
|
|
runtime = "DATA_GATED"
|
|
data_gated_formula_ids.append(fid)
|
|
elif gas_covered and py_covered:
|
|
runtime = "BOTH"
|
|
elif gas_covered:
|
|
runtime = "GAS"
|
|
elif py_covered:
|
|
runtime = "PYTHON"
|
|
python_only_ids.append(fid)
|
|
else:
|
|
runtime = "UNMAPPED"
|
|
unmapped_ids.append(fid)
|
|
|
|
runtime_counts[runtime] += 1
|
|
rows.append(
|
|
{
|
|
"formula_id": fid,
|
|
"runtime": runtime,
|
|
"gas_covered": gas_covered,
|
|
"python_covered": py_covered,
|
|
"gas_function_name": row.get("function_name"),
|
|
"gas_file": row.get("gs_file"),
|
|
"python_files": python_files if isinstance(python_files, list) else [],
|
|
}
|
|
)
|
|
|
|
total = len(rows)
|
|
mapped = total - runtime_counts["UNMAPPED"]
|
|
mapped_pct = round((mapped / total) * 100.0, 2) if total else 0.0
|
|
|
|
return {
|
|
"formula_id": "FORMULA_IMPLEMENTATION_REGISTRY_V1",
|
|
"formula_total": total,
|
|
"declared_runtime_count": total,
|
|
"runtime_counts": runtime_counts,
|
|
"runtime_adjusted_coverage_pct": mapped_pct,
|
|
"unmapped_formula_count": runtime_counts["UNMAPPED"],
|
|
"unmapped_formula_ids": unmapped_ids,
|
|
"python_only_formula_ids": python_only_ids,
|
|
"data_gated_formula_ids": data_gated_formula_ids,
|
|
"rows": rows,
|
|
"gate": "PASS" if runtime_counts["UNMAPPED"] == 0 else "FAIL",
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--audit", default=str(DEFAULT_COVERAGE_AUDIT))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
audit_path = Path(args.audit)
|
|
if not audit_path.is_absolute():
|
|
audit_path = ROOT / audit_path
|
|
out_path = Path(args.out)
|
|
if not out_path.is_absolute():
|
|
out_path = ROOT / out_path
|
|
|
|
formula_ids = _collect_formula_ids()
|
|
audit = _load_json(audit_path)
|
|
data_gated_ids = _load_data_gated_formula_ids()
|
|
result = _build_registry(formula_ids, audit, data_gated_ids)
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
print("FORMULA_IMPLEMENTATION_REGISTRY_V1")
|
|
print(f" formula_total: {result['formula_total']}")
|
|
print(f" declared_runtime_count: {result['declared_runtime_count']}")
|
|
print(f" runtime_adjusted_coverage_pct: {result['runtime_adjusted_coverage_pct']:.2f}%")
|
|
print(f" unmapped_formula_count: {result['unmapped_formula_count']}")
|
|
print(f" gate: {result['gate']}")
|
|
return 0 if result["gate"] == "PASS" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|