ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
299 lines
12 KiB
Python
299 lines
12 KiB
Python
"""build_smart_money_liquidity_composite_v3.py — SMART_MONEY_LIQUIDITY_COMPOSITE_V3
|
|
|
|
P0-013: 수급과 유동성의 실전 합성 V3.
|
|
모든 보유 종목에 slippage_bps를 채우고, 수급 방향성을 1D/3D/5D/20D로 분리 저장하며,
|
|
수급 약세+유동성 양호 종목의 cash_raise 후보 점수를 상향한다.
|
|
수급 약세 종목 신규 BUY를 차단한다.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "smart_money_liquidity_composite_v3.json"
|
|
DEFAULT_LIQUIDITY_SIGNAL = ROOT / "Temp" / "liquidity_flow_signal_v1.json"
|
|
|
|
# 슬리피지 임계값 (억 단위 기준)
|
|
SLIPPAGE_MARKET_THRESHOLD = 50_000 # 500억 이상: MARKET
|
|
SLIPPAGE_TWAP_THRESHOLD = 5_000 # 50억 이상: TWAP
|
|
# < 50억: LIMIT
|
|
|
|
|
|
def _load(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
obj = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return obj if isinstance(obj, dict) else {}
|
|
|
|
|
|
def _rows(v: Any) -> list[dict[str, Any]]:
|
|
if isinstance(v, list):
|
|
return [x for x in v if isinstance(x, dict)]
|
|
if isinstance(v, str):
|
|
try:
|
|
return _rows(json.loads(v))
|
|
except Exception:
|
|
return []
|
|
return []
|
|
|
|
|
|
def _f(v: Any, default: float = 0.0) -> float:
|
|
try:
|
|
return float(v)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _tick_size(price: float) -> float:
|
|
if price < 1_000:
|
|
return 1.0
|
|
if price < 5_000:
|
|
return 5.0
|
|
if price < 10_000:
|
|
return 10.0
|
|
if price < 50_000:
|
|
return 50.0
|
|
if price < 100_000:
|
|
return 100.0
|
|
if price < 500_000:
|
|
return 500.0
|
|
return 1_000.0
|
|
|
|
|
|
def _slippage_bps(price: float, atv_m: float) -> float:
|
|
"""예상 슬리피지 (basis points). 유동성 기반 조정."""
|
|
if price <= 0:
|
|
return 0.0
|
|
tick = _tick_size(price)
|
|
base_slip = (tick / price) * 10_000 # 1tick 슬리피지
|
|
# 유동성 낮을수록 슬리피지 증가 (ATVm = 백만원 단위)
|
|
if atv_m >= SLIPPAGE_MARKET_THRESHOLD:
|
|
multiplier = 1.0
|
|
elif atv_m >= SLIPPAGE_TWAP_THRESHOLD:
|
|
multiplier = 2.0
|
|
else:
|
|
multiplier = 4.0
|
|
return round(base_slip * multiplier, 1)
|
|
|
|
|
|
def _exec_mode(atv_m: float) -> str:
|
|
if atv_m >= SLIPPAGE_MARKET_THRESHOLD:
|
|
return "MARKET"
|
|
if atv_m >= SLIPPAGE_TWAP_THRESHOLD:
|
|
return "TWAP"
|
|
return "LIMIT"
|
|
|
|
|
|
def _liquidity_state(atv_m: float) -> tuple[str, str, float]:
|
|
if atv_m >= 200_000:
|
|
return "DEEP", "MARKET_OK", 100.0
|
|
if atv_m >= 50_000:
|
|
return "NORMAL", "LIMIT_NEAR_BID", 70.0
|
|
if atv_m > 5_000:
|
|
return "THIN", "TWAP_SPLIT", 40.0
|
|
return "FROZEN", "HOLD", 0.0
|
|
|
|
|
|
def _flow_direction(flow_1d: float | None, flow_3d: float | None, flow_5d: float | None, flow_20d: float | None) -> str:
|
|
"""수급 방향성 판정."""
|
|
values = [v for v in [flow_1d, flow_3d, flow_5d, flow_20d] if v is not None]
|
|
if not values:
|
|
return "UNKNOWN"
|
|
positive = sum(1 for v in values if v > 0)
|
|
negative = sum(1 for v in values if v < 0)
|
|
if negative > positive:
|
|
return "OUTFLOW"
|
|
if positive > negative:
|
|
return "INFLOW"
|
|
return "NEUTRAL"
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
json_path = Path(args.json) if Path(args.json).is_absolute() else ROOT / args.json
|
|
payload = _load(json_path)
|
|
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
|
|
h = data.get("_harness_context") if isinstance(data.get("_harness_context"), dict) else {}
|
|
df_list = _rows(data.get("data_feed"))
|
|
liq_signal = _load(DEFAULT_LIQUIDITY_SIGNAL)
|
|
liq_rows = liq_signal.get("rows") if isinstance(liq_signal.get("rows"), list) else []
|
|
liq_by_ticker: dict[str, dict] = {str(r.get("ticker") or ""): r for r in liq_rows if isinstance(r, dict)}
|
|
|
|
# 기존 V2 Temp JSON 참조 (smart_money_score 소스)
|
|
v2_temp = _load(ROOT / "Temp" / "smart_money_liquidity_composite_v2.json")
|
|
v2_raw = v2_temp.get("rows") or _rows(h.get("smart_money_liquidity_gate_v1_json") or h.get("smart_money_liquidity_composite_v2"))
|
|
if not isinstance(v2_raw, list):
|
|
v2_raw = []
|
|
v2_by_ticker: dict[str, dict] = {str(r.get("ticker") or ""): r for r in v2_raw}
|
|
|
|
rows_out: list[dict[str, Any]] = []
|
|
smart_money_weak_buy_count = 0
|
|
|
|
for row in df_list:
|
|
ticker = str(row.get("Ticker") or row.get("ticker") or "")
|
|
if not ticker:
|
|
continue
|
|
|
|
close = _f(row.get("Close") or row.get("close"))
|
|
atv_m = _f(row.get("AvgTradeValue_20D_M"))
|
|
|
|
# 슬리피지 계산
|
|
slip_bps = _slippage_bps(close, atv_m)
|
|
exec_mode = _exec_mode(atv_m)
|
|
|
|
# 수급 방향성 (실데이터 없으면 harness 데이터 참조)
|
|
v2 = v2_by_ticker.get(ticker, {})
|
|
sm_score = _f(row.get("SmartMoney_Score") or v2.get("smart_money_score"))
|
|
liq_row = liq_by_ticker.get(ticker, {})
|
|
liq_label = str(liq_row.get("liquidity_label") or "")
|
|
liq_exec_mode = str(liq_row.get("execution_mode") or "")
|
|
if liq_label:
|
|
exec_mode = liq_exec_mode or exec_mode
|
|
_, _, liq_score = _liquidity_state(atv_m)
|
|
liquidity_state = liq_label
|
|
else:
|
|
liquidity_state, liq_exec_mode, liq_score = _liquidity_state(atv_m)
|
|
if liq_exec_mode:
|
|
exec_mode = liq_exec_mode
|
|
foreign_div = bool(v2.get("foreign_institution_divergence_flag"))
|
|
|
|
# 수급 방향성 멀티-윈도우 (gap 기반 근사)
|
|
flow_1d = _f(row.get("Flow_1D")) if row.get("Flow_1D") is not None else None
|
|
flow_3d = _f(row.get("Flow_3D")) if row.get("Flow_3D") is not None else None
|
|
flow_5d = _f(row.get("Flow_5D")) if row.get("Flow_5D") is not None else None
|
|
flow_20d = _f(row.get("Flow_20D")) if row.get("Flow_20D") is not None else None
|
|
|
|
flow_direction = _flow_direction(flow_1d, flow_3d, flow_5d, flow_20d)
|
|
|
|
# 수급 약세 판정
|
|
smart_money_weak = sm_score < 45.0 or (foreign_div and flow_direction == "OUTFLOW")
|
|
|
|
# cash_raise 점수 상향: 수급 약세 + 유동성 양호 = 매도하기 쉬운 종목
|
|
cash_raise_priority_bonus = 1 if (smart_money_weak and liq_score >= 60.0) else 0
|
|
|
|
if smart_money_weak:
|
|
smart_money_weak_buy_count += 1
|
|
|
|
rows_out.append({
|
|
"ticker": ticker,
|
|
"name": str(row.get("Name") or ""),
|
|
"smart_money_score": round(sm_score, 2),
|
|
"liquidity_score": round(liq_score, 2),
|
|
"liquidity_state": liquidity_state,
|
|
"slippage_bps": slip_bps,
|
|
"exec_mode": exec_mode,
|
|
"avg_trade_value_20d_m": atv_m,
|
|
"flow_direction": flow_direction,
|
|
"flow_1d": flow_1d,
|
|
"flow_3d": flow_3d,
|
|
"flow_5d": flow_5d,
|
|
"flow_20d": flow_20d,
|
|
"foreign_institution_divergence_flag": foreign_div,
|
|
"smart_money_weak": smart_money_weak,
|
|
"smart_money_weak_buy_blocked": smart_money_weak,
|
|
"cash_raise_priority_bonus": cash_raise_priority_bonus,
|
|
"source_path": "Temp/smart_money_liquidity_composite_v3.json",
|
|
"formula_id": "SMART_MONEY_LIQUIDITY_COMPOSITE_V3",
|
|
})
|
|
|
|
# 수용 검증
|
|
slippage_missing_count = sum(1 for r in rows_out if r.get("slippage_bps") is None)
|
|
divergence_missing_count = sum(1 for r in rows_out if r.get("foreign_institution_divergence_flag") is None)
|
|
gate = "PASS" if (slippage_missing_count == 0 and smart_money_weak_buy_count == 0) else (
|
|
"WATCH" if smart_money_weak_buy_count > 0 else "CAUTION"
|
|
)
|
|
|
|
# SMART_MONEY_LIQUIDITY_OUTCOME_LINK_V1: liquidity_label별 T+5 결과 조인
|
|
proposal_hist = {}
|
|
hist_path = ROOT / "Temp" / "proposal_evaluation_history.json"
|
|
if hist_path.exists():
|
|
try:
|
|
proposal_hist = json.loads(hist_path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
pass
|
|
hist_rows = proposal_hist.get("history") or proposal_hist.get("records") or []
|
|
if not isinstance(hist_rows, list):
|
|
hist_rows = []
|
|
|
|
liq_buckets: dict = {}
|
|
for h in hist_rows:
|
|
if not isinstance(h, dict):
|
|
continue
|
|
lbl = h.get("liquidity_label") or h.get("liquidity_state") or h.get("smart_money_liquidity_label") or "UNKNOWN"
|
|
t5 = h.get("realized_return_pct_t5")
|
|
if t5 is None:
|
|
t5 = h.get("t5_return_pct")
|
|
slip = h.get("slippage_pct")
|
|
if slip is None:
|
|
slip = h.get("slippage_bps")
|
|
if lbl not in liq_buckets:
|
|
liq_buckets[lbl] = {"returns": [], "slippages": []}
|
|
if t5 is not None:
|
|
liq_buckets[lbl]["returns"].append(float(t5))
|
|
if slip is not None:
|
|
liq_buckets[lbl]["slippages"].append(float(slip))
|
|
|
|
outcome_link_table = []
|
|
for lbl, data in liq_buckets.items():
|
|
n = len(data["returns"])
|
|
avg_ret = sum(data["returns"]) / n if n > 0 else None
|
|
win_rate = sum(1 for r in data["returns"] if r > 0) / n if n > 0 else None
|
|
n_slip = len(data["slippages"])
|
|
avg_slip = sum(data["slippages"]) / n_slip if n_slip > 0 else None
|
|
outcome_link_table.append({
|
|
"liquidity_label": lbl,
|
|
"sample_count": n,
|
|
"t5_avg_return_pct": round(avg_ret, 2) if avg_ret is not None else None,
|
|
"t5_win_rate": round(win_rate, 3) if win_rate is not None else None,
|
|
"avg_slippage_pct": round(avg_slip, 3) if avg_slip is not None else None,
|
|
"label": f"[UNVALIDATED: n={n} < 30]" if n < 30 else "VALIDATED",
|
|
})
|
|
|
|
outcome_link_result = {
|
|
"formula_id": "SMART_MONEY_LIQUIDITY_OUTCOME_LINK_V1",
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"table": outcome_link_table,
|
|
"total_linked_samples": sum(r["sample_count"] for r in outcome_link_table),
|
|
"gate": "VALIDATED" if any(r["sample_count"] >= 30 for r in outcome_link_table) else "UNVALIDATED",
|
|
}
|
|
link_path = ROOT / "Temp" / "smart_money_liquidity_outcome_link_v1.json"
|
|
link_path.write_text(json.dumps(outcome_link_result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
result = {
|
|
"formula_id": "SMART_MONEY_LIQUIDITY_COMPOSITE_V3",
|
|
"gate": gate,
|
|
"slippage_bps_missing_count": slippage_missing_count,
|
|
"foreign_institution_divergence_flag_missing_count": divergence_missing_count,
|
|
"smart_money_weak_buy_blocked_count": smart_money_weak_buy_count,
|
|
"sell_priority_liquidity_adjusted": True,
|
|
"ticker_count": len(rows_out),
|
|
"rows": rows_out,
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"source_path": "Temp/smart_money_liquidity_composite_v3.json",
|
|
"outcome_link_ref": "Temp/smart_money_liquidity_outcome_link_v1.json",
|
|
}
|
|
|
|
out_path = Path(args.out) if Path(args.out).is_absolute() else ROOT / args.out
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
summary = {k: v for k, v in result.items() if k != "rows"}
|
|
print(json.dumps(summary, indent=2, ensure_ascii=False))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|