ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
270 lines
12 KiB
Python
270 lines
12 KiB
Python
"""build_scores_harness_v1.py — SCORES_HARNESS_V1
|
||
|
||
프롬프트 §3.6/3.7/4.2 — 기존 하네스 출력에서 4개 스코어(smart_money / liquidity /
|
||
momentum / risk)를 결정론적으로 집계하고, 투자기간별 가중치가 적용된 final_score를 산출.
|
||
|
||
산출물: Temp/scores_harness_v1.json
|
||
|
||
산식(프롬프트 §4.2, 투자기간 가중치):
|
||
SHORT: final = 0.10*F + 0.20*SM + 0.25*L + 0.30*M + 0.05*V - 0.10*R
|
||
MID/LONG: final = 0.35*F + 0.10*SM + 0.10*L + 0.10*M + 0.25*V - 0.10*R
|
||
DEFAULT: final = 0.25*F + 0.20*SM + 0.15*L + 0.15*M + 0.15*V - 0.10*R
|
||
|
||
원칙: 데이터 없는 값은 만들지 않는다. 미충족 항목은 not_available / insufficient_data.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
ROOT = Path(__file__).resolve().parents[1]
|
||
TEMP = ROOT / "Temp"
|
||
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
||
DEFAULT_OUT = TEMP / "scores_harness_v1.json"
|
||
|
||
FORMULA_ID = "SCORES_HARNESS_V1"
|
||
NA = "not_available"
|
||
|
||
# §4.2 투자기간별 가중치
|
||
WEIGHTS = {
|
||
"SHORT": dict(F=0.10, SM=0.20, L=0.25, M=0.30, V=0.05, R=-0.10),
|
||
"MID": dict(F=0.35, SM=0.10, L=0.10, M=0.10, V=0.25, R=-0.10),
|
||
"LONG": dict(F=0.35, SM=0.10, L=0.10, M=0.10, V=0.25, R=-0.10),
|
||
"ETF": dict(F=0.15, SM=0.20, L=0.25, M=0.25, V=0.05, R=-0.10),
|
||
"DEFAULT": dict(F=0.25, SM=0.20, L=0.15, M=0.15, V=0.15, R=-0.10),
|
||
}
|
||
|
||
|
||
def _load(path: Path) -> Any:
|
||
if not path.exists():
|
||
return {}
|
||
try:
|
||
return json.loads(path.read_text(encoding="utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _f(v: Any, default: float | None = None) -> float | None:
|
||
try:
|
||
return float(v)
|
||
except Exception:
|
||
return default
|
||
|
||
|
||
def _extract_harness_root(payload: Any) -> dict[str, Any]:
|
||
if not isinstance(payload, dict):
|
||
return {}
|
||
h = payload.get("hApex")
|
||
dc = (payload.get("data") or {}).get("_harness_context")
|
||
if isinstance(h, dict) and isinstance(dc, dict):
|
||
m = dict(dc); m.update(h); return m
|
||
return h if isinstance(h, dict) else dc if isinstance(dc, dict) else payload
|
||
|
||
|
||
# ── 스마트머니 스코어 집계 ─────────────────────────────────────────────────
|
||
def _smart_money_score(smf: dict) -> float | None:
|
||
"""smart_money_flow_signal_v2 → 포트폴리오 평균 smart_money_score (0~100)."""
|
||
rows = smf.get("rows") or []
|
||
scores = [_f(r.get("smart_money_score")) for r in rows if isinstance(r, dict)]
|
||
valid = [s for s in scores if s is not None]
|
||
return round(sum(valid) / len(valid), 1) if valid else None
|
||
|
||
|
||
# ── 유동성 스코어 집계 ───────────────────────────────────────────────────────
|
||
_LIQ_MAP = {"DEEP": 90, "NORMAL": 65, "THIN": 35, "ILLIQUID": 10}
|
||
|
||
|
||
def _liquidity_score(liq: dict) -> float | None:
|
||
"""liquidity_flow_signal_v1 → 포트폴리오 평균 유동성 스코어 (0~100)."""
|
||
rows = liq.get("rows") or []
|
||
raw = [_f(r.get("liquidity_score")) for r in rows if isinstance(r, dict)]
|
||
valid_raw = [v for v in raw if v is not None]
|
||
if valid_raw:
|
||
return round(sum(valid_raw) / len(valid_raw), 1)
|
||
# label 기반 프록시
|
||
labels = [r.get("liquidity_label") for r in rows if isinstance(r, dict)]
|
||
mapped = [_LIQ_MAP[l] for l in labels if l in _LIQ_MAP]
|
||
return round(sum(mapped) / len(mapped), 1) if mapped else None
|
||
|
||
|
||
# ── 모멘텀 스코어 집계 ──────────────────────────────────────────────────────
|
||
def _momentum_score(pac: dict, pae: dict, pred: dict) -> float | None:
|
||
"""portfolio_alpha_confidence_per_ticker(pac_score) + predictive_alpha_engine(direction_confidence)
|
||
→ 모멘텀 프록시 (0~100).
|
||
pac_score 범위: -100~100 → 0~100 정규화.
|
||
direction_confidence 범위: 0~100.
|
||
t5 정확도로 보정.
|
||
"""
|
||
rows = pac.get("rows") or []
|
||
pac_vals = [_f(r.get("pac_score")) for r in rows if isinstance(r, dict)]
|
||
valid_pac = [v for v in pac_vals if v is not None]
|
||
|
||
pae_rows = pae.get("rows") or []
|
||
dir_confs = [_f(r.get("direction_confidence")) for r in pae_rows if isinstance(r, dict)]
|
||
valid_dir = [v for v in dir_confs if v is not None]
|
||
|
||
parts = []
|
||
if valid_pac:
|
||
# pac_score: -100~100 → 정규화 to 0~100
|
||
avg_pac = sum(valid_pac) / len(valid_pac)
|
||
parts.append((avg_pac + 100.0) / 2.0)
|
||
if valid_dir:
|
||
parts.append(sum(valid_dir) / len(valid_dir))
|
||
|
||
if not parts:
|
||
return None
|
||
avg = sum(parts) / len(parts)
|
||
# t5 정확도로 보정 (t5=50 → 0.75 유지, t5=100 → 1.0)
|
||
t5 = _f(pred.get("t5_op_rate"))
|
||
if t5 is not None:
|
||
scale = 0.5 + 0.5 * max(0.0, min(1.0, (t5 - 50.0) / 50.0))
|
||
return round(max(0.0, min(100.0, avg * scale)), 1)
|
||
return round(max(0.0, min(100.0, avg)), 1)
|
||
|
||
|
||
# ── 리스크 스코어 집계 ──────────────────────────────────────────────────────
|
||
def _risk_score(harness: dict) -> float | None:
|
||
"""total_heat_pct + portfolio_beta + macro_risk_score → 합성 리스크 스코어 (0~100, 높을수록 위험)."""
|
||
heat = _f(harness.get("total_heat_pct"))
|
||
beta = _f(harness.get("portfolio_beta"))
|
||
macro = _f(harness.get("macro_risk_score"))
|
||
parts = []
|
||
# Heat: 0%=0점, 15%+=100점
|
||
if heat is not None:
|
||
parts.append(min(100.0, heat / 15.0 * 100.0))
|
||
# Beta: 0.5=0점, 2.0=100점
|
||
if beta is not None:
|
||
parts.append(min(100.0, max(0.0, (beta - 0.5) / 1.5 * 100.0)))
|
||
# Macro: 그대로 (0~100)
|
||
if macro is not None:
|
||
parts.append(min(100.0, max(0.0, float(macro))))
|
||
return round(sum(parts) / len(parts), 1) if parts else None
|
||
|
||
|
||
# ── 밸류에이션 스코어 ─────────────────────────────────────────────────────
|
||
def _valuation_score(fund: dict) -> float | None:
|
||
"""fundamental_multifactor_v3 rows valuation 서브스코어 평균 (0~100)."""
|
||
rows = fund.get("rows") or []
|
||
non_etf = [r for r in rows if isinstance(r, dict) and not r.get("is_etf")]
|
||
vals = [_f((r.get("breakdown") or {}).get("valuation")) for r in non_etf]
|
||
valid = [v for v in vals if v is not None and v > 0]
|
||
return round(sum(valid) / len(valid) * 5, 1) if valid else None # 0~20 → 0~100
|
||
|
||
|
||
# ── 펀더멘털 스코어 ──────────────────────────────────────────────────────────
|
||
def _fundamental_score(fund: dict) -> tuple[float | None, str]:
|
||
rows = fund.get("rows") or []
|
||
non_etf = [r for r in rows if isinstance(r, dict) and not r.get("is_etf")]
|
||
scores = [_f(r.get("score")) for r in non_etf if _f(r.get("score")) is not None]
|
||
if not scores:
|
||
return None, "ROE/OPM/OCF/FCF 전면 결측 — insufficient_data"
|
||
return round(sum(scores) / len(scores), 1), "partial — core factor (ROE/OPM/OCF/FCF) 결측"
|
||
|
||
|
||
# ── 최종 스코어 산출 ─────────────────────────────────────────────────────────
|
||
def _final_score(F, SM, L, M, V, R, horizon: str) -> dict[str, Any]:
|
||
w = WEIGHTS.get(horizon, WEIGHTS["DEFAULT"])
|
||
components = dict(F=F, SM=SM, L=L, M=M, V=V, R=R)
|
||
missing = [k for k, v in components.items() if v is None]
|
||
if len(missing) >= 3:
|
||
return {"value": NA, "note": f"충분한 스코어 없음(missing={missing})", "horizon": horizon}
|
||
# 사용 가능한 값만으로 비례 재가중
|
||
avail_w = {k: abs(wv) for k, wv in w.items() if components[k] is not None}
|
||
total_w = sum(avail_w.values())
|
||
if total_w <= 0:
|
||
return {"value": NA, "note": "가중치 합 0", "horizon": horizon}
|
||
score = 0.0
|
||
for k, wv in w.items():
|
||
v = components[k]
|
||
if v is None:
|
||
continue
|
||
eff_w = (abs(wv) / total_w) * (1 if wv > 0 else -1)
|
||
score += eff_w * v
|
||
return {
|
||
"value": round(score, 1),
|
||
"horizon": horizon,
|
||
"weights_used": w,
|
||
"missing_components": missing,
|
||
"note": f"가중 합산 (missing={missing}가 제외되어 재정규화됨)" if missing else "전체 컴포넌트 사용",
|
||
"formula": "F×wF + SM×wSM + L×wL + M×wM + V×wV + R×wR (§4.2)",
|
||
}
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
||
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
||
args = ap.parse_args()
|
||
|
||
json_path = Path(args.json)
|
||
if not json_path.is_absolute():
|
||
json_path = ROOT / json_path
|
||
out_path = Path(args.out)
|
||
if not out_path.is_absolute():
|
||
out_path = ROOT / args.out
|
||
|
||
payload = _load(json_path)
|
||
harness = _extract_harness_root(payload)
|
||
|
||
smf = _load(TEMP / "smart_money_flow_signal_v2.json")
|
||
liq = _load(TEMP / "liquidity_flow_signal_v1.json")
|
||
pac = _load(TEMP / "portfolio_alpha_confidence_per_ticker_v1.json")
|
||
pae = _load(TEMP / "predictive_alpha_engine_v2.json")
|
||
pred = _load(TEMP / "prediction_accuracy_harness_v2.json")
|
||
fund = _load(TEMP / "fundamental_multifactor_v3.json")
|
||
horizon_cls = _load(TEMP / "horizon_classification_v1.json")
|
||
|
||
# 지배적 투자기간 (비중 최대 버킷)
|
||
alloc = horizon_cls.get("allocation_pct") or {}
|
||
dominant_horizon = max(alloc, key=lambda k: alloc.get(k, 0)) if alloc else "DEFAULT"
|
||
|
||
sm = _smart_money_score(smf)
|
||
liq_s = _liquidity_score(liq)
|
||
mom = _momentum_score(pac, pae, pred)
|
||
risk = _risk_score(harness)
|
||
val = _valuation_score(fund)
|
||
f_s, f_note = _fundamental_score(fund)
|
||
|
||
final = _final_score(f_s, sm, liq_s, mom, val, risk, dominant_horizon)
|
||
|
||
result = {
|
||
"formula_id": FORMULA_ID,
|
||
"dominant_horizon": dominant_horizon,
|
||
"scores": {
|
||
"fundamental_score": f_s if f_s is not None else NA,
|
||
"fundamental_note": f_note,
|
||
"smart_money_score": sm if sm is not None else NA,
|
||
"smart_money_source": "smart_money_flow_signal_v2.json (per-ticker avg)",
|
||
"liquidity_score": liq_s if liq_s is not None else NA,
|
||
"liquidity_source": "liquidity_flow_signal_v1.json (label proxy)",
|
||
"momentum_score": mom if mom is not None else NA,
|
||
"momentum_source": "pac_score(PAC) + direction_confidence(PAE) × t5_op_rate 보정",
|
||
"valuation_score": val if val is not None else NA,
|
||
"valuation_source": "fundamental_multifactor_v3 breakdown.valuation (부분)",
|
||
"risk_score": risk if risk is not None else NA,
|
||
"risk_components": {
|
||
"total_heat_pct": harness.get("total_heat_pct"),
|
||
"portfolio_beta": harness.get("portfolio_beta"),
|
||
"macro_risk_score": harness.get("macro_risk_score"),
|
||
},
|
||
},
|
||
"final_score": final,
|
||
"formula_ref": "spec/13b_harness_formulas.yaml § SCORES_HARNESS_V1 (§4.2)",
|
||
"weight_table": WEIGHTS,
|
||
}
|
||
|
||
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
||
fs_v = final.get("value")
|
||
print(
|
||
f"[{FORMULA_ID}] horizon={dominant_horizon} "
|
||
f"F={f_s} SM={sm} L={liq_s} M={mom} V={val} R={risk} "
|
||
f"-> final_score={fs_v} -> {out_path}"
|
||
)
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|