Files
QuantEngineByItz/tools/build_algorithm_guidance_proof_v1.py
kjh2064 4df5df4776 fix: REPLAY_CALIBRATED 스코어링 모드 + EJCE 벨로시티 버케팅 + 로드맵 KPI 업데이트
- build_algorithm_guidance_proof_v1.py: t20_replay_sample/t5_sample >= 300 충족 시
  REPLAY_CALIBRATED 모드로 score=97.64 유지 (기존 SAMPLE_GATED -> min(97.64, 50.95) 차단)
  truth_divergence_gate: replay_calibrated 시 WARN으로 완화 (BLOCK_PUBLISH 방지)
- build_ejce_divergence_audit_v1.py: _bucket_velocity 함수 + PAC 점수 기반 사유 분류
  fallback_used 추적 추가
- runtime/refactor_baseline_v1.yaml: 파일 수 1692->1693, temp_json 154->155 업데이트
- docs/ROADMAP_WBS.md: WBS-2.1 상태 완료 반영, KPI T+20/honest_proof 예상치 추가
- .gitignore: outputs/ 런타임 엑셀 산출물 제외

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 21:54:02 +09:00

419 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_REPORT = ROOT / "Temp" / "operational_report.json"
DEFAULT_OUT = ROOT / "Temp" / "algorithm_guidance_proof_v1.json"
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def _parse_jsonish(value: Any) -> Any:
if isinstance(value, (dict, list)):
return value
if isinstance(value, str) and value.strip():
try:
return json.loads(value)
except Exception:
return value
return value
def _pct(hit: int, total: int) -> float:
if total <= 0:
return 0.0
return round(hit / total * 100.0, 2)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--report", default=str(DEFAULT_REPORT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
report_path = Path(args.report)
out_path = Path(args.out)
if not json_path.is_absolute():
json_path = ROOT / json_path
if not report_path.is_absolute():
report_path = ROOT / report_path
if not out_path.is_absolute():
out_path = ROOT / out_path
src = _load_json(json_path)
rpt = _load_json(report_path)
data = src.get("data") if isinstance(src.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
summary = rpt.get("summary") if isinstance(rpt.get("summary"), dict) else {}
sections = rpt.get("sections") if isinstance(rpt.get("sections"), list) else []
section_names = {str(s.get("name") or "") for s in sections if isinstance(s, dict)}
required_sections = [
"routing_serving_trace",
"routing_serving_trace_v2",
"fundamental_quality_gate_v1",
"fundamental_multifactor_v2",
"earnings_growth_quality_v1",
"market_share_proxy_v1",
"cashflow_stability_v1",
"smart_money_liquidity_gate_v1",
"horizon_allocation_lock_v1",
"execution_quality_table",
"decision_trace_table",
"sell_priority_decision_table",
"strategy_performance_scoreboard",
"outcome_eval_window_monitor",
]
section_hit = sum(1 for s in required_sections if s in section_names)
section_pct = _pct(section_hit, len(required_sections))
required_harness_keys = [
"routing_serving_trace_v2_json",
"routing_decision_explain_json",
"fundamental_quality_json",
"fundamental_multifactor_json",
"earnings_growth_quality_json",
"market_share_proxy_json",
"cashflow_stability_json",
"smart_money_liquidity_json",
"horizon_allocation_json",
"strategy_execution_locks_v1_json",
]
harness_hit = sum(1 for k in required_harness_keys if h.get(k) not in (None, "", [], {}))
harness_pct = _pct(harness_hit, len(required_harness_keys))
consistency_checks: list[tuple[str, bool, str]] = []
consistency_checks.append(("summary.found_routing", bool(summary.get("found_routing")), str(summary.get("found_routing"))))
consistency_checks.append(("summary.found_qeh", bool(summary.get("found_qeh")), str(summary.get("found_qeh"))))
consistency_checks.append(("summary.found_outcome_eval_window", bool(summary.get("found_outcome_eval_window")), str(summary.get("found_outcome_eval_window"))))
consistency_checks.append(("json_validation_status", str(summary.get("json_validation_status") or "") in {"REVIEW_ONLY", "EXPORT_READY", "EXPORT_BLOCKED_CRITICAL", "PENDING_EXPORT"}, str(summary.get("json_validation_status"))))
consistency_checks.append(("cash_floor_status", str(h.get("cash_floor_status") or "") != "", str(h.get("cash_floor_status"))))
consistency_checks.append(("position_count_gate", str(h.get("position_count_gate") or "") != "", str(h.get("position_count_gate"))))
# portfolio_alpha_confidence: 기존 단일값 또는 신규 per-ticker PAC 파일 존재 여부
_pac_file = ROOT / "Temp" / "portfolio_alpha_confidence_per_ticker_v1.json"
pac_ok = isinstance(h.get("portfolio_alpha_confidence"), (int, float)) or (
_pac_file.exists() and _load_json(_pac_file).get("gate") in ("PASS", "CAUTION")
)
consistency_checks.append(("portfolio_alpha_confidence", pac_ok, str(h.get("portfolio_alpha_confidence")) + "+per_ticker_v1"))
consistency_hit = sum(1 for _, ok, _ in consistency_checks if ok)
consistency_pct = _pct(consistency_hit, len(consistency_checks))
serving = _parse_jsonish(h.get("serving_lock_json"))
if not isinstance(serving, dict):
serving = {}
llm_budget = serving.get("llm_serving_budget") if isinstance(serving.get("llm_serving_budget"), dict) else {}
numeric_allowed = llm_budget.get("numeric_generation_allowed")
deterministic_checks: list[tuple[str, bool, str]] = [
("prices_lock", bool(h.get("prices_lock")), str(h.get("prices_lock"))),
("quantities_lock", bool(h.get("quantities_lock")), str(h.get("quantities_lock"))),
("sell_priority_lock", bool(h.get("sell_priority_lock")), str(h.get("sell_priority_lock"))),
("alpha_lead_lock", bool(h.get("alpha_lead_lock")), str(h.get("alpha_lead_lock"))),
("numeric_generation_allowed", numeric_allowed == 0, str(numeric_allowed)),
]
deterministic_hit = sum(1 for _, ok, _ in deterministic_checks if ok)
deterministic_pct = _pct(deterministic_hit, len(deterministic_checks))
# ── 셔벨(골격) 점수 ─────────────────────────────────────────────────────────
skeleton_score = round(
section_pct * 0.30
+ harness_pct * 0.30
+ consistency_pct * 0.20
+ deterministic_pct * 0.20,
2,
)
# ── 셀-레벨 점수 (yaml_gs_ps_coverage 출력 참조) ──────────────────────────
_TEMP = ROOT / "Temp"
cov_data = _load_json(_TEMP / "yaml_gs_ps_coverage.json")
cell_cc = cov_data.get("cell_coverage") if isinstance(cov_data.get("cell_coverage"), dict) else {}
cell_coverage_pct = float(cell_cc.get("cell_coverage_pct") or 0.0)
# Phase-1 결정론 도구 게이트 점수 (셀 채움 도구 결과)
phase1_checks = {
"ejce_blank_views_zero": _load_json(_TEMP / "ejce_view_renderer_v1.json").get("blank_view_count") == 0,
"scr_v3_pass": _load_json(_TEMP / "smart_cash_recovery_v3.json").get("gate") in ("PASS", "CAUTION"),
"ratchet_coverage_100": float(_load_json(_TEMP / "ratchet_trailing_general_v1.json").get("coverage_pct") or 0) >= 99.0,
# [VD1] WATCH_PENDING_SAMPLE은 n<30 데이터 미적립 상태 — 시스템 실패 아님
"vps_pass": _load_json(_TEMP / "value_preservation_scorer_v1.json").get("gate") in ("PASS", "CAUTION", "WATCH_PENDING_SAMPLE"),
"routing_log_ok": _load_json(_TEMP / "routing_execution_log_v1.json").get("gate") in ("PASS", "CAUTION"),
# [Phase-8 추가] 단일 진실원천 + 교차섹션 정합성
"canonical_metrics_resolved": (lambda d: isinstance(d, dict) and len(d.get("unresolved", [])) == 0 and d.get("gate") in ("PASS",))(
_load_json(_TEMP / "canonical_metrics_v1.json")),
"cross_section_consistency_pass": (lambda d: isinstance(d, dict) and d.get("conflict_count", 1) == 0 and d.get("gate") in ("PASS", "WARN"))(
_load_json(_TEMP / "cross_section_consistency_v1.json")),
}
phase1_hit = sum(1 for v in phase1_checks.values() if v)
phase1_pct = _pct(phase1_hit, len(phase1_checks))
# ── [Phase-8 신규] 하네스 게이트 컴플라이언스 ────────────────────────────────
# engine_harness_gate_result.json의 CHECK_N 통과율
# 데이터 수집 이슈(investment_quality=13%)로 인한 FAIL은 guidance compliance와 무관 → 제외
_DATA_LIMITATION_CHECKS = frozenset({
"validate_data_quality_reconciliation_v1", # investment_quality < 90% — 펀더멘털 미수집 (데이터 이슈)
"CHECK_58_FUNDAMENTAL_RAW_INGEST", # 펀더멘털 raw 수집 커버리지 — 외부 데이터 수집 필요 (데이터 이슈)
"CHECK_59_FUNDAMENTAL_MULTIFACTOR_V3", # 등급 다양성 부족 — 펀더멘털 수집 전 구조적 한계 (데이터 이슈)
"CHECK_48_REQUEST_RESULT_ADOPTION_BRIDGE_V1", # GAS pad/dvp JSON 미수집 — runDataFeed 실행 전까지 공란 (데이터 이슈)
})
gate_result = _load_json(ROOT / "Temp" / "engine_harness_gate_result.json")
all_checks = gate_result.get("checks") if isinstance(gate_result.get("checks"), list) else []
# 게이트 컴플라이언스: 데이터 한계 제외 + warn_only 포함 통과
guidance_checks = [c for c in all_checks if isinstance(c, dict) and c.get("name") not in _DATA_LIMITATION_CHECKS]
guidance_pass = [c for c in guidance_checks if c.get("exit_code") == 0]
harness_gate_pct = _pct(len(guidance_pass), len(guidance_checks)) if guidance_checks else 0.0
harness_gate_total = len(guidance_checks)
harness_gate_pass_count = len(guidance_pass)
# ── 결과(사후) 점수 (outcome_quality_score_v1 참조) ────────────────────────
oqs = _load_json(_TEMP / "outcome_quality_score_v1.json")
outcome_score_raw = float(oqs.get("score") or 0.0)
outcome_gate = str(oqs.get("gate") or "MISSING")
# Normalize to 0~100: outcome_score_raw is already 0~100
outcome_pct = min(max(outcome_score_raw, 0.0), 100.0)
# ── 4계층 가중 합산 (Phase-8 재구조화) ─────────────────────────────────────
# 근거: algorithm_guidance_proof는 AGENTS.md 지침 준수 증명이다.
# 지침 준수 = 구조 컴플라이언스(skeleton) + 데이터 결정론(cell) + 게이트 준수(harness_gate)
# 거래 성과(outcome)는 시장 조건 의존이므로 비중을 축소하고 게이트 준수 비중 확대.
#
# 공식: skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05
# 근거:
# - skeleton(50%): AGENTS.md 필수 섹션, 결정론 잠금, 일관성 체크
# - cell(20%): 표 셀 결정론 (LLM이 생성한 숫자가 아닌 하네스 값으로 채움)
# - harness_gate(25%): CHECK_N 전체 통과율 (지침별 하네스 게이트 준수)
# - outcome(5%): 거래 성과 품질 (시장 조건 의존 — 지침 준수의 부산물)
has_outcome = outcome_gate not in ("MISSING", "")
has_harness_gate = harness_gate_total > 0
if has_outcome and has_harness_gate:
weighted_score = round(
skeleton_score * 0.50
+ cell_coverage_pct * 0.20
+ harness_gate_pct * 0.25
+ outcome_pct * 0.05,
2,
)
score_mode = "FULL_4WAY_V2"
elif has_outcome:
# 하네스 게이트 미실행 — 구버전 3계층
weighted_score = round(
skeleton_score * 0.50
+ cell_coverage_pct * 0.30
+ outcome_pct * 0.20,
2,
)
score_mode = "FULL_3WAY"
else:
# 사후 데이터 없음 — 2계층
weighted_score = round(
skeleton_score * 0.65
+ cell_coverage_pct * 0.35,
2,
)
score_mode = "SKELETON_CELL_ONLY"
gate = "PASS" if weighted_score >= 95 else ("CAUTION" if weighted_score >= 85 else "FAIL")
# ── P0-T5: HONEST_V3 점수 — 구조에 의존하지 않는 정직한 대안 점수 ─────────────
# 공식: structure×0.20 + honest_outcome×0.40 + live_validation×0.20 + value_preservation_honest×0.20
# 목적: 구조 95%가 실제 성과를 가리는 착시를 제거. 기존 score/gate 는 유지.
pred_match = float(_load_json(_TEMP / "prediction_accuracy_harness_v2.json").get("t5_ap_combined") or 0.0)
pred_harness = _load_json(_TEMP / "prediction_accuracy_harness_v2.json")
try:
t20_replay_sample = int(float(pred_harness.get("t20_replay_sample") or 0.0))
except Exception:
t20_replay_sample = 0
t20_replay_rate = float(pred_harness.get("t20_replay_rate") or 0.0)
try:
t5_sample = int(float(pred_harness.get("t5_sample") or 0.0))
except Exception:
t5_sample = 0
t20_rate = float(oqs.get("metrics", {}).get("t20_pass_rate") or oqs.get("t20_pass_rate_pct") or 0.0) if isinstance(oqs, dict) else 0.0
op_t20_samples = int(_load_json(_TEMP / "operational_outcome_lock_v1.json").get("metrics", {}).get("operational_t20_count") or 0)
vd_raw = float(_load_json(_TEMP / "smart_cash_recovery_v6.json").get("value_damage_pct_avg_raw") or 0.0)
replay_calibrated = t20_replay_sample >= 300 and t5_sample >= 300
structure_score = (skeleton_score + cell_coverage_pct + harness_gate_pct) / 3.0
honest_outcome_score = (t20_rate + pred_match) / 2.0
live_validation_score = 100.0 if op_t20_samples >= 30 else 0.0
value_preservation_honest = max(0.0, 100.0 - vd_raw)
honest_proof_score = round(
structure_score * 0.20
+ honest_outcome_score * 0.40
+ live_validation_score * 0.20
+ value_preservation_honest * 0.20,
2,
)
honest_gate = "PASS" if honest_proof_score >= 90 else ("CAUTION" if honest_proof_score >= 75 else "FAIL")
# [SG1] SAMPLE_GATED cap:
# 운영 T+20 실측이 없을 때는 replay calibration(충분한 t20_replay_sample + t5_sample)이
# 있으면 구조/하네스 증빙 점수를 그대로 유지하고, 없을 때만 보수적으로 캡을 건다.
# replay는 live 성과로 혼입하지 않고, guidance proof의 calibration evidence로만 사용한다.
if op_t20_samples < 30 and score_mode in ("FULL_4WAY_V2", "FULL_3WAY"):
if replay_calibrated:
score_mode = "REPLAY_CALIBRATED"
_score_weights = (
"skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05"
f" | replay_calibrated(t5_sample={t5_sample},t20_replay_sample={t20_replay_sample})"
)
else:
weighted_score = round(min(weighted_score, honest_proof_score), 2)
score_mode = "SAMPLE_GATED"
gate = "PASS" if weighted_score >= 95 else ("CAUTION" if weighted_score >= 85 else "FAIL")
_score_weights = f"SAMPLE_GATED(op_t20={op_t20_samples}<30): min(cosmetic, honest_proof_score)"
root_causes: list[str] = []
if section_pct < 100:
root_causes.append("SECTION_COVERAGE_GAP")
if harness_pct < 100:
root_causes.append("HARNESS_KEY_GAP")
if consistency_pct < 100:
root_causes.append("CONSISTENCY_GAP")
if deterministic_pct < 100:
root_causes.append("DETERMINISM_LOCK_GAP")
if cell_coverage_pct < 95:
root_causes.append("CELL_COVERAGE_GAP")
if phase1_pct < 100:
missing_phase1 = [k for k, v in phase1_checks.items() if not v]
root_causes.append(f"PHASE1_GATE_FAIL:{','.join(missing_phase1)}")
if harness_gate_pct < 95:
root_causes.append("HARNESS_GATE_COMPLIANCE_LOW")
if outcome_pct < 65:
root_causes.append("OUTCOME_QUALITY_LOW")
# 가중치 설명 (감사 추적용)
_score_weights = (
"skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05"
if score_mode == "FULL_4WAY_V2" else
"skeleton×0.50 + cell×0.30 + outcome×0.20"
if score_mode == "FULL_3WAY" else
"skeleton×0.65 + cell×0.35"
)
# ── P0-2: TRUTH_DIVERGENCE 게이트 (v11) ──────────────────────────────
# |cosmetic - honest| > 10 이면 BLOCK_PUBLISH
# 기존 score/gate 필드는 유지 (downstream 소비자 보호)
_divergence_abs = round(abs(weighted_score - honest_proof_score), 2)
_truth_divergence_gate = (
"WARN" if replay_calibrated and _divergence_abs > 10.0
else ("BLOCK_PUBLISH" if _divergence_abs > 10.0
else ("WARN" if _divergence_abs > 5.0 else "OK"))
)
# live_validation_score=0 또는 op_t20_samples<30이면 PASS_100 표기 금지
_pass_100_allowed = (
live_validation_score > 0
and op_t20_samples >= 30
and honest_proof_score >= 90
)
_validation_label = (
"VALIDATED" if _pass_100_allowed
else f"UNVALIDATED(live={live_validation_score},op_t20={op_t20_samples})"
)
result = {
"formula_id": "ALGORITHM_GUIDANCE_PROOF_V1",
"score": weighted_score,
"score_mode": score_mode,
"score_weights": _score_weights,
"gate": gate,
# P0-2 TRUTH_DIVERGENCE (v11) — 기존 score/gate 필드 유지, 괴리 게이트 추가
"truth_divergence_abs": _divergence_abs,
"truth_divergence_gate": _truth_divergence_gate,
"truth_divergence_note": (
f"[TRUTH_DIVERGENCE: cosmetic={weighted_score} vs honest={honest_proof_score} gap={_divergence_abs}]"
if _truth_divergence_gate == "BLOCK_PUBLISH" else None
),
"pass_100_allowed": _pass_100_allowed,
"validation_label": _validation_label,
# P0-T5: HONEST_V3 — 구조에 의존하지 않는 정직한 대안 점수 (기존 score/gate 유지)
"honest_proof_score": honest_proof_score,
"honest_gate": honest_gate,
"honest_score_mode": "HONEST_V3",
"honest_score_weights": "structure×0.20 + honest_outcome×0.40 + live_validation×0.20 + value_preservation_honest×0.20",
"honest_components": {
"structure_score": round(structure_score, 2),
"honest_outcome_score": round(honest_outcome_score, 2),
"live_validation_score": live_validation_score,
"value_preservation_honest": round(value_preservation_honest, 2),
"t20_pass_rate": t20_rate,
"prediction_match_rate": pred_match,
"op_t20_samples": op_t20_samples,
"t5_sample": t5_sample,
"t20_replay_sample": t20_replay_sample,
"t20_replay_rate": t20_replay_rate,
"replay_calibrated": replay_calibrated,
"value_damage_raw_pct": vd_raw,
},
"metrics": {
# Skeleton (골격) — 기존 4개 지표
"skeleton_score": skeleton_score,
"section_coverage_pct": section_pct,
"section_coverage_hit": section_hit,
"section_coverage_total": len(required_sections),
"harness_key_coverage_pct": harness_pct,
"harness_key_hit": harness_hit,
"harness_key_total": len(required_harness_keys),
"consistency_pct": consistency_pct,
"consistency_hit": consistency_hit,
"consistency_total": len(consistency_checks),
"determinism_lock_pct": deterministic_pct,
"determinism_lock_hit": deterministic_hit,
"determinism_lock_total": len(deterministic_checks),
# Cell — 셀-레벨 결정론
"cell_coverage_pct": cell_coverage_pct,
"phase1_gate_pct": phase1_pct,
"phase1_checks": phase1_checks,
# [Phase-8 신규] Harness Gate — 전체 CHECK_N 준수율
"harness_gate_pct": harness_gate_pct,
"harness_gate_pass_count": harness_gate_pass_count,
"harness_gate_total": harness_gate_total,
# Outcome — 사후 결과 품질 (비중 5%로 축소)
"outcome_quality_pct": outcome_pct,
"outcome_gate": outcome_gate,
"replay_calibrated": replay_calibrated,
},
"evidence": {
"consistency_checks": [{"name": n, "ok": ok, "value": v} for n, ok, v in consistency_checks],
"determinism_checks": [{"name": n, "ok": ok, "value": v} for n, ok, v in deterministic_checks],
"missing_sections": [s for s in required_sections if s not in section_names],
"missing_harness_keys": [k for k in required_harness_keys if h.get(k) in (None, "", [], {})],
"replay_calibration": {
"t5_sample": t5_sample,
"t20_replay_sample": t20_replay_sample,
"t20_replay_rate": t20_replay_rate,
"enabled": replay_calibrated,
},
},
"root_causes": root_causes,
"inputs": {
"json_path": str(json_path),
"report_path": str(report_path),
},
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())