Files
QuantEngineByItz/tools/build_smart_cash_recovery_v4.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

259 lines
9.8 KiB
Python

from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
from itertools import combinations
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_REBOUND = ROOT / "Temp" / "rebound_sell_efficiency_v1.json"
DEFAULT_OUT = ROOT / "Temp" / "smart_cash_recovery_v4.json"
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _obj(v: Any) -> dict[str, Any]:
if isinstance(v, dict):
return v
if isinstance(v, str):
try:
x = json.loads(v)
return x if isinstance(x, dict) else {}
except Exception:
return {}
return {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if isinstance(v, str):
try:
return _rows(json.loads(v))
except Exception:
return []
return []
def _f(v: Any) -> float:
try:
return float(v)
except Exception:
return 0.0
def _cash_krw(row: dict[str, Any]) -> float:
v = _f(row.get("expected_immediate_krw"))
if v > 0:
return v
return _f(row.get("immediate_krw"))
def _weighted_damage(rows: list[dict[str, Any]]) -> float:
total = sum(_cash_krw(r) for r in rows)
if total <= 0:
return 0.0
num = sum(_cash_krw(r) * _f(r.get("value_damage_pct")) for r in rows)
return round(num / total, 2)
def _best_combo(rows: list[dict[str, Any]], shortfall_min: float) -> list[dict[str, Any]]:
if not rows:
return []
if shortfall_min <= 0:
return []
n = len(rows)
if n > 14:
# Keep deterministic bounded search.
rows = sorted(rows, key=lambda r: (_f(r.get("value_damage_pct")), -_f(r.get("expected_immediate_krw"))))[:14]
n = len(rows)
best: list[dict[str, Any]] = []
best_score = None
for k in range(1, n + 1):
for idx in combinations(range(n), k):
cand = [rows[i] for i in idx]
recovered = sum(_cash_krw(r) for r in cand)
if recovered < shortfall_min:
continue
dmg = _weighted_damage(cand)
oversell = recovered - shortfall_min
score = (dmg, oversell, k)
if best_score is None or score < best_score:
best_score = score
best = cand
if best:
# Already found feasible set with current k (minimum legs), keep best at this cardinality.
break
return best or rows
DEFAULT_FJ = ROOT / "Temp" / "final_judgment_gate_v1.json"
def _breach_tickers(payload: dict[str, Any]) -> dict[str, float]:
"""손절이탈(BREACH) 종목 ticker → 전체 시장가 매핑.
BREACH 종목: 손절가 이미 이탈 → 부분매도 아닌 전량 시장가로 대체해야 함.
"""
fj = _load(DEFAULT_FJ)
if not fj:
return {}
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
data_feed = data.get("data_feed") if isinstance(data.get("data_feed"), list) else []
feed_map = {str(r.get("Ticker") or ""): r for r in data_feed if isinstance(r, dict)}
result: dict[str, float] = {}
for row in (fj.get("rows") or []):
if not isinstance(row, dict):
continue
sell_signals = row.get("sell_signals") or {}
if sell_signals.get("stop_breach_gate") != "BREACH":
continue
ticker = str(row.get("ticker") or "")
feed = feed_map.get(ticker, {})
market_val = _f(feed.get("Account_Market_Value"))
if market_val <= 0.0:
qty = _f(feed.get("Account_Holding_Qty"))
close = _f(feed.get("Close"))
if qty > 0 and close > 0:
market_val = qty * close
if market_val > 0.0:
result[ticker] = market_val
return result
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--rebound", default=str(DEFAULT_REBOUND))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
jp = Path(args.json)
rp = Path(args.rebound)
op = Path(args.out)
if not jp.is_absolute():
jp = ROOT / jp
if not rp.is_absolute():
rp = ROOT / rp
if not op.is_absolute():
op = ROOT / op
payload = _load(jp)
rebound = _load(rp)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
hctx = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
if isinstance(payload.get("hApex"), dict):
hctx = dict(hctx) | payload["hApex"]
scrs = _obj(hctx.get("scrs_v2_json"))
combo = _rows(scrs.get("selected_combo"))
shortfall_min = _f(hctx.get("cash_shortfall_min_krw"))
if shortfall_min <= 0.0:
shortfall_min = _f(scrs.get("cash_shortfall_min_krw"))
total_immediate = _f(scrs.get("total_immediate_sell_krw"))
expected_rebound_gain = _f(scrs.get("expected_rebound_gain_krw"))
value_damage_avg = _f((rebound.get("metrics") or {}).get("value_damage_pct_avg"))
if value_damage_avg <= 0.0:
value_damage_avg = _f(scrs.get("value_damage_pct_avg"))
# [V4-BREACH-FIRST] 결정론 보장: BREACH 종목(손절이탈) 항상 우선 시도.
# GAS combo가 breach 종목을 포함하지 않을 수 있으므로 교체(REPLACE) + 보충(ADD) 모두 수행.
# 정공법: 현금부족 있을 때 항상 breach pool에서 먼저 최적 조합을 탐색. feasible이면 채택.
breach_supplement_used = False
breach_map = _breach_tickers(payload) if shortfall_min > 0.0 else {}
if breach_map:
# 기존 combo 중 breach인 것은 교체, 비breach는 유지
breach_combo_candidates = []
combo_tickers = {str(c.get("ticker") or "") for c in combo}
for c in combo:
ticker = str(c.get("ticker") or "")
if ticker in breach_map:
upgraded = dict(c)
upgraded["immediate_krw"] = breach_map[ticker]
upgraded["expected_immediate_krw"] = breach_map[ticker]
upgraded["value_damage_pct"] = 0.0
upgraded["source"] = "BREACH_FULL_LIQUIDATION"
breach_combo_candidates.append(upgraded)
else:
breach_combo_candidates.append(c)
# combo에 없는 breach 종목도 추가
for ticker, market_val in breach_map.items():
if ticker not in combo_tickers:
breach_combo_candidates.append({
"ticker": ticker,
"immediate_krw": market_val,
"expected_immediate_krw": market_val,
"value_damage_pct": 0.0,
"source": "BREACH_FULL_LIQUIDATION",
})
breach_combo = _best_combo(breach_combo_candidates, shortfall_min)
breach_damage = _weighted_damage(breach_combo) if breach_combo else 999.0
breach_recovered = sum(_cash_krw(r) for r in breach_combo)
breach_feasible = shortfall_min <= 0.0 or breach_recovered >= shortfall_min
if breach_combo and breach_feasible:
optimized_combo = breach_combo
optimized_damage = breach_damage
breach_supplement_used = True
else:
optimized_combo = _best_combo(combo, shortfall_min)
optimized_damage = _weighted_damage(optimized_combo) if optimized_combo else value_damage_avg
else:
optimized_combo = _best_combo(combo, shortfall_min)
optimized_damage = _weighted_damage(optimized_combo) if optimized_combo else value_damage_avg
recovered = sum(_cash_krw(r) for r in optimized_combo)
if recovered <= 0.0:
recovered = total_immediate
remaining = max(0.0, shortfall_min - recovered)
optimized_damage = _weighted_damage(optimized_combo) if optimized_combo else value_damage_avg
shortfall_covered = shortfall_min <= 0.0 or recovered >= shortfall_min
execution_allowed = optimized_damage <= 10.0 and shortfall_covered
emergency_full_sell = False
if shortfall_min > 0.0 and recovered > 0.0 and recovered * 2 < shortfall_min:
emergency_full_sell = True
result = {
"formula_id": "SMART_CASH_RECOVERY_V4",
"status": "PASS" if execution_allowed else "CASH_RECOVERY_VALUE_DAMAGE_BLOCK",
"execution_allowed": execution_allowed,
"value_damage_block_threshold": 10.0,
"selected_sell_combo": optimized_combo,
"cash_recovered_krw": round(recovered),
"cash_shortfall_min_krw": round(shortfall_min),
"cash_shortfall_remaining_krw": round(remaining),
"cash_shortfall_covered": shortfall_covered,
"value_damage_pct_avg": value_damage_avg, # P0-T1: 은폐 금지, 표시=원시값
"value_damage_pct_avg_raw": value_damage_avg, # raw 동일값 유지 (backward compat)
"value_damage_pct_avg_optimized": optimized_damage, # 선택 조합의 최적화값 (참고용)
"expected_rebound_gain_krw": round(expected_rebound_gain),
"total_immediate_sell_krw": round(total_immediate),
"emergency_full_sell": emergency_full_sell,
"optimization": {
"candidate_count": len(combo),
"selected_count": len(optimized_combo),
"method": "MIN_WEIGHTED_DAMAGE_SUBSET_WITH_SHORTFALL_COVER",
"breach_supplement_used": breach_supplement_used,
},
}
op.parent.mkdir(parents=True, exist_ok=True)
op.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())