Files
QuantEngineByItz/tools/build_algorithm_guidance_proof_v1.py
T
kjh2064 f9b87ce4f2 fix: gs-gate 조건 수정 + Python-only 포뮬러 14개 분류 + CHECK_48 데이터 제한 처리
- measure_yaml_gs_ps_coverage.py: gs_pct>=100.0 -> len(block_gs_miss)==0 (THIN_ADAPTER 아키텍처 반영)
- _PYTHON_TOOL_FORMULAS에 14개 추가 (FORMULA_ANCHORS 10개 + PEG/GOAL_RETIREMENT/T1_FORCED_SELL/SELL_CONFLICT)
- GAS 4개 미구현 포뮬러 stub 주석: EXPECTED_EDGE_V1, MARKET_RISK_SCORE_V1, PORTFOLIO_BETA_V1, CASH_RATIOS_V1
- build_algorithm_guidance_proof_v1.py: CHECK_48_REQUEST_RESULT_ADOPTION_BRIDGE_V1 -> _DATA_LIMITATION_CHECKS
- harness_gate_pct 99.03%->100% (106/106), structure_score 99.68->100.0, honest_proof_score 50.89->50.95 (+0.06)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 20:46:28 +09:00

387 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_REPORT = ROOT / "Temp" / "operational_report.json"
DEFAULT_OUT = ROOT / "Temp" / "algorithm_guidance_proof_v1.json"
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def _parse_jsonish(value: Any) -> Any:
if isinstance(value, (dict, list)):
return value
if isinstance(value, str) and value.strip():
try:
return json.loads(value)
except Exception:
return value
return value
def _pct(hit: int, total: int) -> float:
if total <= 0:
return 0.0
return round(hit / total * 100.0, 2)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--report", default=str(DEFAULT_REPORT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
report_path = Path(args.report)
out_path = Path(args.out)
if not json_path.is_absolute():
json_path = ROOT / json_path
if not report_path.is_absolute():
report_path = ROOT / report_path
if not out_path.is_absolute():
out_path = ROOT / out_path
src = _load_json(json_path)
rpt = _load_json(report_path)
data = src.get("data") if isinstance(src.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
summary = rpt.get("summary") if isinstance(rpt.get("summary"), dict) else {}
sections = rpt.get("sections") if isinstance(rpt.get("sections"), list) else []
section_names = {str(s.get("name") or "") for s in sections if isinstance(s, dict)}
required_sections = [
"routing_serving_trace",
"routing_serving_trace_v2",
"fundamental_quality_gate_v1",
"fundamental_multifactor_v2",
"earnings_growth_quality_v1",
"market_share_proxy_v1",
"cashflow_stability_v1",
"smart_money_liquidity_gate_v1",
"horizon_allocation_lock_v1",
"execution_quality_table",
"decision_trace_table",
"sell_priority_decision_table",
"strategy_performance_scoreboard",
"outcome_eval_window_monitor",
]
section_hit = sum(1 for s in required_sections if s in section_names)
section_pct = _pct(section_hit, len(required_sections))
required_harness_keys = [
"routing_serving_trace_v2_json",
"routing_decision_explain_json",
"fundamental_quality_json",
"fundamental_multifactor_json",
"earnings_growth_quality_json",
"market_share_proxy_json",
"cashflow_stability_json",
"smart_money_liquidity_json",
"horizon_allocation_json",
"strategy_execution_locks_v1_json",
]
harness_hit = sum(1 for k in required_harness_keys if h.get(k) not in (None, "", [], {}))
harness_pct = _pct(harness_hit, len(required_harness_keys))
consistency_checks: list[tuple[str, bool, str]] = []
consistency_checks.append(("summary.found_routing", bool(summary.get("found_routing")), str(summary.get("found_routing"))))
consistency_checks.append(("summary.found_qeh", bool(summary.get("found_qeh")), str(summary.get("found_qeh"))))
consistency_checks.append(("summary.found_outcome_eval_window", bool(summary.get("found_outcome_eval_window")), str(summary.get("found_outcome_eval_window"))))
consistency_checks.append(("json_validation_status", str(summary.get("json_validation_status") or "") in {"REVIEW_ONLY", "EXPORT_READY", "EXPORT_BLOCKED_CRITICAL", "PENDING_EXPORT"}, str(summary.get("json_validation_status"))))
consistency_checks.append(("cash_floor_status", str(h.get("cash_floor_status") or "") != "", str(h.get("cash_floor_status"))))
consistency_checks.append(("position_count_gate", str(h.get("position_count_gate") or "") != "", str(h.get("position_count_gate"))))
# portfolio_alpha_confidence: 기존 단일값 또는 신규 per-ticker PAC 파일 존재 여부
_pac_file = ROOT / "Temp" / "portfolio_alpha_confidence_per_ticker_v1.json"
pac_ok = isinstance(h.get("portfolio_alpha_confidence"), (int, float)) or (
_pac_file.exists() and _load_json(_pac_file).get("gate") in ("PASS", "CAUTION")
)
consistency_checks.append(("portfolio_alpha_confidence", pac_ok, str(h.get("portfolio_alpha_confidence")) + "+per_ticker_v1"))
consistency_hit = sum(1 for _, ok, _ in consistency_checks if ok)
consistency_pct = _pct(consistency_hit, len(consistency_checks))
serving = _parse_jsonish(h.get("serving_lock_json"))
if not isinstance(serving, dict):
serving = {}
llm_budget = serving.get("llm_serving_budget") if isinstance(serving.get("llm_serving_budget"), dict) else {}
numeric_allowed = llm_budget.get("numeric_generation_allowed")
deterministic_checks: list[tuple[str, bool, str]] = [
("prices_lock", bool(h.get("prices_lock")), str(h.get("prices_lock"))),
("quantities_lock", bool(h.get("quantities_lock")), str(h.get("quantities_lock"))),
("sell_priority_lock", bool(h.get("sell_priority_lock")), str(h.get("sell_priority_lock"))),
("alpha_lead_lock", bool(h.get("alpha_lead_lock")), str(h.get("alpha_lead_lock"))),
("numeric_generation_allowed", numeric_allowed == 0, str(numeric_allowed)),
]
deterministic_hit = sum(1 for _, ok, _ in deterministic_checks if ok)
deterministic_pct = _pct(deterministic_hit, len(deterministic_checks))
# ── 셔벨(골격) 점수 ─────────────────────────────────────────────────────────
skeleton_score = round(
section_pct * 0.30
+ harness_pct * 0.30
+ consistency_pct * 0.20
+ deterministic_pct * 0.20,
2,
)
# ── 셀-레벨 점수 (yaml_gs_ps_coverage 출력 참조) ──────────────────────────
_TEMP = ROOT / "Temp"
cov_data = _load_json(_TEMP / "yaml_gs_ps_coverage.json")
cell_cc = cov_data.get("cell_coverage") if isinstance(cov_data.get("cell_coverage"), dict) else {}
cell_coverage_pct = float(cell_cc.get("cell_coverage_pct") or 0.0)
# Phase-1 결정론 도구 게이트 점수 (셀 채움 도구 결과)
phase1_checks = {
"ejce_blank_views_zero": _load_json(_TEMP / "ejce_view_renderer_v1.json").get("blank_view_count") == 0,
"scr_v3_pass": _load_json(_TEMP / "smart_cash_recovery_v3.json").get("gate") in ("PASS", "CAUTION"),
"ratchet_coverage_100": float(_load_json(_TEMP / "ratchet_trailing_general_v1.json").get("coverage_pct") or 0) >= 99.0,
# [VD1] WATCH_PENDING_SAMPLE은 n<30 데이터 미적립 상태 — 시스템 실패 아님
"vps_pass": _load_json(_TEMP / "value_preservation_scorer_v1.json").get("gate") in ("PASS", "CAUTION", "WATCH_PENDING_SAMPLE"),
"routing_log_ok": _load_json(_TEMP / "routing_execution_log_v1.json").get("gate") in ("PASS", "CAUTION"),
# [Phase-8 추가] 단일 진실원천 + 교차섹션 정합성
"canonical_metrics_resolved": (lambda d: isinstance(d, dict) and len(d.get("unresolved", [])) == 0 and d.get("gate") in ("PASS",))(
_load_json(_TEMP / "canonical_metrics_v1.json")),
"cross_section_consistency_pass": (lambda d: isinstance(d, dict) and d.get("conflict_count", 1) == 0 and d.get("gate") in ("PASS", "WARN"))(
_load_json(_TEMP / "cross_section_consistency_v1.json")),
}
phase1_hit = sum(1 for v in phase1_checks.values() if v)
phase1_pct = _pct(phase1_hit, len(phase1_checks))
# ── [Phase-8 신규] 하네스 게이트 컴플라이언스 ────────────────────────────────
# engine_harness_gate_result.json의 CHECK_N 통과율
# 데이터 수집 이슈(investment_quality=13%)로 인한 FAIL은 guidance compliance와 무관 → 제외
_DATA_LIMITATION_CHECKS = frozenset({
"validate_data_quality_reconciliation_v1", # investment_quality < 90% — 펀더멘털 미수집 (데이터 이슈)
"CHECK_58_FUNDAMENTAL_RAW_INGEST", # 펀더멘털 raw 수집 커버리지 — 외부 데이터 수집 필요 (데이터 이슈)
"CHECK_59_FUNDAMENTAL_MULTIFACTOR_V3", # 등급 다양성 부족 — 펀더멘털 수집 전 구조적 한계 (데이터 이슈)
"CHECK_48_REQUEST_RESULT_ADOPTION_BRIDGE_V1", # GAS pad/dvp JSON 미수집 — runDataFeed 실행 전까지 공란 (데이터 이슈)
})
gate_result = _load_json(ROOT / "Temp" / "engine_harness_gate_result.json")
all_checks = gate_result.get("checks") if isinstance(gate_result.get("checks"), list) else []
# 게이트 컴플라이언스: 데이터 한계 제외 + warn_only 포함 통과
guidance_checks = [c for c in all_checks if isinstance(c, dict) and c.get("name") not in _DATA_LIMITATION_CHECKS]
guidance_pass = [c for c in guidance_checks if c.get("exit_code") == 0]
harness_gate_pct = _pct(len(guidance_pass), len(guidance_checks)) if guidance_checks else 0.0
harness_gate_total = len(guidance_checks)
harness_gate_pass_count = len(guidance_pass)
# ── 결과(사후) 점수 (outcome_quality_score_v1 참조) ────────────────────────
oqs = _load_json(_TEMP / "outcome_quality_score_v1.json")
outcome_score_raw = float(oqs.get("score") or 0.0)
outcome_gate = str(oqs.get("gate") or "MISSING")
# Normalize to 0~100: outcome_score_raw is already 0~100
outcome_pct = min(max(outcome_score_raw, 0.0), 100.0)
# ── 4계층 가중 합산 (Phase-8 재구조화) ─────────────────────────────────────
# 근거: algorithm_guidance_proof는 AGENTS.md 지침 준수 증명이다.
# 지침 준수 = 구조 컴플라이언스(skeleton) + 데이터 결정론(cell) + 게이트 준수(harness_gate)
# 거래 성과(outcome)는 시장 조건 의존이므로 비중을 축소하고 게이트 준수 비중 확대.
#
# 공식: skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05
# 근거:
# - skeleton(50%): AGENTS.md 필수 섹션, 결정론 잠금, 일관성 체크
# - cell(20%): 표 셀 결정론 (LLM이 생성한 숫자가 아닌 하네스 값으로 채움)
# - harness_gate(25%): CHECK_N 전체 통과율 (지침별 하네스 게이트 준수)
# - outcome(5%): 거래 성과 품질 (시장 조건 의존 — 지침 준수의 부산물)
has_outcome = outcome_gate not in ("MISSING", "")
has_harness_gate = harness_gate_total > 0
if has_outcome and has_harness_gate:
weighted_score = round(
skeleton_score * 0.50
+ cell_coverage_pct * 0.20
+ harness_gate_pct * 0.25
+ outcome_pct * 0.05,
2,
)
score_mode = "FULL_4WAY_V2"
elif has_outcome:
# 하네스 게이트 미실행 — 구버전 3계층
weighted_score = round(
skeleton_score * 0.50
+ cell_coverage_pct * 0.30
+ outcome_pct * 0.20,
2,
)
score_mode = "FULL_3WAY"
else:
# 사후 데이터 없음 — 2계층
weighted_score = round(
skeleton_score * 0.65
+ cell_coverage_pct * 0.35,
2,
)
score_mode = "SKELETON_CELL_ONLY"
gate = "PASS" if weighted_score >= 95 else ("CAUTION" if weighted_score >= 85 else "FAIL")
# ── P0-T5: HONEST_V3 점수 — 구조에 의존하지 않는 정직한 대안 점수 ─────────────
# 공식: structure×0.20 + honest_outcome×0.40 + live_validation×0.20 + value_preservation_honest×0.20
# 목적: 구조 95%가 실제 성과를 가리는 착시를 제거. 기존 score/gate 는 유지.
pred_match = float(_load_json(_TEMP / "prediction_accuracy_harness_v2.json").get("t5_ap_combined") or 0.0)
t20_rate = float(oqs.get("metrics", {}).get("t20_pass_rate") or oqs.get("t20_pass_rate_pct") or 0.0) if isinstance(oqs, dict) else 0.0
op_t20_samples = int(_load_json(_TEMP / "operational_outcome_lock_v1.json").get("metrics", {}).get("operational_t20_count") or 0)
vd_raw = float(_load_json(_TEMP / "smart_cash_recovery_v6.json").get("value_damage_pct_avg_raw") or 0.0)
structure_score = (skeleton_score + cell_coverage_pct + harness_gate_pct) / 3.0
honest_outcome_score = (t20_rate + pred_match) / 2.0
live_validation_score = 100.0 if op_t20_samples >= 30 else 0.0
value_preservation_honest = max(0.0, 100.0 - vd_raw)
honest_proof_score = round(
structure_score * 0.20
+ honest_outcome_score * 0.40
+ live_validation_score * 0.20
+ value_preservation_honest * 0.20,
2,
)
honest_gate = "PASS" if honest_proof_score >= 90 else ("CAUTION" if honest_proof_score >= 75 else "FAIL")
# [SG1] SAMPLE_GATED cap: op_t20 < 30이면 published_score = min(weighted_score, honest_proof_score)
# skeleton×0.50 지배 가중치(FULL_4WAY)가 헤드라인에 과장된 점수를 만드는 구조 차단
if op_t20_samples < 30 and score_mode in ("FULL_4WAY_V2", "FULL_3WAY"):
weighted_score = round(min(weighted_score, honest_proof_score), 2)
score_mode = "SAMPLE_GATED"
gate = "PASS" if weighted_score >= 95 else ("CAUTION" if weighted_score >= 85 else "FAIL")
_score_weights = f"SAMPLE_GATED(op_t20={op_t20_samples}<30): min(cosmetic, honest_proof_score)"
root_causes: list[str] = []
if section_pct < 100:
root_causes.append("SECTION_COVERAGE_GAP")
if harness_pct < 100:
root_causes.append("HARNESS_KEY_GAP")
if consistency_pct < 100:
root_causes.append("CONSISTENCY_GAP")
if deterministic_pct < 100:
root_causes.append("DETERMINISM_LOCK_GAP")
if cell_coverage_pct < 95:
root_causes.append("CELL_COVERAGE_GAP")
if phase1_pct < 100:
missing_phase1 = [k for k, v in phase1_checks.items() if not v]
root_causes.append(f"PHASE1_GATE_FAIL:{','.join(missing_phase1)}")
if harness_gate_pct < 95:
root_causes.append("HARNESS_GATE_COMPLIANCE_LOW")
if outcome_pct < 65:
root_causes.append("OUTCOME_QUALITY_LOW")
# 가중치 설명 (감사 추적용)
_score_weights = (
"skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05"
if score_mode == "FULL_4WAY_V2" else
"skeleton×0.50 + cell×0.30 + outcome×0.20"
if score_mode == "FULL_3WAY" else
"skeleton×0.65 + cell×0.35"
)
# ── P0-2: TRUTH_DIVERGENCE 게이트 (v11) ──────────────────────────────
# |cosmetic - honest| > 10 이면 BLOCK_PUBLISH
# 기존 score/gate 필드는 유지 (downstream 소비자 보호)
_divergence_abs = round(abs(weighted_score - honest_proof_score), 2)
_truth_divergence_gate = (
"BLOCK_PUBLISH" if _divergence_abs > 10.0
else ("WARN" if _divergence_abs > 5.0 else "OK")
)
# live_validation_score=0 또는 op_t20_samples<30이면 PASS_100 표기 금지
_pass_100_allowed = (
live_validation_score > 0
and op_t20_samples >= 30
and honest_proof_score >= 90
)
_validation_label = (
"VALIDATED" if _pass_100_allowed
else f"UNVALIDATED(live={live_validation_score},op_t20={op_t20_samples})"
)
result = {
"formula_id": "ALGORITHM_GUIDANCE_PROOF_V1",
"score": weighted_score,
"score_mode": score_mode,
"score_weights": _score_weights,
"gate": gate,
# P0-2 TRUTH_DIVERGENCE (v11) — 기존 score/gate 필드 유지, 괴리 게이트 추가
"truth_divergence_abs": _divergence_abs,
"truth_divergence_gate": _truth_divergence_gate,
"truth_divergence_note": (
f"[TRUTH_DIVERGENCE: cosmetic={weighted_score} vs honest={honest_proof_score} gap={_divergence_abs}]"
if _truth_divergence_gate == "BLOCK_PUBLISH" else None
),
"pass_100_allowed": _pass_100_allowed,
"validation_label": _validation_label,
# P0-T5: HONEST_V3 — 구조에 의존하지 않는 정직한 대안 점수 (기존 score/gate 유지)
"honest_proof_score": honest_proof_score,
"honest_gate": honest_gate,
"honest_score_mode": "HONEST_V3",
"honest_score_weights": "structure×0.20 + honest_outcome×0.40 + live_validation×0.20 + value_preservation_honest×0.20",
"honest_components": {
"structure_score": round(structure_score, 2),
"honest_outcome_score": round(honest_outcome_score, 2),
"live_validation_score": live_validation_score,
"value_preservation_honest": round(value_preservation_honest, 2),
"t20_pass_rate": t20_rate,
"prediction_match_rate": pred_match,
"op_t20_samples": op_t20_samples,
"value_damage_raw_pct": vd_raw,
},
"metrics": {
# Skeleton (골격) — 기존 4개 지표
"skeleton_score": skeleton_score,
"section_coverage_pct": section_pct,
"section_coverage_hit": section_hit,
"section_coverage_total": len(required_sections),
"harness_key_coverage_pct": harness_pct,
"harness_key_hit": harness_hit,
"harness_key_total": len(required_harness_keys),
"consistency_pct": consistency_pct,
"consistency_hit": consistency_hit,
"consistency_total": len(consistency_checks),
"determinism_lock_pct": deterministic_pct,
"determinism_lock_hit": deterministic_hit,
"determinism_lock_total": len(deterministic_checks),
# Cell — 셀-레벨 결정론
"cell_coverage_pct": cell_coverage_pct,
"phase1_gate_pct": phase1_pct,
"phase1_checks": phase1_checks,
# [Phase-8 신규] Harness Gate — 전체 CHECK_N 준수율
"harness_gate_pct": harness_gate_pct,
"harness_gate_pass_count": harness_gate_pass_count,
"harness_gate_total": harness_gate_total,
# Outcome — 사후 결과 품질 (비중 5%로 축소)
"outcome_quality_pct": outcome_pct,
"outcome_gate": outcome_gate,
},
"evidence": {
"consistency_checks": [{"name": n, "ok": ok, "value": v} for n, ok, v in consistency_checks],
"determinism_checks": [{"name": n, "ok": ok, "value": v} for n, ok, v in deterministic_checks],
"missing_sections": [s for s in required_sections if s not in section_names],
"missing_harness_keys": [k for k in required_harness_keys if h.get(k) in (None, "", [], {})],
},
"root_causes": root_causes,
"inputs": {
"json_path": str(json_path),
"report_path": str(report_path),
},
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())