Files
QuantEngineByItz/tools/build_smart_money_liquidity_gate_v1.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

250 lines
8.2 KiB
Python

"""SMART_MONEY_LIQUIDITY_GATE_V1
스마트머니·유동성 차단 게이트 — SM001 / SM002 / SM003 결정론 구현.
SM001: inst_5d < 0 AND frg_5d < 0 → BLOCK_BUY
SM002: avg_trade_value_5d_m < 5000 (50억 = 5,000M KRW) → LIMIT_QUANTITY
SM003: rsi14 > 70 AND flow_credit < 0.3 → BLOCK_BUY
gate_status 우선순위: BLOCK_BUY > LIMIT_QUANTITY > PASS
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
def _ensure_utf8_stdio() -> None:
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1)
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_SM_SIGNAL = ROOT / "Temp" / "smart_money_flow_signal_v2.json"
DEFAULT_OUT = ROOT / "Temp" / "smart_money_liquidity_gate_v1.json"
# SM002 임계값: 50억 = 5,000,000,000 KRW = 5,000 M KRW
AVG_TRADE_VALUE_5D_M_THRESHOLD = 5_000.0
# SM003 임계값
RSI14_THRESHOLD = 70.0
FLOW_CREDIT_THRESHOLD = 0.3
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _as_float(v: Any, default: float | None = None) -> float | None:
"""None-safe float 변환. 변환 불가 시 default 반환."""
if v is None:
return default
try:
return float(v)
except Exception:
return default
def _extract_data_feed(payload: dict[str, Any]) -> list[dict[str, Any]]:
data_node = payload.get("data") or {}
if isinstance(data_node, dict):
feed = data_node.get("data_feed")
if isinstance(feed, list):
return feed
return []
def _check_sm001(row: dict[str, Any]) -> dict[str, Any] | None:
"""SM001: 외국인 5D + 기관 5D 동시 순매도 → BLOCK_BUY."""
frg = _as_float(row.get("Frg_5D"))
inst = _as_float(row.get("Inst_5D"))
if frg is None or inst is None:
return None # 데이터 부재 — silent PASS 금지, 호출자에서 missing 처리
if frg < 0 and inst < 0:
return {
"rule": "SM001",
"condition": "inst_5d<0 AND frg_5d<0",
"values": {"inst_5d": inst, "frg_5d": frg},
"result": "BLOCK_BUY",
}
return None
def _check_sm002(row: dict[str, Any]) -> dict[str, Any] | None:
"""SM002: 5일 평균 거래대금 < 50억(5,000M KRW) → LIMIT_QUANTITY."""
avg_tv = _as_float(row.get("AvgTradeValue_5D_M"))
if avg_tv is None:
return None
if avg_tv < AVG_TRADE_VALUE_5D_M_THRESHOLD:
return {
"rule": "SM002",
"condition": f"avg_trade_value_5d_m<{AVG_TRADE_VALUE_5D_M_THRESHOLD}",
"values": {"avg_trade_value_5d_m": avg_tv},
"result": "LIMIT_QUANTITY",
}
return None
def _check_sm003(row: dict[str, Any]) -> dict[str, Any] | None:
"""SM003: RSI14 > 70 AND flow_credit < 0.3 → BLOCK_BUY (과매수+자금 이탈)."""
rsi = _as_float(row.get("RSI14"))
fc = _as_float(row.get("Flow_Credit"))
if rsi is None or fc is None:
return None
if rsi > RSI14_THRESHOLD and fc < FLOW_CREDIT_THRESHOLD:
return {
"rule": "SM003",
"condition": f"rsi14>{RSI14_THRESHOLD} AND flow_credit<{FLOW_CREDIT_THRESHOLD}",
"values": {"rsi14": rsi, "flow_credit": fc},
"result": "BLOCK_BUY",
}
return None
def _evaluate_ticker(row: dict[str, Any]) -> dict[str, Any]:
ticker = str(row.get("Ticker") or "UNKNOWN")
name = str(row.get("Name") or "")
rules_fired: list[dict[str, Any]] = []
missing_fields: list[str] = []
sm1 = _check_sm001(row)
if sm1 is not None:
rules_fired.append(sm1)
else:
# 명시적 데이터 부재 추적
frg = row.get("Frg_5D")
inst = row.get("Inst_5D")
if frg is None:
missing_fields.append("Frg_5D")
if inst is None:
missing_fields.append("Inst_5D")
sm2 = _check_sm002(row)
if sm2 is not None:
rules_fired.append(sm2)
else:
if row.get("AvgTradeValue_5D_M") is None:
missing_fields.append("AvgTradeValue_5D_M")
sm3 = _check_sm003(row)
if sm3 is not None:
rules_fired.append(sm3)
else:
if row.get("RSI14") is None:
missing_fields.append("RSI14")
if row.get("Flow_Credit") is None:
missing_fields.append("Flow_Credit")
# gate_status 결정: BLOCK_BUY > LIMIT_QUANTITY > PASS
results_set = {r["result"] for r in rules_fired}
if "BLOCK_BUY" in results_set:
gate_status = "BLOCK_BUY"
elif "LIMIT_QUANTITY" in results_set:
gate_status = "LIMIT_QUANTITY"
elif missing_fields:
gate_status = "DATA_MISSING" # 판단 불가 — PASS로 처리하지 않음
else:
gate_status = "PASS"
return {
"ticker": ticker,
"name": name,
"gate_status": gate_status,
"rules_fired": rules_fired,
"missing_fields": missing_fields,
"formula_id": "SMART_MONEY_LIQUIDITY_GATE_V1",
}
def main() -> int:
_ensure_utf8_stdio()
ap = argparse.ArgumentParser(description="스마트머니·유동성 차단 게이트 V1 (SM001~SM003)")
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--sm-signal", default=str(DEFAULT_SM_SIGNAL))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
if not json_path.is_absolute():
json_path = ROOT / json_path
out_path = Path(args.out)
if not out_path.is_absolute():
out_path = ROOT / out_path
data = _load_json(json_path)
feed = _extract_data_feed(data)
if not feed:
print("ERROR: data_feed 비어 있음 — GatherTradingData.json 경로 확인", file=sys.stderr)
return 1
rows: list[dict[str, Any]] = []
block_count = 0
limit_count = 0
pass_count = 0
missing_count = 0
for row in feed:
result = _evaluate_ticker(row)
rows.append(result)
gs = result["gate_status"]
if gs == "BLOCK_BUY":
block_count += 1
elif gs == "LIMIT_QUANTITY":
limit_count += 1
elif gs == "DATA_MISSING":
missing_count += 1
else:
pass_count += 1
# SM 규칙별 발화 집계
sm001_fired = sum(1 for r in rows if any(f["rule"] == "SM001" for f in r["rules_fired"]))
sm002_fired = sum(1 for r in rows if any(f["rule"] == "SM002" for f in r["rules_fired"]))
sm003_fired = sum(1 for r in rows if any(f["rule"] == "SM003" for f in r["rules_fired"]))
coverage_pct = round(100.0 * len(rows) / max(1, len(feed)), 2)
out = {
"formula_id": "SMART_MONEY_LIQUIDITY_GATE_V1",
"gate": "OK",
"coverage_pct": coverage_pct,
"ticker_count": len(rows),
"summary": {
"block_buy_count": block_count,
"limit_quantity_count": limit_count,
"pass_count": pass_count,
"data_missing_count": missing_count,
"sm001_fired": sm001_fired,
"sm002_fired": sm002_fired,
"sm003_fired": sm003_fired,
},
"rows": rows,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"SMART_MONEY_LIQUIDITY_GATE_V1")
print(f" tickers: {len(rows)}/{len(feed)} (coverage={coverage_pct}%)")
print(f" BLOCK_BUY: {block_count} LIMIT_QUANTITY: {limit_count} PASS: {pass_count} DATA_MISSING: {missing_count}")
print(f" SM001(동시순매도): {sm001_fired}건 SM002(저유동성): {sm002_fired}건 SM003(과매수+이탈): {sm003_fired}건")
for r in rows:
fired_str = ", ".join(f['rule'] for f in r['rules_fired']) or "NONE"
print(f" [{r['ticker']}] {r['gate_status']:15s} rules={fired_str}")
return 0
if __name__ == "__main__":
raise SystemExit(main())