"""SMART_MONEY_FLOW_SIGNAL_V2 — 외인/기관 자금 흐름 신호 산출기. data_feed의 Frg_5D / Inst_5D / Frg_20D / Inst_20D 필드를 사용하여 스마트머니 흐름 점수와 라벨을 산출한다. 점수 구성 (0~100): 외인 20일 누적: 40점 (상위권 → 40, 하위권 → 0) 기관 20일 누적: 30점 외인 5일 추세: 15점 기관 5일 추세: 15점 라벨: STRONG_INFLOW ≥ 75 INFLOW ≥ 55 NEUTRAL ≥ 40 OUTFLOW ≥ 25 STRONG_OUTFLOW < 25 출력: Temp/smart_money_flow_signal_v2.json """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "smart_money_flow_signal_v2.json" def _load(path: Path) -> dict[str, Any]: try: x = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return x if isinstance(x, dict) else {} def _rows(v: Any) -> list[dict[str, Any]]: if isinstance(v, list): return [r for r in v if isinstance(r, dict)] if isinstance(v, str): try: return _rows(json.loads(v)) except Exception: return [] return [] def _f(v: Any, default: float = 0.0) -> float: try: return float(v) except Exception: return default def _percentile_rank(val: float, all_vals: list[float]) -> float: """val이 전체 중 몇 %에 위치하는지 (0~100).""" if not all_vals: return 50.0 n = len(all_vals) rank = sum(1 for v in all_vals if v < val) return round(rank / n * 100.0, 2) def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() jp = Path(args.json) op = Path(args.out) if not jp.is_absolute(): jp = ROOT / jp if not op.is_absolute(): op = ROOT / op payload = _load(jp) data = payload.get("data") if isinstance(payload.get("data"), dict) else {} df_list = _rows(data.get("data_feed")) # 전체 종목의 흐름 데이터 수집 flow_data: list[dict[str, Any]] = [] for r in df_list: flow_data.append({ "ticker": str(r.get("Ticker") or r.get("ticker") or ""), "name": r.get("Name") or r.get("name") or "", "frg_20d": _f(r.get("Frg_20D")), "inst_20d": _f(r.get("Inst_20D")), "frg_5d": _f(r.get("Frg_5D")), "inst_5d": _f(r.get("Inst_5D")), }) # 각 지표의 백분위 계산 all_frg_20d = [d["frg_20d"] for d in flow_data] all_inst_20d = [d["inst_20d"] for d in flow_data] all_frg_5d = [d["frg_5d"] for d in flow_data] all_inst_5d = [d["inst_5d"] for d in flow_data] out_rows = [] scores: list[float] = [] for d in flow_data: pct_frg20 = _percentile_rank(d["frg_20d"], all_frg_20d) pct_inst20 = _percentile_rank(d["inst_20d"], all_inst_20d) pct_frg5 = _percentile_rank(d["frg_5d"], all_frg_5d) pct_inst5 = _percentile_rank(d["inst_5d"], all_inst_5d) score = round( pct_frg20 * 0.40 + pct_inst20 * 0.30 + pct_frg5 * 0.15 + pct_inst5 * 0.15, 2, ) scores.append(score) if score >= 75: label = "STRONG_INFLOW" elif score >= 55: label = "INFLOW" elif score >= 40: label = "NEUTRAL" elif score >= 25: label = "OUTFLOW" else: label = "STRONG_OUTFLOW" out_rows.append({ "ticker": d["ticker"], "name": d["name"], "smart_money_score": score, "label": label, "frg_20d": d["frg_20d"], "inst_20d": d["inst_20d"], "frg_5d": d["frg_5d"], "inst_5d": d["inst_5d"], "pct_frg20": pct_frg20, "pct_inst20": pct_inst20, "formula_id": "SMART_MONEY_FLOW_SIGNAL_V2", }) mean = sum(scores) / len(scores) if scores else 0.0 var = sum((s - mean) ** 2 for s in scores) / len(scores) if scores else 0.0 cv = (var ** 0.5) / mean if mean > 0 else 0.0 label_diversity = len({r["label"] for r in out_rows}) label_summary: dict[str, int] = {} for r in out_rows: lbl = r["label"] label_summary[lbl] = label_summary.get(lbl, 0) + 1 gate = "PASS" if (label_diversity >= 3 and cv >= 0.20) else ( "CAUTION" if out_rows else "FAIL" ) out = { "formula_id": "SMART_MONEY_FLOW_SIGNAL_V2", "gate": gate, "rows": out_rows, "row_count": len(out_rows), "coefficient_of_variation": round(cv, 4), "label_diversity": label_diversity, "label_summary": label_summary, } op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps({ "formula_id": out["formula_id"], "gate": gate, "rows": len(out_rows), "cv": out["coefficient_of_variation"], "label_summary": label_summary, }, ensure_ascii=False)) return 0 if __name__ == "__main__": raise SystemExit(main())