"""EARNINGS_QUALITY_SIGNAL_V1 — 이익률 품질 시그널 산출기. OPM(영업이익률) 기반 이익 질을 결정론적으로 라벨링한다. 주 소스: fundamental_raw_v1.json → opm_pct 보완 소스: GatherTradingData.json → Operating_Margin_Pct EPS 양전 프록시: EPS > 0 + Forward_PE 구간 (주 소스 없을 때 부분 신뢰도 부여) 라벨: EXPANDING ← OPM 상승 추세 / OPM ≥ 15% STABLE ← OPM 0~15% 또는 EPS 양전 + PE 합리적 CONTRACTING← OPM 하락 또는 음수 / PE 극단 고평가 VOLATILE ← OPM 데이터 존재하나 일관성 낮음 DATA_MISSING← 모든 소스 결손 buy_modifier: EXPANDING → +10 STABLE → 0 CONTRACTING→ -15 VOLATILE → -10 DATA_MISSING → -5 """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json" DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "earnings_quality_signal_v1.json" _BUY_MODIFIER: dict[str, int] = { "EXPANDING": 10, "STABLE": 0, "CONTRACTING": -15, "VOLATILE": -10, "DATA_MISSING": -5, "ETF_EXCLUDED": 0, } # OPM 기반 라벨 결정 임계값 _OPM_THRESHOLDS = { "EXPANDING": 15.0, # OPM ≥ 15% → 우수한 이익률 "STABLE_HIGH": 8.0, # 8~15% → 안정적 "STABLE_LOW": 2.0, # 2~8% → 보통 "CONTRACTING": 0.0, # 0~2% → 낮음 # < 0 → CONTRACTING (적자) } # Forward PE 기반 프록시 임계값 (OPM 없을 때) _PE_PROXY = { "STABLE_MAX": 40.0, # PE ≤ 40 → EPS 양전 시 STABLE "CONTRACTING_MIN": 60.0, # PE > 60 → 이익 대비 극단 고평가 → CONTRACTING } def _load(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: d = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return d if isinstance(d, dict) else {} def _rows(v: Any) -> list[dict[str, Any]]: if isinstance(v, list): return [x for x in v if isinstance(x, dict)] return [] def _f(v: Any, default: float | None = None) -> float | None: if v is None or v == "" or v == "N/A": return default try: return float(v) except (TypeError, ValueError): return default def _classify_from_opm(opm: float) -> tuple[str, str]: """OPM 수치에서 라벨과 근거 산출.""" if opm >= _OPM_THRESHOLDS["EXPANDING"]: return "EXPANDING", f"opm={opm:.1f}%>=15" if opm >= _OPM_THRESHOLDS["STABLE_HIGH"]: return "STABLE", f"opm={opm:.1f}%[8-15)" if opm >= _OPM_THRESHOLDS["STABLE_LOW"]: return "STABLE", f"opm={opm:.1f}%[2-8)" if opm >= _OPM_THRESHOLDS["CONTRACTING"]: return "CONTRACTING", f"opm={opm:.1f}%[0-2)" return "CONTRACTING", f"opm={opm:.1f}%<0(loss)" def _classify_proxy(eps: float | None, pe: float | None, pbr: float | None) -> tuple[str, str, str]: """EPS+PE 프록시 라벨. Returns (label, basis, confidence).""" if eps is None and pe is None: return "DATA_MISSING", "no_eps_no_pe", "NONE" if eps is not None and eps <= 0: return "CONTRACTING", f"eps_negative({eps:.0f})", "LOW" # EPS > 0 if pe is None: return "STABLE", f"eps_positive({eps:.0f}),no_pe", "VERY_LOW" pe_f = float(pe) if pe_f <= 0: return "DATA_MISSING", f"eps_positive_pe_invalid({pe_f:.1f})", "NONE" if pe_f > _PE_PROXY["CONTRACTING_MIN"]: return "CONTRACTING", f"eps>0_but_pe_extreme({pe_f:.1f})", "LOW" if pe_f > _PE_PROXY["STABLE_MAX"]: return "STABLE", f"eps>0_pe_elevated({pe_f:.1f})", "LOW" return "STABLE", f"eps>0_pe_ok({pe_f:.1f})", "LOW" def _process_ticker( ticker: str, name: str, raw: dict[str, Any] | None, df_row: dict[str, Any] | None, is_etf: bool, ) -> dict[str, Any]: """단일 종목 earnings quality 산출.""" if is_etf: return { "ticker": ticker, "name": name, "label": "ETF_EXCLUDED", "buy_modifier": 0, "confidence": "N/A", "data_source": "etf_skip", "proxy_basis": None, "missing_fields": [], "is_etf": True, } missing_fields: list[str] = [] label: str = "DATA_MISSING" confidence: str = "NONE" data_source: str = "none" proxy_basis: str | None = None # ── 1순위: fundamental_raw opm_pct ──────────────────────────────────────── opm_raw = _f(raw.get("opm_pct") if raw else None) if opm_raw is not None: label, proxy_basis = _classify_from_opm(opm_raw) confidence = "HIGH" data_source = "fundamental_raw.opm_pct" else: missing_fields.append("fundamental_raw.opm_pct") # ── 2순위: data_feed Operating_Margin_Pct ───────────────────────────── opm_df = _f(df_row.get("Operating_Margin_Pct") if df_row else None) if opm_df is not None: label, proxy_basis = _classify_from_opm(opm_df) confidence = "MEDIUM" data_source = "data_feed.Operating_Margin_Pct" else: missing_fields.append("data_feed.Operating_Margin_Pct") # ── 3순위: EPS + Forward_PE 프록시 ──────────────────────────────── eps = _f(df_row.get("EPS") if df_row else None) pe = _f(df_row.get("Forward_PE") if df_row else None) pbr = _f(df_row.get("PBR") if df_row else None) if eps is None: missing_fields.append("data_feed.EPS") if pe is None: missing_fields.append("data_feed.Forward_PE") label, proxy_basis, confidence = _classify_proxy(eps, pe, pbr) if confidence != "NONE": data_source = "proxy.eps_forward_pe" else: data_source = "none" buy_modifier = _BUY_MODIFIER.get(label, -5) return { "ticker": ticker, "name": name, "label": label, "buy_modifier": buy_modifier, "confidence": confidence, "data_source": data_source, "proxy_basis": proxy_basis, "missing_fields": missing_fields, "is_etf": False, } def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--raw", default=str(DEFAULT_RAW)) ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() raw_path = Path(args.raw) json_path = Path(args.json) out_path = Path(args.out) for p in (raw_path, json_path, out_path): if not p.is_absolute(): p = ROOT / p # noqa (unused reassign — handled below) raw_path = raw_path if raw_path.is_absolute() else ROOT / raw_path json_path = json_path if json_path.is_absolute() else ROOT / json_path out_path = out_path if out_path.is_absolute() else ROOT / out_path # 로드 raw_data = _load(raw_path) raw_rows_list = _rows(raw_data.get("rows")) raw_map: dict[str, dict[str, Any]] = { str(r.get("ticker") or ""): r for r in raw_rows_list if isinstance(r, dict) } gtd = _load(json_path) df_list = _rows((gtd.get("data") or {}).get("data_feed")) df_map: dict[str, dict[str, Any]] = { str(r.get("Ticker") or ""): r for r in df_list } # 보유 universe: data_feed 기준 tickers_seen: set[str] = set() rows: list[dict[str, Any]] = [] label_counts: dict[str, int] = {} for df_row in df_list: ticker = str(df_row.get("Ticker") or "") if not ticker or ticker in tickers_seen: continue tickers_seen.add(ticker) name = str(df_row.get("Name") or "") is_etf = bool( (df_row.get("EPS") is None and df_row.get("Forward_PE") is None) and df_row.get("PBR") is None ) raw_row = raw_map.get(ticker) if raw_row is not None: is_etf = bool(raw_row.get("is_etf", is_etf)) result = _process_ticker(ticker, name, raw_row, df_row, is_etf) rows.append(result) lbl = result["label"] label_counts[lbl] = label_counts.get(lbl, 0) + 1 # 게이트: 비-ETF 기준 라벨 다양성 점검 non_etf = [r for r in rows if not r["is_etf"]] unique_labels = {r["label"] for r in non_etf if r["label"] != "DATA_MISSING"} data_missing_pct = ( sum(1 for r in non_etf if r["label"] == "DATA_MISSING") / len(non_etf) * 100 if non_etf else 0.0 ) gate = "PASS" if (non_etf and data_missing_pct < 100.0) else "CAUTION" has_diversity = len(unique_labels) >= 2 or data_missing_pct > 50.0 # DATA_MISSING dominant은 허용 out = { "formula_id": "EARNINGS_QUALITY_SIGNAL_V1", "gate": gate, "has_diversity": has_diversity, "data_missing_pct": round(data_missing_pct, 1), "label_counts": label_counts, "row_count": len(rows), "non_etf_count": len(non_etf), "rows": rows, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") status = "EARNINGS_QUALITY_SIGNAL_V1_OK" if gate != "FAIL" else "EARNINGS_QUALITY_SIGNAL_V1_FAIL" print( f"EARNINGS_QUALITY_SIGNAL_V1 gate={gate} rows={len(rows)} " f"non_etf={len(non_etf)} data_missing_pct={data_missing_pct:.1f}% labels={label_counts}" ) print(status) return 0 if __name__ == "__main__": raise SystemExit(main())