Files
QuantEngineByItz/tools/build_scores_harness_v1.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

270 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""build_scores_harness_v1.py — SCORES_HARNESS_V1
프롬프트 §3.6/3.7/4.2 — 기존 하네스 출력에서 4개 스코어(smart_money / liquidity /
momentum / risk)를 결정론적으로 집계하고, 투자기간별 가중치가 적용된 final_score를 산출.
산출물: Temp/scores_harness_v1.json
산식(프롬프트 §4.2, 투자기간 가중치):
SHORT: final = 0.10*F + 0.20*SM + 0.25*L + 0.30*M + 0.05*V - 0.10*R
MID/LONG: final = 0.35*F + 0.10*SM + 0.10*L + 0.10*M + 0.25*V - 0.10*R
DEFAULT: final = 0.25*F + 0.20*SM + 0.15*L + 0.15*M + 0.15*V - 0.10*R
원칙: 데이터 없는 값은 만들지 않는다. 미충족 항목은 not_available / insufficient_data.
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
TEMP = ROOT / "Temp"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "scores_harness_v1.json"
FORMULA_ID = "SCORES_HARNESS_V1"
NA = "not_available"
# §4.2 투자기간별 가중치
WEIGHTS = {
"SHORT": dict(F=0.10, SM=0.20, L=0.25, M=0.30, V=0.05, R=-0.10),
"MID": dict(F=0.35, SM=0.10, L=0.10, M=0.10, V=0.25, R=-0.10),
"LONG": dict(F=0.35, SM=0.10, L=0.10, M=0.10, V=0.25, R=-0.10),
"ETF": dict(F=0.15, SM=0.20, L=0.25, M=0.25, V=0.05, R=-0.10),
"DEFAULT": dict(F=0.25, SM=0.20, L=0.15, M=0.15, V=0.15, R=-0.10),
}
def _load(path: Path) -> Any:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _f(v: Any, default: float | None = None) -> float | None:
try:
return float(v)
except Exception:
return default
def _extract_harness_root(payload: Any) -> dict[str, Any]:
if not isinstance(payload, dict):
return {}
h = payload.get("hApex")
dc = (payload.get("data") or {}).get("_harness_context")
if isinstance(h, dict) and isinstance(dc, dict):
m = dict(dc); m.update(h); return m
return h if isinstance(h, dict) else dc if isinstance(dc, dict) else payload
# ── 스마트머니 스코어 집계 ─────────────────────────────────────────────────
def _smart_money_score(smf: dict) -> float | None:
"""smart_money_flow_signal_v2 → 포트폴리오 평균 smart_money_score (0~100)."""
rows = smf.get("rows") or []
scores = [_f(r.get("smart_money_score")) for r in rows if isinstance(r, dict)]
valid = [s for s in scores if s is not None]
return round(sum(valid) / len(valid), 1) if valid else None
# ── 유동성 스코어 집계 ───────────────────────────────────────────────────────
_LIQ_MAP = {"DEEP": 90, "NORMAL": 65, "THIN": 35, "ILLIQUID": 10}
def _liquidity_score(liq: dict) -> float | None:
"""liquidity_flow_signal_v1 → 포트폴리오 평균 유동성 스코어 (0~100)."""
rows = liq.get("rows") or []
raw = [_f(r.get("liquidity_score")) for r in rows if isinstance(r, dict)]
valid_raw = [v for v in raw if v is not None]
if valid_raw:
return round(sum(valid_raw) / len(valid_raw), 1)
# label 기반 프록시
labels = [r.get("liquidity_label") for r in rows if isinstance(r, dict)]
mapped = [_LIQ_MAP[l] for l in labels if l in _LIQ_MAP]
return round(sum(mapped) / len(mapped), 1) if mapped else None
# ── 모멘텀 스코어 집계 ──────────────────────────────────────────────────────
def _momentum_score(pac: dict, pae: dict, pred: dict) -> float | None:
"""portfolio_alpha_confidence_per_ticker(pac_score) + predictive_alpha_engine(direction_confidence)
→ 모멘텀 프록시 (0~100).
pac_score 범위: -100~100 → 0~100 정규화.
direction_confidence 범위: 0~100.
t5 정확도로 보정.
"""
rows = pac.get("rows") or []
pac_vals = [_f(r.get("pac_score")) for r in rows if isinstance(r, dict)]
valid_pac = [v for v in pac_vals if v is not None]
pae_rows = pae.get("rows") or []
dir_confs = [_f(r.get("direction_confidence")) for r in pae_rows if isinstance(r, dict)]
valid_dir = [v for v in dir_confs if v is not None]
parts = []
if valid_pac:
# pac_score: -100~100 → 정규화 to 0~100
avg_pac = sum(valid_pac) / len(valid_pac)
parts.append((avg_pac + 100.0) / 2.0)
if valid_dir:
parts.append(sum(valid_dir) / len(valid_dir))
if not parts:
return None
avg = sum(parts) / len(parts)
# t5 정확도로 보정 (t5=50 → 0.75 유지, t5=100 → 1.0)
t5 = _f(pred.get("t5_op_rate"))
if t5 is not None:
scale = 0.5 + 0.5 * max(0.0, min(1.0, (t5 - 50.0) / 50.0))
return round(max(0.0, min(100.0, avg * scale)), 1)
return round(max(0.0, min(100.0, avg)), 1)
# ── 리스크 스코어 집계 ──────────────────────────────────────────────────────
def _risk_score(harness: dict) -> float | None:
"""total_heat_pct + portfolio_beta + macro_risk_score → 합성 리스크 스코어 (0~100, 높을수록 위험)."""
heat = _f(harness.get("total_heat_pct"))
beta = _f(harness.get("portfolio_beta"))
macro = _f(harness.get("macro_risk_score"))
parts = []
# Heat: 0%=0점, 15%+=100점
if heat is not None:
parts.append(min(100.0, heat / 15.0 * 100.0))
# Beta: 0.5=0점, 2.0=100점
if beta is not None:
parts.append(min(100.0, max(0.0, (beta - 0.5) / 1.5 * 100.0)))
# Macro: 그대로 (0~100)
if macro is not None:
parts.append(min(100.0, max(0.0, float(macro))))
return round(sum(parts) / len(parts), 1) if parts else None
# ── 밸류에이션 스코어 ─────────────────────────────────────────────────────
def _valuation_score(fund: dict) -> float | None:
"""fundamental_multifactor_v3 rows valuation 서브스코어 평균 (0~100)."""
rows = fund.get("rows") or []
non_etf = [r for r in rows if isinstance(r, dict) and not r.get("is_etf")]
vals = [_f((r.get("breakdown") or {}).get("valuation")) for r in non_etf]
valid = [v for v in vals if v is not None and v > 0]
return round(sum(valid) / len(valid) * 5, 1) if valid else None # 0~20 → 0~100
# ── 펀더멘털 스코어 ──────────────────────────────────────────────────────────
def _fundamental_score(fund: dict) -> tuple[float | None, str]:
rows = fund.get("rows") or []
non_etf = [r for r in rows if isinstance(r, dict) and not r.get("is_etf")]
scores = [_f(r.get("score")) for r in non_etf if _f(r.get("score")) is not None]
if not scores:
return None, "ROE/OPM/OCF/FCF 전면 결측 — insufficient_data"
return round(sum(scores) / len(scores), 1), "partial — core factor (ROE/OPM/OCF/FCF) 결측"
# ── 최종 스코어 산출 ─────────────────────────────────────────────────────────
def _final_score(F, SM, L, M, V, R, horizon: str) -> dict[str, Any]:
w = WEIGHTS.get(horizon, WEIGHTS["DEFAULT"])
components = dict(F=F, SM=SM, L=L, M=M, V=V, R=R)
missing = [k for k, v in components.items() if v is None]
if len(missing) >= 3:
return {"value": NA, "note": f"충분한 스코어 없음(missing={missing})", "horizon": horizon}
# 사용 가능한 값만으로 비례 재가중
avail_w = {k: abs(wv) for k, wv in w.items() if components[k] is not None}
total_w = sum(avail_w.values())
if total_w <= 0:
return {"value": NA, "note": "가중치 합 0", "horizon": horizon}
score = 0.0
for k, wv in w.items():
v = components[k]
if v is None:
continue
eff_w = (abs(wv) / total_w) * (1 if wv > 0 else -1)
score += eff_w * v
return {
"value": round(score, 1),
"horizon": horizon,
"weights_used": w,
"missing_components": missing,
"note": f"가중 합산 (missing={missing}가 제외되어 재정규화됨)" if missing else "전체 컴포넌트 사용",
"formula": "F×wF + SM×wSM + L×wL + M×wM + V×wV + R×wR (§4.2)",
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
json_path = Path(args.json)
if not json_path.is_absolute():
json_path = ROOT / json_path
out_path = Path(args.out)
if not out_path.is_absolute():
out_path = ROOT / args.out
payload = _load(json_path)
harness = _extract_harness_root(payload)
smf = _load(TEMP / "smart_money_flow_signal_v2.json")
liq = _load(TEMP / "liquidity_flow_signal_v1.json")
pac = _load(TEMP / "portfolio_alpha_confidence_per_ticker_v1.json")
pae = _load(TEMP / "predictive_alpha_engine_v2.json")
pred = _load(TEMP / "prediction_accuracy_harness_v2.json")
fund = _load(TEMP / "fundamental_multifactor_v3.json")
horizon_cls = _load(TEMP / "horizon_classification_v1.json")
# 지배적 투자기간 (비중 최대 버킷)
alloc = horizon_cls.get("allocation_pct") or {}
dominant_horizon = max(alloc, key=lambda k: alloc.get(k, 0)) if alloc else "DEFAULT"
sm = _smart_money_score(smf)
liq_s = _liquidity_score(liq)
mom = _momentum_score(pac, pae, pred)
risk = _risk_score(harness)
val = _valuation_score(fund)
f_s, f_note = _fundamental_score(fund)
final = _final_score(f_s, sm, liq_s, mom, val, risk, dominant_horizon)
result = {
"formula_id": FORMULA_ID,
"dominant_horizon": dominant_horizon,
"scores": {
"fundamental_score": f_s if f_s is not None else NA,
"fundamental_note": f_note,
"smart_money_score": sm if sm is not None else NA,
"smart_money_source": "smart_money_flow_signal_v2.json (per-ticker avg)",
"liquidity_score": liq_s if liq_s is not None else NA,
"liquidity_source": "liquidity_flow_signal_v1.json (label proxy)",
"momentum_score": mom if mom is not None else NA,
"momentum_source": "pac_score(PAC) + direction_confidence(PAE) × t5_op_rate 보정",
"valuation_score": val if val is not None else NA,
"valuation_source": "fundamental_multifactor_v3 breakdown.valuation (부분)",
"risk_score": risk if risk is not None else NA,
"risk_components": {
"total_heat_pct": harness.get("total_heat_pct"),
"portfolio_beta": harness.get("portfolio_beta"),
"macro_risk_score": harness.get("macro_risk_score"),
},
},
"final_score": final,
"formula_ref": "spec/13b_harness_formulas.yaml § SCORES_HARNESS_V1 (§4.2)",
"weight_table": WEIGHTS,
}
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
fs_v = final.get("value")
print(
f"[{FORMULA_ID}] horizon={dominant_horizon} "
f"F={f_s} SM={sm} L={liq_s} M={mom} V={val} R={risk} "
f"-> final_score={fs_v} -> {out_path}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())