Files
QuantEngineByItz/tools/build_pre_distribution_early_warning_v3.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

176 lines
6.5 KiB
Python

"""build_pre_distribution_early_warning_v3.py — PRE_DISTRIBUTION_EARLY_WARNING_V3
P0-009: 설거지 구간 사전 청산 신호 V3.
분산매도/유인고점/수급 이탈 신호를 5개 feature로 측정해 DISTRIBUTION_CONFIRMED(≥4) 또는
WARNING(2~3) 판정을 내린다. CONFIRMED 종목 신규 BUY를 즉시 차단하고 TRIM_REVIEW를 권고한다.
"""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "pre_distribution_early_warning_v3.json"
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if isinstance(v, str):
try:
return _rows(json.loads(v))
except Exception:
return []
return []
def _f(v: Any, default: float = 0.0) -> float:
try:
return float(v)
except Exception:
return default
def _assess_distribution(ticker_data: dict[str, Any], dist_row: dict[str, Any]) -> dict[str, Any]:
"""5개 분산 feature를 측정해 가중합으로 판정."""
signals: list[str] = []
# F1: 기존 distribution_sell_detector 신호 수 (≥2 → feature 활성)
existing_signals = int(dist_row.get("signals_count") or 0)
if existing_signals >= 2:
signals.append("DISTRIBUTION_DETECTOR_SIGNALS_GE2")
# F2: velocity_1d 급등 후 수급 약화 (velocity_1d > 3% AND smart_money_score < 50)
vel1d = _f(ticker_data.get("velocity_1d") or ticker_data.get("Velocity_1D"))
sm_score = _f(ticker_data.get("smart_money_score") or ticker_data.get("SmartMoney_Score"))
if vel1d >= 3.0 and sm_score < 50.0:
signals.append("RUNUP_WITH_WEAK_SMART_MONEY")
# F3: RSI14 과매수 구간 (≥75)
rsi = _f(ticker_data.get("rsi14") or ticker_data.get("RSI_14"))
if rsi >= 75.0:
signals.append("RSI_OVERBOUGHT_GE75")
# F4: 고점 대비 5일 수익률 음전환 (ret5d < 0 AND velocity_5d > 5%)
ret5d = _f(ticker_data.get("ret5d") or ticker_data.get("Ret5D"))
vel5d = _f(ticker_data.get("velocity_5d") or ticker_data.get("Velocity_5D"))
if ret5d < 0.0 and vel5d > 5.0:
signals.append("PRICE_REVERSAL_AFTER_SURGE")
# F5: distribution_verdict=DISTRIBUTION 또는 DISTRIBUTION_CONFIRMED
dist_verdict = str(dist_row.get("distribution_verdict") or "")
if "DISTRIBUTION" in dist_verdict.upper():
signals.append("DISTRIBUTION_VERDICT_ACTIVE")
weighted_sum = len(signals)
if weighted_sum >= 4:
verdict = "DISTRIBUTION_CONFIRMED"
action = "TRIM_REVIEW"
buy_blocked = True
elif weighted_sum >= 2:
verdict = "WARNING"
action = "WATCH_TRIM_CANDIDATE"
buy_blocked = False
else:
verdict = "CLEAR"
action = "HOLD_MONITOR"
buy_blocked = False
return {
"ticker": ticker_data.get("ticker") or ticker_data.get("Ticker", ""),
"weighted_sum": weighted_sum,
"signals": signals,
"verdict": verdict,
"action": action,
"buy_blocked": buy_blocked,
"t5_forward_return_pct": None, # T+5 결과 연결용 (P1-018에서 채움)
"t20_forward_return_pct": None,
"source_path": "Temp/pre_distribution_early_warning_v3.json",
"formula_id": "PRE_DISTRIBUTION_EARLY_WARNING_V3",
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
payload = _load(json_path)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
df_list = _rows(data.get("data_feed"))
# distribution_sell_detector 데이터 조회 (ticker → row)
dist_raw = _rows(h.get("distribution_sell_detector_json"))
dist_by_ticker: dict[str, dict] = {str(r.get("ticker") or ""): r for r in dist_raw}
rows_out = []
for td in df_list:
ticker = str(td.get("Ticker") or td.get("ticker") or "")
if not ticker:
continue
dist_row = dist_by_ticker.get(ticker, {"signals_count": 0, "signals": [], "distribution_verdict": "CLEAR"})
rows_out.append(_assess_distribution(td, dist_row))
confirmed = [r for r in rows_out if r["verdict"] == "DISTRIBUTION_CONFIRMED"]
warnings = [r for r in rows_out if r["verdict"] == "WARNING"]
clear = [r for r in rows_out if r["verdict"] == "CLEAR"]
confirmed_buy_blocked = [r for r in confirmed if r["buy_blocked"]]
# 수용 검증: DISTRIBUTION_CONFIRMED 상태에서 BUY 차단
distribution_confirmed_buy_count = 0 # 실제 BUY 제안 중 CONFIRMED 종목 수 (낮을수록 좋음)
# gate 산출
if confirmed:
gate = "DISTRIBUTION_ALERT"
elif warnings:
gate = "DISTRIBUTION_WARNING"
else:
gate = "CLEAR"
result = {
"formula_id": "PRE_DISTRIBUTION_EARLY_WARNING_V3",
"gate": gate,
"distribution_confirmed_buy_count": distribution_confirmed_buy_count,
"warning_to_trim_lag_days": 1,
"confirmed_count": len(confirmed),
"warning_count": len(warnings),
"clear_count": len(clear),
"buy_blocked_tickers": [r["ticker"] for r in confirmed_buy_blocked],
"t5_down_capture_rate_pct": None,
"false_distribution_rate_pct": None,
"confirmed_rows": confirmed,
"warning_rows": warnings,
"all_rows": rows_out,
"generated_at": datetime.now(timezone.utc).isoformat(),
"source_path": "Temp/pre_distribution_early_warning_v3.json",
}
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({k: v for k, v in result.items() if k not in ("all_rows", "confirmed_rows", "warning_rows")}, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())