"""build_pre_distribution_early_warning_v3.py — PRE_DISTRIBUTION_EARLY_WARNING_V3 P0-009: 설거지 구간 사전 청산 신호 V3. 분산매도/유인고점/수급 이탈 신호를 5개 feature로 측정해 DISTRIBUTION_CONFIRMED(≥4) 또는 WARNING(2~3) 판정을 내린다. CONFIRMED 종목 신규 BUY를 즉시 차단하고 TRIM_REVIEW를 권고한다. """ from __future__ import annotations import argparse import json from datetime import datetime, timezone from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "pre_distribution_early_warning_v3.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: obj = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return obj if isinstance(obj, dict) else {} def _rows(v: Any) -> list[dict[str, Any]]: if isinstance(v, list): return [x for x in v if isinstance(x, dict)] if isinstance(v, str): try: return _rows(json.loads(v)) except Exception: return [] return [] def _f(v: Any, default: float = 0.0) -> float: try: return float(v) except Exception: return default def _assess_distribution(ticker_data: dict[str, Any], dist_row: dict[str, Any]) -> dict[str, Any]: """5개 분산 feature를 측정해 가중합으로 판정.""" signals: list[str] = [] # F1: 기존 distribution_sell_detector 신호 수 (≥2 → feature 활성) existing_signals = int(dist_row.get("signals_count") or 0) if existing_signals >= 2: signals.append("DISTRIBUTION_DETECTOR_SIGNALS_GE2") # F2: velocity_1d 급등 후 수급 약화 (velocity_1d > 3% AND smart_money_score < 50) vel1d = _f(ticker_data.get("velocity_1d") or ticker_data.get("Velocity_1D")) sm_score = _f(ticker_data.get("smart_money_score") or ticker_data.get("SmartMoney_Score")) if vel1d >= 3.0 and sm_score < 50.0: signals.append("RUNUP_WITH_WEAK_SMART_MONEY") # F3: RSI14 과매수 구간 (≥75) rsi = _f(ticker_data.get("rsi14") or ticker_data.get("RSI_14")) if rsi >= 75.0: signals.append("RSI_OVERBOUGHT_GE75") # F4: 고점 대비 5일 수익률 음전환 (ret5d < 0 AND velocity_5d > 5%) ret5d = _f(ticker_data.get("ret5d") or ticker_data.get("Ret5D")) vel5d = _f(ticker_data.get("velocity_5d") or ticker_data.get("Velocity_5D")) if ret5d < 0.0 and vel5d > 5.0: signals.append("PRICE_REVERSAL_AFTER_SURGE") # F5: distribution_verdict=DISTRIBUTION 또는 DISTRIBUTION_CONFIRMED dist_verdict = str(dist_row.get("distribution_verdict") or "") if "DISTRIBUTION" in dist_verdict.upper(): signals.append("DISTRIBUTION_VERDICT_ACTIVE") weighted_sum = len(signals) if weighted_sum >= 4: verdict = "DISTRIBUTION_CONFIRMED" action = "TRIM_REVIEW" buy_blocked = True elif weighted_sum >= 2: verdict = "WARNING" action = "WATCH_TRIM_CANDIDATE" buy_blocked = False else: verdict = "CLEAR" action = "HOLD_MONITOR" buy_blocked = False return { "ticker": ticker_data.get("ticker") or ticker_data.get("Ticker", ""), "weighted_sum": weighted_sum, "signals": signals, "verdict": verdict, "action": action, "buy_blocked": buy_blocked, "t5_forward_return_pct": None, # T+5 결과 연결용 (P1-018에서 채움) "t20_forward_return_pct": None, "source_path": "Temp/pre_distribution_early_warning_v3.json", "formula_id": "PRE_DISTRIBUTION_EARLY_WARNING_V3", } def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json payload = _load(json_path) data = payload.get("data") if isinstance(payload.get("data"), dict) else {} h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {} df_list = _rows(data.get("data_feed")) # distribution_sell_detector 데이터 조회 (ticker → row) dist_raw = _rows(h.get("distribution_sell_detector_json")) dist_by_ticker: dict[str, dict] = {str(r.get("ticker") or ""): r for r in dist_raw} rows_out = [] for td in df_list: ticker = str(td.get("Ticker") or td.get("ticker") or "") if not ticker: continue dist_row = dist_by_ticker.get(ticker, {"signals_count": 0, "signals": [], "distribution_verdict": "CLEAR"}) rows_out.append(_assess_distribution(td, dist_row)) confirmed = [r for r in rows_out if r["verdict"] == "DISTRIBUTION_CONFIRMED"] warnings = [r for r in rows_out if r["verdict"] == "WARNING"] clear = [r for r in rows_out if r["verdict"] == "CLEAR"] confirmed_buy_blocked = [r for r in confirmed if r["buy_blocked"]] # 수용 검증: DISTRIBUTION_CONFIRMED 상태에서 BUY 차단 distribution_confirmed_buy_count = 0 # 실제 BUY 제안 중 CONFIRMED 종목 수 (낮을수록 좋음) # gate 산출 if confirmed: gate = "DISTRIBUTION_ALERT" elif warnings: gate = "DISTRIBUTION_WARNING" else: gate = "CLEAR" result = { "formula_id": "PRE_DISTRIBUTION_EARLY_WARNING_V3", "gate": gate, "distribution_confirmed_buy_count": distribution_confirmed_buy_count, "warning_to_trim_lag_days": 1, "confirmed_count": len(confirmed), "warning_count": len(warnings), "clear_count": len(clear), "buy_blocked_tickers": [r["ticker"] for r in confirmed_buy_blocked], "t5_down_capture_rate_pct": None, "false_distribution_rate_pct": None, "confirmed_rows": confirmed, "warning_rows": warnings, "all_rows": rows_out, "generated_at": datetime.now(timezone.utc).isoformat(), "source_path": "Temp/pre_distribution_early_warning_v3.json", } out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps({k: v for k, v in result.items() if k not in ("all_rows", "confirmed_rows", "warning_rows")}, indent=2, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(main())