Files
QuantEngineByItz/tools/build_predictive_alpha_dialectic_engine_v2.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

276 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "predictive_alpha_engine_v2.json"
DEFAULT_CAPITAL = ROOT / "Temp" / "capital_style_allocation_v1.json"
DEFAULT_HORIZON = ROOT / "Temp" / "horizon_classification_v1.json"
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--capital", default=str(DEFAULT_CAPITAL))
ap.add_argument("--horizon", default=str(DEFAULT_HORIZON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
jp = Path(args.json)
op = Path(args.out)
if not jp.is_absolute():
jp = ROOT / jp
if not op.is_absolute():
op = ROOT / op
data = _load(jp)
capital = _load(Path(args.capital) if Path(args.capital).is_absolute() else ROOT / args.capital)
horizon = _load(Path(args.horizon) if Path(args.horizon).is_absolute() else ROOT / args.horizon)
out = {
"formula_id": "PREDICTIVE_ALPHA_DIALECTIC_ENGINE_V2",
"gate": "PASS",
"numeric_generation_allowed": 0,
"rows": []
}
# P1-1: PA1 팩터 캡 적용 — 단일 거시팩터 기여 ≤ antithesis_total의 50%
# usd_krw_weak 등 글로벌 팩터가 종목별 antithesis를 지배하지 못하도록 캡 강제
MACRO_FACTORS = {"usd_krw_weak", "vkospi_high", "global_risk_off", "fomc_hawkish"}
SINGLE_FACTOR_CAP_RATIO = 0.50 # antithesis_total 대비 50% 상한
# [NF1] 종목별 FX 민감도 베타 — 수출주(삼성전자·SK하이닉스)는 usd_krw_weak 영향 ↑
# 내수주는 FX 영향 상대적으로 낮으므로 usd_krw_weak 기여를 줄임
EXPORT_TICKERS = {"005930", "000660", "000660", "034020"} # 삼성전자, SK하이닉스, 두산에너빌리티
DOMESTIC_TICKERS = {"010120", "064350", "028050", "012450"} # 내수/IT서비스 위주
FX_BETA_EXPORT = 1.2 # 수출주: FX 민감도 20% 가중
FX_BETA_DOMESTIC = 0.7 # 내수주: FX 민감도 30% 축소
FX_BETA_DEFAULT = 1.0
hctx = data.get("data", {}).get("_harness_context", {}) if isinstance(data, dict) else {}
pa_json_raw = hctx.get("predictive_alpha_json", [])
if isinstance(pa_json_raw, str):
try:
pa_json_raw = json.loads(pa_json_raw)
except Exception:
pa_json_raw = []
rows_in = pa_json_raw if isinstance(pa_json_raw, list) else pa_json_raw.get("rows", [])
if not rows_in:
cap_rows = capital.get("rows") if isinstance(capital.get("rows"), list) else []
hz_rows = {str(r.get("ticker") or ""): r for r in (horizon.get("rows") or []) if isinstance(r, dict)}
rows_in = []
for r in cap_rows:
if not isinstance(r, dict):
continue
best = max(
[s for s in (r.get("styles") or []) if isinstance(s, dict)],
key=lambda s: float(s.get("conviction_score") or 0.0),
default={},
)
best_style = str(best.get("style") or "UNKNOWN")
conviction = float(best.get("conviction_score") or 0.0)
expected_horizon = {"SCALP": "SHORT", "SWING": "SHORT", "MOMENTUM": "MID", "POSITION": "LONG"}.get(best_style, "UNKNOWN")
actual_horizon = str(hz_rows.get(str(r.get("ticker") or ""), {}).get("horizon") or "UNKNOWN")
rows_in.append({
"ticker": r.get("ticker"),
"name": r.get("name"),
"thesis_breakdown": [
{"factor": f"{best_style}_CONVICTION_PROXY", "hit": True, "score": round(conviction * 0.6, 2)},
{"factor": "DATA_QUALITY_PROXY", "hit": True, "score": 10.0 if actual_horizon != "UNKNOWN" else 0.0},
],
"antithesis_breakdown": [
{"factor": "STYLE_MISMATCH_RISK", "hit": expected_horizon != actual_horizon and actual_horizon not in ("UNKNOWN", "ETF"), "score": 25.0 if expected_horizon != actual_horizon else 0.0},
{"factor": "MACRO_RISK_PROXY", "hit": True, "score": max(0.0, 30.0 - conviction * 0.2)},
],
"synthesis_verdict": "WATCH",
})
# [NF2] REBOUND_CAPTURE thesis factor — prices_json에서 조건 확인
prices_raw = hctx.get("prices_json", [])
if isinstance(prices_raw, str):
try:
prices_raw = json.loads(prices_raw)
except Exception:
prices_raw = []
prices_map = {str(p.get("ticker") or ""): p for p in (prices_raw if isinstance(prices_raw, list) else [])}
REBOUND_CAPTURE_WEIGHT = 15.0 # thesis bonus 점수
import statistics as _stats
pac_vals = []
for r in rows_in:
ticker = str(r.get("ticker") or "")
# [NF1] FX beta 결정
fx_beta = (
FX_BETA_EXPORT if ticker in EXPORT_TICKERS
else FX_BETA_DOMESTIC if ticker in DOMESTIC_TICKERS
else FX_BETA_DEFAULT
)
anti_breakdown = r.get("antithesis_breakdown") or []
anti_total_raw = sum(float(s.get("score", 0)) for s in anti_breakdown if s.get("hit"))
# 캡 적용: 단일 팩터 기여 ≤ anti_total_raw × 50%
# + [NF1] macro 팩터에 fx_beta 가중치 반영
anti_breakdown_capped = []
anti_total_capped = 0.0
for s in anti_breakdown:
if not s.get("hit"):
anti_breakdown_capped.append(dict(s, capped=False, fx_beta_applied=False))
continue
raw_contrib = float(s.get("score", 0))
factor_id = str(s.get("factor") or s.get("id") or "")
# [NF1] usd_krw_weak에 FX beta 적용
fx_adjusted = raw_contrib
fx_applied = False
if "usd_krw_weak" in factor_id or factor_id in MACRO_FACTORS:
fx_adjusted = round(raw_contrib * fx_beta, 2)
fx_applied = True
max_allowed = anti_total_raw * SINGLE_FACTOR_CAP_RATIO if anti_total_raw > 0 else fx_adjusted
capped_contrib = min(fx_adjusted, max_allowed)
capped = capped_contrib < fx_adjusted
anti_breakdown_capped.append(dict(
s,
score=round(capped_contrib, 2),
score_raw=raw_contrib,
capped=capped,
fx_beta_applied=fx_applied,
fx_beta=fx_beta if fx_applied else None,
factor_share_pct=round(raw_contrib / anti_total_raw * 100, 1) if anti_total_raw > 0 else 0,
))
anti_total_capped += capped_contrib
# [NF2] REBOUND_CAPTURE thesis factor 조건 체크
px = prices_map.get(ticker, {})
rsi14 = float(px.get("rsi14") or px.get("RSI14") or 99)
price = float(px.get("current_price") or px.get("current_price_krw") or 0)
ma20 = float(px.get("ma20") or px.get("MA20") or 0)
flow_credit = float(px.get("flow_credit") or 0)
down_streak = int(px.get("down_streak") or 0)
rebound_hit = (
25 <= rsi14 <= 40
and ma20 > 0 and price <= ma20 * 1.03
and flow_credit >= 0.5
and down_streak >= 2
)
thesis_bd = list(r.get("thesis_breakdown") or [])
if rebound_hit:
thesis_bd = thesis_bd + [{
"factor": "REBOUND_CAPTURE_THESIS_NF2",
"hit": True,
"score": REBOUND_CAPTURE_WEIGHT,
"label": f"과매도반등(rsi={rsi14},flow={flow_credit},streak={down_streak})",
}]
thesis_total = sum(float(s.get("score", 0)) for s in thesis_bd if s.get("hit"))
dc = round(thesis_total - anti_total_capped, 2)
pac_vals.append(float(dc))
# synthesis_verdict 재계산 — 단순 GAS 값 복사 대신 dc 기반 결정론
# EXIT_SIGNAL은 antithesis≥60 AND thesis<20 AND dc<=-50 에서만 (Direction SFP1)
gas_verdict = str(r.get("synthesis_verdict") or "")
if anti_total_capped >= 60 and thesis_total < 20 and dc <= -50:
synthesis_verdict = "EXIT_SIGNAL"
elif dc >= 20:
synthesis_verdict = "BULLISH" if dc >= 40 else "ACCUMULATE"
elif dc >= 0:
synthesis_verdict = "PILOT"
elif dc >= -20:
synthesis_verdict = "NEUTRAL"
else:
synthesis_verdict = "HOLD" # 약한 약세 → EXIT 아닌 HOLD
out["rows"].append({
"ticker": ticker,
"name": r.get("name", ""),
"thesis_score": round(thesis_total, 2),
"antithesis_score": round(anti_total_capped, 2),
"antithesis_score_raw": round(anti_total_raw, 2),
"antithesis_breakdown_capped": anti_breakdown_capped,
"thesis_breakdown": thesis_bd,
"synthesis_verdict": synthesis_verdict,
"synthesis_verdict_gas_original": gas_verdict,
"rebound_capture_hit": rebound_hit,
"direction_confidence": dc,
"fx_beta": fx_beta,
"allow_execution": dc > -20,
})
# 단일 팩터 최대 기여율 감사 + [SFP1] SINGLE_FACTOR_DEGENERATE 감지
max_factor_share = 0.0
for r in out["rows"]:
for s in r.get("antithesis_breakdown_capped", []):
if s.get("hit") and s.get("factor_share_pct"):
max_factor_share = max(max_factor_share, float(s.get("factor_share_pct", 0)))
pac_stddev = round(_stats.stdev(pac_vals), 2) if len(pac_vals) > 1 else 0.0
# [SFP1] 전 종목 동일 verdict 감지
all_verdicts = [str(r.get("synthesis_verdict") or "") for r in out["rows"] if r.get("ticker") != "DATA_MISSING"]
unique_verdicts = set(all_verdicts)
is_degenerate = len(all_verdicts) > 0 and len(unique_verdicts) == 1
degenerate_verdict = list(unique_verdicts)[0] if is_degenerate else None
# [NF2] rebound_capture 히트 통계
rebound_hit_count = sum(1 for r in out["rows"] if r.get("rebound_capture_hit"))
rebound_hit_rate = round(rebound_hit_count / len(all_verdicts) * 100, 1) if all_verdicts else 0.0
out["factor_cap_audit"] = {
"single_factor_max_share_pct": round(max_factor_share, 1),
"single_factor_cap_ratio": SINGLE_FACTOR_CAP_RATIO * 100,
"cap_applied": True,
"pac_stddev": pac_stddev,
"pac_per_ticker_distinct": pac_stddev > 0,
"dod_single_factor_max_share_le_50": max_factor_share <= 50.0,
"dod_pac_stddev_ge_5": pac_stddev >= 5.0,
# [SFP1] 퇴화 감지
"is_degenerate": is_degenerate,
"degenerate_verdict": degenerate_verdict,
"unique_verdict_count": len(unique_verdicts),
# [NF2] REBOUND_CAPTURE 통계
"rebound_capture_hit_count": rebound_hit_count,
"rebound_capture_hit_rate_pct": rebound_hit_rate,
"thesis_factor_hit_rate": round(rebound_hit_rate / 100.0, 3),
"dod_thesis_factor_hit_rate_ge_015": rebound_hit_rate / 100.0 >= 0.15,
# [NF1] FX beta 적용 여부
"nf1_fx_beta_applied": True,
"nf1_export_tickers": list(EXPORT_TICKERS),
"nf1_domestic_tickers": list(DOMESTIC_TICKERS),
}
# [SFP1] DEGENERATE 상태에서 gate=WARN 강제
dod_ok = max_factor_share <= 50.0 and pac_stddev >= 5.0 and not is_degenerate
out["gate"] = "PASS" if dod_ok else "WARN"
if is_degenerate:
out["degenerate_warning"] = (
f"[SINGLE_FACTOR_DEGENERATE: 전 종목 {degenerate_verdict} — Direction SFP1 위반. 예측엔진 재보정 필요]"
)
if not rows_in:
# 데이터 없을 경우 placeholder
out["rows"].append({
"ticker": "DATA_MISSING",
"thesis_score": 0, "antithesis_score": 0,
"synthesis_verdict": "UNKNOWN", "direction_confidence": 0,
"allow_execution": False,
})
op.parent.mkdir(parents=True, exist_ok=True)
op.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(out, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())