ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
278 lines
9.6 KiB
Python
278 lines
9.6 KiB
Python
"""EARNINGS_QUALITY_SIGNAL_V1 — 이익률 품질 시그널 산출기.
|
|
|
|
OPM(영업이익률) 기반 이익 질을 결정론적으로 라벨링한다.
|
|
|
|
주 소스: fundamental_raw_v1.json → opm_pct
|
|
보완 소스: GatherTradingData.json → Operating_Margin_Pct
|
|
EPS 양전 프록시: EPS > 0 + Forward_PE 구간 (주 소스 없을 때 부분 신뢰도 부여)
|
|
|
|
라벨:
|
|
EXPANDING ← OPM 상승 추세 / OPM ≥ 15%
|
|
STABLE ← OPM 0~15% 또는 EPS 양전 + PE 합리적
|
|
CONTRACTING← OPM 하락 또는 음수 / PE 극단 고평가
|
|
VOLATILE ← OPM 데이터 존재하나 일관성 낮음
|
|
DATA_MISSING← 모든 소스 결손
|
|
|
|
buy_modifier:
|
|
EXPANDING → +10
|
|
STABLE → 0
|
|
CONTRACTING→ -15
|
|
VOLATILE → -10
|
|
DATA_MISSING → -5
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json"
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "earnings_quality_signal_v1.json"
|
|
|
|
_BUY_MODIFIER: dict[str, int] = {
|
|
"EXPANDING": 10,
|
|
"STABLE": 0,
|
|
"CONTRACTING": -15,
|
|
"VOLATILE": -10,
|
|
"DATA_MISSING": -5,
|
|
"ETF_EXCLUDED": 0,
|
|
}
|
|
|
|
# OPM 기반 라벨 결정 임계값
|
|
_OPM_THRESHOLDS = {
|
|
"EXPANDING": 15.0, # OPM ≥ 15% → 우수한 이익률
|
|
"STABLE_HIGH": 8.0, # 8~15% → 안정적
|
|
"STABLE_LOW": 2.0, # 2~8% → 보통
|
|
"CONTRACTING": 0.0, # 0~2% → 낮음
|
|
# < 0 → CONTRACTING (적자)
|
|
}
|
|
|
|
# Forward PE 기반 프록시 임계값 (OPM 없을 때)
|
|
_PE_PROXY = {
|
|
"STABLE_MAX": 40.0, # PE ≤ 40 → EPS 양전 시 STABLE
|
|
"CONTRACTING_MIN": 60.0, # PE > 60 → 이익 대비 극단 고평가 → CONTRACTING
|
|
}
|
|
|
|
|
|
def _load(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
d = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
return d if isinstance(d, dict) else {}
|
|
|
|
|
|
def _rows(v: Any) -> list[dict[str, Any]]:
|
|
if isinstance(v, list):
|
|
return [x for x in v if isinstance(x, dict)]
|
|
return []
|
|
|
|
|
|
def _f(v: Any, default: float | None = None) -> float | None:
|
|
if v is None or v == "" or v == "N/A":
|
|
return default
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _classify_from_opm(opm: float) -> tuple[str, str]:
|
|
"""OPM 수치에서 라벨과 근거 산출."""
|
|
if opm >= _OPM_THRESHOLDS["EXPANDING"]:
|
|
return "EXPANDING", f"opm={opm:.1f}%>=15"
|
|
if opm >= _OPM_THRESHOLDS["STABLE_HIGH"]:
|
|
return "STABLE", f"opm={opm:.1f}%[8-15)"
|
|
if opm >= _OPM_THRESHOLDS["STABLE_LOW"]:
|
|
return "STABLE", f"opm={opm:.1f}%[2-8)"
|
|
if opm >= _OPM_THRESHOLDS["CONTRACTING"]:
|
|
return "CONTRACTING", f"opm={opm:.1f}%[0-2)"
|
|
return "CONTRACTING", f"opm={opm:.1f}%<0(loss)"
|
|
|
|
|
|
def _classify_proxy(eps: float | None, pe: float | None, pbr: float | None) -> tuple[str, str, str]:
|
|
"""EPS+PE 프록시 라벨. Returns (label, basis, confidence)."""
|
|
if eps is None and pe is None:
|
|
return "DATA_MISSING", "no_eps_no_pe", "NONE"
|
|
if eps is not None and eps <= 0:
|
|
return "CONTRACTING", f"eps_negative({eps:.0f})", "LOW"
|
|
# EPS > 0
|
|
if pe is None:
|
|
return "STABLE", f"eps_positive({eps:.0f}),no_pe", "VERY_LOW"
|
|
pe_f = float(pe)
|
|
if pe_f <= 0:
|
|
return "DATA_MISSING", f"eps_positive_pe_invalid({pe_f:.1f})", "NONE"
|
|
if pe_f > _PE_PROXY["CONTRACTING_MIN"]:
|
|
return "CONTRACTING", f"eps>0_but_pe_extreme({pe_f:.1f})", "LOW"
|
|
if pe_f > _PE_PROXY["STABLE_MAX"]:
|
|
return "STABLE", f"eps>0_pe_elevated({pe_f:.1f})", "LOW"
|
|
return "STABLE", f"eps>0_pe_ok({pe_f:.1f})", "LOW"
|
|
|
|
|
|
def _process_ticker(
|
|
ticker: str,
|
|
name: str,
|
|
raw: dict[str, Any] | None,
|
|
df_row: dict[str, Any] | None,
|
|
is_etf: bool,
|
|
) -> dict[str, Any]:
|
|
"""단일 종목 earnings quality 산출."""
|
|
if is_etf:
|
|
return {
|
|
"ticker": ticker,
|
|
"name": name,
|
|
"label": "ETF_EXCLUDED",
|
|
"buy_modifier": 0,
|
|
"confidence": "N/A",
|
|
"data_source": "etf_skip",
|
|
"proxy_basis": None,
|
|
"missing_fields": [],
|
|
"is_etf": True,
|
|
}
|
|
|
|
missing_fields: list[str] = []
|
|
label: str = "DATA_MISSING"
|
|
confidence: str = "NONE"
|
|
data_source: str = "none"
|
|
proxy_basis: str | None = None
|
|
|
|
# ── 1순위: fundamental_raw opm_pct ────────────────────────────────────────
|
|
opm_raw = _f(raw.get("opm_pct") if raw else None)
|
|
if opm_raw is not None:
|
|
label, proxy_basis = _classify_from_opm(opm_raw)
|
|
confidence = "HIGH"
|
|
data_source = "fundamental_raw.opm_pct"
|
|
else:
|
|
missing_fields.append("fundamental_raw.opm_pct")
|
|
|
|
# ── 2순위: data_feed Operating_Margin_Pct ─────────────────────────────
|
|
opm_df = _f(df_row.get("Operating_Margin_Pct") if df_row else None)
|
|
if opm_df is not None:
|
|
label, proxy_basis = _classify_from_opm(opm_df)
|
|
confidence = "MEDIUM"
|
|
data_source = "data_feed.Operating_Margin_Pct"
|
|
else:
|
|
missing_fields.append("data_feed.Operating_Margin_Pct")
|
|
|
|
# ── 3순위: EPS + Forward_PE 프록시 ────────────────────────────────
|
|
eps = _f(df_row.get("EPS") if df_row else None)
|
|
pe = _f(df_row.get("Forward_PE") if df_row else None)
|
|
pbr = _f(df_row.get("PBR") if df_row else None)
|
|
if eps is None:
|
|
missing_fields.append("data_feed.EPS")
|
|
if pe is None:
|
|
missing_fields.append("data_feed.Forward_PE")
|
|
|
|
label, proxy_basis, confidence = _classify_proxy(eps, pe, pbr)
|
|
if confidence != "NONE":
|
|
data_source = "proxy.eps_forward_pe"
|
|
else:
|
|
data_source = "none"
|
|
|
|
buy_modifier = _BUY_MODIFIER.get(label, -5)
|
|
return {
|
|
"ticker": ticker,
|
|
"name": name,
|
|
"label": label,
|
|
"buy_modifier": buy_modifier,
|
|
"confidence": confidence,
|
|
"data_source": data_source,
|
|
"proxy_basis": proxy_basis,
|
|
"missing_fields": missing_fields,
|
|
"is_etf": False,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--raw", default=str(DEFAULT_RAW))
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
args = ap.parse_args()
|
|
|
|
raw_path = Path(args.raw)
|
|
json_path = Path(args.json)
|
|
out_path = Path(args.out)
|
|
for p in (raw_path, json_path, out_path):
|
|
if not p.is_absolute():
|
|
p = ROOT / p # noqa (unused reassign — handled below)
|
|
|
|
raw_path = raw_path if raw_path.is_absolute() else ROOT / raw_path
|
|
json_path = json_path if json_path.is_absolute() else ROOT / json_path
|
|
out_path = out_path if out_path.is_absolute() else ROOT / out_path
|
|
|
|
# 로드
|
|
raw_data = _load(raw_path)
|
|
raw_rows_list = _rows(raw_data.get("rows"))
|
|
raw_map: dict[str, dict[str, Any]] = {
|
|
str(r.get("ticker") or ""): r for r in raw_rows_list if isinstance(r, dict)
|
|
}
|
|
|
|
gtd = _load(json_path)
|
|
df_list = _rows((gtd.get("data") or {}).get("data_feed"))
|
|
df_map: dict[str, dict[str, Any]] = {
|
|
str(r.get("Ticker") or ""): r for r in df_list
|
|
}
|
|
|
|
# 보유 universe: data_feed 기준
|
|
tickers_seen: set[str] = set()
|
|
rows: list[dict[str, Any]] = []
|
|
label_counts: dict[str, int] = {}
|
|
|
|
for df_row in df_list:
|
|
ticker = str(df_row.get("Ticker") or "")
|
|
if not ticker or ticker in tickers_seen:
|
|
continue
|
|
tickers_seen.add(ticker)
|
|
name = str(df_row.get("Name") or "")
|
|
is_etf = bool(
|
|
(df_row.get("EPS") is None and df_row.get("Forward_PE") is None)
|
|
and df_row.get("PBR") is None
|
|
)
|
|
raw_row = raw_map.get(ticker)
|
|
if raw_row is not None:
|
|
is_etf = bool(raw_row.get("is_etf", is_etf))
|
|
result = _process_ticker(ticker, name, raw_row, df_row, is_etf)
|
|
rows.append(result)
|
|
lbl = result["label"]
|
|
label_counts[lbl] = label_counts.get(lbl, 0) + 1
|
|
|
|
# 게이트: 비-ETF 기준 라벨 다양성 점검
|
|
non_etf = [r for r in rows if not r["is_etf"]]
|
|
unique_labels = {r["label"] for r in non_etf if r["label"] != "DATA_MISSING"}
|
|
data_missing_pct = (
|
|
sum(1 for r in non_etf if r["label"] == "DATA_MISSING") / len(non_etf) * 100
|
|
if non_etf else 0.0
|
|
)
|
|
gate = "PASS" if (non_etf and data_missing_pct < 100.0) else "CAUTION"
|
|
has_diversity = len(unique_labels) >= 2 or data_missing_pct > 50.0 # DATA_MISSING dominant은 허용
|
|
|
|
out = {
|
|
"formula_id": "EARNINGS_QUALITY_SIGNAL_V1",
|
|
"gate": gate,
|
|
"has_diversity": has_diversity,
|
|
"data_missing_pct": round(data_missing_pct, 1),
|
|
"label_counts": label_counts,
|
|
"row_count": len(rows),
|
|
"non_etf_count": len(non_etf),
|
|
"rows": rows,
|
|
}
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
status = "EARNINGS_QUALITY_SIGNAL_V1_OK" if gate != "FAIL" else "EARNINGS_QUALITY_SIGNAL_V1_FAIL"
|
|
print(
|
|
f"EARNINGS_QUALITY_SIGNAL_V1 gate={gate} rows={len(rows)} "
|
|
f"non_etf={len(non_etf)} data_missing_pct={data_missing_pct:.1f}% labels={label_counts}"
|
|
)
|
|
print(status)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|