Files
QuantEngineByItz/tools/build_rebalance_engine_v1.py
kjh2064 72f8d61244 feat: Sprint-3 완결 + Sprint-4 착수 (WBS-3.2, 3.4, 5.2)
주요 변경:
- [WBS-3.2] 리밸런싱 V2 신호 가중 목표배분 (signal_weighted_ss001_v1)
  * equal_weight -> SS001_Norm_Score 비례 버킷내 배분
  * 하네스: 삼성(36.84%) > SK하이닉스(29.16%), Core=66.00% PASS
- [WBS-3.4] logDailyAssetHistory_ SpreadsheetApp.getActiveSpreadsheet() -> getSpreadsheet_() 수정
  * run_all 컨텍스트에서 null 반환 방지
- [WBS-5.2] deploy_gas.py 전면 재작성
  * src/gas_adapter_parts/ + src/gas/ 양쪽 소스 탐색
  * gdc_01+gdc_02 -> gas_data_collect.gs 번들링
  * dry-run PASS: 17개 파일 WARN 0건
- src/gas/ 디렉토리 신규 추가 (CLASP 조직화 구조)
- tools/automate_routine.py, download_trading_data.py 신규 추가
- .gitignore: .clasprc.json OAuth 토큰 제외 추가
- ROADMAP_WBS.md: Sprint-3 [x] 완료, Sprint-4 착수 목록 추가

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 16:22:19 +09:00

