Files
QuantEngineByItz/tools/build_smart_money_liquidity_composite_v3.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

299 lines
12 KiB
Python

"""build_smart_money_liquidity_composite_v3.py — SMART_MONEY_LIQUIDITY_COMPOSITE_V3
P0-013: 수급과 유동성의 실전 합성 V3.
모든 보유 종목에 slippage_bps를 채우고, 수급 방향성을 1D/3D/5D/20D로 분리 저장하며,
수급 약세+유동성 양호 종목의 cash_raise 후보 점수를 상향한다.
수급 약세 종목 신규 BUY를 차단한다.
"""
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "smart_money_liquidity_composite_v3.json"
DEFAULT_LIQUIDITY_SIGNAL = ROOT / "Temp" / "liquidity_flow_signal_v1.json"
# 슬리피지 임계값 (억 단위 기준)
SLIPPAGE_MARKET_THRESHOLD = 50_000 # 500억 이상: MARKET
SLIPPAGE_TWAP_THRESHOLD = 5_000 # 50억 이상: TWAP
# < 50억: LIMIT
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if isinstance(v, str):
try:
return _rows(json.loads(v))
except Exception:
return []
return []
def _f(v: Any, default: float = 0.0) -> float:
try:
return float(v)
except Exception:
return default
def _tick_size(price: float) -> float:
if price < 1_000:
return 1.0
if price < 5_000:
return 5.0
if price < 10_000:
return 10.0
if price < 50_000:
return 50.0
if price < 100_000:
return 100.0
if price < 500_000:
return 500.0
return 1_000.0
def _slippage_bps(price: float, atv_m: float) -> float:
"""예상 슬리피지 (basis points). 유동성 기반 조정."""
if price <= 0:
return 0.0
tick = _tick_size(price)
base_slip = (tick / price) * 10_000 # 1tick 슬리피지
# 유동성 낮을수록 슬리피지 증가 (ATVm = 백만원 단위)
if atv_m >= SLIPPAGE_MARKET_THRESHOLD:
multiplier = 1.0
elif atv_m >= SLIPPAGE_TWAP_THRESHOLD:
multiplier = 2.0
else:
multiplier = 4.0
return round(base_slip * multiplier, 1)
def _exec_mode(atv_m: float) -> str:
if atv_m >= SLIPPAGE_MARKET_THRESHOLD:
return "MARKET"
if atv_m >= SLIPPAGE_TWAP_THRESHOLD:
return "TWAP"
return "LIMIT"
def _liquidity_state(atv_m: float) -> tuple[str, str, float]:
if atv_m >= 200_000:
return "DEEP", "MARKET_OK", 100.0
if atv_m >= 50_000:
return "NORMAL", "LIMIT_NEAR_BID", 70.0
if atv_m > 5_000:
return "THIN", "TWAP_SPLIT", 40.0
return "FROZEN", "HOLD", 0.0
def _flow_direction(flow_1d: float | None, flow_3d: float | None, flow_5d: float | None, flow_20d: float | None) -> str:
"""수급 방향성 판정."""
values = [v for v in [flow_1d, flow_3d, flow_5d, flow_20d] if v is not None]
if not values:
return "UNKNOWN"
positive = sum(1 for v in values if v > 0)
negative = sum(1 for v in values if v < 0)
if negative > positive:
return "OUTFLOW"
if positive > negative:
return "INFLOW"
return "NEUTRAL"
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
payload = _load(json_path)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
df_list = _rows(data.get("data_feed"))
liq_signal = _load(DEFAULT_LIQUIDITY_SIGNAL)
liq_rows = liq_signal.get("rows") if isinstance(liq_signal.get("rows"), list) else []
liq_by_ticker: dict[str, dict] = {str(r.get("ticker") or ""): r for r in liq_rows if isinstance(r, dict)}
# 기존 V2 Temp JSON 참조 (smart_money_score 소스)
v2_temp = _load(ROOT / "Temp" / "smart_money_liquidity_composite_v2.json")
v2_raw = v2_temp.get("rows") or _rows(h.get("smart_money_liquidity_gate_v1_json") or h.get("smart_money_liquidity_composite_v2"))
if not isinstance(v2_raw, list):
v2_raw = []
v2_by_ticker: dict[str, dict] = {str(r.get("ticker") or ""): r for r in v2_raw}
rows_out: list[dict[str, Any]] = []
smart_money_weak_buy_count = 0
for row in df_list:
ticker = str(row.get("Ticker") or row.get("ticker") or "")
if not ticker:
continue
close = _f(row.get("Close") or row.get("close"))
atv_m = _f(row.get("AvgTradeValue_20D_M"))
# 슬리피지 계산
slip_bps = _slippage_bps(close, atv_m)
exec_mode = _exec_mode(atv_m)
# 수급 방향성 (실데이터 없으면 harness 데이터 참조)
v2 = v2_by_ticker.get(ticker, {})
sm_score = _f(row.get("SmartMoney_Score") or v2.get("smart_money_score"))
liq_row = liq_by_ticker.get(ticker, {})
liq_label = str(liq_row.get("liquidity_label") or "")
liq_exec_mode = str(liq_row.get("execution_mode") or "")
if liq_label:
exec_mode = liq_exec_mode or exec_mode
_, _, liq_score = _liquidity_state(atv_m)
liquidity_state = liq_label
else:
liquidity_state, liq_exec_mode, liq_score = _liquidity_state(atv_m)
if liq_exec_mode:
exec_mode = liq_exec_mode
foreign_div = bool(v2.get("foreign_institution_divergence_flag"))
# 수급 방향성 멀티-윈도우 (gap 기반 근사)
flow_1d = _f(row.get("Flow_1D")) if row.get("Flow_1D") is not None else None
flow_3d = _f(row.get("Flow_3D")) if row.get("Flow_3D") is not None else None
flow_5d = _f(row.get("Flow_5D")) if row.get("Flow_5D") is not None else None
flow_20d = _f(row.get("Flow_20D")) if row.get("Flow_20D") is not None else None
flow_direction = _flow_direction(flow_1d, flow_3d, flow_5d, flow_20d)
# 수급 약세 판정
smart_money_weak = sm_score < 45.0 or (foreign_div and flow_direction == "OUTFLOW")
# cash_raise 점수 상향: 수급 약세 + 유동성 양호 = 매도하기 쉬운 종목
cash_raise_priority_bonus = 1 if (smart_money_weak and liq_score >= 60.0) else 0
if smart_money_weak:
smart_money_weak_buy_count += 1
rows_out.append({
"ticker": ticker,
"name": str(row.get("Name") or ""),
"smart_money_score": round(sm_score, 2),
"liquidity_score": round(liq_score, 2),
"liquidity_state": liquidity_state,
"slippage_bps": slip_bps,
"exec_mode": exec_mode,
"avg_trade_value_20d_m": atv_m,
"flow_direction": flow_direction,
"flow_1d": flow_1d,
"flow_3d": flow_3d,
"flow_5d": flow_5d,
"flow_20d": flow_20d,
"foreign_institution_divergence_flag": foreign_div,
"smart_money_weak": smart_money_weak,
"smart_money_weak_buy_blocked": smart_money_weak,
"cash_raise_priority_bonus": cash_raise_priority_bonus,
"source_path": "Temp/smart_money_liquidity_composite_v3.json",
"formula_id": "SMART_MONEY_LIQUIDITY_COMPOSITE_V3",
})
# 수용 검증
slippage_missing_count = sum(1 for r in rows_out if r.get("slippage_bps") is None)
divergence_missing_count = sum(1 for r in rows_out if r.get("foreign_institution_divergence_flag") is None)
gate = "PASS" if (slippage_missing_count == 0 and smart_money_weak_buy_count == 0) else (
"WATCH" if smart_money_weak_buy_count > 0 else "CAUTION"
)
# SMART_MONEY_LIQUIDITY_OUTCOME_LINK_V1: liquidity_label별 T+5 결과 조인
proposal_hist = {}
hist_path = ROOT / "Temp" / "proposal_evaluation_history.json"
if hist_path.exists():
try:
proposal_hist = json.loads(hist_path.read_text(encoding="utf-8"))
except Exception:
pass
hist_rows = proposal_hist.get("history") or proposal_hist.get("records") or []
if not isinstance(hist_rows, list):
hist_rows = []
liq_buckets: dict = {}
for h in hist_rows:
if not isinstance(h, dict):
continue
lbl = h.get("liquidity_label") or h.get("liquidity_state") or h.get("smart_money_liquidity_label") or "UNKNOWN"
t5 = h.get("realized_return_pct_t5")
if t5 is None:
t5 = h.get("t5_return_pct")
slip = h.get("slippage_pct")
if slip is None:
slip = h.get("slippage_bps")
if lbl not in liq_buckets:
liq_buckets[lbl] = {"returns": [], "slippages": []}
if t5 is not None:
liq_buckets[lbl]["returns"].append(float(t5))
if slip is not None:
liq_buckets[lbl]["slippages"].append(float(slip))
outcome_link_table = []
for lbl, data in liq_buckets.items():
n = len(data["returns"])
avg_ret = sum(data["returns"]) / n if n > 0 else None
win_rate = sum(1 for r in data["returns"] if r > 0) / n if n > 0 else None
n_slip = len(data["slippages"])
avg_slip = sum(data["slippages"]) / n_slip if n_slip > 0 else None
outcome_link_table.append({
"liquidity_label": lbl,
"sample_count": n,
"t5_avg_return_pct": round(avg_ret, 2) if avg_ret is not None else None,
"t5_win_rate": round(win_rate, 3) if win_rate is not None else None,
"avg_slippage_pct": round(avg_slip, 3) if avg_slip is not None else None,
"label": f"[UNVALIDATED: n={n} < 30]" if n < 30 else "VALIDATED",
})
outcome_link_result = {
"formula_id": "SMART_MONEY_LIQUIDITY_OUTCOME_LINK_V1",
"generated_at": datetime.now(timezone.utc).isoformat(),
"table": outcome_link_table,
"total_linked_samples": sum(r["sample_count"] for r in outcome_link_table),
"gate": "VALIDATED" if any(r["sample_count"] >= 30 for r in outcome_link_table) else "UNVALIDATED",
}
link_path = ROOT / "Temp" / "smart_money_liquidity_outcome_link_v1.json"
link_path.write_text(json.dumps(outcome_link_result, ensure_ascii=False, indent=2), encoding="utf-8")
result = {
"formula_id": "SMART_MONEY_LIQUIDITY_COMPOSITE_V3",
"gate": gate,
"slippage_bps_missing_count": slippage_missing_count,
"foreign_institution_divergence_flag_missing_count": divergence_missing_count,
"smart_money_weak_buy_blocked_count": smart_money_weak_buy_count,
"sell_priority_liquidity_adjusted": True,
"ticker_count": len(rows_out),
"rows": rows_out,
"generated_at": datetime.now(timezone.utc).isoformat(),
"source_path": "Temp/smart_money_liquidity_composite_v3.json",
"outcome_link_ref": "Temp/smart_money_liquidity_outcome_link_v1.json",
}
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
summary = {k: v for k, v in result.items() if k != "rows"}
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())