from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "predictive_alpha_engine_v2.json" DEFAULT_CAPITAL = ROOT / "Temp" / "capital_style_allocation_v1.json" DEFAULT_HORIZON = ROOT / "Temp" / "horizon_classification_v1.json" def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: obj = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return obj if isinstance(obj, dict) else {} def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--capital", default=str(DEFAULT_CAPITAL)) ap.add_argument("--horizon", default=str(DEFAULT_HORIZON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() jp = Path(args.json) op = Path(args.out) if not jp.is_absolute(): jp = ROOT / jp if not op.is_absolute(): op = ROOT / op data = _load(jp) capital = _load(Path(args.capital) if Path(args.capital).is_absolute() else ROOT / args.capital) horizon = _load(Path(args.horizon) if Path(args.horizon).is_absolute() else ROOT / args.horizon) out = { "formula_id": "PREDICTIVE_ALPHA_DIALECTIC_ENGINE_V2", "gate": "PASS", "numeric_generation_allowed": 0, "rows": [] } # P1-1: PA1 팩터 캡 적용 — 단일 거시팩터 기여 ≤ antithesis_total의 50% # usd_krw_weak 등 글로벌 팩터가 종목별 antithesis를 지배하지 못하도록 캡 강제 MACRO_FACTORS = {"usd_krw_weak", "vkospi_high", "global_risk_off", "fomc_hawkish"} SINGLE_FACTOR_CAP_RATIO = 0.50 # antithesis_total 대비 50% 상한 # [NF1] 종목별 FX 민감도 베타 — 수출주(삼성전자·SK하이닉스)는 usd_krw_weak 영향 ↑ # 내수주는 FX 영향 상대적으로 낮으므로 usd_krw_weak 기여를 줄임 EXPORT_TICKERS = {"005930", "000660", "000660", "034020"} # 삼성전자, SK하이닉스, 두산에너빌리티 DOMESTIC_TICKERS = {"010120", "064350", "028050", "012450"} # 내수/IT서비스 위주 FX_BETA_EXPORT = 1.2 # 수출주: FX 민감도 20% 가중 FX_BETA_DOMESTIC = 0.7 # 내수주: FX 민감도 30% 축소 FX_BETA_DEFAULT = 1.0 hctx = data.get("data", {}).get("_harness_context", {}) if isinstance(data, dict) else {} pa_json_raw = hctx.get("predictive_alpha_json", []) if isinstance(pa_json_raw, str): try: pa_json_raw = json.loads(pa_json_raw) except Exception: pa_json_raw = [] rows_in = pa_json_raw if isinstance(pa_json_raw, list) else pa_json_raw.get("rows", []) if not rows_in: cap_rows = capital.get("rows") if isinstance(capital.get("rows"), list) else [] hz_rows = {str(r.get("ticker") or ""): r for r in (horizon.get("rows") or []) if isinstance(r, dict)} rows_in = [] for r in cap_rows: if not isinstance(r, dict): continue best = max( [s for s in (r.get("styles") or []) if isinstance(s, dict)], key=lambda s: float(s.get("conviction_score") or 0.0), default={}, ) best_style = str(best.get("style") or "UNKNOWN") conviction = float(best.get("conviction_score") or 0.0) expected_horizon = {"SCALP": "SHORT", "SWING": "SHORT", "MOMENTUM": "MID", "POSITION": "LONG"}.get(best_style, "UNKNOWN") actual_horizon = str(hz_rows.get(str(r.get("ticker") or ""), {}).get("horizon") or "UNKNOWN") rows_in.append({ "ticker": r.get("ticker"), "name": r.get("name"), "thesis_breakdown": [ {"factor": f"{best_style}_CONVICTION_PROXY", "hit": True, "score": round(conviction * 0.6, 2)}, {"factor": "DATA_QUALITY_PROXY", "hit": True, "score": 10.0 if actual_horizon != "UNKNOWN" else 0.0}, ], "antithesis_breakdown": [ {"factor": "STYLE_MISMATCH_RISK", "hit": expected_horizon != actual_horizon and actual_horizon not in ("UNKNOWN", "ETF"), "score": 25.0 if expected_horizon != actual_horizon else 0.0}, {"factor": "MACRO_RISK_PROXY", "hit": True, "score": max(0.0, 30.0 - conviction * 0.2)}, ], "synthesis_verdict": "WATCH", }) # [NF2] REBOUND_CAPTURE thesis factor — prices_json에서 조건 확인 prices_raw = hctx.get("prices_json", []) if isinstance(prices_raw, str): try: prices_raw = json.loads(prices_raw) except Exception: prices_raw = [] prices_map = {str(p.get("ticker") or ""): p for p in (prices_raw if isinstance(prices_raw, list) else [])} REBOUND_CAPTURE_WEIGHT = 15.0 # thesis bonus 점수 import statistics as _stats pac_vals = [] for r in rows_in: ticker = str(r.get("ticker") or "") # [NF1] FX beta 결정 fx_beta = ( FX_BETA_EXPORT if ticker in EXPORT_TICKERS else FX_BETA_DOMESTIC if ticker in DOMESTIC_TICKERS else FX_BETA_DEFAULT ) anti_breakdown = r.get("antithesis_breakdown") or [] anti_total_raw = sum(float(s.get("score", 0)) for s in anti_breakdown if s.get("hit")) # 캡 적용: 단일 팩터 기여 ≤ anti_total_raw × 50% # + [NF1] macro 팩터에 fx_beta 가중치 반영 anti_breakdown_capped = [] anti_total_capped = 0.0 for s in anti_breakdown: if not s.get("hit"): anti_breakdown_capped.append(dict(s, capped=False, fx_beta_applied=False)) continue raw_contrib = float(s.get("score", 0)) factor_id = str(s.get("factor") or s.get("id") or "") # [NF1] usd_krw_weak에 FX beta 적용 fx_adjusted = raw_contrib fx_applied = False if "usd_krw_weak" in factor_id or factor_id in MACRO_FACTORS: fx_adjusted = round(raw_contrib * fx_beta, 2) fx_applied = True max_allowed = anti_total_raw * SINGLE_FACTOR_CAP_RATIO if anti_total_raw > 0 else fx_adjusted capped_contrib = min(fx_adjusted, max_allowed) capped = capped_contrib < fx_adjusted anti_breakdown_capped.append(dict( s, score=round(capped_contrib, 2), score_raw=raw_contrib, capped=capped, fx_beta_applied=fx_applied, fx_beta=fx_beta if fx_applied else None, factor_share_pct=round(raw_contrib / anti_total_raw * 100, 1) if anti_total_raw > 0 else 0, )) anti_total_capped += capped_contrib # [NF2] REBOUND_CAPTURE thesis factor 조건 체크 px = prices_map.get(ticker, {}) rsi14 = float(px.get("rsi14") or px.get("RSI14") or 99) price = float(px.get("current_price") or px.get("current_price_krw") or 0) ma20 = float(px.get("ma20") or px.get("MA20") or 0) flow_credit = float(px.get("flow_credit") or 0) down_streak = int(px.get("down_streak") or 0) rebound_hit = ( 25 <= rsi14 <= 40 and ma20 > 0 and price <= ma20 * 1.03 and flow_credit >= 0.5 and down_streak >= 2 ) thesis_bd = list(r.get("thesis_breakdown") or []) if rebound_hit: thesis_bd = thesis_bd + [{ "factor": "REBOUND_CAPTURE_THESIS_NF2", "hit": True, "score": REBOUND_CAPTURE_WEIGHT, "label": f"과매도반등(rsi={rsi14},flow={flow_credit},streak={down_streak})", }] thesis_total = sum(float(s.get("score", 0)) for s in thesis_bd if s.get("hit")) dc = round(thesis_total - anti_total_capped, 2) pac_vals.append(float(dc)) # synthesis_verdict 재계산 — 단순 GAS 값 복사 대신 dc 기반 결정론 # EXIT_SIGNAL은 antithesis≥60 AND thesis<20 AND dc<=-50 에서만 (Direction SFP1) gas_verdict = str(r.get("synthesis_verdict") or "") if anti_total_capped >= 60 and thesis_total < 20 and dc <= -50: synthesis_verdict = "EXIT_SIGNAL" elif dc >= 20: synthesis_verdict = "BULLISH" if dc >= 40 else "ACCUMULATE" elif dc >= 0: synthesis_verdict = "PILOT" elif dc >= -20: synthesis_verdict = "NEUTRAL" else: synthesis_verdict = "HOLD" # 약한 약세 → EXIT 아닌 HOLD out["rows"].append({ "ticker": ticker, "name": r.get("name", ""), "thesis_score": round(thesis_total, 2), "antithesis_score": round(anti_total_capped, 2), "antithesis_score_raw": round(anti_total_raw, 2), "antithesis_breakdown_capped": anti_breakdown_capped, "thesis_breakdown": thesis_bd, "synthesis_verdict": synthesis_verdict, "synthesis_verdict_gas_original": gas_verdict, "rebound_capture_hit": rebound_hit, "direction_confidence": dc, "fx_beta": fx_beta, "allow_execution": dc > -20, }) # 단일 팩터 최대 기여율 감사 + [SFP1] SINGLE_FACTOR_DEGENERATE 감지 max_factor_share = 0.0 for r in out["rows"]: for s in r.get("antithesis_breakdown_capped", []): if s.get("hit") and s.get("factor_share_pct"): max_factor_share = max(max_factor_share, float(s.get("factor_share_pct", 0))) pac_stddev = round(_stats.stdev(pac_vals), 2) if len(pac_vals) > 1 else 0.0 # [SFP1] 전 종목 동일 verdict 감지 all_verdicts = [str(r.get("synthesis_verdict") or "") for r in out["rows"] if r.get("ticker") != "DATA_MISSING"] unique_verdicts = set(all_verdicts) is_degenerate = len(all_verdicts) > 0 and len(unique_verdicts) == 1 degenerate_verdict = list(unique_verdicts)[0] if is_degenerate else None # [NF2] rebound_capture 히트 통계 rebound_hit_count = sum(1 for r in out["rows"] if r.get("rebound_capture_hit")) rebound_hit_rate = round(rebound_hit_count / len(all_verdicts) * 100, 1) if all_verdicts else 0.0 out["factor_cap_audit"] = { "single_factor_max_share_pct": round(max_factor_share, 1), "single_factor_cap_ratio": SINGLE_FACTOR_CAP_RATIO * 100, "cap_applied": True, "pac_stddev": pac_stddev, "pac_per_ticker_distinct": pac_stddev > 0, "dod_single_factor_max_share_le_50": max_factor_share <= 50.0, "dod_pac_stddev_ge_5": pac_stddev >= 5.0, # [SFP1] 퇴화 감지 "is_degenerate": is_degenerate, "degenerate_verdict": degenerate_verdict, "unique_verdict_count": len(unique_verdicts), # [NF2] REBOUND_CAPTURE 통계 "rebound_capture_hit_count": rebound_hit_count, "rebound_capture_hit_rate_pct": rebound_hit_rate, "thesis_factor_hit_rate": round(rebound_hit_rate / 100.0, 3), "dod_thesis_factor_hit_rate_ge_015": rebound_hit_rate / 100.0 >= 0.15, # [NF1] FX beta 적용 여부 "nf1_fx_beta_applied": True, "nf1_export_tickers": list(EXPORT_TICKERS), "nf1_domestic_tickers": list(DOMESTIC_TICKERS), } # [SFP1] DEGENERATE 상태에서 gate=WARN 강제 dod_ok = max_factor_share <= 50.0 and pac_stddev >= 5.0 and not is_degenerate out["gate"] = "PASS" if dod_ok else "WARN" if is_degenerate: out["degenerate_warning"] = ( f"[SINGLE_FACTOR_DEGENERATE: 전 종목 {degenerate_verdict} — Direction SFP1 위반. 예측엔진 재보정 필요]" ) if not rows_in: # 데이터 없을 경우 placeholder out["rows"].append({ "ticker": "DATA_MISSING", "thesis_score": 0, "antithesis_score": 0, "synthesis_verdict": "UNKNOWN", "direction_confidence": 0, "allow_execution": False, }) op.parent.mkdir(parents=True, exist_ok=True) op.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(out, ensure_ascii=False, indent=2)) return 0 if __name__ == "__main__": raise SystemExit(main())