649 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""build_rebalance_engine_v1.py — REBALANCE_ENGINE_V1
리밸런싱 엔진: bucket drift 측정 → 레짐 적응 밴드 → 비용효익 게이트 → 3단계 분할 실행 계획.
ABS_FLOOR / TIME_STOP 강제 매도 신호 통합.
P3: 레짐 적응 밴드 (Risk-On ±15%p, Neutral ±5%p, Risk-Off +2%p/10%p)
P4: 비용효익 게이트 (|drift| TX_COST_ROUNDTRIP > 0.5%p)
P5: 3단계 분할 실행 (Stage1 30%, Stage2 30%, Stage3 40%)
P6: ABS_FLOOR / TIME_STOP 강제 매도 통합
입력: GatherTradingData.json + Temp/computed_harness_v1.json (optional)
출력: Temp/rebalance_engine_v1.json
"""
from __future__ import annotations
import argparse
import json
import math
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
TEMP = ROOT / "Temp"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "rebalance_engine_v1.json"
FORMULA_ID = "REBALANCE_ENGINE_V1"
# ── 버킷 목표 (gdf_01_price_metrics.gs THRESHOLDS 와 동기화) ──────────────────
BUCKET_CONFIG: dict[str, dict] = {
"Core": {"target": 66.0, "min": 60.0, "max": 72.0},
"Satellite": {"target": 17.5, "min": 10.0, "max": 25.0},
"Cash": {"target": 16.5, "min": 10.0, "max": 22.0},
}
# 직접 코어 주도주 (gdc_02_account_satellite.gs isCoreLeader 기준)
CORE_TICKERS_BASE: set[str] = {"005930", "000660", "000270"}
# ── 레짐 적응 밴드 (P3) ──────────────────────────────────────────────────────
# expand = 목표 대비 상단 확장%p, contract = 목표 대비 하단 축소%p (절대값)
REGIME_BANDS: dict[str, dict] = {
"RISK_ON": {"label": "RISK_ON ±15%p", "expand": 15.0, "contract": 15.0},
"SECULAR_LEADER_RISK_ON": {"label": "RISK_ON ±15%p", "expand": 15.0, "contract": 15.0},
"NEUTRAL": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0},
"RISK_OFF_CANDIDATE": {"label": "RISK_OFF_CANDIDATE +2/10%p", "expand": 2.0, "contract": 10.0},
"RISK_OFF": {"label": "RISK_OFF +2/10%p", "expand": 2.0, "contract": 10.0},
"EVENT_SHOCK": {"label": "RISK_OFF +2/10%p", "expand": 2.0, "contract": 10.0},
"_DEFAULT": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0},
}
# ── 비용효익 게이트 (P4) ─────────────────────────────────────────────────────
TX_COST_ONE_WAY = 0.0035 # 0.35% (수수료 + 세금)
TX_COST_ROUNDTRIP = TX_COST_ONE_WAY * 2 # 0.70%
COST_BENEFIT_THRESHOLD = 0.005 # 0.50%p — 이보다 클 때만 주문 생성
MIN_ACTIONABLE_DRIFT_PCT = (TX_COST_ROUNDTRIP + COST_BENEFIT_THRESHOLD) * 100 # 1.20%p
# ── 3단계 분할 실행 비율 (P5) ────────────────────────────────────────────────
STAGE_RATIOS = [0.30, 0.30, 0.40] # Stage1, Stage2, Stage3
# 매도 지정가 할인율 (직전 종가의 0.2% 아래로 지정)
LIMIT_PRICE_DISCOUNT = 0.002
# ── 유틸 ─────────────────────────────────────────────────────────────────────
def _load(path: Path) -> Any:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _f(v: Any, default: float = 0.0) -> float:
try:
return float(v)
except (TypeError, ValueError):
return default
def _i(v: Any, default: int = 0) -> int:
try:
return int(float(v))
except (TypeError, ValueError):
return default
def _s(v: Any, default: str = "") -> str:
return str(v) if v is not None else default
def _round2(v: float) -> float:
return round(v, 2)
def _band_for_regime(regime: str) -> dict:
return REGIME_BANDS.get(regime.upper(), REGIME_BANDS["_DEFAULT"])
def _detect_force_signal(row: dict) -> str:
"""Sell_Reason 또는 Final_Action에서 ABS_FLOOR / TIME_STOP 강제 신호 추출."""
combined = " ".join([
_s(row.get("Sell_Reason")),
_s(row.get("Final_Action")),
_s(row.get("Sell_Action")),
]).upper()
if "ABS_FLOOR" in combined:
return "ABS_FLOOR"
if "TIME_STOP" in combined or "TIME_EXIT" in combined or "TIME_TRIM" in combined:
return "TIME_STOP"
return ""
def _stage_qty_split(total_qty: int) -> list[int]:
"""3단계 수량 분할: [30%, 30%, 40%]. 각 stage는 최소 1주."""
if total_qty <= 0:
return [0, 0, 0]
if total_qty < 3:
return [total_qty, 0, 0]
s1 = max(1, math.floor(total_qty * STAGE_RATIOS[0]))
s2 = max(1, math.floor(total_qty * STAGE_RATIOS[1]))
s3 = total_qty - s1 - s2
if s3 < 0:
s2 = total_qty - s1
s3 = 0
return [s1, s2, max(0, s3)]
def _extract_trim_guidance(hc: dict) -> dict:
tg = hc.get("regime_trim_guidance_json") or {}
if isinstance(tg, str):
try:
tg = json.loads(tg)
except Exception:
tg = {}
return tg if isinstance(tg, dict) else {}
def _extract_df_rows(payload: Any) -> list[dict]:
"""GatherTradingData.json 에서 data_feed 행 배열 추출."""
if not isinstance(payload, dict):
return []
data = payload.get("data") or {}
df = data.get("data_feed") or []
if not isinstance(df, list):
return []
return [r for r in df if isinstance(r, dict)]
def _build_snap_position_map(payload: Any, total_krw: float) -> dict[str, dict]:
"""
account_snapshot 행 배열을 티커별로 합산한 맵 반환.
동일 티커 복수 행(소수 분리 계좌 포함) → qty·MV·원가 합산.
GAS readAccountSnapshotMap_ 의 _mergePositionRecord_ 와 동일 로직.
반환: { ticker: { qty, market_value, weight_pct, position_type, name } }
"""
data = payload.get("data") or {} if isinstance(payload, dict) else {}
snap = data.get("account_snapshot") or []
if not isinstance(snap, list):
return {}
merged: dict[str, dict] = {}
for row in snap:
if not isinstance(row, dict):
continue
if _s(row.get("parse_status")).strip().upper() != "CAPTURE_READ_OK":
continue
# 연금저축 계좌는 주문 불가 — 건너뜀
if "연금저축" in _s(row.get("account_type")):
continue
ticker = _s(row.get("ticker")).strip()
if not ticker:
continue
# US 주식(알파벳만): Naver 가격 없음 → weight 0 → 건너뜀
if ticker.replace(".", "").isalpha():
continue
qty = _f(row.get("holding_quantity"))
if qty <= 0:
continue
mv = _f(row.get("market_value"))
cost = _f(row.get("total_cost"))
pl = _f(row.get("profit_loss"))
name = _s(row.get("name"))
pt = _s(row.get("position_type")).lower()
if ticker not in merged:
merged[ticker] = {
"qty": qty, "market_value": mv,
"total_cost": cost, "profit_loss": pl,
"name": name, "position_type": pt,
}
else:
ex = merged[ticker]
ex["qty"] += qty
ex["market_value"] += mv
ex["total_cost"] += cost
ex["profit_loss"] += pl
# "(소수)" 없는 이름 우선
if "소수" in ex["name"] and "소수" not in name:
ex["name"] = name
if not ex["position_type"] and pt:
ex["position_type"] = pt
# weight_pct 계산
for v in merged.values():
v["weight_pct"] = _round2(v["market_value"] / total_krw * 100) if total_krw > 0 else 0.0
return merged
def _extract_portfolio_totals(payload: Any) -> tuple[float, float]:
"""
(total_portfolio_krw, cash_krw) 추출 우선순위:
1. settings.total_asset_krw / settings.settlement_cash_d2_krw (가장 정확)
2. account_snapshot cash row 의 settlement_cash_d2
3. holdings 합산 (fallback)
"""
data = payload.get("data") or {} if isinstance(payload, dict) else {}
settings = data.get("settings") or {}
# settings 에서 읽기 (ChatGPT → settings 탭에 기록된 총자산)
total_krw = _f(settings.get("total_asset_krw"))
cash_krw = _f(settings.get("settlement_cash_d2_krw"))
if total_krw <= 0:
# account_snapshot cash row fallback
snap = data.get("account_snapshot") or []
if isinstance(snap, list):
for row in snap:
if isinstance(row, dict) and _s(row.get("position_type")).lower() == "cash":
cash_krw = _f(row.get("settlement_cash_d2") or row.get("immediate_cash"))
break
# holdings 합산 (최후 fallback)
df_rows = _extract_df_rows(payload)
holdings_total = sum(_f(r.get("Account_Market_Value")) for r in df_rows)
total_krw = holdings_total + cash_krw
return total_krw, cash_krw
_REGIME_NORMALIZE: dict[str, str] = {
"BREAKDOWN": "RISK_OFF",
"EVENT_SHOCK": "RISK_OFF",
"RISK_OFF_CANDIDATE": "RISK_OFF_CANDIDATE",
"RISK_OFF": "RISK_OFF",
"NEUTRAL": "NEUTRAL",
"RISK_ON": "RISK_ON",
"SECULAR_LEADER_RISK_ON": "RISK_ON",
}
def _normalize_regime(raw: str) -> str:
return _REGIME_NORMALIZE.get(raw.upper(), raw.upper())
def _extract_regime(harness: Any, payload: Any) -> str:
"""
레짐 추출 우선순위:
1. payload.data.macro[Symbol=REGIME_PRELIM].Close — GAS가 실제 사용하는 소스(최신)
2. payload.data.settings.prev_market_regime — ChatGPT 설정값
3. payload.data._harness_context.market_regime_state — harness (최대 수일 stale 가능)
4. payload.data._harness_context.regime_transition_json.current_regime
5. computed_harness_v1.json 의 market_regime 키
6. 기본값 "NEUTRAL"
BREAKDOWN → RISK_OFF 정규화.
GAS gdc_01_fetch_fundamentals.gs:2237 과 동일한 소스 사용.
"""
data = payload.get("data") or {} if isinstance(payload, dict) else {}
# 1. macro 시트 REGIME_PRELIM 행 (GAS 와 동일한 소스 — 항상 최신)
macro_rows = data.get("macro") or []
if isinstance(macro_rows, list):
for r in macro_rows:
if isinstance(r, dict) and _s(r.get("Symbol")).strip() == "REGIME_PRELIM":
v = _s(r.get("Close")).strip()
if v:
return _normalize_regime(v)
# 2. settings.prev_market_regime
settings = data.get("settings") or {}
v = settings.get("prev_market_regime") or settings.get("REGIME_PRELIM")
if v and isinstance(v, str) and v.strip():
return _normalize_regime(v.strip())
# 3-4. _harness_context (수일 stale 가능 — fallback 전용)
hc = data.get("_harness_context") or {}
if isinstance(hc, dict):
v = hc.get("market_regime_state")
if v and isinstance(v, str):
return _normalize_regime(v)
rt = hc.get("regime_transition_json") or {}
if isinstance(rt, str):
try:
rt = json.loads(rt)
except Exception:
rt = {}
if isinstance(rt, dict):
v = rt.get("current_regime")
if v and isinstance(v, str):
return _normalize_regime(v)
# 5. computed_harness_v1.json
if isinstance(harness, dict):
for key in ("market_regime", "Market_Regime", "regime"):
v = harness.get(key)
if v and isinstance(v, str):
return _normalize_regime(v)
return "NEUTRAL"
def _assign_bucket(ticker: str, row: dict, snap_position_type: str = "") -> str:
"""
우선순위: snap_position_type (account_snapshot) > row.position_type > CORE_TICKERS_BASE.
ETF/펀드 코드(알파벳 포함)는 Satellite.
"""
for pt_raw in (snap_position_type, _s(row.get("position_type") or row.get("Position_Type"))):
pt = pt_raw.lower().strip()
if pt == "core":
return "Core"
if pt == "satellite":
return "Satellite"
return "Core" if ticker in CORE_TICKERS_BASE else "Satellite"
def _compute_limit_price(close: float, action: str) -> float:
"""매도 지정가: 종가의 (1 - LIMIT_PRICE_DISCOUNT). 매수는 종가 그대로."""
if close <= 0:
return 0.0
if action == "SELL":
return round(close * (1 - LIMIT_PRICE_DISCOUNT))
return round(close)
# ── 메인 엔진 ─────────────────────────────────────────────────────────────────
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--harness", default=str(TEMP / "computed_harness_v1.json"))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
harness_path = Path(args.harness)
out_path = Path(args.out)
for p in (json_path, harness_path, out_path):
if not p.is_absolute():
p = ROOT / p
payload = _load(json_path)
harness = _load(harness_path)
df_rows = _extract_df_rows(payload)
regime = _extract_regime(harness, payload)
band = _band_for_regime(regime)
total_portfolio_krw, cash_krw = _extract_portfolio_totals(payload)
# harness_context 에서 현금 바닥 목표 읽기 (BUCKET_CONFIG.Cash.min 보다 우선)
hc_data = (payload.get("data") or {}).get("_harness_context") or {}
cash_floor_pct = float(hc_data.get("cash_floor_min_pct") or BUCKET_CONFIG["Cash"]["min"])
cash_shortfall_krw = float(hc_data.get("cash_shortfall_min_krw") or 0)
# Cash 버킷 최소값을 harness 현금바닥으로 override
BUCKET_CONFIG["Cash"]["min"] = max(BUCKET_CONFIG["Cash"]["min"], cash_floor_pct)
# ── 1. account_snapshot 합산 맵 (Weight_Pct 재계산 기준) ─────────────────
# GAS readAccountSnapshotMap_ 에 소수 분리 행 덮어쓰기 버그가 있는 경우를 대비,
# Python 엔진은 account_snapshot 을 직접 합산하여 올바른 weight_pct 를 사용한다.
snap_map = _build_snap_position_map(payload, total_portfolio_krw)
# ── 2. 보유 종목 필터링 (snap_map 우선, data_feed 보조) ───────────────────
holdings = []
seen_tickers: set[str] = set()
# snap_map 에 있는 종목이 authoritative (GAS 소수 분리 버그 무관)
df_row_map = {_s(r.get("Ticker")).strip(): r for r in df_rows if _s(r.get("Ticker")).strip()}
for ticker, sp in snap_map.items():
if sp["weight_pct"] <= 0:
continue
seen_tickers.add(ticker)
df_row = df_row_map.get(ticker, {})
# SS001_Norm_Score: 0~100 신호 강도. 0은 데이터 없음 → 최소 가중치 1.0으로 폴백
ss001_raw = _f(df_row.get("SS001_Norm_Score"), default=-1.0)
ss001_norm = max(1.0, ss001_raw) if ss001_raw > 0 else 1.0
holdings.append({
"ticker": ticker,
"name": sp["name"] or _s(df_row.get("Name")),
"bucket": _assign_bucket(ticker, df_row, sp.get("position_type", "")),
"weight_pct": sp["weight_pct"],
"acct_mv_krw": sp["market_value"],
"holding_qty": int(round(sp["qty"])), # 주문 수량은 정수
"close": _f(df_row.get("Close")),
"final_action": _s(df_row.get("Final_Action")),
"sell_reason": _s(df_row.get("Sell_Reason")),
"force_signal": _detect_force_signal(df_row),
"ss001_norm": ss001_norm,
})
# data_feed 에만 있는 보유 종목 보완 (snap 에 없는 경우 — 비정상이지만 방어)
for row in df_rows:
ticker = _s(row.get("Ticker")).strip()
if not ticker or ticker in seen_tickers:
continue
weight_pct = _f(row.get("Weight_Pct"))
acct_mv = _f(row.get("Account_Market_Value"))
if weight_pct <= 0 and acct_mv <= 0:
continue
ss001_raw = _f(row.get("SS001_Norm_Score"), default=-1.0)
ss001_norm = max(1.0, ss001_raw) if ss001_raw > 0 else 1.0
holdings.append({
"ticker": ticker,
"name": _s(row.get("Name")),
"bucket": _assign_bucket(ticker, row),
"weight_pct": weight_pct,
"acct_mv_krw": acct_mv,
"holding_qty": _i(row.get("Account_Holding_Qty")),
"close": _f(row.get("Close")),
"final_action": _s(row.get("Final_Action")),
"sell_reason": _s(row.get("Sell_Reason")),
"force_signal": _detect_force_signal(row),
"ss001_norm": ss001_norm,
})
# ── 3. 버킷별 현재 비중 집계 ─────────────────────────────────────────────
# weight_pct 는 account_snapshot MV / total_asset_krw 기반 (재계산 완료).
# Cash: settings.settlement_cash_d2_krw / total_asset_krw 로 직접 계산 (정확).
core_pct = sum(h["weight_pct"] for h in holdings if h["bucket"] == "Core")
sat_pct = sum(h["weight_pct"] for h in holdings if h["bucket"] == "Satellite")
if total_portfolio_krw > 0 and cash_krw > 0:
cash_pct = _round2(cash_krw / total_portfolio_krw * 100)
else:
cash_pct = max(0.0, 100.0 - core_pct - sat_pct)
bucket_current = {"Core": core_pct, "Satellite": sat_pct, "Cash": cash_pct}
# ── 3. 버킷 drift + 밴드 계산 ────────────────────────────────────────────
bucket_rows: list[dict] = []
rebalance_needed = False
for bname, bcfg in BUCKET_CONFIG.items():
target = bcfg["target"]
current = _round2(bucket_current[bname])
drift = _round2(current - target)
band_min = _round2(target - band["contract"])
band_max = _round2(target + band["expand"])
if current < band_min:
drift_status = "BREACH_LOW"
rebalance_needed = True
elif current > band_max:
drift_status = "BREACH_HIGH"
rebalance_needed = True
elif abs(drift) >= MIN_ACTIONABLE_DRIFT_PCT / 2:
drift_status = "WARN"
else:
drift_status = "NORMAL"
bucket_rows.append({
"bucket": bname,
"target_pct": target,
"current_pct": current,
"drift_pct": drift,
"band_min": band_min,
"band_max": band_max,
"regime_band": band["label"],
"drift_status": drift_status,
})
# ── 4. 종목별 분석 ───────────────────────────────────────────────────────
# [WBS-3.2] 신호 가중 목표배분 (SS001_Norm_Score 기반)
# ticker_target_pct = bucket_target × (ss001_norm / Σ ss001_norm in bucket)
# 폴백: ss001_norm 모두 1.0 → equal_weight 와 동일
bucket_ss001_total: dict[str, float] = {}
for h in holdings:
bucket_ss001_total[h["bucket"]] = (
bucket_ss001_total.get(h["bucket"], 0.0) + h.get("ss001_norm", 1.0)
)
ticker_rows: list[dict] = []
for h in holdings:
bname = h["bucket"]
bcfg = BUCKET_CONFIG.get(bname, BUCKET_CONFIG["Satellite"])
ss001_w = h.get("ss001_norm", 1.0)
bucket_ss001 = max(bucket_ss001_total.get(bname, ss001_w), 0.01)
target_pct = _round2(bcfg["target"] * ss001_w / bucket_ss001)
current_pct = _round2(h["weight_pct"])
drift = _round2(current_pct - target_pct)
band_min = _round2(target_pct - band["contract"])
band_max = _round2(target_pct + band["expand"])
# 드리프트 상태
force = h["force_signal"]
if force:
drift_status = "FORCE_" + force
action = "SELL"
gate_status = "FORCE_OVERRIDE"
elif current_pct > band_max:
drift_status = "BREACH_HIGH"
action = "SELL" if abs(drift) >= MIN_ACTIONABLE_DRIFT_PCT else "WATCH"
gate_status = "PASS" if abs(drift) >= MIN_ACTIONABLE_DRIFT_PCT else "BLOCKED_BY_COST"
elif current_pct < band_min:
drift_status = "BREACH_LOW"
action = "BUY" if abs(drift) >= MIN_ACTIONABLE_DRIFT_PCT else "WATCH"
gate_status = "PASS" if abs(drift) >= MIN_ACTIONABLE_DRIFT_PCT else "BLOCKED_BY_COST"
elif abs(drift) >= MIN_ACTIONABLE_DRIFT_PCT / 2:
drift_status = "WARN"
gate_status = "BLOCKED_BY_COST"
action = "WATCH"
else:
drift_status = "NORMAL"
gate_status = "BLOCKED_BY_COST"
action = "HOLD"
# 3단계 분할 수량 계산 (P5) — SELL/BUY 액션에만 적용
stage_qtys: list[int] = [0, 0, 0]
limit_prices: list[float] = [0.0, 0.0, 0.0]
trade_value_krw = 0.0
cost_est_krw = 0.0
net_benefit_pct = 0.0
if action in ("SELL", "BUY") and h["holding_qty"] > 0 and h["close"] > 0:
# 매도/매수 수량: 드리프트 해소에 필요한 수량
# |drift|%p ÷ current_pct * 보유수량 근사
if action == "SELL" and current_pct > 0:
adjust_ratio = min(abs(drift) / current_pct, 1.0)
adjust_qty = max(1, round(h["holding_qty"] * adjust_ratio))
else:
adjust_qty = max(1, round(h["holding_qty"] * 0.10)) # BUY: 10% 추가 근사
stage_qtys = _stage_qty_split(adjust_qty)
limit_price = _compute_limit_price(h["close"], action)
limit_prices = [limit_price, limit_price, limit_price]
total_action_qty = sum(stage_qtys)
trade_value_krw = _round2(total_action_qty * limit_price)
cost_est_krw = _round2(trade_value_krw * TX_COST_ROUNDTRIP)
# 비용 차감 후 드리프트 개선 효과
net_benefit_pct = _round2(abs(drift) - TX_COST_ROUNDTRIP * 100)
ticker_rows.append({
"ticker": h["ticker"],
"name": h["name"],
"bucket": bname,
"target_pct": target_pct,
"current_pct": current_pct,
"drift_pct": drift,
"band_min": band_min,
"band_max": band_max,
"regime_band": band["label"],
"drift_status": drift_status,
"force_signal": force,
"gate_status": gate_status,
"action": action,
"stage1_qty": stage_qtys[0],
"stage1_price": limit_prices[0],
"stage2_qty": stage_qtys[1],
"stage2_price": limit_prices[1],
"stage3_qty": stage_qtys[2],
"stage3_price": limit_prices[2],
"trade_value_krw": trade_value_krw,
"cost_est_krw": cost_est_krw,
"net_benefit_pct": net_benefit_pct,
"close": h["close"],
})
# ── 5. ORDERS 생성 (P4: gate_status=PASS 또는 FORCE_OVERRIDE のみ) ────────
order_rows: list[dict] = []
order_no = 1
# Priority: FORCE_OVERRIDE > BREACH_HIGH(SELL) > BREACH_LOW(BUY)
priority_map = {"FORCE_OVERRIDE": 0, "PASS": 1}
sorted_tickers = sorted(
[t for t in ticker_rows if t["gate_status"] in ("PASS", "FORCE_OVERRIDE")],
key=lambda t: (priority_map.get(t["gate_status"], 9), -abs(t["drift_pct"]))
)
for t in sorted_tickers:
for stage_idx, (qty, price) in enumerate(zip(
[t["stage1_qty"], t["stage2_qty"], t["stage3_qty"]],
[t["stage1_price"], t["stage2_price"], t["stage3_price"]],
), start=1):
if qty <= 0:
continue
reason = t["force_signal"] if t["force_signal"] else t["drift_status"]
order_rows.append({
"order_no": order_no,
"ticker": t["ticker"],
"name": t["name"],
"bucket": t["bucket"],
"action": t["action"],
"stage": stage_idx,
"qty": qty,
"limit_price_krw": price,
"trade_value_krw": round(qty * price),
"reason": reason,
})
order_no += 1
# ── 6. SUMMARY ───────────────────────────────────────────────────────────
summary = {
"run_date": _s((payload.get("metadata") or {}).get("converted_at", "")),
"regime": regime,
"regime_band": band["label"],
"total_portfolio_krw": total_portfolio_krw,
"cash_krw": cash_krw,
"core_pct": _round2(core_pct),
"satellite_pct": _round2(sat_pct),
"cash_pct": _round2(cash_pct),
"target_core_pct": BUCKET_CONFIG["Core"]["target"],
"target_sat_pct": BUCKET_CONFIG["Satellite"]["target"],
"target_cash_pct": BUCKET_CONFIG["Cash"]["target"],
"rebalance_needed": rebalance_needed,
"holdings_count": len(holdings),
"orders_count": len(order_rows),
"min_actionable_drift_pct": MIN_ACTIONABLE_DRIFT_PCT,
"cash_floor_min_pct": cash_floor_pct,
"cash_shortfall_krw": cash_shortfall_krw,
"regime_trim_lock": bool(hc_data.get("regime_trim_lock", False)),
"new_buy_gate": _s((_extract_trim_guidance(hc_data)).get("new_buy_gate", "")),
}
# ── 7. 출력 ──────────────────────────────────────────────────────────────
out = {
"formula_id": FORMULA_ID,
"metadata": {
"per_ticker_target_method": "signal_weighted_ss001_v1",
"regime_source": "macro.REGIME_PRELIM > settings > harness_context > computed_harness",
},
"summary": summary,
"buckets": bucket_rows,
"tickers": ticker_rows,
"orders": order_rows,
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
action_count = len([t for t in ticker_rows if t["action"] not in ("HOLD", "WATCH")])
print(f"[{FORMULA_ID}] regime={regime} rebalance_needed={rebalance_needed} "
f"holdings={len(holdings)} action_tickers={action_count} orders={len(order_rows)}")
print(f" core={core_pct:.1f}% sat={sat_pct:.1f}% cash={cash_pct:.1f}%")
print(f" -> {out_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())