Files
QuantEngineByItz/tools/harness_coverage_auditor.py
kjh2064 0823d1b5a8 fix: CI venv hash-cache + sector exposure renderer + auditor registration
- ci.yml: venv 해시 기반 캐싱 적용 (validate_specs.py md5 기준), requirements.txt 불필요 스텝 제거
- harness_coverage_auditor.py: sector_trend_analysis.py, etf_representative_monitor.py PY_FILES 등록
- render_operational_report.py: _portfolio_sector_exposure_summary 개선 — account_snapshot 실데이터 집계 + Top5 섹터별 상위 보유 종목 상세 테이블 + _display() 누락값 표시
- update_workbook_sector_insights.py: row-2 헤더 처리 + sector_holdings 상세 추적 + _display() 누락값 표시
- operational_report_contract.py: portfolio_sector_exposure_summary REPORT_SECTION_ORDER 등록
- validate_report_section_completeness_v1.py: 동일 섹션 추가
- build_architecture_boundaries_v2.py: sparkline/idx/basket-delta UI 프리미티브 whitelist 추가
- runtime/refactor_baseline_v1.yaml: 엔트로피 베이스라인 갱신 (1692 files, gate=PASS)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 21:16:32 +09:00

549 lines
22 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import sys
from pathlib import Path
from typing import Any
import yaml
ROOT = Path(__file__).resolve().parents[1]
SPEC_FILES = [
ROOT / "spec" / "13_formula_registry.yaml",
ROOT / "spec" / "13b_harness_formulas.yaml",
]
_GS_CORE = [
ROOT / "gas_data_feed.gs",
ROOT / "gas_harness_rows.gs",
ROOT / "gas_lib.gs",
ROOT / "gas_data_collect.gs",
ROOT / "gas_report.gs",
]
_GAS_ADAPTER_DIR = ROOT / "src" / "gas_adapter_parts"
_gs_adapter_files = sorted(_GAS_ADAPTER_DIR.glob("*.gs")) if _GAS_ADAPTER_DIR.is_dir() else []
_GS_ALPHA_WATCH = [ROOT / "gas_apex_alpha_watch.gs", ROOT / "gas_apex_runtime_core.gs"]
GS_FILES = _GS_CORE + _gs_adapter_files + _GS_ALPHA_WATCH
PY_FILES = [
ROOT / "tools" / "compute_formula_outputs.py",
ROOT / "tools" / "validate_alpha_execution_harness.py",
ROOT / "tools" / "validate_harness_context.py",
ROOT / "tools" / "render_operational_report.py",
# Phase-1 결정론 도구 (Python-tool-only formulas)
ROOT / "tools" / "build_ejce_view_renderer_v1.py",
ROOT / "tools" / "build_smart_cash_recovery_v3.py",
ROOT / "tools" / "build_ratchet_trailing_general_v1.py",
ROOT / "tools" / "build_value_preservation_scorer_v1.py",
ROOT / "tools" / "build_routing_execution_log_v1.py",
ROOT / "tools" / "build_blank_cell_audit_v1.py",
# Phase-2 결정론 도구
ROOT / "tools" / "ingest_fundamental_raw.py",
ROOT / "tools" / "build_fundamental_multifactor_v3.py",
ROOT / "tools" / "build_horizon_classification_v1.py",
# Phase-2B 결정론 도구
ROOT / "tools" / "build_earnings_quality_signal_v1.py",
ROOT / "tools" / "build_growth_rate_signal_v1.py",
ROOT / "tools" / "build_cashflow_quality_signal_v1.py",
# Phase-3 결정론 도구
ROOT / "tools" / "build_smart_money_flow_signal_v2.py",
ROOT / "tools" / "build_liquidity_flow_signal_v1.py",
ROOT / "tools" / "build_portfolio_alpha_confidence_per_ticker_v1.py",
ROOT / "tools" / "build_market_share_signal_v2.py",
ROOT / "tools" / "build_dynamic_value_preservation_sell_v6.py",
ROOT / "tools" / "build_predictive_alpha_dialectic_engine_v2.py",
ROOT / "tools" / "build_capital_style_time_stop_v1.py",
ROOT / "tools" / "build_execution_integrity_gate_v1.py",
# Phase-4~5 결정론 도구 (Python-tool-only)
ROOT / "tools" / "build_trade_quality_from_t5_v1.py",
ROOT / "tools" / "build_prediction_accuracy_harness_v2.py",
ROOT / "tools" / "build_sell_waterfall_engine_v2.py",
ROOT / "tools" / "build_execution_method_ladder_v1.py",
ROOT / "tools" / "build_llm_narrative_template_lock_v1.py",
ROOT / "tools" / "build_ejce_divergence_audit_v1.py",
ROOT / "tools" / "build_predictive_alpha_report_lock_v2.py",
# Phase-6 결정론 도구 (Python-tool-only)
ROOT / "tools" / "build_smart_money_liquidity_gate_v1.py",
ROOT / "tools" / "build_final_judgment_gate_v1.py",
ROOT / "tools" / "build_verdict_consistency_lock_v1.py",
ROOT / "tools" / "build_data_quality_reconciliation_v1.py",
# Phase-7 단일 진실원천 + 교차섹션 정합성 게이트 (Python-tool-only)
ROOT / "tools" / "build_canonical_metrics_v1.py",
ROOT / "tools" / "build_cross_section_consistency_v1.py",
# Work 7 + Work 3: AFL V2 + alpha_lead 최적화 분석
ROOT / "tools" / "build_alpha_feedback_loop_v2.py",
# V9 orphan reconciliation 2026-06-03 — 파이프라인 하네스 도구 전체 등록
ROOT / "tools" / "apply_perf_recovery_overrides_v1.py",
ROOT / "tools" / "apply_request_result_adoption_v1.py",
ROOT / "tools" / "apply_strategy_execution_locks.py",
ROOT / "tools" / "build_anti_late_entry_pullback_gate_v4.py",
ROOT / "tools" / "build_architecture_boundaries_v2.py",
ROOT / "tools" / "build_audit_replay_snapshot_v1.py",
ROOT / "tools" / "build_canonical_artifact_resolver_v1.py",
ROOT / "tools" / "build_cash_raise_pareto_executor_v2.py",
ROOT / "tools" / "build_cash_raise_value_optimizer_v3.py",
ROOT / "tools" / "build_cash_recovery_optimizer_v4.py",
ROOT / "tools" / "build_confidence_calibration_v2.py",
ROOT / "tools" / "build_continuous_evaluation_dashboard_v1.py",
ROOT / "tools" / "build_data_integrity_100_lock_v2.py",
ROOT / "tools" / "build_data_maturity_truth_gate_v1.py",
ROOT / "tools" / "build_data_quality_gate_v3.py",
ROOT / "tools" / "build_decision_evidence_score_v2.py",
ROOT / "tools" / "build_decision_replay_snapshot_pack_v1.py",
ROOT / "tools" / "build_derivation_validity_score_v1.py",
ROOT / "tools" / "build_distribution_exit_presignal_v2.py",
ROOT / "tools" / "build_evaluation_history_coverage_v1.py",
ROOT / "tools" / "build_execution_quality_harness_v1.py",
ROOT / "tools" / "build_execution_readiness_matrix_v1.py",
ROOT / "tools" / "build_final_context_for_llm_v2.py",
ROOT / "tools" / "build_final_execution_decision_v1.py",
ROOT / "tools" / "build_formula_runtime_registry_v1.py",
ROOT / "tools" / "build_horizon_allocation_guard_v2.py",
ROOT / "tools" / "build_horizon_routing_lock_v6.py",
ROOT / "tools" / "build_imputed_data_exposure_gate_v2.py",
ROOT / "tools" / "build_late_rebound_bucket_score_v1.py",
ROOT / "tools" / "build_operational_alpha_calibration_v2.py",
ROOT / "tools" / "build_operational_eval_queue_v1.py",
ROOT / "tools" / "build_operational_evidence_audit_v1.py",
ROOT / "tools" / "build_operational_outcome_lock_v1.py",
ROOT / "tools" / "build_operational_t20_outcome_ledger_v1.py",
ROOT / "tools" / "build_pass_100_criteria_v1.py",
ROOT / "tools" / "build_perf_recovery_harness_v1.py",
ROOT / "tools" / "build_performance_monitoring_dashboard_v1.py",
ROOT / "tools" / "build_performance_readiness_replay_bridge_v1.py",
ROOT / "tools" / "build_realized_performance_v1.py",
ROOT / "tools" / "build_root_cause_attribution_v1.py",
ROOT / "tools" / "build_root_cause_recovery_plan_v1.py",
ROOT / "tools" / "build_sell_execution_timing_lock_v2.py",
ROOT / "tools" / "build_short_horizon_outcome_monitor_v1.py",
ROOT / "tools" / "build_smart_cash_recovery_v4.py",
ROOT / "tools" / "build_strategy_decision_v3.py",
ROOT / "tools" / "build_strategy_hardening_harness_v1.py",
ROOT / "tools" / "build_truthful_decision_ledger_v2.py",
ROOT / "tools" / "build_truthfulness_guard_v1.py",
ROOT / "tools" / "build_value_preservation_scorer_v2.py",
ROOT / "tools" / "build_walk_forward_calibration_v1.py",
ROOT / "tools" / "build_missing_formula_bridge_v1.py",
ROOT / "tools" / "inject_computed_harness.py",
ROOT / "tools" / "measure_semantic_formula_coverage.py",
ROOT / "tools" / "pipeline_runtime_anomaly_lib_v1.py",
ROOT / "tools" / "profile_pipeline_runtime.py",
ROOT / "tools" / "run_phase_checks_50_60.py",
ROOT / "tools" / "validate_artifact_freshness_v1.py",
ROOT / "tools" / "validate_data_maturity_truth_gate_v1.py",
ROOT / "tools" / "validate_pipeline_runtime_anomaly.py",
ROOT / "tools" / "validate_pipeline_runtime_contract.py",
ROOT / "tools" / "validate_strategy_execution_locks_regression.py",
ROOT / "tools" / "build_yaml_code_coverage_v1.py",
# src/quant_engine canonical Python implementations
ROOT / "src" / "quant_engine" / "compute_formula_outputs.py",
ROOT / "src" / "quant_engine" / "inject_computed_harness.py",
ROOT / "src" / "quant_engine" / "exit_decisions.py",
ROOT / "src" / "quant_engine" / "orchestration_harness_v1.py",
ROOT / "src" / "quant_engine" / "run_formula_golden_cases_v2.py",
ROOT / "src" / "quant_engine" / "measure_harness_coverage.py",
ROOT / "src" / "quant_engine" / "refactor_master_helpers.py",
ROOT / "src" / "quant_engine" / "sector_trend_analysis.py",
ROOT / "src" / "quant_engine" / "etf_representative_monitor.py",
]
ENTRYPOINT_FUNCTIONS = [
"buildHarnessContext_",
"buildHarnessRows_",
"runDataFeed",
"runMacro",
"runEventRisk",
"runSellPriority",
"runDecisionFlow_",
"calcApexExecutionHarness_",
"runCoreSatelliteBatch",
"runCoreSatelliteFinalize",
# Monthly batch trigger entry points
"calcTradeQualityScorer_",
"evaluatePa1FeedbackBatch_",
]
# Functions intentionally reserved for feature-flagged flows are excluded from dead-code hard fail.
DEAD_CODE_ALLOWLIST = {
"calcSecularLeaderAutoDetect_",
"runCoreSatelliteFlow_",
"calcValuePreservingCashRaiseV9_",
"calcCapitalStyleAllocationV2_",
"calcFcBudget_",
"calcOrbitGap_",
"runOrbitGap",
"calcWatchBreakoutRealtimeGateV1_",
"runRebalanceSheet_",
}
def _ensure_utf8_stdio() -> None:
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1)
def _load_yaml(path: Path) -> dict[str, Any]:
try:
payload = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def _formula_registry_ids() -> list[dict[str, str]]:
rows: list[dict[str, str]] = []
seen: set[str] = set()
for spec_file in SPEC_FILES:
payload = _load_yaml(spec_file)
formulas = ((payload.get("formula_registry") or {}).get("formulas")) or {}
for formula_id in formulas.keys():
fid = str(formula_id)
if fid in seen:
continue
seen.add(fid)
rows.append(
{
"formula_id": fid,
"yaml_file": spec_file.name,
}
)
return rows
def _read_text(path: Path) -> str:
if not path.exists():
return ""
return path.read_text(encoding="utf-8", errors="ignore")
def _files_containing(term: str, paths: list[Path]) -> list[str]:
hits: list[str] = []
for path in paths:
text = _read_text(path)
if term in text:
hits.append(path.name)
return hits
def _function_catalog() -> list[dict[str, Any]]:
catalog: list[dict[str, Any]] = []
fn_re = re.compile(r"^\s*function\s+([A-Za-z0-9_]+)\s*\(", re.M)
for gs_file in GS_FILES:
text = _read_text(gs_file)
if not text:
continue
lines = text.splitlines()
starts: list[tuple[int, str]] = []
for idx, line in enumerate(lines, start=1):
match = fn_re.match(line)
if match:
starts.append((idx, match.group(1)))
for pos, (idx, name) in enumerate(starts):
end_line = starts[pos + 1][0] - 1 if pos + 1 < len(starts) else len(lines)
block_text = "\n".join(lines[idx - 1:end_line])
catalog.append(
{
"function_name": name,
"gs_file": gs_file.name,
"line": idx,
"end_line": end_line,
"block_text": block_text,
}
)
return catalog
def _pascal_case(value: str) -> str:
parts = [p for p in re.split(r"[_\s]+", value.strip()) if p]
return "".join(part[:1].upper() + part[1:].lower() for part in parts)
def _strip_version_suffix(value: str) -> str:
return re.sub(r"_V\d+$", "", value, flags=re.IGNORECASE)
def _candidate_function_names(formula_id: str) -> list[str]:
base = _strip_version_suffix(formula_id)
candidates = [
f"calc{_pascal_case(formula_id)}_",
f"calc{_pascal_case(base)}_",
f"run{_pascal_case(formula_id)}_",
f"validate{_pascal_case(formula_id)}_",
]
deduped: list[str] = []
for candidate in candidates:
if candidate not in deduped:
deduped.append(candidate)
return deduped
def _anchor_lookup(formula_id: str, gs_texts: dict[str, str], fn_catalog: list[dict[str, Any]]) -> dict[str, Any] | None:
for row in fn_catalog:
if formula_id in row.get("block_text", ""):
return {
"function_name": row["function_name"],
"gs_file": row["gs_file"],
"line": row["line"],
"match_source": "block",
}
return None
def _load_python_harness_supplements() -> set[str]:
"""python_harness_supplements에 등록된 formula_id는 Python-only로 이미 구현된 것으로 처리."""
registry_path = ROOT / "spec" / "13_formula_registry.yaml"
try:
payload = yaml.safe_load(registry_path.read_text(encoding="utf-8"))
supplements = (
(payload.get("formula_registry") or {})
.get("policy", {})
.get("python_harness_supplements", {})
)
return set(supplements.get("formulas") or [])
except Exception:
return set()
def _load_data_gated_formula_ids() -> set[str]:
"""lifecycle_state=DATA_GATED 공식 — 구현 대기 중이므로 true_missing에서 제외."""
lifecycle_path = ROOT / "spec" / "51_formula_lifecycle_registry.yaml"
try:
payload = yaml.safe_load(lifecycle_path.read_text(encoding="utf-8")) or {}
if isinstance(payload, dict):
rows = payload.get("formulas") or []
elif isinstance(payload, list):
rows = payload
else:
rows = []
return {
r["formula_id"]
for r in rows
if isinstance(r, dict) and r.get("lifecycle_state") == "DATA_GATED"
}
except Exception:
return set()
def _build_coverage() -> dict[str, Any]:
formula_rows = _formula_registry_ids()
fn_catalog = _function_catalog()
gs_texts = {path.name: _read_text(path) for path in GS_FILES}
py_texts = {path.name: _read_text(path) for path in PY_FILES}
function_names = {row["function_name"] for row in fn_catalog}
function_name_list = sorted(function_names, key=len, reverse=True)
function_rows_by_name = {row["function_name"]: row for row in fn_catalog}
# [python_harness_supplements] GAS execution_order 제외 Python-only 공식 → 구현된 것으로 사전 등록
harness_supplement_ids = _load_python_harness_supplements()
coverage_map: list[dict[str, Any]] = []
mapped_functions: set[str] = set()
missing_formula_ids: list[str] = []
python_implemented_ids: list[str] = list(harness_supplement_ids)
for row in formula_rows:
formula_id = row["formula_id"]
match: dict[str, Any] | None = None
for candidate in _candidate_function_names(formula_id):
if candidate in function_names:
fn_row = next(item for item in fn_catalog if item["function_name"] == candidate)
match = {
"function_name": candidate,
"gs_file": fn_row["gs_file"],
"line": fn_row["line"],
"match_source": "name",
}
break
if match is None:
match = _anchor_lookup(formula_id, gs_texts, fn_catalog)
if match is None:
py_hits = _files_containing(formula_id, PY_FILES)
# python_harness_supplements 등록 공식: Python-only 구현으로 처리
if formula_id in harness_supplement_ids:
if formula_id not in python_implemented_ids:
python_implemented_ids.append(formula_id)
supplement_info = _load_python_harness_supplements.__dict__.get(
"_impl_map", {}
)
missing_formula_ids.append(formula_id)
coverage_map.append(
{
"formula_id": formula_id,
"yaml_file": row["yaml_file"],
"status": "PYTHON_HARNESS",
"function_name": None,
"gs_file": None,
"line": None,
"match_source": "python_harness_supplements",
"python_files": py_hits if py_hits else ["[python_harness_supplements]"],
}
)
continue
if py_hits:
python_implemented_ids.append(formula_id)
missing_formula_ids.append(formula_id)
coverage_map.append(
{
"formula_id": formula_id,
"yaml_file": row["yaml_file"],
"status": "GAP",
"function_name": None,
"gs_file": None,
"line": None,
"match_source": None,
"python_files": py_hits if py_hits else [],
}
)
continue
mapped_functions.add(match["function_name"])
coverage_map.append(
{
"formula_id": formula_id,
"yaml_file": row["yaml_file"],
"status": "COVERED",
"function_name": match["function_name"],
"gs_file": match["gs_file"],
"line": match["line"],
"match_source": match["match_source"],
}
)
# Reachability graph: entrypoints -> called functions.
call_graph: dict[str, set[str]] = {}
call_pattern_cache: dict[str, re.Pattern[str]] = {}
for row in fn_catalog:
block_text = row.get("block_text", "")
if not isinstance(block_text, str) or not block_text:
continue
callers = set()
for callee in function_name_list:
if callee == row["function_name"]:
continue
pattern = call_pattern_cache.get(callee)
if pattern is None:
pattern = re.compile(r"(?<![A-Za-z0-9_])" + re.escape(callee) + r"\s*\(")
call_pattern_cache[callee] = pattern
if pattern.search(block_text):
callers.add(callee)
call_graph[row["function_name"]] = callers
reachable: set[str] = set()
stack = [name for name in ENTRYPOINT_FUNCTIONS if name in function_names]
while stack:
current = stack.pop()
if current in reachable:
continue
reachable.add(current)
for callee in call_graph.get(current, set()):
if callee not in reachable:
stack.append(callee)
dead_code = [
{
"function_name": row["function_name"],
"gs_file": row["gs_file"],
"line": row["line"],
}
for row in fn_catalog
if row["function_name"].startswith(("calc", "run"))
and row["function_name"] not in reachable
and row["function_name"] not in mapped_functions
and row["function_name"] not in DEAD_CODE_ALLOWLIST
]
covered = sum(1 for row in coverage_map if row["status"] == "COVERED")
total = len(coverage_map) or 1
coverage_pct = round(covered / total * 100, 2)
python_coverage_pct = round(len(python_implemented_ids) / total * 100, 2)
data_gated_ids = _load_data_gated_formula_ids()
true_missing_ids = [
fid for fid in missing_formula_ids
if fid not in python_implemented_ids and fid not in data_gated_ids
]
# effective_coverage: "GAS 또는 Python 구현 = COVERED"로 재정의
# 중복 집계를 총 공식 수를 넘기지 않도록 상한 처리한다.
effective_covered = min(total, covered + len(python_implemented_ids))
effective_coverage_pct = round(effective_covered / total * 100, 2)
return {
"formula_total": len(coverage_map),
"covered_count": covered,
"missing_count": len(missing_formula_ids),
"coverage_pct": coverage_pct,
"python_implemented_count": len(python_implemented_ids),
"python_coverage_pct": python_coverage_pct,
"effective_covered_count": effective_covered,
"effective_coverage_pct": effective_coverage_pct,
"true_missing_count": len(true_missing_ids),
"true_missing_formula_ids": true_missing_ids,
"min_coverage_pct": None,
"coverage_map": coverage_map,
"missing_formula_ids": missing_formula_ids,
"python_implemented_formula_ids": python_implemented_ids,
"dead_code": dead_code,
"dead_code_count": len(dead_code),
"reachable_function_count": len(reachable),
"entrypoint_function_count": len([n for n in ENTRYPOINT_FUNCTIONS if n in function_names]),
"function_catalog_size": len(fn_catalog),
}
def main() -> int:
_ensure_utf8_stdio()
parser = argparse.ArgumentParser(description="Audit YAML formula coverage against GAS functions.")
parser.add_argument("--min-coverage", type=float, default=80.0)
parser.add_argument("--output-json", default=str(ROOT / "Temp" / "harness_coverage_audit.json"))
args = parser.parse_args()
summary = _build_coverage()
summary["min_coverage_pct"] = float(args.min_coverage)
# effective_coverage: GAS-or-Python 구현 기준 (true_missing=0 → 100%)
summary["status"] = (
"OK"
if summary["effective_coverage_pct"] >= args.min_coverage
and summary["true_missing_count"] == 0
else "FAIL"
)
output_path = Path(args.output_json)
if not output_path.is_absolute():
output_path = ROOT / output_path
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print("HARNESS_COVERAGE_AUDIT")
print(f" formula_total: {summary['formula_total']}")
print(f" covered_count(GAS): {summary['covered_count']}")
print(f" python_implemented_count: {summary['python_implemented_count']}")
print(f" effective_covered_count: {summary['effective_covered_count']} (GAS+Python)")
print(f" coverage_pct(GAS-only): {summary['coverage_pct']:.2f}%")
print(f" effective_coverage_pct: {summary['effective_coverage_pct']:.2f}% ← 공식 커버리지")
print(f" true_missing_count: {summary['true_missing_count']} (반드시 0)")
print(f" missing_count: {summary['missing_count']}")
print(f" reachable_function_count: {summary['reachable_function_count']}")
print(f" dead_code_count: {len(summary['dead_code'])}")
print(f" output_json: {output_path}")
if summary["status"] == "OK":
print("HARNESS_COVERAGE_AUDIT_OK")
return 0
print("HARNESS_COVERAGE_AUDIT_FAIL")
print(f" min_coverage_pct: {args.min_coverage:.2f}%")
if summary["true_missing_formula_ids"]:
print(" true_missing_formula_ids:")
for formula_id in summary["true_missing_formula_ids"]:
print(f" - {formula_id}")
return 1
if __name__ == "__main__":
raise SystemExit(main())