"""build_sell_engine_audit_v1.py — SELL_ENGINE_AUDIT_V1 프롬프트 §3.8 Sell & Cash Reserve Harness — 매도 유형 분류 검증. 매도 유형 분류: risk_cut_sell — 손절 이탈(BREACH) 종목 매도 profit_taking_sell — TP 도달 또는 profit_lock 구간 익절 cash_reserve_sell — 현금부족 해소 목적 매도 rebalance_sell — 비중 초과(OVERWEIGHT_TRIM) 감축 thesis_broken_sell — 투자 thesis 붕괴(fundamental 역전 등) 필수 §3.8 출력 항목 검증: sell_reason / sell_type / sell_ratio / minimum_cash_required / expected_liquidity_impact / rebound_participation_ratio / stop_loss_level / trailing_stop_level / execution_risk 산출물: Temp/sell_engine_audit_v1.json """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] TEMP = ROOT / "Temp" DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = TEMP / "sell_engine_audit_v1.json" FORMULA_ID = "SELL_ENGINE_AUDIT_V1" NA = "not_available" def _load(path: Path) -> Any: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _f(v: Any, default: float | None = None) -> float | None: try: return float(v) except Exception: return default def _extract_harness_root(payload: Any) -> dict[str, Any]: if not isinstance(payload, dict): return {} h = payload.get("hApex") dc = (payload.get("data") or {}).get("_harness_context") if isinstance(h, dict) and isinstance(dc, dict): m = dict(dc); m.update(h); return m return h if isinstance(h, dict) else dc if isinstance(dc, dict) else payload # 매도 유형 분류 로직 def _classify_sell_type( ticker: str, verdict: str, stop_breach: bool, tp_triggered: bool, cash_shortfall: float, is_overweight: bool, thesis_broken: bool, ) -> str: if stop_breach: return "risk_cut_sell" if tp_triggered: return "profit_taking_sell" if cash_shortfall > 0 and verdict in ("TRIM", "SELL"): return "cash_reserve_sell" if is_overweight: return "rebalance_sell" if thesis_broken: return "thesis_broken_sell" return "no_sell" def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() json_path = Path(args.json); json_path = json_path if json_path.is_absolute() else ROOT / json_path out_path = Path(args.out); out_path = out_path if out_path.is_absolute() else ROOT / args.out payload = _load(json_path) harness = _extract_harness_root(payload) fj = _load(TEMP / "final_judgment_gate_v1.json") scr = _load(TEMP / "smart_cash_recovery_v5.json") dvps = _load(TEMP / "dynamic_value_preservation_sell_v6.json") ratchet = _load(TEMP / "ratchet_trailing_general_v1.json") # K2 staged rebound sell — hApex.k2_staged_rebound_sell_json _k2_raw = harness.get("k2_staged_rebound_sell_json") or [] if isinstance(_k2_raw, str): try: _k2_raw = json.loads(_k2_raw) except Exception: _k2_raw = [] k2_map: dict[str, dict] = {} for k2r in (_k2_raw if isinstance(_k2_raw, list) else []): if isinstance(k2r, dict) and k2r.get("ticker"): k2_map[str(k2r["ticker"])] = k2r # sell_ratio 산식 (SCR): sell_ratio = immediate_sell_qty / total_qty # rebound_sell_trigger_json에서 비율 산출 _rebound_raw = harness.get("rebound_sell_trigger_json") or [] if isinstance(_rebound_raw, str): try: _rebound_raw = json.loads(_rebound_raw) except Exception: _rebound_raw = [] rebound_map: dict[str, dict] = {} for rb in (_rebound_raw if isinstance(_rebound_raw, list) else []): if isinstance(rb, dict) and rb.get("ticker"): rebound_map[str(rb["ticker"])] = rb # 현금 부족액 cash_shortfall = _f(scr.get("cash_shortfall_min_krw"), 0) or 0.0 # §3.8 필수 출력 현황 체크 scr_required_fields = [ "status", "execution_allowed", "selected_sell_combo", "cash_shortfall_min_krw", "value_damage_pct_avg", "emergency_full_sell", ] dvps_required_fields = ["formula_id", "rows"] missing_scr = [f for f in scr_required_fields if scr.get(f) is None] missing_dvps = [f for f in dvps_required_fields if dvps.get(f) is None] # 종목별 매도 분류 fj_rows = fj.get("rows") or [] # 손절 이탈 감지: effective_confidence에서 J01 BREACH 여부 breach_tickers = set() for r in fj_rows: if not isinstance(r, dict): continue trace = r.get("and_trace") or [] for gate in trace: if isinstance(gate, dict) and "STOP_BREACH" in str(gate.get("detail", "")): breach_tickers.add(r.get("ticker", "")) # hApex의 stop_breach_alert_json에서 읽기 # 구조: list[{ticker, stop_breach_gate, gap_pct, ...}] sba = harness.get("stop_breach_alert_json") or [] if isinstance(sba, str): try: sba = json.loads(sba) except Exception: sba = [] # list 구조 처리 if isinstance(sba, list): for entry in sba: if isinstance(entry, dict): gate = str(entry.get("stop_breach_gate") or "") if gate in ("BREACH", "STOP_BREACH", "BREACH_IMMEDIATE_EXIT"): breach_tickers.add(str(entry.get("ticker", ""))) elif isinstance(sba, dict): # 구버전 dict 구조 (하위 호환) for entry in (sba.get("breaches") or []): if isinstance(entry, dict): breach_tickers.add(str(entry.get("ticker", ""))) # 비중 초과 감지 spw = harness.get("single_position_weight_json") or {} if isinstance(spw, str): try: spw = json.loads(spw) except Exception: spw = {} overweight_tickers = set() if isinstance(spw, dict): for entry in (spw.get("overweight_tickers") or []): if isinstance(entry, dict): overweight_tickers.add(str(entry.get("ticker", ""))) # Also check status field directly if str(spw.get("gate_status", "")) == "OVERWEIGHT_TRIM": for entry in (spw.get("tickers") or []): if isinstance(entry, dict) and entry.get("status") == "OVERWEIGHT": overweight_tickers.add(str(entry.get("ticker", ""))) classified_rows: list[dict[str, Any]] = [] sell_type_counts: dict[str, int] = { "risk_cut_sell": 0, "profit_taking_sell": 0, "cash_reserve_sell": 0, "rebalance_sell": 0, "thesis_broken_sell": 0, "no_sell": 0, } for r in fj_rows: if not isinstance(r, dict): continue ticker = r.get("ticker", "") verdict = str(r.get("action_verdict", "")) is_breach = ticker in breach_tickers is_overweight = ticker in overweight_tickers sell_type = _classify_sell_type( ticker=ticker, verdict=verdict, stop_breach=is_breach, tp_triggered=False, # TP trigger는 별도 gate (tp_trigger_alert_json) cash_shortfall=cash_shortfall, is_overweight=is_overweight, thesis_broken=False, ) if sell_type in sell_type_counts: sell_type_counts[sell_type] += 1 # §3.8 필수 sell 출력 필드 존재 여부 combo = next( (c for c in (scr.get("selected_sell_combo") or []) if isinstance(c, dict) and c.get("ticker") == ticker), {}, ) # K2 rebound 데이터 (hApex.k2_staged_rebound_sell_json) k2 = k2_map.get(ticker, {}) imm_qty = _f(k2.get("immediate_sell_qty")) wait_qty = _f(k2.get("rebound_wait_qty")) total_qty = (imm_qty or 0) + (wait_qty or 0) sell_ratio = round(imm_qty / total_qty, 4) if (imm_qty is not None and total_qty > 0) else NA rebound_ratio = round((wait_qty or 0) / total_qty, 4) if (wait_qty is not None and total_qty > 0) else NA trigger_price = k2.get("rebound_trigger_price") row_result: dict[str, Any] = { "ticker": ticker, "action_verdict": verdict, "sell_type": sell_type, "stop_breach": is_breach, "overweight": is_overweight, # §3.8 필수 출력 항목 "sell_reason": combo.get("source", NA), "sell_ratio": sell_ratio, # immediate / total (K2_STAGED_REBOUND_SELL) "sell_ratio_formula": "immediate_sell_qty / (immediate_sell_qty + rebound_wait_qty)", "immediate_krw": combo.get("immediate_krw", NA), "value_damage_pct": combo.get("value_damage_pct", NA), "rebound_participation_ratio": rebound_ratio, # K2에서 추출 "rebound_trigger_price": trigger_price, "rebound_wait_qty": wait_qty, "emergency_full_sell": k2.get("emergency_full_sell"), "stop_loss_level": "harness_locked", "trailing_stop_level": "ratchet_managed", "execution_risk": combo.get("execution_allowed", NA), } classified_rows.append(row_result) # SCR 매도 계획 요약 scr_plan = { "status": scr.get("status", NA), "execution_allowed": scr.get("execution_allowed"), "cash_shortfall_min_krw": scr.get("cash_shortfall_min_krw", NA), "cash_recovered_krw": scr.get("cash_recovered_krw", NA), "value_damage_pct_avg": scr.get("value_damage_pct_avg", NA), "emergency_full_sell": scr.get("emergency_full_sell"), "combo_count": len(scr.get("selected_sell_combo") or []), "sell_method": "BREACH_FULL_LIQUIDATION / K2_STAGED_REBOUND_SELL (하네스 결정)", } # §3.8 필수 출력 완성도 체크 required_outputs_present = { "sell_type_classification": True, # classified_rows "sell_reason": True, # SCR source 필드 "minimum_cash_required": scr.get("cash_shortfall_min_krw") is not None, "expected_liquidity_impact": scr.get("value_damage_pct_avg") is not None, "execution_allowed": scr.get("execution_allowed") is not None, "emergency_full_sell": scr.get("emergency_full_sell") is not None, "tranche_sell": "K2_STAGED_REBOUND_SELL_V1 (하네스, 별도 검증)", "stop_loss_level": True, # RATCHET_TRAILING 관리 "sell_ratio_formula": len(k2_map) > 0, # K2 데이터 존재 시 산출 가능 "rebound_participation_ratio": len(k2_map) > 0, # K2 데이터 존재 시 산출 가능 } missing_outputs = [k for k, v in required_outputs_present.items() if v is False] result = { "formula_id": FORMULA_ID, "sell_type_counts": sell_type_counts, "scr_plan": scr_plan, "classified_rows": classified_rows, "required_outputs_present": required_outputs_present, "missing_required_outputs": missing_outputs, "missing_scr_fields": missing_scr, "missing_dvps_fields": missing_dvps, "breach_tickers": list(breach_tickers), "overweight_tickers": list(overweight_tickers), "gate": "WARN" if missing_outputs else "PASS", "k2_tickers_with_rebound": list(k2_map.keys()), "note": ( "K2_STAGED_REBOUND_SELL_V1: hApex.k2_staged_rebound_sell_json에서 " "sell_ratio / rebound_participation_ratio 산출. " "k2_map이 비어있으면 해당 종목에 K2 발동 없음." ), } out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") print( f"[{FORMULA_ID}] sell_types={sell_type_counts} " f"breach_tickers={list(breach_tickers)} " f"missing_outputs={missing_outputs} " f"gate={result['gate']} -> {out_path}" ) return 0 if __name__ == "__main__": raise SystemExit(main())