ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
250 lines
8.2 KiB
Python
250 lines
8.2 KiB
Python
"""SMART_MONEY_LIQUIDITY_GATE_V1
|
|
스마트머니·유동성 차단 게이트 — SM001 / SM002 / SM003 결정론 구현.
|
|
|
|
SM001: inst_5d < 0 AND frg_5d < 0 → BLOCK_BUY
|
|
SM002: avg_trade_value_5d_m < 5000 (50억 = 5,000M KRW) → LIMIT_QUANTITY
|
|
SM003: rsi14 > 70 AND flow_credit < 0.3 → BLOCK_BUY
|
|
|
|
gate_status 우선순위: BLOCK_BUY > LIMIT_QUANTITY > PASS
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
def _ensure_utf8_stdio() -> None:
|
|
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
|
|
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
|
|
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
|
|
sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1)
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_SM_SIGNAL = ROOT / "Temp" / "smart_money_flow_signal_v2.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "smart_money_liquidity_gate_v1.json"
|
|
|
|
# SM002 임계값: 50억 = 5,000,000,000 KRW = 5,000 M KRW
|
|
AVG_TRADE_VALUE_5D_M_THRESHOLD = 5_000.0
|
|
# SM003 임계값
|
|
RSI14_THRESHOLD = 70.0
|
|
FLOW_CREDIT_THRESHOLD = 0.3
|
|
|
|
|
|
def _load_json(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
obj = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return obj if isinstance(obj, dict) else {}
|
|
|
|
|
|
def _as_float(v: Any, default: float | None = None) -> float | None:
|
|
"""None-safe float 변환. 변환 불가 시 default 반환."""
|
|
if v is None:
|
|
return default
|
|
try:
|
|
return float(v)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _extract_data_feed(payload: dict[str, Any]) -> list[dict[str, Any]]:
|
|
data_node = payload.get("data") or {}
|
|
if isinstance(data_node, dict):
|
|
feed = data_node.get("data_feed")
|
|
if isinstance(feed, list):
|
|
return feed
|
|
return []
|
|
|
|
|
|
def _check_sm001(row: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""SM001: 외국인 5D + 기관 5D 동시 순매도 → BLOCK_BUY."""
|
|
frg = _as_float(row.get("Frg_5D"))
|
|
inst = _as_float(row.get("Inst_5D"))
|
|
if frg is None or inst is None:
|
|
return None # 데이터 부재 — silent PASS 금지, 호출자에서 missing 처리
|
|
if frg < 0 and inst < 0:
|
|
return {
|
|
"rule": "SM001",
|
|
"condition": "inst_5d<0 AND frg_5d<0",
|
|
"values": {"inst_5d": inst, "frg_5d": frg},
|
|
"result": "BLOCK_BUY",
|
|
}
|
|
return None
|
|
|
|
|
|
def _check_sm002(row: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""SM002: 5일 평균 거래대금 < 50억(5,000M KRW) → LIMIT_QUANTITY."""
|
|
avg_tv = _as_float(row.get("AvgTradeValue_5D_M"))
|
|
if avg_tv is None:
|
|
return None
|
|
if avg_tv < AVG_TRADE_VALUE_5D_M_THRESHOLD:
|
|
return {
|
|
"rule": "SM002",
|
|
"condition": f"avg_trade_value_5d_m<{AVG_TRADE_VALUE_5D_M_THRESHOLD}",
|
|
"values": {"avg_trade_value_5d_m": avg_tv},
|
|
"result": "LIMIT_QUANTITY",
|
|
}
|
|
return None
|
|
|
|
|
|
def _check_sm003(row: dict[str, Any]) -> dict[str, Any] | None:
|
|
"""SM003: RSI14 > 70 AND flow_credit < 0.3 → BLOCK_BUY (과매수+자금 이탈)."""
|
|
rsi = _as_float(row.get("RSI14"))
|
|
fc = _as_float(row.get("Flow_Credit"))
|
|
if rsi is None or fc is None:
|
|
return None
|
|
if rsi > RSI14_THRESHOLD and fc < FLOW_CREDIT_THRESHOLD:
|
|
return {
|
|
"rule": "SM003",
|
|
"condition": f"rsi14>{RSI14_THRESHOLD} AND flow_credit<{FLOW_CREDIT_THRESHOLD}",
|
|
"values": {"rsi14": rsi, "flow_credit": fc},
|
|
"result": "BLOCK_BUY",
|
|
}
|
|
return None
|
|
|
|
|
|
def _evaluate_ticker(row: dict[str, Any]) -> dict[str, Any]:
|
|
ticker = str(row.get("Ticker") or "UNKNOWN")
|
|
name = str(row.get("Name") or "")
|
|
|
|
rules_fired: list[dict[str, Any]] = []
|
|
missing_fields: list[str] = []
|
|
|
|
sm1 = _check_sm001(row)
|
|
if sm1 is not None:
|
|
rules_fired.append(sm1)
|
|
else:
|
|
# 명시적 데이터 부재 추적
|
|
frg = row.get("Frg_5D")
|
|
inst = row.get("Inst_5D")
|
|
if frg is None:
|
|
missing_fields.append("Frg_5D")
|
|
if inst is None:
|
|
missing_fields.append("Inst_5D")
|
|
|
|
sm2 = _check_sm002(row)
|
|
if sm2 is not None:
|
|
rules_fired.append(sm2)
|
|
else:
|
|
if row.get("AvgTradeValue_5D_M") is None:
|
|
missing_fields.append("AvgTradeValue_5D_M")
|
|
|
|
sm3 = _check_sm003(row)
|
|
if sm3 is not None:
|
|
rules_fired.append(sm3)
|
|
else:
|
|
if row.get("RSI14") is None:
|
|
missing_fields.append("RSI14")
|
|
if row.get("Flow_Credit") is None:
|
|
missing_fields.append("Flow_Credit")
|
|
|
|
# gate_status 결정: BLOCK_BUY > LIMIT_QUANTITY > PASS
|
|
results_set = {r["result"] for r in rules_fired}
|
|
if "BLOCK_BUY" in results_set:
|
|
gate_status = "BLOCK_BUY"
|
|
elif "LIMIT_QUANTITY" in results_set:
|
|
gate_status = "LIMIT_QUANTITY"
|
|
elif missing_fields:
|
|
gate_status = "DATA_MISSING" # 판단 불가 — PASS로 처리하지 않음
|
|
else:
|
|
gate_status = "PASS"
|
|
|
|
return {
|
|
"ticker": ticker,
|
|
"name": name,
|
|
"gate_status": gate_status,
|
|
"rules_fired": rules_fired,
|
|
"missing_fields": missing_fields,
|
|
"formula_id": "SMART_MONEY_LIQUIDITY_GATE_V1",
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
_ensure_utf8_stdio()
|
|
ap = argparse.ArgumentParser(description="스마트머니·유동성 차단 게이트 V1 (SM001~SM003)")
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--sm-signal", default=str(DEFAULT_SM_SIGNAL))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
json_path = Path(args.json)
|
|
if not json_path.is_absolute():
|
|
json_path = ROOT / json_path
|
|
out_path = Path(args.out)
|
|
if not out_path.is_absolute():
|
|
out_path = ROOT / out_path
|
|
|
|
data = _load_json(json_path)
|
|
feed = _extract_data_feed(data)
|
|
|
|
if not feed:
|
|
print("ERROR: data_feed 비어 있음 — GatherTradingData.json 경로 확인", file=sys.stderr)
|
|
return 1
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
block_count = 0
|
|
limit_count = 0
|
|
pass_count = 0
|
|
missing_count = 0
|
|
|
|
for row in feed:
|
|
result = _evaluate_ticker(row)
|
|
rows.append(result)
|
|
gs = result["gate_status"]
|
|
if gs == "BLOCK_BUY":
|
|
block_count += 1
|
|
elif gs == "LIMIT_QUANTITY":
|
|
limit_count += 1
|
|
elif gs == "DATA_MISSING":
|
|
missing_count += 1
|
|
else:
|
|
pass_count += 1
|
|
|
|
# SM 규칙별 발화 집계
|
|
sm001_fired = sum(1 for r in rows if any(f["rule"] == "SM001" for f in r["rules_fired"]))
|
|
sm002_fired = sum(1 for r in rows if any(f["rule"] == "SM002" for f in r["rules_fired"]))
|
|
sm003_fired = sum(1 for r in rows if any(f["rule"] == "SM003" for f in r["rules_fired"]))
|
|
|
|
coverage_pct = round(100.0 * len(rows) / max(1, len(feed)), 2)
|
|
|
|
out = {
|
|
"formula_id": "SMART_MONEY_LIQUIDITY_GATE_V1",
|
|
"gate": "OK",
|
|
"coverage_pct": coverage_pct,
|
|
"ticker_count": len(rows),
|
|
"summary": {
|
|
"block_buy_count": block_count,
|
|
"limit_quantity_count": limit_count,
|
|
"pass_count": pass_count,
|
|
"data_missing_count": missing_count,
|
|
"sm001_fired": sm001_fired,
|
|
"sm002_fired": sm002_fired,
|
|
"sm003_fired": sm003_fired,
|
|
},
|
|
"rows": rows,
|
|
}
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
print(f"SMART_MONEY_LIQUIDITY_GATE_V1")
|
|
print(f" tickers: {len(rows)}/{len(feed)} (coverage={coverage_pct}%)")
|
|
print(f" BLOCK_BUY: {block_count} LIMIT_QUANTITY: {limit_count} PASS: {pass_count} DATA_MISSING: {missing_count}")
|
|
print(f" SM001(동시순매도): {sm001_fired}건 SM002(저유동성): {sm002_fired}건 SM003(과매수+이탈): {sm003_fired}건")
|
|
for r in rows:
|
|
fired_str = ", ".join(f['rule'] for f in r['rules_fired']) or "NONE"
|
|
print(f" [{r['ticker']}] {r['gate_status']:15s} rules={fired_str}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|