#!/usr/bin/env python3 from __future__ import annotations import argparse import json import re import sys from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[1] SPEC_FILES = [ ROOT / "spec" / "13_formula_registry.yaml", ROOT / "spec" / "13b_harness_formulas.yaml", ] _GS_CORE = [ ROOT / "gas_data_feed.gs", ROOT / "gas_harness_rows.gs", ROOT / "gas_lib.gs", ROOT / "gas_data_collect.gs", ROOT / "gas_report.gs", ] _GAS_ADAPTER_DIR = ROOT / "src" / "gas_adapter_parts" _gs_adapter_files = sorted(_GAS_ADAPTER_DIR.glob("*.gs")) if _GAS_ADAPTER_DIR.is_dir() else [] _GS_ALPHA_WATCH = [ROOT / "gas_apex_alpha_watch.gs", ROOT / "gas_apex_runtime_core.gs"] GS_FILES = _GS_CORE + _gs_adapter_files + _GS_ALPHA_WATCH PY_FILES = [ ROOT / "tools" / "compute_formula_outputs.py", ROOT / "tools" / "validate_alpha_execution_harness.py", ROOT / "tools" / "validate_harness_context.py", ROOT / "tools" / "render_operational_report.py", # Phase-1 결정론 도구 (Python-tool-only formulas) ROOT / "tools" / "build_ejce_view_renderer_v1.py", ROOT / "tools" / "build_smart_cash_recovery_v3.py", ROOT / "tools" / "build_ratchet_trailing_general_v1.py", ROOT / "tools" / "build_value_preservation_scorer_v1.py", ROOT / "tools" / "build_routing_execution_log_v1.py", ROOT / "tools" / "build_blank_cell_audit_v1.py", # Phase-2 결정론 도구 ROOT / "tools" / "ingest_fundamental_raw.py", ROOT / "tools" / "build_fundamental_multifactor_v3.py", ROOT / "tools" / "build_horizon_classification_v1.py", # Phase-2B 결정론 도구 ROOT / "tools" / "build_earnings_quality_signal_v1.py", ROOT / "tools" / "build_growth_rate_signal_v1.py", ROOT / "tools" / "build_cashflow_quality_signal_v1.py", # Phase-3 결정론 도구 ROOT / "tools" / "build_smart_money_flow_signal_v2.py", ROOT / "tools" / "build_liquidity_flow_signal_v1.py", ROOT / "tools" / "build_portfolio_alpha_confidence_per_ticker_v1.py", ROOT / "tools" / "build_market_share_signal_v2.py", ROOT / "tools" / "build_dynamic_value_preservation_sell_v6.py", ROOT / "tools" / "build_predictive_alpha_dialectic_engine_v2.py", ROOT / "tools" / "build_capital_style_time_stop_v1.py", ROOT / "tools" / "build_execution_integrity_gate_v1.py", # Phase-4~5 결정론 도구 (Python-tool-only) ROOT / "tools" / "build_trade_quality_from_t5_v1.py", ROOT / "tools" / "build_prediction_accuracy_harness_v2.py", ROOT / "tools" / "build_sell_waterfall_engine_v2.py", ROOT / "tools" / "build_execution_method_ladder_v1.py", ROOT / "tools" / "build_llm_narrative_template_lock_v1.py", ROOT / "tools" / "build_ejce_divergence_audit_v1.py", ROOT / "tools" / "build_predictive_alpha_report_lock_v2.py", # Phase-6 결정론 도구 (Python-tool-only) ROOT / "tools" / "build_smart_money_liquidity_gate_v1.py", ROOT / "tools" / "build_final_judgment_gate_v1.py", ROOT / "tools" / "build_verdict_consistency_lock_v1.py", ROOT / "tools" / "build_data_quality_reconciliation_v1.py", # Phase-7 단일 진실원천 + 교차섹션 정합성 게이트 (Python-tool-only) ROOT / "tools" / "build_canonical_metrics_v1.py", ROOT / "tools" / "build_cross_section_consistency_v1.py", # Work 7 + Work 3: AFL V2 + alpha_lead 최적화 분석 ROOT / "tools" / "build_alpha_feedback_loop_v2.py", # V9 orphan reconciliation 2026-06-03 — 파이프라인 하네스 도구 전체 등록 ROOT / "tools" / "apply_perf_recovery_overrides_v1.py", ROOT / "tools" / "apply_request_result_adoption_v1.py", ROOT / "tools" / "apply_strategy_execution_locks.py", ROOT / "tools" / "build_anti_late_entry_pullback_gate_v4.py", ROOT / "tools" / "build_architecture_boundaries_v2.py", ROOT / "tools" / "build_audit_replay_snapshot_v1.py", ROOT / "tools" / "build_canonical_artifact_resolver_v1.py", ROOT / "tools" / "build_cash_raise_pareto_executor_v2.py", ROOT / "tools" / "build_cash_raise_value_optimizer_v3.py", ROOT / "tools" / "build_cash_recovery_optimizer_v4.py", ROOT / "tools" / "build_confidence_calibration_v2.py", ROOT / "tools" / "build_continuous_evaluation_dashboard_v1.py", ROOT / "tools" / "build_data_integrity_100_lock_v2.py", ROOT / "tools" / "build_data_maturity_truth_gate_v1.py", ROOT / "tools" / "build_data_quality_gate_v3.py", ROOT / "tools" / "build_decision_evidence_score_v2.py", ROOT / "tools" / "build_decision_replay_snapshot_pack_v1.py", ROOT / "tools" / "build_derivation_validity_score_v1.py", ROOT / "tools" / "build_distribution_exit_presignal_v2.py", ROOT / "tools" / "build_evaluation_history_coverage_v1.py", ROOT / "tools" / "build_execution_quality_harness_v1.py", ROOT / "tools" / "build_execution_readiness_matrix_v1.py", ROOT / "tools" / "build_final_context_for_llm_v2.py", ROOT / "tools" / "build_final_execution_decision_v1.py", ROOT / "tools" / "build_formula_runtime_registry_v1.py", ROOT / "tools" / "build_horizon_allocation_guard_v2.py", ROOT / "tools" / "build_horizon_routing_lock_v6.py", ROOT / "tools" / "build_imputed_data_exposure_gate_v2.py", ROOT / "tools" / "build_late_rebound_bucket_score_v1.py", ROOT / "tools" / "build_operational_alpha_calibration_v2.py", ROOT / "tools" / "build_operational_eval_queue_v1.py", ROOT / "tools" / "build_operational_evidence_audit_v1.py", ROOT / "tools" / "build_operational_outcome_lock_v1.py", ROOT / "tools" / "build_operational_t20_outcome_ledger_v1.py", ROOT / "tools" / "build_pass_100_criteria_v1.py", ROOT / "tools" / "build_perf_recovery_harness_v1.py", ROOT / "tools" / "build_performance_monitoring_dashboard_v1.py", ROOT / "tools" / "build_performance_readiness_replay_bridge_v1.py", ROOT / "tools" / "build_realized_performance_v1.py", ROOT / "tools" / "build_root_cause_attribution_v1.py", ROOT / "tools" / "build_root_cause_recovery_plan_v1.py", ROOT / "tools" / "build_sell_execution_timing_lock_v2.py", ROOT / "tools" / "build_short_horizon_outcome_monitor_v1.py", ROOT / "tools" / "build_smart_cash_recovery_v4.py", ROOT / "tools" / "build_strategy_decision_v3.py", ROOT / "tools" / "build_strategy_hardening_harness_v1.py", ROOT / "tools" / "build_truthful_decision_ledger_v2.py", ROOT / "tools" / "build_truthfulness_guard_v1.py", ROOT / "tools" / "build_value_preservation_scorer_v2.py", ROOT / "tools" / "build_walk_forward_calibration_v1.py", ROOT / "tools" / "inject_computed_harness.py", ROOT / "tools" / "measure_semantic_formula_coverage.py", ROOT / "tools" / "pipeline_runtime_anomaly_lib_v1.py", ROOT / "tools" / "profile_pipeline_runtime.py", ROOT / "tools" / "run_phase_checks_50_60.py", ROOT / "tools" / "validate_artifact_freshness_v1.py", ROOT / "tools" / "validate_data_maturity_truth_gate_v1.py", ROOT / "tools" / "validate_pipeline_runtime_anomaly.py", ROOT / "tools" / "validate_pipeline_runtime_contract.py", ROOT / "tools" / "validate_strategy_execution_locks_regression.py", ROOT / "tools" / "build_yaml_code_coverage_v1.py", # src/quant_engine canonical Python implementations ROOT / "src" / "quant_engine" / "compute_formula_outputs.py", ROOT / "src" / "quant_engine" / "inject_computed_harness.py", ROOT / "src" / "quant_engine" / "exit_decisions.py", ROOT / "src" / "quant_engine" / "orchestration_harness_v1.py", ROOT / "src" / "quant_engine" / "run_formula_golden_cases_v2.py", ROOT / "src" / "quant_engine" / "measure_harness_coverage.py", ROOT / "src" / "quant_engine" / "refactor_master_helpers.py", ] ENTRYPOINT_FUNCTIONS = [ "buildHarnessContext_", "buildHarnessRows_", "runDataFeed", "runMacro", "runEventRisk", "runSellPriority", "runDecisionFlow_", "calcApexExecutionHarness_", "runCoreSatelliteBatch", "runCoreSatelliteFinalize", # Monthly batch trigger entry points "calcTradeQualityScorer_", "evaluatePa1FeedbackBatch_", ] # Functions intentionally reserved for feature-flagged flows are excluded from dead-code hard fail. DEAD_CODE_ALLOWLIST = { "calcSecularLeaderAutoDetect_", } def _ensure_utf8_stdio() -> None: if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"): sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1) if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"): sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1) def _load_yaml(path: Path) -> dict[str, Any]: try: payload = yaml.safe_load(path.read_text(encoding="utf-8")) except Exception: return {} return payload if isinstance(payload, dict) else {} def _formula_registry_ids() -> list[dict[str, str]]: rows: list[dict[str, str]] = [] seen: set[str] = set() for spec_file in SPEC_FILES: payload = _load_yaml(spec_file) formulas = ((payload.get("formula_registry") or {}).get("formulas")) or {} for formula_id in formulas.keys(): fid = str(formula_id) if fid in seen: continue seen.add(fid) rows.append( { "formula_id": fid, "yaml_file": spec_file.name, } ) return rows def _read_text(path: Path) -> str: if not path.exists(): return "" return path.read_text(encoding="utf-8", errors="ignore") def _files_containing(term: str, paths: list[Path]) -> list[str]: hits: list[str] = [] for path in paths: text = _read_text(path) if term in text: hits.append(path.name) return hits def _function_catalog() -> list[dict[str, Any]]: catalog: list[dict[str, Any]] = [] fn_re = re.compile(r"^\s*function\s+([A-Za-z0-9_]+)\s*\(", re.M) for gs_file in GS_FILES: text = _read_text(gs_file) if not text: continue lines = text.splitlines() starts: list[tuple[int, str]] = [] for idx, line in enumerate(lines, start=1): match = fn_re.match(line) if match: starts.append((idx, match.group(1))) for pos, (idx, name) in enumerate(starts): end_line = starts[pos + 1][0] - 1 if pos + 1 < len(starts) else len(lines) block_text = "\n".join(lines[idx - 1:end_line]) catalog.append( { "function_name": name, "gs_file": gs_file.name, "line": idx, "end_line": end_line, "block_text": block_text, } ) return catalog def _pascal_case(value: str) -> str: parts = [p for p in re.split(r"[_\s]+", value.strip()) if p] return "".join(part[:1].upper() + part[1:].lower() for part in parts) def _strip_version_suffix(value: str) -> str: return re.sub(r"_V\d+$", "", value, flags=re.IGNORECASE) def _candidate_function_names(formula_id: str) -> list[str]: base = _strip_version_suffix(formula_id) candidates = [ f"calc{_pascal_case(formula_id)}_", f"calc{_pascal_case(base)}_", f"run{_pascal_case(formula_id)}_", f"validate{_pascal_case(formula_id)}_", ] deduped: list[str] = [] for candidate in candidates: if candidate not in deduped: deduped.append(candidate) return deduped def _anchor_lookup(formula_id: str, gs_texts: dict[str, str], fn_catalog: list[dict[str, Any]]) -> dict[str, Any] | None: for row in fn_catalog: if formula_id in row.get("block_text", ""): return { "function_name": row["function_name"], "gs_file": row["gs_file"], "line": row["line"], "match_source": "block", } return None def _load_python_harness_supplements() -> set[str]: """python_harness_supplements에 등록된 formula_id는 Python-only로 이미 구현된 것으로 처리.""" registry_path = ROOT / "spec" / "13_formula_registry.yaml" try: payload = yaml.safe_load(registry_path.read_text(encoding="utf-8")) supplements = ( (payload.get("formula_registry") or {}) .get("policy", {}) .get("python_harness_supplements", {}) ) return set(supplements.get("formulas") or []) except Exception: return set() def _load_data_gated_formula_ids() -> set[str]: """lifecycle_state=DATA_GATED 공식 — 구현 대기 중이므로 true_missing에서 제외.""" lifecycle_path = ROOT / "spec" / "51_formula_lifecycle_registry.yaml" try: payload = yaml.safe_load(lifecycle_path.read_text(encoding="utf-8")) or {} if isinstance(payload, dict): rows = payload.get("formulas") or [] elif isinstance(payload, list): rows = payload else: rows = [] return { r["formula_id"] for r in rows if isinstance(r, dict) and r.get("lifecycle_state") == "DATA_GATED" } except Exception: return set() def _build_coverage() -> dict[str, Any]: formula_rows = _formula_registry_ids() fn_catalog = _function_catalog() gs_texts = {path.name: _read_text(path) for path in GS_FILES} py_texts = {path.name: _read_text(path) for path in PY_FILES} function_names = {row["function_name"] for row in fn_catalog} function_name_list = sorted(function_names, key=len, reverse=True) function_rows_by_name = {row["function_name"]: row for row in fn_catalog} # [python_harness_supplements] GAS execution_order 제외 Python-only 공식 → 구현된 것으로 사전 등록 harness_supplement_ids = _load_python_harness_supplements() coverage_map: list[dict[str, Any]] = [] mapped_functions: set[str] = set() missing_formula_ids: list[str] = [] python_implemented_ids: list[str] = list(harness_supplement_ids) for row in formula_rows: formula_id = row["formula_id"] match: dict[str, Any] | None = None for candidate in _candidate_function_names(formula_id): if candidate in function_names: fn_row = next(item for item in fn_catalog if item["function_name"] == candidate) match = { "function_name": candidate, "gs_file": fn_row["gs_file"], "line": fn_row["line"], "match_source": "name", } break if match is None: match = _anchor_lookup(formula_id, gs_texts, fn_catalog) if match is None: py_hits = _files_containing(formula_id, PY_FILES) # python_harness_supplements 등록 공식: Python-only 구현으로 처리 if formula_id in harness_supplement_ids: if formula_id not in python_implemented_ids: python_implemented_ids.append(formula_id) supplement_info = _load_python_harness_supplements.__dict__.get( "_impl_map", {} ) missing_formula_ids.append(formula_id) coverage_map.append( { "formula_id": formula_id, "yaml_file": row["yaml_file"], "status": "PYTHON_HARNESS", "function_name": None, "gs_file": None, "line": None, "match_source": "python_harness_supplements", "python_files": py_hits if py_hits else ["[python_harness_supplements]"], } ) continue if py_hits: python_implemented_ids.append(formula_id) missing_formula_ids.append(formula_id) coverage_map.append( { "formula_id": formula_id, "yaml_file": row["yaml_file"], "status": "GAP", "function_name": None, "gs_file": None, "line": None, "match_source": None, "python_files": py_hits if py_hits else [], } ) continue mapped_functions.add(match["function_name"]) coverage_map.append( { "formula_id": formula_id, "yaml_file": row["yaml_file"], "status": "COVERED", "function_name": match["function_name"], "gs_file": match["gs_file"], "line": match["line"], "match_source": match["match_source"], } ) # Reachability graph: entrypoints -> called functions. call_graph: dict[str, set[str]] = {} call_pattern_cache: dict[str, re.Pattern[str]] = {} for row in fn_catalog: block_text = row.get("block_text", "") if not isinstance(block_text, str) or not block_text: continue callers = set() for callee in function_name_list: if callee == row["function_name"]: continue pattern = call_pattern_cache.get(callee) if pattern is None: pattern = re.compile(r"(? int: _ensure_utf8_stdio() parser = argparse.ArgumentParser(description="Audit YAML formula coverage against GAS functions.") parser.add_argument("--min-coverage", type=float, default=80.0) parser.add_argument("--output-json", default=str(ROOT / "Temp" / "harness_coverage_audit.json")) args = parser.parse_args() summary = _build_coverage() summary["min_coverage_pct"] = float(args.min_coverage) # effective_coverage: GAS-or-Python 구현 기준 (true_missing=0 → 100%) summary["status"] = ( "OK" if summary["effective_coverage_pct"] >= args.min_coverage and summary["true_missing_count"] == 0 else "FAIL" ) output_path = Path(args.output_json) if not output_path.is_absolute(): output_path = ROOT / output_path output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") print("HARNESS_COVERAGE_AUDIT") print(f" formula_total: {summary['formula_total']}") print(f" covered_count(GAS): {summary['covered_count']}") print(f" python_implemented_count: {summary['python_implemented_count']}") print(f" effective_covered_count: {summary['effective_covered_count']} (GAS+Python)") print(f" coverage_pct(GAS-only): {summary['coverage_pct']:.2f}%") print(f" effective_coverage_pct: {summary['effective_coverage_pct']:.2f}% ← 공식 커버리지") print(f" true_missing_count: {summary['true_missing_count']} (반드시 0)") print(f" missing_count: {summary['missing_count']}") print(f" reachable_function_count: {summary['reachable_function_count']}") print(f" dead_code_count: {len(summary['dead_code'])}") print(f" output_json: {output_path}") if summary["status"] == "OK": print("HARNESS_COVERAGE_AUDIT_OK") return 0 print("HARNESS_COVERAGE_AUDIT_FAIL") print(f" min_coverage_pct: {args.min_coverage:.2f}%") if summary["true_missing_formula_ids"]: print(" true_missing_formula_ids:") for formula_id in summary["true_missing_formula_ids"]: print(f" - {formula_id}") return 1 if __name__ == "__main__": raise SystemExit(main())