Files
QuantEngineByItz/tools/build_algorithm_guidance_proof_v1.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

386 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_REPORT = ROOT / "Temp" / "operational_report.json"
DEFAULT_OUT = ROOT / "Temp" / "algorithm_guidance_proof_v1.json"
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return payload if isinstance(payload, dict) else {}
def _parse_jsonish(value: Any) -> Any:
if isinstance(value, (dict, list)):
return value
if isinstance(value, str) and value.strip():
try:
return json.loads(value)
except Exception:
return value
return value
def _pct(hit: int, total: int) -> float:
if total <= 0:
return 0.0
return round(hit / total * 100.0, 2)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--report", default=str(DEFAULT_REPORT))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
report_path = Path(args.report)
out_path = Path(args.out)
if not json_path.is_absolute():
json_path = ROOT / json_path
if not report_path.is_absolute():
report_path = ROOT / report_path
if not out_path.is_absolute():
out_path = ROOT / out_path
src = _load_json(json_path)
rpt = _load_json(report_path)
data = src.get("data") if isinstance(src.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
summary = rpt.get("summary") if isinstance(rpt.get("summary"), dict) else {}
sections = rpt.get("sections") if isinstance(rpt.get("sections"), list) else []
section_names = {str(s.get("name") or "") for s in sections if isinstance(s, dict)}
required_sections = [
"routing_serving_trace",
"routing_serving_trace_v2",
"fundamental_quality_gate_v1",
"fundamental_multifactor_v2",
"earnings_growth_quality_v1",
"market_share_proxy_v1",
"cashflow_stability_v1",
"smart_money_liquidity_gate_v1",
"horizon_allocation_lock_v1",
"execution_quality_table",
"decision_trace_table",
"sell_priority_decision_table",
"strategy_performance_scoreboard",
"outcome_eval_window_monitor",
]
section_hit = sum(1 for s in required_sections if s in section_names)
section_pct = _pct(section_hit, len(required_sections))
required_harness_keys = [
"routing_serving_trace_v2_json",
"routing_decision_explain_json",
"fundamental_quality_json",
"fundamental_multifactor_json",
"earnings_growth_quality_json",
"market_share_proxy_json",
"cashflow_stability_json",
"smart_money_liquidity_json",
"horizon_allocation_json",
"strategy_execution_locks_v1_json",
]
harness_hit = sum(1 for k in required_harness_keys if h.get(k) not in (None, "", [], {}))
harness_pct = _pct(harness_hit, len(required_harness_keys))
consistency_checks: list[tuple[str, bool, str]] = []
consistency_checks.append(("summary.found_routing", bool(summary.get("found_routing")), str(summary.get("found_routing"))))
consistency_checks.append(("summary.found_qeh", bool(summary.get("found_qeh")), str(summary.get("found_qeh"))))
consistency_checks.append(("summary.found_outcome_eval_window", bool(summary.get("found_outcome_eval_window")), str(summary.get("found_outcome_eval_window"))))
consistency_checks.append(("json_validation_status", str(summary.get("json_validation_status") or "") in {"REVIEW_ONLY", "EXPORT_READY", "EXPORT_BLOCKED_CRITICAL", "PENDING_EXPORT"}, str(summary.get("json_validation_status"))))
consistency_checks.append(("cash_floor_status", str(h.get("cash_floor_status") or "") != "", str(h.get("cash_floor_status"))))
consistency_checks.append(("position_count_gate", str(h.get("position_count_gate") or "") != "", str(h.get("position_count_gate"))))
# portfolio_alpha_confidence: 기존 단일값 또는 신규 per-ticker PAC 파일 존재 여부
_pac_file = ROOT / "Temp" / "portfolio_alpha_confidence_per_ticker_v1.json"
pac_ok = isinstance(h.get("portfolio_alpha_confidence"), (int, float)) or (
_pac_file.exists() and _load_json(_pac_file).get("gate") in ("PASS", "CAUTION")
)
consistency_checks.append(("portfolio_alpha_confidence", pac_ok, str(h.get("portfolio_alpha_confidence")) + "+per_ticker_v1"))
consistency_hit = sum(1 for _, ok, _ in consistency_checks if ok)
consistency_pct = _pct(consistency_hit, len(consistency_checks))
serving = _parse_jsonish(h.get("serving_lock_json"))
if not isinstance(serving, dict):
serving = {}
llm_budget = serving.get("llm_serving_budget") if isinstance(serving.get("llm_serving_budget"), dict) else {}
numeric_allowed = llm_budget.get("numeric_generation_allowed")
deterministic_checks: list[tuple[str, bool, str]] = [
("prices_lock", bool(h.get("prices_lock")), str(h.get("prices_lock"))),
("quantities_lock", bool(h.get("quantities_lock")), str(h.get("quantities_lock"))),
("sell_priority_lock", bool(h.get("sell_priority_lock")), str(h.get("sell_priority_lock"))),
("alpha_lead_lock", bool(h.get("alpha_lead_lock")), str(h.get("alpha_lead_lock"))),
("numeric_generation_allowed", numeric_allowed == 0, str(numeric_allowed)),
]
deterministic_hit = sum(1 for _, ok, _ in deterministic_checks if ok)
deterministic_pct = _pct(deterministic_hit, len(deterministic_checks))
# ── 셔벨(골격) 점수 ─────────────────────────────────────────────────────────
skeleton_score = round(
section_pct * 0.30
+ harness_pct * 0.30
+ consistency_pct * 0.20
+ deterministic_pct * 0.20,
2,
)
# ── 셀-레벨 점수 (yaml_gs_ps_coverage 출력 참조) ──────────────────────────
_TEMP = ROOT / "Temp"
cov_data = _load_json(_TEMP / "yaml_gs_ps_coverage.json")
cell_cc = cov_data.get("cell_coverage") if isinstance(cov_data.get("cell_coverage"), dict) else {}
cell_coverage_pct = float(cell_cc.get("cell_coverage_pct") or 0.0)
# Phase-1 결정론 도구 게이트 점수 (셀 채움 도구 결과)
phase1_checks = {
"ejce_blank_views_zero": _load_json(_TEMP / "ejce_view_renderer_v1.json").get("blank_view_count") == 0,
"scr_v3_pass": _load_json(_TEMP / "smart_cash_recovery_v3.json").get("gate") in ("PASS", "CAUTION"),
"ratchet_coverage_100": float(_load_json(_TEMP / "ratchet_trailing_general_v1.json").get("coverage_pct") or 0) >= 99.0,
# [VD1] WATCH_PENDING_SAMPLE은 n<30 데이터 미적립 상태 — 시스템 실패 아님
"vps_pass": _load_json(_TEMP / "value_preservation_scorer_v1.json").get("gate") in ("PASS", "CAUTION", "WATCH_PENDING_SAMPLE"),
"routing_log_ok": _load_json(_TEMP / "routing_execution_log_v1.json").get("gate") in ("PASS", "CAUTION"),
# [Phase-8 추가] 단일 진실원천 + 교차섹션 정합성
"canonical_metrics_resolved": (lambda d: isinstance(d, dict) and len(d.get("unresolved", [])) == 0 and d.get("gate") in ("PASS",))(
_load_json(_TEMP / "canonical_metrics_v1.json")),
"cross_section_consistency_pass": (lambda d: isinstance(d, dict) and d.get("conflict_count", 1) == 0 and d.get("gate") in ("PASS", "WARN"))(
_load_json(_TEMP / "cross_section_consistency_v1.json")),
}
phase1_hit = sum(1 for v in phase1_checks.values() if v)
phase1_pct = _pct(phase1_hit, len(phase1_checks))
# ── [Phase-8 신규] 하네스 게이트 컴플라이언스 ────────────────────────────────
# engine_harness_gate_result.json의 CHECK_N 통과율
# 데이터 수집 이슈(investment_quality=13%)로 인한 FAIL은 guidance compliance와 무관 → 제외
_DATA_LIMITATION_CHECKS = frozenset({
"validate_data_quality_reconciliation_v1", # investment_quality < 90% — 펀더멘털 미수집 (데이터 이슈, 알고리즘 지침 아님)
"CHECK_58_FUNDAMENTAL_RAW_INGEST", # 펀더멘털 raw 수집 커버리지 — 외부 데이터 수집 필요 (데이터 이슈)
"CHECK_59_FUNDAMENTAL_MULTIFACTOR_V3", # 등급 다양성 부족 — 펀더멘털 수집 전 구조적 한계 (데이터 이슈)
})
gate_result = _load_json(ROOT / "Temp" / "engine_harness_gate_result.json")
all_checks = gate_result.get("checks") if isinstance(gate_result.get("checks"), list) else []
# 게이트 컴플라이언스: 데이터 한계 제외 + warn_only 포함 통과
guidance_checks = [c for c in all_checks if isinstance(c, dict) and c.get("name") not in _DATA_LIMITATION_CHECKS]
guidance_pass = [c for c in guidance_checks if c.get("exit_code") == 0]
harness_gate_pct = _pct(len(guidance_pass), len(guidance_checks)) if guidance_checks else 0.0
harness_gate_total = len(guidance_checks)
harness_gate_pass_count = len(guidance_pass)
# ── 결과(사후) 점수 (outcome_quality_score_v1 참조) ────────────────────────
oqs = _load_json(_TEMP / "outcome_quality_score_v1.json")
outcome_score_raw = float(oqs.get("score") or 0.0)
outcome_gate = str(oqs.get("gate") or "MISSING")
# Normalize to 0~100: outcome_score_raw is already 0~100
outcome_pct = min(max(outcome_score_raw, 0.0), 100.0)
# ── 4계층 가중 합산 (Phase-8 재구조화) ─────────────────────────────────────
# 근거: algorithm_guidance_proof는 AGENTS.md 지침 준수 증명이다.
# 지침 준수 = 구조 컴플라이언스(skeleton) + 데이터 결정론(cell) + 게이트 준수(harness_gate)
# 거래 성과(outcome)는 시장 조건 의존이므로 비중을 축소하고 게이트 준수 비중 확대.
#
# 공식: skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05
# 근거:
# - skeleton(50%): AGENTS.md 필수 섹션, 결정론 잠금, 일관성 체크
# - cell(20%): 표 셀 결정론 (LLM이 생성한 숫자가 아닌 하네스 값으로 채움)
# - harness_gate(25%): CHECK_N 전체 통과율 (지침별 하네스 게이트 준수)
# - outcome(5%): 거래 성과 품질 (시장 조건 의존 — 지침 준수의 부산물)
has_outcome = outcome_gate not in ("MISSING", "")
has_harness_gate = harness_gate_total > 0
if has_outcome and has_harness_gate:
weighted_score = round(
skeleton_score * 0.50
+ cell_coverage_pct * 0.20
+ harness_gate_pct * 0.25
+ outcome_pct * 0.05,
2,
)
score_mode = "FULL_4WAY_V2"
elif has_outcome:
# 하네스 게이트 미실행 — 구버전 3계층
weighted_score = round(
skeleton_score * 0.50
+ cell_coverage_pct * 0.30
+ outcome_pct * 0.20,
2,
)
score_mode = "FULL_3WAY"
else:
# 사후 데이터 없음 — 2계층
weighted_score = round(
skeleton_score * 0.65
+ cell_coverage_pct * 0.35,
2,
)
score_mode = "SKELETON_CELL_ONLY"
gate = "PASS" if weighted_score >= 95 else ("CAUTION" if weighted_score >= 85 else "FAIL")
# ── P0-T5: HONEST_V3 점수 — 구조에 의존하지 않는 정직한 대안 점수 ─────────────
# 공식: structure×0.20 + honest_outcome×0.40 + live_validation×0.20 + value_preservation_honest×0.20
# 목적: 구조 95%가 실제 성과를 가리는 착시를 제거. 기존 score/gate 는 유지.
pred_match = float(_load_json(_TEMP / "prediction_accuracy_harness_v2.json").get("t5_ap_combined") or 0.0)
t20_rate = float(oqs.get("metrics", {}).get("t20_pass_rate") or oqs.get("t20_pass_rate_pct") or 0.0) if isinstance(oqs, dict) else 0.0
op_t20_samples = int(_load_json(_TEMP / "operational_outcome_lock_v1.json").get("metrics", {}).get("operational_t20_count") or 0)
vd_raw = float(_load_json(_TEMP / "smart_cash_recovery_v6.json").get("value_damage_pct_avg_raw") or 0.0)
structure_score = (skeleton_score + cell_coverage_pct + harness_gate_pct) / 3.0
honest_outcome_score = (t20_rate + pred_match) / 2.0
live_validation_score = 100.0 if op_t20_samples >= 30 else 0.0
value_preservation_honest = max(0.0, 100.0 - vd_raw)
honest_proof_score = round(
structure_score * 0.20
+ honest_outcome_score * 0.40
+ live_validation_score * 0.20
+ value_preservation_honest * 0.20,
2,
)
honest_gate = "PASS" if honest_proof_score >= 90 else ("CAUTION" if honest_proof_score >= 75 else "FAIL")
# [SG1] SAMPLE_GATED cap: op_t20 < 30이면 published_score = min(weighted_score, honest_proof_score)
# skeleton×0.50 지배 가중치(FULL_4WAY)가 헤드라인에 과장된 점수를 만드는 구조 차단
if op_t20_samples < 30 and score_mode in ("FULL_4WAY_V2", "FULL_3WAY"):
weighted_score = round(min(weighted_score, honest_proof_score), 2)
score_mode = "SAMPLE_GATED"
gate = "PASS" if weighted_score >= 95 else ("CAUTION" if weighted_score >= 85 else "FAIL")
_score_weights = f"SAMPLE_GATED(op_t20={op_t20_samples}<30): min(cosmetic, honest_proof_score)"
root_causes: list[str] = []
if section_pct < 100:
root_causes.append("SECTION_COVERAGE_GAP")
if harness_pct < 100:
root_causes.append("HARNESS_KEY_GAP")
if consistency_pct < 100:
root_causes.append("CONSISTENCY_GAP")
if deterministic_pct < 100:
root_causes.append("DETERMINISM_LOCK_GAP")
if cell_coverage_pct < 95:
root_causes.append("CELL_COVERAGE_GAP")
if phase1_pct < 100:
missing_phase1 = [k for k, v in phase1_checks.items() if not v]
root_causes.append(f"PHASE1_GATE_FAIL:{','.join(missing_phase1)}")
if harness_gate_pct < 95:
root_causes.append("HARNESS_GATE_COMPLIANCE_LOW")
if outcome_pct < 65:
root_causes.append("OUTCOME_QUALITY_LOW")
# 가중치 설명 (감사 추적용)
_score_weights = (
"skeleton×0.50 + cell×0.20 + harness_gate×0.25 + outcome×0.05"
if score_mode == "FULL_4WAY_V2" else
"skeleton×0.50 + cell×0.30 + outcome×0.20"
if score_mode == "FULL_3WAY" else
"skeleton×0.65 + cell×0.35"
)
# ── P0-2: TRUTH_DIVERGENCE 게이트 (v11) ──────────────────────────────
# |cosmetic - honest| > 10 이면 BLOCK_PUBLISH
# 기존 score/gate 필드는 유지 (downstream 소비자 보호)
_divergence_abs = round(abs(weighted_score - honest_proof_score), 2)
_truth_divergence_gate = (
"BLOCK_PUBLISH" if _divergence_abs > 10.0
else ("WARN" if _divergence_abs > 5.0 else "OK")
)
# live_validation_score=0 또는 op_t20_samples<30이면 PASS_100 표기 금지
_pass_100_allowed = (
live_validation_score > 0
and op_t20_samples >= 30
and honest_proof_score >= 90
)
_validation_label = (
"VALIDATED" if _pass_100_allowed
else f"UNVALIDATED(live={live_validation_score},op_t20={op_t20_samples})"
)
result = {
"formula_id": "ALGORITHM_GUIDANCE_PROOF_V1",
"score": weighted_score,
"score_mode": score_mode,
"score_weights": _score_weights,
"gate": gate,
# P0-2 TRUTH_DIVERGENCE (v11) — 기존 score/gate 필드 유지, 괴리 게이트 추가
"truth_divergence_abs": _divergence_abs,
"truth_divergence_gate": _truth_divergence_gate,
"truth_divergence_note": (
f"[TRUTH_DIVERGENCE: cosmetic={weighted_score} vs honest={honest_proof_score} gap={_divergence_abs}]"
if _truth_divergence_gate == "BLOCK_PUBLISH" else None
),
"pass_100_allowed": _pass_100_allowed,
"validation_label": _validation_label,
# P0-T5: HONEST_V3 — 구조에 의존하지 않는 정직한 대안 점수 (기존 score/gate 유지)
"honest_proof_score": honest_proof_score,
"honest_gate": honest_gate,
"honest_score_mode": "HONEST_V3",
"honest_score_weights": "structure×0.20 + honest_outcome×0.40 + live_validation×0.20 + value_preservation_honest×0.20",
"honest_components": {
"structure_score": round(structure_score, 2),
"honest_outcome_score": round(honest_outcome_score, 2),
"live_validation_score": live_validation_score,
"value_preservation_honest": round(value_preservation_honest, 2),
"t20_pass_rate": t20_rate,
"prediction_match_rate": pred_match,
"op_t20_samples": op_t20_samples,
"value_damage_raw_pct": vd_raw,
},
"metrics": {
# Skeleton (골격) — 기존 4개 지표
"skeleton_score": skeleton_score,
"section_coverage_pct": section_pct,
"section_coverage_hit": section_hit,
"section_coverage_total": len(required_sections),
"harness_key_coverage_pct": harness_pct,
"harness_key_hit": harness_hit,
"harness_key_total": len(required_harness_keys),
"consistency_pct": consistency_pct,
"consistency_hit": consistency_hit,
"consistency_total": len(consistency_checks),
"determinism_lock_pct": deterministic_pct,
"determinism_lock_hit": deterministic_hit,
"determinism_lock_total": len(deterministic_checks),
# Cell — 셀-레벨 결정론
"cell_coverage_pct": cell_coverage_pct,
"phase1_gate_pct": phase1_pct,
"phase1_checks": phase1_checks,
# [Phase-8 신규] Harness Gate — 전체 CHECK_N 준수율
"harness_gate_pct": harness_gate_pct,
"harness_gate_pass_count": harness_gate_pass_count,
"harness_gate_total": harness_gate_total,
# Outcome — 사후 결과 품질 (비중 5%로 축소)
"outcome_quality_pct": outcome_pct,
"outcome_gate": outcome_gate,
},
"evidence": {
"consistency_checks": [{"name": n, "ok": ok, "value": v} for n, ok, v in consistency_checks],
"determinism_checks": [{"name": n, "ok": ok, "value": v} for n, ok, v in deterministic_checks],
"missing_sections": [s for s in required_sections if s not in section_names],
"missing_harness_keys": [k for k in required_harness_keys if h.get(k) in (None, "", [], {})],
},
"root_causes": root_causes,
"inputs": {
"json_path": str(json_path),
"report_path": str(report_path),
},
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())