from __future__ import annotations import argparse import json import re from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[2] # ── 셀-레벨 커버리지: yaml expected_outputs → operational_report 셀 매핑 ────── # 각 formula의 expected_outputs 필드가 operational_report의 표 셀에 채워졌는지 측정. # _CELL_COVERAGE_STUBS: 채워진 것처럼 보이지만 실제 데이터 없는 일률 stub 값들 _CELL_COVERAGE_STUBS = frozenset({ "", "-", "n/a", "N/A", "데이터 누락", "DATA_MISSING", "중립", "NEUTRAL", "LOSING", "정상", "NORMAL", "MISSING", "WATCH_PENDING_SAMPLE", }) def load_yaml(path: Path) -> dict[str, Any]: payload = yaml.safe_load(path.read_text(encoding="utf-8")) return payload if isinstance(payload, dict) else {} def load_json_safe(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: v = json.loads(path.read_text(encoding="utf-8")) return v if isinstance(v, dict) else {} except Exception: return {} def formula_registry() -> dict[str, Any]: """formula_id → formula_dict (expected_outputs 포함).""" registry: dict[str, Any] = {} for p in (ROOT / "spec" / "13_formula_registry.yaml", ROOT / "spec" / "13b_harness_formulas.yaml"): y = load_yaml(p) fm = ((y.get("formula_registry") or {}).get("formulas")) or {} for k, v in fm.items(): if isinstance(v, dict): registry[str(k)] = v return registry def formula_ids() -> list[str]: return sorted(formula_registry().keys()) def read_texts(paths: list[Path]) -> str: chunks: list[str] = [] for p in paths: if p.exists(): chunks.append(p.read_text(encoding="utf-8", errors="ignore")) return "\n".join(chunks) def _extract_table_cells(markdown: str) -> set[str]: """GFM 표에서 셀 값 목록을 추출 (헤더 + 데이터 행).""" cells: set[str] = set() for line in markdown.split("\n"): if "|" not in line: continue parts = [p.strip() for p in line.split("|")] for p in parts: if p and p != "---" and not re.match(r"^-+$", p): cells.add(p) return cells def _is_stub(value: str) -> bool: return value.strip() in _CELL_COVERAGE_STUBS or value.strip().startswith("-") def measure_cell_coverage( formula_reg: dict[str, Any], report_json: dict[str, Any], harness_ctx: dict[str, Any], temp_outputs: dict[str, dict[str, Any]], ) -> dict[str, Any]: """yaml expected_outputs → 4경로 커버리지. 출력 필드가 채워진 것으로 인정하는 4가지 경로: 1. GAS harness_context에 output.field 키가 non-null 존재 2. Phase-1 Temp JSON 파일에 expected_output 필드가 non-stub 존재 3. operational_report 섹션 텍스트에 expected_output 이름이 column header로 존재 4. operational_report 표 셀에 non-stub 값으로 필드명=값 패턴 존재 """ # Collect all report text (markdown) all_section_text = "" for sec in report_json.get("sections") or []: all_section_text += " " + (sec.get("markdown") or "") # Flatten all Temp output values for quick lookup temp_flat: dict[str, Any] = {} for _fname, tdata in temp_outputs.items(): if isinstance(tdata, dict): # Flatten top-level scalars and row-level fields for k, v in tdata.items(): if k not in ("rows", "steps", "selected_combo"): temp_flat[k] = v # Also include fields from first row of any rows list for listkey in ("rows", "steps", "selected_combo"): lst = tdata.get(listkey) if isinstance(lst, list) and lst and isinstance(lst[0], dict): for k, v in lst[0].items(): temp_flat.setdefault(k, v) required_outputs: list[dict[str, Any]] = [] for fid, fdef in formula_reg.items(): if not isinstance(fdef, dict): continue # orphan reconcile 공식은 GAS/보고서 셀 검사 제외 (Python harness 전용) if str(fdef.get("version", "")).endswith("_ORPHAN_RECONCILE"): continue exp = fdef.get("expected_outputs") if not isinstance(exp, list): continue out_field = (fdef.get("output") or {}).get("field") if isinstance(fdef.get("output"), dict) else None # Path 1: GAS harness_context ctx_present = bool(out_field and harness_ctx.get(out_field) is not None) for o in exp: field_name = str(o).strip() if isinstance(o, str) else str(o).strip() # Path 2: Temp JSON outputs (Phase-1 Python tools) temp_val = temp_flat.get(field_name) temp_filled = temp_val is not None and str(temp_val).strip() not in _CELL_COVERAGE_STUBS # Path 3: Column header in report in_report_header = bool(field_name and field_name in all_section_text) # Path 4: Row-level cell value (non-stub) non_stub_value = False pat = re.search( rf"\b{re.escape(field_name)}\b[^|\n]*\|([^|\n]+)", all_section_text ) if pat: val_candidate = pat.group(1).strip() non_stub_value = not _is_stub(val_candidate) filled = ctx_present or temp_filled or (in_report_header and non_stub_value) required_outputs.append({ "formula_id": fid, "output_field": field_name, "ctx_present": ctx_present, "temp_filled": temp_filled, "in_report_header": in_report_header, "non_stub_value": non_stub_value, "filled": filled, }) total = len(required_outputs) filled_count = sum(1 for r in required_outputs if r["filled"]) cell_coverage_pct = round(filled_count / total * 100, 2) if total > 0 else 0.0 unfilled = [r for r in required_outputs if not r["filled"]] return { "total_required_outputs": total, "filled_outputs": filled_count, "cell_coverage_pct": cell_coverage_pct, "unfilled_outputs": unfilled, "cell_gate": "PASS" if cell_coverage_pct >= 95.0 else ("CAUTION" if cell_coverage_pct >= 75.0 else "FAIL"), } def main() -> int: parser = argparse.ArgumentParser(description="Measure YAML formula coverage in GS and PS governance layers.") parser.add_argument("--strict-100", action="store_true") parser.add_argument("--output-json", default=str(ROOT / "Temp" / "yaml_gs_ps_coverage.json")) parser.add_argument("--report-json", default=str(ROOT / "Temp" / "operational_report.json")) args = parser.parse_args() reg = formula_registry() ids = sorted(reg.keys()) _GS_CORE = [ROOT / "gas_data_feed.gs", ROOT / "gas_harness_rows.gs", ROOT / "gas_lib.gs", ROOT / "gas_data_collect.gs", ROOT / "gas_report.gs"] _GAS_ADAPTER_DIR = ROOT / "src" / "gas_adapter_parts" _gs_adapter_files = sorted(_GAS_ADAPTER_DIR.glob("*.gs")) if _GAS_ADAPTER_DIR.is_dir() else [] _GS_ALPHA_WATCH = [ROOT / "gas_apex_alpha_watch.gs", ROOT / "gas_apex_runtime_core.gs"] gs_text = read_texts(_GS_CORE + _gs_adapter_files + _GS_ALPHA_WATCH) ps_text = read_texts([ROOT / "tools" / "run_engine_harness_gate.ps1", ROOT / "tools" / "run_yolo_full_cycle.ps1"]) gate_py_text = read_texts([ROOT / "tools" / "validate_engine_harness_gate.py"]) gs_hit = [i for i in ids if i in gs_text] gs_miss = [i for i in ids if i not in gs_text] # PS는 공식 직접 계산 계층이 아니라 실행 강제 계층. ps_required_hooks = [ ("run_engine_harness_gate.ps1", ROOT / "tools" / "run_engine_harness_gate.ps1"), ("run_yolo_full_cycle.ps1", ROOT / "tools" / "run_yolo_full_cycle.ps1"), ("validate_engine_harness_gate.py", ROOT / "tools" / "validate_engine_harness_gate.py"), ] ps_hook_hit = [name for name, path in ps_required_hooks if path.exists()] ps_hook_miss = [name for name, path in ps_required_hooks if not path.exists()] total = len(ids) if ids else 1 gs_pct = round(len(gs_hit) / total * 100, 2) ps_pct = round(len(ps_hook_hit) / len(ps_required_hooks) * 100, 2) # ── 셀-레벨 커버리지 측정 ────────────────────────────────────────────────── report_json_path = Path(args.report_json) if not report_json_path.is_absolute(): report_json_path = ROOT / report_json_path report_json = load_json_safe(report_json_path) # harness context from GatherTradingData.json gtd = load_json_safe(ROOT / "GatherTradingData.json") hctx = (gtd.get("data") or {}).get("_harness_context") or {} # Phase-1/2/3 Temp outputs (Python tools) _TEMP = ROOT / "Temp" temp_outputs = { # Phase-1 "ejce_view_renderer_v1": load_json_safe(_TEMP / "ejce_view_renderer_v1.json"), "smart_cash_recovery_v3": load_json_safe(_TEMP / "smart_cash_recovery_v3.json"), "ratchet_trailing_v1": load_json_safe(_TEMP / "ratchet_trailing_general_v1.json"), "value_preservation_v1": load_json_safe(_TEMP / "value_preservation_scorer_v1.json"), "routing_execution_log_v1": load_json_safe(_TEMP / "routing_execution_log_v1.json"), "blank_cell_audit_v1": load_json_safe(_TEMP / "blank_cell_audit_v1.json"), "formula_registry_sync_v1": load_json_safe(_TEMP / "formula_registry_sync_v1.json"), # Phase-2 "fundamental_raw_v1": load_json_safe(_TEMP / "fundamental_raw_v1.json"), "fundamental_multifactor_v3": load_json_safe(_TEMP / "fundamental_multifactor_v3.json"), "horizon_classification_v1": load_json_safe(_TEMP / "horizon_classification_v1.json"), # Phase-2B "earnings_quality_signal_v1": load_json_safe(_TEMP / "earnings_quality_signal_v1.json"), "growth_rate_signal_v1": load_json_safe(_TEMP / "growth_rate_signal_v1.json"), "cashflow_quality_signal_v1": load_json_safe(_TEMP / "cashflow_quality_signal_v1.json"), "market_share_signal_v2": load_json_safe(_TEMP / "market_share_signal_v2.json"), # Phase-3 "smart_money_flow_signal_v2": load_json_safe(_TEMP / "smart_money_flow_signal_v2.json"), "liquidity_flow_signal_v1": load_json_safe(_TEMP / "liquidity_flow_signal_v1.json"), "capital_style_allocation_v1": load_json_safe(_TEMP / "capital_style_allocation_v1.json"), "portfolio_alpha_confidence_per_ticker_v1": load_json_safe(_TEMP / "portfolio_alpha_confidence_per_ticker_v1.json"), # [Advanced Harness Architecture] "dynamic_value_preservation_sell_v6": load_json_safe(_TEMP / "dynamic_value_preservation_sell_v6.json"), "predictive_alpha_engine_v2": load_json_safe(_TEMP / "predictive_alpha_engine_v2.json"), "capital_style_time_stop_v1": load_json_safe(_TEMP / "capital_style_time_stop_v1.json"), "execution_integrity_gate_v1": load_json_safe(_TEMP / "execution_integrity_gate_v1.json"), # Phase-6 Python-tool-only "final_judgment_gate_v1": load_json_safe(_TEMP / "final_judgment_gate_v1.json"), "verdict_consistency_lock_v1": load_json_safe(_TEMP / "verdict_consistency_lock_v1.json"), "data_quality_reconciliation_v1": load_json_safe(_TEMP / "data_quality_reconciliation_v1.json"), } cell_cov = measure_cell_coverage(reg, report_json, hctx, temp_outputs) # ───────────────────────────────────────────────────────────────────────── # Python-tool-only formulas: not in GAS (implemented as Python tools) _PYTHON_TOOL_FORMULAS = { # Phase-1 "BLANK_CELL_AUDIT_V1", "VALUE_PRESERVATION_SCORER_V1", "SMART_CASH_RECOVERY_V3", "RATCHET_TRAILING_GENERAL_V1", "EJCE_VIEW_RENDERER_V1", "ROUTING_EXECUTION_LOG_TABLE_V1", # Phase-2 "FUNDAMENTAL_RAW_INGEST_V1", "FUNDAMENTAL_MULTIFACTOR_V3", "HORIZON_CLASSIFICATION_V1", # Phase-2B "EARNINGS_QUALITY_SIGNAL_V1", "GROWTH_RATE_SIGNAL_V1", "CASHFLOW_QUALITY_SIGNAL_V1", # Phase-3 "SMART_MONEY_FLOW_SIGNAL_V2", "LIQUIDITY_FLOW_SIGNAL_V1", "PORTFOLIO_ALPHA_CONFIDENCE_PER_TICKER_V1", # Phase-3 Market Share V2 "MARKET_SHARE_SIGNAL_V2", # [Advanced Harness Architecture] "DYNAMIC_VALUE_PRESERVATION_SELL_V6", "PREDICTIVE_ALPHA_DIALECTIC_ENGINE_V2", "CAPITAL_STYLE_TIME_STOP_V1", "EXECUTION_INTEGRITY_GATE_V1", # Phase-4~5 Python-tool-only 공식 (GAS 구현 없음, Python tools로 구현) "TRADE_QUALITY_FROM_T5_V1", "PREDICTION_ACCURACY_HARNESS_V2", "MACRO_EVENT_TICKER_IMPACT_V1", "SELL_WATERFALL_ENGINE_V2", "LLM_NARRATIVE_TEMPLATE_LOCK_V1", "EJCE_DIVERGENCE_AUDIT_V1", "PREDICTIVE_ALPHA_REPORT_LOCK_V2", # Phase-6 Python-tool-only 공식 (판단 결정론 계층) "FINAL_JUDGMENT_GATE_V1", "VERDICT_CONSISTENCY_LOCK_V1", "INVESTMENT_QUALITY_HEADLINE_V1", # Phase-7 단일 진실원천 + 교차섹션 정합성 (Python-tool-only, GAS 구현 불필요) "CANONICAL_METRICS_V1", "CROSS_SECTION_CONSISTENCY_V1", # Work 7 + Work 3 분석 도구 "ALPHA_FEEDBACK_LOOP_V2", "ALPHA_LEAD_THRESHOLD_OPTIMIZER_V1", # Registry sync: formulas implemented outside GAS coverage path "VELOCITY_V1", "PROFIT_LOCK_STAGE_V1", "ANTI_LATE_ENTRY_GATE_V2", "DYNAMIC_HEAT_GATE_V1", "POSITION_SIZE_REGIME_SCALE_V1", "REGIME_CASH_UPLIFT_V1", "DRAWDOWN_GUARD_V1", "POSITION_COUNT_LIMIT_V1", "CASH_FLOOR_V1", "SEMICONDUCTOR_CLUSTER_GATE_V1", "SINGLE_POSITION_WEIGHT_CAP_V1", "REGIME_TRIM_GUIDANCE_V1", "HEAT_CONCENTRATION_ALERT_V1", "SECTOR_CONCENTRATION_LIMIT_V1", "PORTFOLIO_DRAWDOWN_GATE_V1", "K2_STAGED_REBOUND_SELL_V1", "STOP_BREACH_ALERT_V1", "SECTOR_ROTATION_MOMENTUM_V1", "ANTI_WHIPSAW_GATE_V1", "BREAKEVEN_RATCHET_V1", "MARKET_WEIGHT_AWARE_CLUSTER_GATE_V1", "LEADER_POSITION_WEIGHT_CAP_V1", "CAPITAL_STYLE_ALLOCATION_V1", # ENGINE_AUDIT_V1 — Python-tool-only 감사 게이트 (GAS 런타임 비개입) "IMPUTED_DATA_EXPOSURE_GATE_V1", "SCORES_HARNESS_V1", "STRATEGY_ROUTING_AUDIT_V1", "SELL_ENGINE_AUDIT_V1", "YAML_TO_CODE_COVERAGE_V1", "REALIZED_PERFORMANCE_V1", "BACKTEST_HARNESS_V1", # NF1~NF5: GAS execution_order 제외 Python-harness 전용 보조 공식 (python_harness_supplements 등록) "REGIME_CONDITIONAL_MACRO_FACTOR_V1", # NF1 — tools/build_predictive_alpha_dialectic_engine_v2.py "REBOUND_CAPTURE_THESIS_FACTOR_V1", # NF2 — tools/build_predictive_alpha_dialectic_engine_v2.py "ENTRY_TIMING_DECILE_FACTOR_V1", # NF3 — tools/build_late_chase_attribution_v1.py "SELL_SLIPPAGE_BUDGET_FACTOR_V1", # NF4 — tools/build_value_preservation_scorer_v1.py "PROFIT_GIVEBACK_RATCHET_FACTOR_V1", # NF5 — tools/build_ratchet_trailing_general_v1.py # Phase-execution Python-tool-only (tools/build_execution_method_ladder_v1.py, runtime=PYTHON) "EXECUTION_METHOD_LADDER_V1", } # V9 orphan reconcile — _ORPHAN_RECONCILE 버전 태그 공식은 GAS 요구사항 면제 ids_to_skip = {fid for fid, fdef in reg.items() if isinstance(fdef, dict) and str(fdef.get("version", "")).endswith("_ORPHAN_RECONCILE")} _PYTHON_TOOL_FORMULAS = _PYTHON_TOOL_FORMULAS | ids_to_skip block_gs_miss = [f for f in gs_miss if f not in _PYTHON_TOOL_FORMULAS] summary = { "formula_total": len(ids), "gs_covered": len(gs_hit), "gs_missing": gs_miss, "gs_coverage_pct": gs_pct, "gs_blocking_missing": block_gs_miss, "ps_required_hooks": [name for name, _ in ps_required_hooks], "ps_hook_covered": len(ps_hook_hit), "ps_hook_missing": ps_hook_miss, "ps_coverage_pct": ps_pct, "cell_coverage": cell_cov, "status": "OK" if (gs_pct >= 100.0 and ps_pct == 100.0 and cell_cov["cell_gate"] != "FAIL") else "FAIL", } out = Path(args.output_json) if not out.is_absolute(): out = ROOT / out out.parent.mkdir(parents=True, exist_ok=True) out.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print( f"YAML_GS_PS_COVERAGE: gs={gs_pct:.2f}% " f"ps={ps_pct:.2f}% total={len(ids)} " f"cell_coverage={cell_cov['cell_coverage_pct']:.2f}% [{cell_cov['cell_gate']}]" ) if summary["status"] == "OK": print("YAML_GS_PS_COVERAGE_OK") return 0 print("YAML_GS_PS_COVERAGE_FAIL") if args.strict_100: return 1 return 0 if __name__ == "__main__": raise SystemExit(main())