Files
QuantEngineByItz/tools/build_smart_money_flow_signal_v2.py
T
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

181 lines
5.3 KiB
Python

"""SMART_MONEY_FLOW_SIGNAL_V2 — 외인/기관 자금 흐름 신호 산출기.
data_feed의 Frg_5D / Inst_5D / Frg_20D / Inst_20D 필드를 사용하여
스마트머니 흐름 점수와 라벨을 산출한다.
점수 구성 (0~100):
외인 20일 누적: 40점 (상위권 → 40, 하위권 → 0)
기관 20일 누적: 30점
외인 5일 추세: 15점
기관 5일 추세: 15점
라벨:
STRONG_INFLOW ≥ 75
INFLOW ≥ 55
NEUTRAL ≥ 40
OUTFLOW ≥ 25
STRONG_OUTFLOW < 25
출력: Temp/smart_money_flow_signal_v2.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "smart_money_flow_signal_v2.json"
def _load(path: Path) -> dict[str, Any]:
try:
x = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return x if isinstance(x, dict) else {}
def _rows(v: Any) -> list[dict[str, Any]]:
if isinstance(v, list):
return [r for r in v if isinstance(r, dict)]
if isinstance(v, str):
try:
return _rows(json.loads(v))
except Exception:
return []
return []
def _f(v: Any, default: float = 0.0) -> float:
try:
return float(v)
except Exception:
return default
def _percentile_rank(val: float, all_vals: list[float]) -> float:
"""val이 전체 중 몇 %에 위치하는지 (0~100)."""
if not all_vals:
return 50.0
n = len(all_vals)
rank = sum(1 for v in all_vals if v < val)
return round(rank / n * 100.0, 2)
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
jp = Path(args.json)
op = Path(args.out)
if not jp.is_absolute():
jp = ROOT / jp
if not op.is_absolute():
op = ROOT / op
payload = _load(jp)
data = payload.get("data") if isinstance(payload.get("data"), dict) else {}
df_list = _rows(data.get("data_feed"))
# 전체 종목의 흐름 데이터 수집
flow_data: list[dict[str, Any]] = []
for r in df_list:
flow_data.append({
"ticker": str(r.get("Ticker") or r.get("ticker") or ""),
"name": r.get("Name") or r.get("name") or "",
"frg_20d": _f(r.get("Frg_20D")),
"inst_20d": _f(r.get("Inst_20D")),
"frg_5d": _f(r.get("Frg_5D")),
"inst_5d": _f(r.get("Inst_5D")),
})
# 각 지표의 백분위 계산
all_frg_20d = [d["frg_20d"] for d in flow_data]
all_inst_20d = [d["inst_20d"] for d in flow_data]
all_frg_5d = [d["frg_5d"] for d in flow_data]
all_inst_5d = [d["inst_5d"] for d in flow_data]
out_rows = []
scores: list[float] = []
for d in flow_data:
pct_frg20 = _percentile_rank(d["frg_20d"], all_frg_20d)
pct_inst20 = _percentile_rank(d["inst_20d"], all_inst_20d)
pct_frg5 = _percentile_rank(d["frg_5d"], all_frg_5d)
pct_inst5 = _percentile_rank(d["inst_5d"], all_inst_5d)
score = round(
pct_frg20 * 0.40 +
pct_inst20 * 0.30 +
pct_frg5 * 0.15 +
pct_inst5 * 0.15,
2,
)
scores.append(score)
if score >= 75:
label = "STRONG_INFLOW"
elif score >= 55:
label = "INFLOW"
elif score >= 40:
label = "NEUTRAL"
elif score >= 25:
label = "OUTFLOW"
else:
label = "STRONG_OUTFLOW"
out_rows.append({
"ticker": d["ticker"],
"name": d["name"],
"smart_money_score": score,
"label": label,
"frg_20d": d["frg_20d"],
"inst_20d": d["inst_20d"],
"frg_5d": d["frg_5d"],
"inst_5d": d["inst_5d"],
"pct_frg20": pct_frg20,
"pct_inst20": pct_inst20,
"formula_id": "SMART_MONEY_FLOW_SIGNAL_V2",
})
mean = sum(scores) / len(scores) if scores else 0.0
var = sum((s - mean) ** 2 for s in scores) / len(scores) if scores else 0.0
cv = (var ** 0.5) / mean if mean > 0 else 0.0
label_diversity = len({r["label"] for r in out_rows})
label_summary: dict[str, int] = {}
for r in out_rows:
lbl = r["label"]
label_summary[lbl] = label_summary.get(lbl, 0) + 1
gate = "PASS" if (label_diversity >= 3 and cv >= 0.20) else (
"CAUTION" if out_rows else "FAIL"
)
out = {
"formula_id": "SMART_MONEY_FLOW_SIGNAL_V2",
"gate": gate,
"rows": out_rows,
"row_count": len(out_rows),
"coefficient_of_variation": round(cv, 4),
"label_diversity": label_diversity,
"label_summary": label_summary,
}
op.parent.mkdir(parents=True, exist_ok=True)
op.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({
"formula_id": out["formula_id"],
"gate": gate,
"rows": len(out_rows),
"cv": out["coefficient_of_variation"],
"label_summary": label_summary,
}, ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())