Files
QuantEngineByItz/tools/build_sell_engine_audit_v1.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

306 lines
12 KiB
Python

"""build_sell_engine_audit_v1.py — SELL_ENGINE_AUDIT_V1
프롬프트 §3.8 Sell & Cash Reserve Harness — 매도 유형 분류 검증.
매도 유형 분류:
risk_cut_sell — 손절 이탈(BREACH) 종목 매도
profit_taking_sell — TP 도달 또는 profit_lock 구간 익절
cash_reserve_sell — 현금부족 해소 목적 매도
rebalance_sell — 비중 초과(OVERWEIGHT_TRIM) 감축
thesis_broken_sell — 투자 thesis 붕괴(fundamental 역전 등)
필수 §3.8 출력 항목 검증:
sell_reason / sell_type / sell_ratio / minimum_cash_required /
expected_liquidity_impact / rebound_participation_ratio /
stop_loss_level / trailing_stop_level / execution_risk
산출물: Temp/sell_engine_audit_v1.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
TEMP = ROOT / "Temp"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "sell_engine_audit_v1.json"
FORMULA_ID = "SELL_ENGINE_AUDIT_V1"
NA = "not_available"
def _load(path: Path) -> Any:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _f(v: Any, default: float | None = None) -> float | None:
try:
return float(v)
except Exception:
return default
def _extract_harness_root(payload: Any) -> dict[str, Any]:
if not isinstance(payload, dict):
return {}
h = payload.get("hApex")
dc = (payload.get("data") or {}).get("_harness_context")
if isinstance(h, dict) and isinstance(dc, dict):
m = dict(dc); m.update(h); return m
return h if isinstance(h, dict) else dc if isinstance(dc, dict) else payload
# 매도 유형 분류 로직
def _classify_sell_type(
ticker: str,
verdict: str,
stop_breach: bool,
tp_triggered: bool,
cash_shortfall: float,
is_overweight: bool,
thesis_broken: bool,
) -> str:
if stop_breach:
return "risk_cut_sell"
if tp_triggered:
return "profit_taking_sell"
if cash_shortfall > 0 and verdict in ("TRIM", "SELL"):
return "cash_reserve_sell"
if is_overweight:
return "rebalance_sell"
if thesis_broken:
return "thesis_broken_sell"
return "no_sell"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json); json_path = json_path if json_path.is_absolute() else ROOT / json_path
out_path = Path(args.out); out_path = out_path if out_path.is_absolute() else ROOT / args.out
payload = _load(json_path)
harness = _extract_harness_root(payload)
fj = _load(TEMP / "final_judgment_gate_v1.json")
scr = _load(TEMP / "smart_cash_recovery_v5.json")
dvps = _load(TEMP / "dynamic_value_preservation_sell_v6.json")
ratchet = _load(TEMP / "ratchet_trailing_general_v1.json")
# K2 staged rebound sell — hApex.k2_staged_rebound_sell_json
_k2_raw = harness.get("k2_staged_rebound_sell_json") or []
if isinstance(_k2_raw, str):
try: _k2_raw = json.loads(_k2_raw)
except Exception: _k2_raw = []
k2_map: dict[str, dict] = {}
for k2r in (_k2_raw if isinstance(_k2_raw, list) else []):
if isinstance(k2r, dict) and k2r.get("ticker"):
k2_map[str(k2r["ticker"])] = k2r
# sell_ratio 산식 (SCR): sell_ratio = immediate_sell_qty / total_qty
# rebound_sell_trigger_json에서 비율 산출
_rebound_raw = harness.get("rebound_sell_trigger_json") or []
if isinstance(_rebound_raw, str):
try: _rebound_raw = json.loads(_rebound_raw)
except Exception: _rebound_raw = []
rebound_map: dict[str, dict] = {}
for rb in (_rebound_raw if isinstance(_rebound_raw, list) else []):
if isinstance(rb, dict) and rb.get("ticker"):
rebound_map[str(rb["ticker"])] = rb
# 현금 부족액
cash_shortfall = _f(scr.get("cash_shortfall_min_krw"), 0) or 0.0
# §3.8 필수 출력 현황 체크
scr_required_fields = [
"status", "execution_allowed", "selected_sell_combo",
"cash_shortfall_min_krw", "value_damage_pct_avg",
"emergency_full_sell",
]
dvps_required_fields = ["formula_id", "rows"]
missing_scr = [f for f in scr_required_fields if scr.get(f) is None]
missing_dvps = [f for f in dvps_required_fields if dvps.get(f) is None]
# 종목별 매도 분류
fj_rows = fj.get("rows") or []
# 손절 이탈 감지: effective_confidence에서 J01 BREACH 여부
breach_tickers = set()
for r in fj_rows:
if not isinstance(r, dict):
continue
trace = r.get("and_trace") or []
for gate in trace:
if isinstance(gate, dict) and "STOP_BREACH" in str(gate.get("detail", "")):
breach_tickers.add(r.get("ticker", ""))
# hApex의 stop_breach_alert_json에서 읽기
# 구조: list[{ticker, stop_breach_gate, gap_pct, ...}]
sba = harness.get("stop_breach_alert_json") or []
if isinstance(sba, str):
try:
sba = json.loads(sba)
except Exception:
sba = []
# list 구조 처리
if isinstance(sba, list):
for entry in sba:
if isinstance(entry, dict):
gate = str(entry.get("stop_breach_gate") or "")
if gate in ("BREACH", "STOP_BREACH", "BREACH_IMMEDIATE_EXIT"):
breach_tickers.add(str(entry.get("ticker", "")))
elif isinstance(sba, dict):
# 구버전 dict 구조 (하위 호환)
for entry in (sba.get("breaches") or []):
if isinstance(entry, dict):
breach_tickers.add(str(entry.get("ticker", "")))
# 비중 초과 감지
spw = harness.get("single_position_weight_json") or {}
if isinstance(spw, str):
try:
spw = json.loads(spw)
except Exception:
spw = {}
overweight_tickers = set()
if isinstance(spw, dict):
for entry in (spw.get("overweight_tickers") or []):
if isinstance(entry, dict):
overweight_tickers.add(str(entry.get("ticker", "")))
# Also check status field directly
if str(spw.get("gate_status", "")) == "OVERWEIGHT_TRIM":
for entry in (spw.get("tickers") or []):
if isinstance(entry, dict) and entry.get("status") == "OVERWEIGHT":
overweight_tickers.add(str(entry.get("ticker", "")))
classified_rows: list[dict[str, Any]] = []
sell_type_counts: dict[str, int] = {
"risk_cut_sell": 0, "profit_taking_sell": 0, "cash_reserve_sell": 0,
"rebalance_sell": 0, "thesis_broken_sell": 0, "no_sell": 0,
}
for r in fj_rows:
if not isinstance(r, dict):
continue
ticker = r.get("ticker", "")
verdict = str(r.get("action_verdict", ""))
is_breach = ticker in breach_tickers
is_overweight = ticker in overweight_tickers
sell_type = _classify_sell_type(
ticker=ticker,
verdict=verdict,
stop_breach=is_breach,
tp_triggered=False, # TP trigger는 별도 gate (tp_trigger_alert_json)
cash_shortfall=cash_shortfall,
is_overweight=is_overweight,
thesis_broken=False,
)
if sell_type in sell_type_counts:
sell_type_counts[sell_type] += 1
# §3.8 필수 sell 출력 필드 존재 여부
combo = next(
(c for c in (scr.get("selected_sell_combo") or []) if isinstance(c, dict) and c.get("ticker") == ticker),
{},
)
# K2 rebound 데이터 (hApex.k2_staged_rebound_sell_json)
k2 = k2_map.get(ticker, {})
imm_qty = _f(k2.get("immediate_sell_qty"))
wait_qty = _f(k2.get("rebound_wait_qty"))
total_qty = (imm_qty or 0) + (wait_qty or 0)
sell_ratio = round(imm_qty / total_qty, 4) if (imm_qty is not None and total_qty > 0) else NA
rebound_ratio = round((wait_qty or 0) / total_qty, 4) if (wait_qty is not None and total_qty > 0) else NA
trigger_price = k2.get("rebound_trigger_price")
row_result: dict[str, Any] = {
"ticker": ticker,
"action_verdict": verdict,
"sell_type": sell_type,
"stop_breach": is_breach,
"overweight": is_overweight,
# §3.8 필수 출력 항목
"sell_reason": combo.get("source", NA),
"sell_ratio": sell_ratio, # immediate / total (K2_STAGED_REBOUND_SELL)
"sell_ratio_formula": "immediate_sell_qty / (immediate_sell_qty + rebound_wait_qty)",
"immediate_krw": combo.get("immediate_krw", NA),
"value_damage_pct": combo.get("value_damage_pct", NA),
"rebound_participation_ratio": rebound_ratio, # K2에서 추출
"rebound_trigger_price": trigger_price,
"rebound_wait_qty": wait_qty,
"emergency_full_sell": k2.get("emergency_full_sell"),
"stop_loss_level": "harness_locked",
"trailing_stop_level": "ratchet_managed",
"execution_risk": combo.get("execution_allowed", NA),
}
classified_rows.append(row_result)
# SCR 매도 계획 요약
scr_plan = {
"status": scr.get("status", NA),
"execution_allowed": scr.get("execution_allowed"),
"cash_shortfall_min_krw": scr.get("cash_shortfall_min_krw", NA),
"cash_recovered_krw": scr.get("cash_recovered_krw", NA),
"value_damage_pct_avg": scr.get("value_damage_pct_avg", NA),
"emergency_full_sell": scr.get("emergency_full_sell"),
"combo_count": len(scr.get("selected_sell_combo") or []),
"sell_method": "BREACH_FULL_LIQUIDATION / K2_STAGED_REBOUND_SELL (하네스 결정)",
}
# §3.8 필수 출력 완성도 체크
required_outputs_present = {
"sell_type_classification": True, # classified_rows
"sell_reason": True, # SCR source 필드
"minimum_cash_required": scr.get("cash_shortfall_min_krw") is not None,
"expected_liquidity_impact": scr.get("value_damage_pct_avg") is not None,
"execution_allowed": scr.get("execution_allowed") is not None,
"emergency_full_sell": scr.get("emergency_full_sell") is not None,
"tranche_sell": "K2_STAGED_REBOUND_SELL_V1 (하네스, 별도 검증)",
"stop_loss_level": True, # RATCHET_TRAILING 관리
"sell_ratio_formula": len(k2_map) > 0, # K2 데이터 존재 시 산출 가능
"rebound_participation_ratio": len(k2_map) > 0, # K2 데이터 존재 시 산출 가능
}
missing_outputs = [k for k, v in required_outputs_present.items() if v is False]
result = {
"formula_id": FORMULA_ID,
"sell_type_counts": sell_type_counts,
"scr_plan": scr_plan,
"classified_rows": classified_rows,
"required_outputs_present": required_outputs_present,
"missing_required_outputs": missing_outputs,
"missing_scr_fields": missing_scr,
"missing_dvps_fields": missing_dvps,
"breach_tickers": list(breach_tickers),
"overweight_tickers": list(overweight_tickers),
"gate": "WARN" if missing_outputs else "PASS",
"k2_tickers_with_rebound": list(k2_map.keys()),
"note": (
"K2_STAGED_REBOUND_SELL_V1: hApex.k2_staged_rebound_sell_json에서 "
"sell_ratio / rebound_participation_ratio 산출. "
"k2_map이 비어있으면 해당 종목에 K2 발동 없음."
),
}
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
print(
f"[{FORMULA_ID}] sell_types={sell_type_counts} "
f"breach_tickers={list(breach_tickers)} "
f"missing_outputs={missing_outputs} "
f"gate={result['gate']} -> {out_path}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())