Files
QuantEngineByItz/tools/build_rebalance_engine_v2.py
T

185 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""build_rebalance_engine_v2.py — REBALANCE_ENGINE_V2 (Score-Based Allocation)
개선 사항:
1. Equal Weight 탈피: SS001_Norm_Score 를 가중치로 사용하여 종목별 목표 비중 동적 산출.
2. 리스크 예산 연동: 신호가 강한 종목에 더 많은 비중을 배분.
3. 로드맵 WBS-3.2 완결.
로직:
ticker_target_pct = bucket_target_pct * (SS001_Score / sum(Scores_in_bucket))
"""
from __future__ import annotations
import argparse
import json
import math
import os
from pathlib import Path
from typing import Any
# V1 모듈 재사용을 위해 sys.path 추가 (필요시)
import sys
ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(ROOT))
# V1에서 유틸리티 함수 및 설정 상속 (직접 정의)
TEMP = ROOT / "Temp"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "rebalance_engine_v2.json"
FORMULA_ID = "REBALANCE_ENGINE_V2"
BUCKET_CONFIG: dict[str, dict] = {
"Core": {"target": 66.0, "min": 60.0, "max": 72.0},
"Satellite": {"target": 17.5, "min": 10.0, "max": 25.0},
"Cash": {"target": 16.5, "min": 10.0, "max": 22.0},
}
CORE_TICKERS_BASE: set[str] = {"005930", "000660", "000270"}
REGIME_BANDS: dict[str, dict] = {
"RISK_ON": {"label": "RISK_ON ±15%p", "expand": 15.0, "contract": 15.0},
"NEUTRAL": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0},
"RISK_OFF": {"label": "RISK_OFF +2/10%p", "expand": 2.0, "contract": 10.0},
"_DEFAULT": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0},
}
TX_COST_ROUNDTRIP = 0.007
COST_BENEFIT_THRESHOLD = 0.005
MIN_ACTIONABLE_DRIFT_PCT = 1.2
STAGE_RATIOS = [0.30, 0.30, 0.40]
def _load(path: Path) -> Any:
if not path.exists(): return {}
try: return json.loads(path.read_text(encoding="utf-8"))
except: return {}
def _f(v: Any, default: float = 0.0) -> float:
try: return float(v)
except: return default
def _detect_force_signal(row: dict) -> str:
combined = " ".join([str(row.get(k) or "").upper() for k in ["Sell_Reason", "Final_Action", "Sell_Action"]])
if "ABS_FLOOR" in combined: return "ABS_FLOOR"
if any(k in combined for k in ["TIME_STOP", "TIME_EXIT"]): return "TIME_STOP"
return ""
def _extract_df_rows(payload: Any) -> list[dict]:
return payload.get("data", {}).get("data_feed", [])
def _extract_portfolio_totals(payload: Any) -> tuple[float, float]:
settings = payload.get("data", {}).get("settings", {})
return _f(settings.get("total_asset_krw")), _f(settings.get("settlement_cash_d2_krw"))
def _extract_regime(payload: Any) -> str:
macro = payload.get("data", {}).get("macro", [])
for r in macro:
if str(r.get("Symbol")) == "REGIME_PRELIM":
return str(r.get("Close")).upper()
return "NEUTRAL"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
payload = _load(Path(args.json))
df_rows = _extract_df_rows(payload)
regime = _extract_regime(payload)
total_asset, cash_krw = _extract_portfolio_totals(payload)
# 1. 종목별 데이터 추출 (점수 포함)
holdings = []
bucket_scores: dict[str, float] = {}
for row in df_rows:
ticker = str(row.get("Ticker", ""))
mv = _f(row.get("Account_Market_Value"))
if mv <= 0: continue
# 신호 점수 추출 (SS001_Norm_Score)
score = _f(row.get("SS001_Norm_Score"), 50.0) # 기본값 50
bucket = "Core" if ticker in CORE_TICKERS_BASE else "Satellite"
holdings.append({
"ticker": ticker,
"name": row.get("Name"),
"bucket": bucket,
"weight_pct": _f(row.get("Weight_Pct")),
"score": score,
"close": _f(row.get("Close")),
"qty": _f(row.get("Account_Holding_Qty")),
"force_signal": _detect_force_signal(row)
})
bucket_scores[bucket] = bucket_scores.get(bucket, 0.0) + score
# 2. 버킷별 목표 비중 및 종목별 목표 비중 계산 (고도화 로직)
ticker_rows = []
band = REGIME_BANDS.get(regime, REGIME_BANDS["_DEFAULT"])
for h in holdings:
bucket_target = BUCKET_CONFIG[h["bucket"]]["target"]
total_score_in_bucket = bucket_scores[h["bucket"]]
# [V2 핵심] 점수 비례 목표 비중 산출
if total_score_in_bucket > 0:
target_pct = round(bucket_target * (h["score"] / total_score_in_bucket), 2)
else:
target_pct = round(bucket_target / 1, 2) # fallback
current_pct = h["weight_pct"]
drift = round(current_pct - target_pct, 2)
# 밴드 설정
b_min = round(target_pct - band["contract"], 2)
b_max = round(target_pct + band["expand"], 2)
# 액션 결정
action = "HOLD"
if h["force_signal"]:
action = "SELL"
elif drift > MIN_ACTIONABLE_DRIFT_PCT:
action = "SELL"
elif drift < -MIN_ACTIONABLE_DRIFT_PCT:
action = "BUY"
ticker_rows.append({
"ticker": h["ticker"],
"name": h["name"],
"bucket": h["bucket"],
"score": h["score"],
"target_pct": target_pct,
"current_pct": current_pct,
"drift_pct": drift,
"action": action,
"reason": h["force_signal"] or ("DRIFT" if action != "HOLD" else "OK")
})
# 3. 결과 요약
core_pct = sum(h["current_pct"] for h in ticker_rows if h["bucket"] == "Core")
sat_pct = sum(h["current_pct"] for h in ticker_rows if h["bucket"] == "Satellite")
cash_pct = round(cash_krw / total_asset * 100, 2) if total_asset > 0 else 0
summary = {
"regime": regime,
"total_asset": total_asset,
"core_pct": core_pct,
"satellite_pct": sat_pct,
"cash_pct": cash_pct,
"allocation_method": "SS001_SCORE_WEIGHTED"
}
out = {
"formula_id": FORMULA_ID,
"summary": summary,
"tickers": ticker_rows
}
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"REBALANCE_ENGINE_V2 Complete. Allocation: {summary['allocation_method']}")
return 0
if __name__ == "__main__":
main()