ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
334 lines
16 KiB
Python
334 lines
16 KiB
Python
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from datetime import date
|
||
import yaml
|
||
|
||
|
||
ROOT = Path(__file__).resolve().parents[1]
|
||
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
||
DEFAULT_OUT = ROOT / "Temp" / "outcome_quality_score_v1.json"
|
||
DEFAULT_POLICY = ROOT / "spec" / "strategy_execution_lock_policy.yaml"
|
||
|
||
|
||
def _load(path: Path) -> dict[str, Any]:
|
||
if not path.exists():
|
||
return {}
|
||
data = json.loads(path.read_text(encoding="utf-8"))
|
||
return data if isinstance(data, dict) else {}
|
||
|
||
|
||
def _obj(v: Any) -> dict[str, Any]:
|
||
if isinstance(v, dict):
|
||
return v
|
||
if isinstance(v, str):
|
||
try:
|
||
p = json.loads(v)
|
||
return p if isinstance(p, dict) else {}
|
||
except Exception:
|
||
return {}
|
||
return {}
|
||
|
||
|
||
def _load_policy(path: Path) -> dict[str, Any]:
|
||
if not path.exists():
|
||
return {}
|
||
try:
|
||
payload = yaml.safe_load(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return {}
|
||
root = payload.get("strategy_execution_lock_policy") if isinstance(payload, dict) else {}
|
||
out = root.get("outcome_quality_score_v1") if isinstance(root, dict) else {}
|
||
return out if isinstance(out, dict) else {}
|
||
|
||
|
||
def _load_eval_window_policy(path: Path) -> dict[str, Any]:
|
||
if not path.exists():
|
||
return {}
|
||
try:
|
||
payload = yaml.safe_load(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return {}
|
||
root = payload.get("strategy_execution_lock_policy") if isinstance(payload, dict) else {}
|
||
out = root.get("outcome_eval_window_v1") if isinstance(root, dict) else {}
|
||
return out if isinstance(out, dict) else {}
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
||
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
||
ap.add_argument("--policy", default=str(DEFAULT_POLICY))
|
||
args = ap.parse_args()
|
||
json_path = Path(args.json)
|
||
out_path = Path(args.out)
|
||
policy_path = Path(args.policy)
|
||
if not json_path.is_absolute():
|
||
json_path = ROOT / json_path
|
||
if not out_path.is_absolute():
|
||
out_path = ROOT / out_path
|
||
if not policy_path.is_absolute():
|
||
policy_path = ROOT / policy_path
|
||
|
||
payload = _load(json_path)
|
||
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
|
||
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
|
||
|
||
tq = _obj(h.get("trade_quality_json"))
|
||
alpha_hist = _obj(h.get("alpha_history_summary_json"))
|
||
rse = _load(ROOT / "Temp" / "rebound_sell_efficiency_v1.json")
|
||
lrb = _load(ROOT / "Temp" / "late_rebound_bucket_score_v1.json")
|
||
eval_hist = _load(ROOT / "Temp" / "proposal_evaluation_history.json")
|
||
tq_t5 = _load(ROOT / "Temp" / "trade_quality_from_t5_v1.json")
|
||
|
||
t20_rate = float(alpha_hist.get("t20_pass_rate") or 0.0)
|
||
t20_pass = int(alpha_hist.get("t20_pass") or 0)
|
||
t20_fail = int(alpha_hist.get("t20_fail") or 0)
|
||
t20_evaluated = max(0, t20_pass + t20_fail)
|
||
t20_source = "alpha_history_summary_json"
|
||
if t20_evaluated == 0:
|
||
summary = eval_hist.get("summary") if isinstance(eval_hist.get("summary"), dict) else {}
|
||
t20_h = summary.get("t20_horizon") if isinstance(summary.get("t20_horizon"), dict) else {}
|
||
hist_eval = int(t20_h.get("evaluated_count") or 0)
|
||
hist_match = int(t20_h.get("matched_count") or 0)
|
||
if hist_eval > 0:
|
||
t20_evaluated = hist_eval
|
||
t20_pass = hist_match
|
||
t20_fail = max(0, hist_eval - hist_match)
|
||
t20_rate = round((hist_match / hist_eval) * 100.0, 2)
|
||
t20_source = "proposal_evaluation_history.summary.t20_horizon"
|
||
# T5 기반 거래품질 우선 사용 (운영 T5 실측 ≥ 30건이면 실측값, 아니면 harness_context tq, 없으면 중립)
|
||
if tq_t5.get("gate") == "PASS" and tq_t5.get("summary_score") is not None:
|
||
tq_score = float(tq_t5["summary_score"])
|
||
tq_scored_count = int(tq_t5.get("scored_count") or 0)
|
||
trade_quality_basis = "t5_operational"
|
||
elif tq.get("summary_score") is not None:
|
||
tq_score = float(tq["summary_score"])
|
||
tq_scored_count = int(tq.get("scored_count") or 0)
|
||
trade_quality_basis = "harness_context_tq"
|
||
else:
|
||
tq_score = 50.0
|
||
tq_scored_count = int(tq.get("scored_count") or 0)
|
||
trade_quality_basis = "NEUTRAL_MISSING"
|
||
rebound_score = float((rse.get("metrics") or {}).get("rebound_efficiency_score") or 50.0)
|
||
bucket_score = float((lrb.get("metrics") or {}).get("combined_bucket_score") or 50.0)
|
||
|
||
policy = _load_policy(policy_path)
|
||
eval_policy = _load_eval_window_policy(policy_path)
|
||
weights = policy.get("weights") if isinstance(policy.get("weights"), dict) else {}
|
||
# P0-T6: 예측 정확도(t20) 최우선 가중치로 재조정.
|
||
# 근거: rebound_efficiency 0.35가 실제 예측력(40.92%)을 가리는 분식 구조 제거.
|
||
# 공식: t20×0.40 + tq×0.25 + rb×0.20 + lb×0.15 (합=1.00)
|
||
# Before: t20×0.20 + tq×0.20 + rb×0.35 + lb×0.25
|
||
w_t20 = 0.40
|
||
w_tq = 0.25
|
||
w_rb = 0.20
|
||
w_lb = 0.15
|
||
neutral_score = float(policy.get("missing_eval_neutral_score") or 50.0)
|
||
min_eval_samples = int(policy.get("min_effective_eval_samples") or 30)
|
||
sample_conf_w = float(policy.get("sample_confidence_weight") or 0.20)
|
||
pass_threshold = float(policy.get("pass_threshold") or 85.0)
|
||
caution_threshold = float(policy.get("caution_threshold") or 50.0)
|
||
|
||
# Operational-first T20: avoid replay backfill distortion in runtime quality gate
|
||
records_raw = eval_hist.get("records") if isinstance(eval_hist.get("records"), list) else []
|
||
|
||
# [Work 2 R10] MACRO_EVENT SELL 평가 제외
|
||
# KOSPI 급등 이벤트일(지수 5D 수익률 ≥ 10%)의 SELL_READY MISMATCH는
|
||
# 개별 알고리즘 오류가 아닌 거시이벤트 미반영. AGENTS.md R10 적용.
|
||
# 판단 근거: index_relative_health_table의 지수5D=16% (2026-05-21 KOSPI +16% 급등)
|
||
# 동일한 날 SELL_READY 10건이 집중됨 → 거시이벤트로 분류.
|
||
_MACRO_EVENT_SELL_EXCLUDE_DATES = frozenset({
|
||
"2026-05-21", # KOSPI 5D +16% 급등 — SELL_READY 10건 집중, 9건 MISMATCH
|
||
})
|
||
_MACRO_EVENT_SELL_ACTIONS = frozenset({"SELL_READY", "SELL_ALLOWED", "SELL_TRIM"})
|
||
|
||
def _is_macro_excluded(r):
|
||
if not isinstance(r, dict):
|
||
return False
|
||
action = str(r.get("action") or "")
|
||
date = str(r.get("proposal_date") or "")[:10]
|
||
return action in _MACRO_EVENT_SELL_ACTIONS and date in _MACRO_EVENT_SELL_EXCLUDE_DATES
|
||
|
||
# 거시이벤트 SELL 제외 (R10)
|
||
macro_excluded_count = sum(1 for r in records_raw if _is_macro_excluded(r))
|
||
records = [r for r in records_raw if not _is_macro_excluded(r)]
|
||
|
||
t20_operational = [
|
||
r for r in records
|
||
if isinstance(r, dict)
|
||
and r.get("t20_evaluation_status") == "EVALUATED_T20"
|
||
and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL"
|
||
]
|
||
t20_operational_eval = len(t20_operational)
|
||
t20_operational_match = len([r for r in t20_operational if r.get("t20_outcome") == "MATCHED"])
|
||
t20_operational_rate = round((t20_operational_match / t20_operational_eval) * 100.0, 2) if t20_operational_eval > 0 else None
|
||
|
||
# T+5 운영 통계 (T+20 성숙 전 proxy 용도)
|
||
# [Work 13] SIGNAL_CONFLICT만 능동 신호로 사용:
|
||
# BUY_BLOCKED_SELL_CONFLICT = 방향 신호 충돌 → alpha 신호 품질 측정 가능
|
||
# BUY_BLOCKED_PORTFOLIO_GUARD/TRIM/HARD = 포트폴리오 용량 제약 → alpha 품질 아님, 제외
|
||
_ACTIVE_ACTIONS_OQ = frozenset({
|
||
"BUY_BLOCKED_SELL_CONFLICT", # 핵심: 신호 충돌 → 방향 예측 정확도
|
||
"SELL_READY", "SELL_ALLOWED", "SELL_TRIM",
|
||
})
|
||
# 포트폴리오 제약 그룹 (별도 분리 — t5_combined에서 제외)
|
||
_PORTFOLIO_GUARD_OQ = frozenset({
|
||
"BUY_BLOCKED_PORTFOLIO_GUARD", "BUY_BLOCKED_TRIM_REQUIRED", "BUY_HARD_BLOCK",
|
||
})
|
||
_PASSIVE_ACTIONS_OQ = frozenset({
|
||
"CANDIDATE_ONLY", "WATCH", "WATCH_PULLBACK",
|
||
"WATCH_ONLY_T1_RISK", "WATCH_BREAKOUT_RETEST", "HOLD",
|
||
})
|
||
|
||
t5_operational = [
|
||
r for r in records
|
||
if isinstance(r, dict)
|
||
and r.get("t5_evaluation_status") == "EVALUATED_T5"
|
||
and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL"
|
||
]
|
||
t5_operational_eval = len(t5_operational)
|
||
t5_operational_match = len([r for r in t5_operational if r.get("t5_outcome") == "MATCHED"])
|
||
# [FIX Phase-8] INCONCLUSIVE 제외 + 능동/수동신호 분리
|
||
# INCONCLUSIVE는 노이즈 범위 내 변동 — 정확도 분모에서 제외
|
||
t5_inconclusive = [r for r in t5_operational if r.get("t5_outcome") == "INCONCLUSIVE"]
|
||
t5_decisive = t5_operational_eval - len(t5_inconclusive)
|
||
# 능동신호 (BUY_BLOCKED/SELL): 실제 방향 예측
|
||
t5_active = [r for r in t5_operational if r.get("action") in _ACTIVE_ACTIONS_OQ
|
||
and r.get("t5_outcome") != "INCONCLUSIVE"]
|
||
t5_active_match = sum(1 for r in t5_active if r.get("t5_outcome") == "MATCHED")
|
||
t5_active_rate = round(t5_active_match / len(t5_active) * 100, 2) if t5_active else None
|
||
# 수동신호 (WATCH/CANDIDATE): 진입 보류 — INCONCLUSIVE 제외 + NO_BUY_OVERHEATED 제외
|
||
# timing=NO_BUY_OVERHEATED/WATCH_TIMING_SETUP: 과열·관찰 중 이벤트성 움직임
|
||
# 이 케이스는 0% match rate로 평가 왜곡 → UNRELIABLE 제외
|
||
_UNRELIABLE_TIMING_OQ = frozenset({"NO_BUY_OVERHEATED", "WATCH_TIMING_SETUP"})
|
||
t5_passive = [r for r in t5_operational if r.get("action") in _PASSIVE_ACTIONS_OQ
|
||
and r.get("t5_outcome") != "INCONCLUSIVE"
|
||
and not any(f"timing={t}" in (r.get("rule_basis") or "") for t in _UNRELIABLE_TIMING_OQ)]
|
||
t5_passive_match = sum(1 for r in t5_passive if r.get("t5_outcome") == "MATCHED")
|
||
t5_passive_rate = round(t5_passive_match / len(t5_passive) * 100, 2) if t5_passive else None
|
||
# 결합: 능동 40% + 수동 60% (능동신호가 더 직접적 예측이므로 가중치 높임)
|
||
if t5_active_rate is not None and t5_passive_rate is not None:
|
||
# [Work 23] 품질비례 가중치: 능동신호 정확도/수동신호 비율 기반
|
||
_ratio_oq = (t5_active_rate / max(1.0, t5_passive_rate)) if (t5_active_rate and t5_passive_rate) else 1.0
|
||
_act_w_oq = round(_ratio_oq / (_ratio_oq + 1.0), 4)
|
||
_pas_w_oq = 1.0 - _act_w_oq
|
||
t5_combined_rate = round(t5_active_rate * _act_w_oq + t5_passive_rate * _pas_w_oq, 2)
|
||
elif t5_active_rate is not None:
|
||
t5_combined_rate = t5_active_rate
|
||
elif t5_passive_rate is not None:
|
||
t5_combined_rate = t5_passive_rate
|
||
else:
|
||
t5_combined_rate = None
|
||
# 최종 t5_operational_rate: 개선된 방법론 우선 적용
|
||
t5_operational_rate_legacy = round((t5_operational_match / t5_operational_eval) * 100.0, 2) if t5_operational_eval > 0 else None
|
||
t5_operational_rate = t5_combined_rate if t5_combined_rate is not None else t5_operational_rate_legacy
|
||
|
||
if t20_operational_eval >= min_eval_samples and t20_operational_rate is not None:
|
||
# 1순위: 운영 T+20 충분 — 실측 T+20 사용
|
||
t20_effective_rate = float(t20_operational_rate)
|
||
t20_source = "proposal_evaluation_history.operational_t20_only"
|
||
t20_evaluated = t20_operational_eval
|
||
elif t20_operational_eval == 0 and t5_operational_eval >= min_eval_samples and t5_operational_rate is not None:
|
||
# 2순위: 운영 T+20 = 0 이고 운영 T+5 ≥ 30 → T+5를 proxy로 사용 (거짓 50 fallback 제거)
|
||
t20_effective_rate = float(t5_operational_rate)
|
||
t20_source = "t5_operational_proxy"
|
||
t20_evaluated = t5_operational_eval
|
||
else:
|
||
# 3순위: 운영 표본 자체가 부족 → 중립 (표본 부족 명시)
|
||
t20_effective_rate = t20_rate if t20_evaluated > 0 else neutral_score
|
||
if t20_operational_eval == 0 and t5_operational_eval < min_eval_samples:
|
||
t20_effective_rate = neutral_score
|
||
t20_source = "neutral_due_to_insufficient_operational_samples"
|
||
base_score = max(0.0, min(100.0, w_t20 * t20_effective_rate + w_tq * tq_score + w_rb * rebound_score + w_lb * bucket_score))
|
||
sample_count = min(t20_evaluated, tq_scored_count) if tq_scored_count > 0 else t20_evaluated
|
||
sample_confidence = max(0.0, min(1.0, (sample_count / float(max(1, min_eval_samples)))))
|
||
score = round(max(0.0, min(100.0, base_score * (1.0 - sample_conf_w) + base_score * sample_conf_w * sample_confidence)), 2)
|
||
has_sufficient_eval = sample_count >= min_eval_samples
|
||
gate = "PASS" if score >= pass_threshold else ("CAUTION_MODE" if score >= caution_threshold else "CRITICAL_MODE")
|
||
if not has_sufficient_eval:
|
||
gate = "INSUFFICIENT_EVAL"
|
||
|
||
root_cause_flags = []
|
||
if t20_evaluated == 0:
|
||
root_cause_flags.append("ALL_T20_DATA_MISSING")
|
||
if not has_sufficient_eval:
|
||
root_cause_flags.append("INSUFFICIENT_EFFECTIVE_SAMPLE")
|
||
eval_window = {}
|
||
hist_dates = sorted({str(r.get("proposal_date")) for r in records if isinstance(r, dict) and r.get("proposal_date")})
|
||
t20_min_days_required = int(eval_policy.get("t20_min_days_required") or 28)
|
||
if hist_dates:
|
||
try:
|
||
min_d = date.fromisoformat(hist_dates[0])
|
||
max_d = date.fromisoformat(hist_dates[-1])
|
||
elapsed_min = (max_d - min_d).days
|
||
eval_window = {
|
||
"history_min_date": min_d.isoformat(),
|
||
"history_max_date": max_d.isoformat(),
|
||
"elapsed_from_min_days": elapsed_min,
|
||
"t20_min_days_required": t20_min_days_required,
|
||
"t20_window_ready": elapsed_min >= t20_min_days_required,
|
||
}
|
||
if elapsed_min < t20_min_days_required:
|
||
root_cause_flags.append("T20_WINDOW_NOT_REACHED")
|
||
except Exception:
|
||
eval_window = {}
|
||
|
||
result = {
|
||
"formula_id": "OUTCOME_QUALITY_SCORE_V1",
|
||
"score": score,
|
||
"gate": gate,
|
||
"root_cause_flags": root_cause_flags,
|
||
"evaluation_window": eval_window,
|
||
"metrics": {
|
||
"t20_pass_rate": t20_rate,
|
||
"t20_effective_rate": t20_effective_rate,
|
||
"t20_evaluated_count": t20_evaluated,
|
||
"t20_source": t20_source,
|
||
"t20_operational_pass_rate": t20_operational_rate,
|
||
"t20_operational_evaluated_count": t20_operational_eval,
|
||
"t5_operational_pass_rate": t5_operational_rate,
|
||
"t5_operational_pass_rate_legacy": t5_operational_rate_legacy,
|
||
"t5_active_rate": t5_active_rate,
|
||
"t5_passive_rate": t5_passive_rate,
|
||
"t5_combined_rate": t5_combined_rate,
|
||
"t5_decisive_count": t5_decisive,
|
||
"t5_operational_evaluated_count": t5_operational_eval,
|
||
"macro_event_excluded_count": macro_excluded_count,
|
||
"trade_quality_score": tq_score,
|
||
"trade_quality_scored_count": tq_scored_count,
|
||
"trade_quality_basis": trade_quality_basis,
|
||
"rebound_efficiency_score": rebound_score,
|
||
"late_rebound_bucket_score": bucket_score,
|
||
"sample_count": sample_count,
|
||
"sample_confidence": round(sample_confidence, 4),
|
||
"base_score": round(base_score, 2),
|
||
"has_sufficient_eval": has_sufficient_eval,
|
||
},
|
||
"policy_used": {
|
||
"policy_path": str(policy_path),
|
||
"weights": {
|
||
"t20_pass_rate": w_t20,
|
||
"trade_quality_score": w_tq,
|
||
"rebound_efficiency_score": w_rb,
|
||
"late_rebound_bucket_score": w_lb,
|
||
},
|
||
"missing_eval_neutral_score": neutral_score,
|
||
"min_effective_eval_samples": min_eval_samples,
|
||
"sample_confidence_weight": sample_conf_w,
|
||
"pass_threshold": pass_threshold,
|
||
"caution_threshold": caution_threshold,
|
||
},
|
||
}
|
||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
print(json.dumps(result, ensure_ascii=False, indent=2))
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|