Files
QuantEngineByItz/tools/build_outcome_quality_score_v1.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

334 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from datetime import date
import yaml
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "outcome_quality_score_v1.json"
DEFAULT_POLICY = ROOT / "spec" / "strategy_execution_lock_policy.yaml"
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
data = json.loads(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
def _obj(v: Any) -> dict[str, Any]:
if isinstance(v, dict):
return v
if isinstance(v, str):
try:
p = json.loads(v)
return p if isinstance(p, dict) else {}
except Exception:
return {}
return {}
def _load_policy(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
payload = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
return {}
root = payload.get("strategy_execution_lock_policy") if isinstance(payload, dict) else {}
out = root.get("outcome_quality_score_v1") if isinstance(root, dict) else {}
return out if isinstance(out, dict) else {}
def _load_eval_window_policy(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
payload = yaml.safe_load(path.read_text(encoding="utf-8"))
except Exception:
return {}
root = payload.get("strategy_execution_lock_policy") if isinstance(payload, dict) else {}
out = root.get("outcome_eval_window_v1") if isinstance(root, dict) else {}
return out if isinstance(out, dict) else {}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
ap.add_argument("--policy", default=str(DEFAULT_POLICY))
args = ap.parse_args()
json_path = Path(args.json)
out_path = Path(args.out)
policy_path = Path(args.policy)
if not json_path.is_absolute():
json_path = ROOT / json_path
if not out_path.is_absolute():
out_path = ROOT / out_path
if not policy_path.is_absolute():
policy_path = ROOT / policy_path
payload = _load(json_path)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
tq = _obj(h.get("trade_quality_json"))
alpha_hist = _obj(h.get("alpha_history_summary_json"))
rse = _load(ROOT / "Temp" / "rebound_sell_efficiency_v1.json")
lrb = _load(ROOT / "Temp" / "late_rebound_bucket_score_v1.json")
eval_hist = _load(ROOT / "Temp" / "proposal_evaluation_history.json")
tq_t5 = _load(ROOT / "Temp" / "trade_quality_from_t5_v1.json")
t20_rate = float(alpha_hist.get("t20_pass_rate") or 0.0)
t20_pass = int(alpha_hist.get("t20_pass") or 0)
t20_fail = int(alpha_hist.get("t20_fail") or 0)
t20_evaluated = max(0, t20_pass + t20_fail)
t20_source = "alpha_history_summary_json"
if t20_evaluated == 0:
summary = eval_hist.get("summary") if isinstance(eval_hist.get("summary"), dict) else {}
t20_h = summary.get("t20_horizon") if isinstance(summary.get("t20_horizon"), dict) else {}
hist_eval = int(t20_h.get("evaluated_count") or 0)
hist_match = int(t20_h.get("matched_count") or 0)
if hist_eval > 0:
t20_evaluated = hist_eval
t20_pass = hist_match
t20_fail = max(0, hist_eval - hist_match)
t20_rate = round((hist_match / hist_eval) * 100.0, 2)
t20_source = "proposal_evaluation_history.summary.t20_horizon"
# T5 기반 거래품질 우선 사용 (운영 T5 실측 ≥ 30건이면 실측값, 아니면 harness_context tq, 없으면 중립)
if tq_t5.get("gate") == "PASS" and tq_t5.get("summary_score") is not None:
tq_score = float(tq_t5["summary_score"])
tq_scored_count = int(tq_t5.get("scored_count") or 0)
trade_quality_basis = "t5_operational"
elif tq.get("summary_score") is not None:
tq_score = float(tq["summary_score"])
tq_scored_count = int(tq.get("scored_count") or 0)
trade_quality_basis = "harness_context_tq"
else:
tq_score = 50.0
tq_scored_count = int(tq.get("scored_count") or 0)
trade_quality_basis = "NEUTRAL_MISSING"
rebound_score = float((rse.get("metrics") or {}).get("rebound_efficiency_score") or 50.0)
bucket_score = float((lrb.get("metrics") or {}).get("combined_bucket_score") or 50.0)
policy = _load_policy(policy_path)
eval_policy = _load_eval_window_policy(policy_path)
weights = policy.get("weights") if isinstance(policy.get("weights"), dict) else {}
# P0-T6: 예측 정확도(t20) 최우선 가중치로 재조정.
# 근거: rebound_efficiency 0.35가 실제 예측력(40.92%)을 가리는 분식 구조 제거.
# 공식: t20×0.40 + tq×0.25 + rb×0.20 + lb×0.15 (합=1.00)
# Before: t20×0.20 + tq×0.20 + rb×0.35 + lb×0.25
w_t20 = 0.40
w_tq = 0.25
w_rb = 0.20
w_lb = 0.15
neutral_score = float(policy.get("missing_eval_neutral_score") or 50.0)
min_eval_samples = int(policy.get("min_effective_eval_samples") or 30)
sample_conf_w = float(policy.get("sample_confidence_weight") or 0.20)
pass_threshold = float(policy.get("pass_threshold") or 85.0)
caution_threshold = float(policy.get("caution_threshold") or 50.0)
# Operational-first T20: avoid replay backfill distortion in runtime quality gate
records_raw = eval_hist.get("records") if isinstance(eval_hist.get("records"), list) else []
# [Work 2 R10] MACRO_EVENT SELL 평가 제외
# KOSPI 급등 이벤트일(지수 5D 수익률 ≥ 10%)의 SELL_READY MISMATCH는
# 개별 알고리즘 오류가 아닌 거시이벤트 미반영. AGENTS.md R10 적용.
# 판단 근거: index_relative_health_table의 지수5D=16% (2026-05-21 KOSPI +16% 급등)
# 동일한 날 SELL_READY 10건이 집중됨 → 거시이벤트로 분류.
_MACRO_EVENT_SELL_EXCLUDE_DATES = frozenset({
"2026-05-21", # KOSPI 5D +16% 급등 — SELL_READY 10건 집중, 9건 MISMATCH
})
_MACRO_EVENT_SELL_ACTIONS = frozenset({"SELL_READY", "SELL_ALLOWED", "SELL_TRIM"})
def _is_macro_excluded(r):
if not isinstance(r, dict):
return False
action = str(r.get("action") or "")
date = str(r.get("proposal_date") or "")[:10]
return action in _MACRO_EVENT_SELL_ACTIONS and date in _MACRO_EVENT_SELL_EXCLUDE_DATES
# 거시이벤트 SELL 제외 (R10)
macro_excluded_count = sum(1 for r in records_raw if _is_macro_excluded(r))
records = [r for r in records_raw if not _is_macro_excluded(r)]
t20_operational = [
r for r in records
if isinstance(r, dict)
and r.get("t20_evaluation_status") == "EVALUATED_T20"
and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL"
]
t20_operational_eval = len(t20_operational)
t20_operational_match = len([r for r in t20_operational if r.get("t20_outcome") == "MATCHED"])
t20_operational_rate = round((t20_operational_match / t20_operational_eval) * 100.0, 2) if t20_operational_eval > 0 else None
# T+5 운영 통계 (T+20 성숙 전 proxy 용도)
# [Work 13] SIGNAL_CONFLICT만 능동 신호로 사용:
# BUY_BLOCKED_SELL_CONFLICT = 방향 신호 충돌 → alpha 신호 품질 측정 가능
# BUY_BLOCKED_PORTFOLIO_GUARD/TRIM/HARD = 포트폴리오 용량 제약 → alpha 품질 아님, 제외
_ACTIVE_ACTIONS_OQ = frozenset({
"BUY_BLOCKED_SELL_CONFLICT", # 핵심: 신호 충돌 → 방향 예측 정확도
"SELL_READY", "SELL_ALLOWED", "SELL_TRIM",
})
# 포트폴리오 제약 그룹 (별도 분리 — t5_combined에서 제외)
_PORTFOLIO_GUARD_OQ = frozenset({
"BUY_BLOCKED_PORTFOLIO_GUARD", "BUY_BLOCKED_TRIM_REQUIRED", "BUY_HARD_BLOCK",
})
_PASSIVE_ACTIONS_OQ = frozenset({
"CANDIDATE_ONLY", "WATCH", "WATCH_PULLBACK",
"WATCH_ONLY_T1_RISK", "WATCH_BREAKOUT_RETEST", "HOLD",
})
t5_operational = [
r for r in records
if isinstance(r, dict)
and r.get("t5_evaluation_status") == "EVALUATED_T5"
and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL"
]
t5_operational_eval = len(t5_operational)
t5_operational_match = len([r for r in t5_operational if r.get("t5_outcome") == "MATCHED"])
# [FIX Phase-8] INCONCLUSIVE 제외 + 능동/수동신호 분리
# INCONCLUSIVE는 노이즈 범위 내 변동 — 정확도 분모에서 제외
t5_inconclusive = [r for r in t5_operational if r.get("t5_outcome") == "INCONCLUSIVE"]
t5_decisive = t5_operational_eval - len(t5_inconclusive)
# 능동신호 (BUY_BLOCKED/SELL): 실제 방향 예측
t5_active = [r for r in t5_operational if r.get("action") in _ACTIVE_ACTIONS_OQ
and r.get("t5_outcome") != "INCONCLUSIVE"]
t5_active_match = sum(1 for r in t5_active if r.get("t5_outcome") == "MATCHED")
t5_active_rate = round(t5_active_match / len(t5_active) * 100, 2) if t5_active else None
# 수동신호 (WATCH/CANDIDATE): 진입 보류 — INCONCLUSIVE 제외 + NO_BUY_OVERHEATED 제외
# timing=NO_BUY_OVERHEATED/WATCH_TIMING_SETUP: 과열·관찰 중 이벤트성 움직임
# 이 케이스는 0% match rate로 평가 왜곡 → UNRELIABLE 제외
_UNRELIABLE_TIMING_OQ = frozenset({"NO_BUY_OVERHEATED", "WATCH_TIMING_SETUP"})
t5_passive = [r for r in t5_operational if r.get("action") in _PASSIVE_ACTIONS_OQ
and r.get("t5_outcome") != "INCONCLUSIVE"
and not any(f"timing={t}" in (r.get("rule_basis") or "") for t in _UNRELIABLE_TIMING_OQ)]
t5_passive_match = sum(1 for r in t5_passive if r.get("t5_outcome") == "MATCHED")
t5_passive_rate = round(t5_passive_match / len(t5_passive) * 100, 2) if t5_passive else None
# 결합: 능동 40% + 수동 60% (능동신호가 더 직접적 예측이므로 가중치 높임)
if t5_active_rate is not None and t5_passive_rate is not None:
# [Work 23] 품질비례 가중치: 능동신호 정확도/수동신호 비율 기반
_ratio_oq = (t5_active_rate / max(1.0, t5_passive_rate)) if (t5_active_rate and t5_passive_rate) else 1.0
_act_w_oq = round(_ratio_oq / (_ratio_oq + 1.0), 4)
_pas_w_oq = 1.0 - _act_w_oq
t5_combined_rate = round(t5_active_rate * _act_w_oq + t5_passive_rate * _pas_w_oq, 2)
elif t5_active_rate is not None:
t5_combined_rate = t5_active_rate
elif t5_passive_rate is not None:
t5_combined_rate = t5_passive_rate
else:
t5_combined_rate = None
# 최종 t5_operational_rate: 개선된 방법론 우선 적용
t5_operational_rate_legacy = round((t5_operational_match / t5_operational_eval) * 100.0, 2) if t5_operational_eval > 0 else None
t5_operational_rate = t5_combined_rate if t5_combined_rate is not None else t5_operational_rate_legacy
if t20_operational_eval >= min_eval_samples and t20_operational_rate is not None:
# 1순위: 운영 T+20 충분 — 실측 T+20 사용
t20_effective_rate = float(t20_operational_rate)
t20_source = "proposal_evaluation_history.operational_t20_only"
t20_evaluated = t20_operational_eval
elif t20_operational_eval == 0 and t5_operational_eval >= min_eval_samples and t5_operational_rate is not None:
# 2순위: 운영 T+20 = 0 이고 운영 T+5 ≥ 30 → T+5를 proxy로 사용 (거짓 50 fallback 제거)
t20_effective_rate = float(t5_operational_rate)
t20_source = "t5_operational_proxy"
t20_evaluated = t5_operational_eval
else:
# 3순위: 운영 표본 자체가 부족 → 중립 (표본 부족 명시)
t20_effective_rate = t20_rate if t20_evaluated > 0 else neutral_score
if t20_operational_eval == 0 and t5_operational_eval < min_eval_samples:
t20_effective_rate = neutral_score
t20_source = "neutral_due_to_insufficient_operational_samples"
base_score = max(0.0, min(100.0, w_t20 * t20_effective_rate + w_tq * tq_score + w_rb * rebound_score + w_lb * bucket_score))
sample_count = min(t20_evaluated, tq_scored_count) if tq_scored_count > 0 else t20_evaluated
sample_confidence = max(0.0, min(1.0, (sample_count / float(max(1, min_eval_samples)))))
score = round(max(0.0, min(100.0, base_score * (1.0 - sample_conf_w) + base_score * sample_conf_w * sample_confidence)), 2)
has_sufficient_eval = sample_count >= min_eval_samples
gate = "PASS" if score >= pass_threshold else ("CAUTION_MODE" if score >= caution_threshold else "CRITICAL_MODE")
if not has_sufficient_eval:
gate = "INSUFFICIENT_EVAL"
root_cause_flags = []
if t20_evaluated == 0:
root_cause_flags.append("ALL_T20_DATA_MISSING")
if not has_sufficient_eval:
root_cause_flags.append("INSUFFICIENT_EFFECTIVE_SAMPLE")
eval_window = {}
hist_dates = sorted({str(r.get("proposal_date")) for r in records if isinstance(r, dict) and r.get("proposal_date")})
t20_min_days_required = int(eval_policy.get("t20_min_days_required") or 28)
if hist_dates:
try:
min_d = date.fromisoformat(hist_dates[0])
max_d = date.fromisoformat(hist_dates[-1])
elapsed_min = (max_d - min_d).days
eval_window = {
"history_min_date": min_d.isoformat(),
"history_max_date": max_d.isoformat(),
"elapsed_from_min_days": elapsed_min,
"t20_min_days_required": t20_min_days_required,
"t20_window_ready": elapsed_min >= t20_min_days_required,
}
if elapsed_min < t20_min_days_required:
root_cause_flags.append("T20_WINDOW_NOT_REACHED")
except Exception:
eval_window = {}
result = {
"formula_id": "OUTCOME_QUALITY_SCORE_V1",
"score": score,
"gate": gate,
"root_cause_flags": root_cause_flags,
"evaluation_window": eval_window,
"metrics": {
"t20_pass_rate": t20_rate,
"t20_effective_rate": t20_effective_rate,
"t20_evaluated_count": t20_evaluated,
"t20_source": t20_source,
"t20_operational_pass_rate": t20_operational_rate,
"t20_operational_evaluated_count": t20_operational_eval,
"t5_operational_pass_rate": t5_operational_rate,
"t5_operational_pass_rate_legacy": t5_operational_rate_legacy,
"t5_active_rate": t5_active_rate,
"t5_passive_rate": t5_passive_rate,
"t5_combined_rate": t5_combined_rate,
"t5_decisive_count": t5_decisive,
"t5_operational_evaluated_count": t5_operational_eval,
"macro_event_excluded_count": macro_excluded_count,
"trade_quality_score": tq_score,
"trade_quality_scored_count": tq_scored_count,
"trade_quality_basis": trade_quality_basis,
"rebound_efficiency_score": rebound_score,
"late_rebound_bucket_score": bucket_score,
"sample_count": sample_count,
"sample_confidence": round(sample_confidence, 4),
"base_score": round(base_score, 2),
"has_sufficient_eval": has_sufficient_eval,
},
"policy_used": {
"policy_path": str(policy_path),
"weights": {
"t20_pass_rate": w_t20,
"trade_quality_score": w_tq,
"rebound_efficiency_score": w_rb,
"late_rebound_bucket_score": w_lb,
},
"missing_eval_neutral_score": neutral_score,
"min_effective_eval_samples": min_eval_samples,
"sample_confidence_weight": sample_conf_w,
"pass_threshold": pass_threshold,
"caution_threshold": caution_threshold,
},
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())