"""FUNDAMENTAL_MULTIFACTOR_V3 — 종목별 펀더멘털 등급 산출기. fundamental_raw_v1.json(FUNDAMENTAL_RAW_INGEST_V1 출력)에서 per-ticker 지표를 결정론 공식으로 합산하여 종목별 등급을 산출한다. 점수 (총 100점): ROE 25점 (roe_pct) OPM 20점 (opm_pct) OCF/매출 15점 (ocf_krw / revenue_krw) FCF 15점 (fcf_krw) Debt 10점 (net_debt_krw) 밸류에이션 15점 (per/pbr) 누락 필드 정규화: 보유 필드 기준 100점 환산 후 품질 계수 적용 data_quality=FULL → multiplier 1.00 data_quality=PARTIAL → multiplier 0.90 data_quality=SPARSE → multiplier 0.80 data_quality=MISSING → multiplier 0.00 data_quality=ETF_EXCLUDED → ETF 등급 별도 부여 (점수 없음) 등급: A ≥ 80점 B ≥ 65점 C ≥ 50점 D ≥ 35점 F < 35점 ETF — ETF 종목 (펀더멘털 미적용) buy_allowed = grade ∈ {A, B} AND len(critical_fail_reasons) == 0. """ from __future__ import annotations import argparse import json from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_RAW = ROOT / "Temp" / "fundamental_raw_v1.json" DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "fundamental_multifactor_v3.json" _QUALITY_MULTIPLIER = { "FULL_ADVANCED": 1.00, "FULL": 1.00, "PARTIAL": 0.90, "SPARSE": 0.80, "MISSING": 0.00, "ETF_EXCLUDED": None, # ETF는 별도 처리 } _FIELD_MAX = { "roe": 25.0, "opm": 20.0, "ocf": 15.0, "fcf": 15.0, "debt": 10.0, "valuation": 15.0, } def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _num(v: Any, default: float = 0.0) -> float: try: if v is None or v == "" or v == "N/A": return default return float(v) except (TypeError, ValueError): return default def _clip(v: float, lo: float = 0.0, hi: float = 100.0) -> float: return max(lo, min(hi, v)) def _grade_from_score(score: float) -> str: if score >= 80: return "A" if score >= 65: return "B" if score >= 50: return "C" if score >= 35: return "D" return "F" def _score_component(raw: dict[str, Any]) -> tuple[dict[str, float], list[str], dict[str, bool]]: """각 컴포넌트 점수를 산출. Returns (scores, fail_reasons, has_data).""" scores: dict[str, float] = {} fail_reasons: list[str] = [] has_data: dict[str, bool] = {} # ── ROE (25점) ──────────────────────────────────────────────────────────── roe = _num(raw.get("roe_pct")) has_data["roe"] = raw.get("roe_pct") is not None and roe != 0.0 if not has_data["roe"]: scores["roe"] = 0.0 elif roe >= 20: scores["roe"] = 25.0 elif roe >= 15: scores["roe"] = 20.0 elif roe >= 10: scores["roe"] = 15.0 elif roe >= 5: scores["roe"] = 8.0 elif roe > 0: scores["roe"] = 3.0 else: scores["roe"] = 0.0 if has_data["roe"]: fail_reasons.append(f"ROE_NEGATIVE({roe:.1f}%)") # ── OPM (20점) ─────────────────────────────────────────────────────────── opm = _num(raw.get("opm_pct")) has_data["opm"] = raw.get("opm_pct") is not None and opm != 0.0 if not has_data["opm"]: scores["opm"] = 0.0 elif opm >= 20: scores["opm"] = 20.0 elif opm >= 15: scores["opm"] = 16.0 elif opm >= 10: scores["opm"] = 12.0 elif opm >= 5: scores["opm"] = 7.0 elif opm > 0: scores["opm"] = 3.0 else: scores["opm"] = 0.0 if has_data["opm"]: fail_reasons.append(f"OPM_NEGATIVE({opm:.1f}%)") # ── OCF/매출 (15점) ─────────────────────────────────────────────────────── ocf = _num(raw.get("ocf_krw")) revenue = _num(raw.get("revenue_krw")) has_data["ocf"] = raw.get("ocf_krw") is not None and ocf != 0.0 if not has_data["ocf"]: scores["ocf"] = 0.0 elif revenue > 0 and ocf > 0: ocf_margin = ocf / revenue * 100.0 if ocf_margin >= 20: scores["ocf"] = 15.0 elif ocf_margin >= 12: scores["ocf"] = 12.0 elif ocf_margin >= 7: scores["ocf"] = 8.0 elif ocf_margin >= 3: scores["ocf"] = 4.0 else: scores["ocf"] = 1.0 elif ocf > 0: scores["ocf"] = 7.0 # 매출 없어도 양전 else: scores["ocf"] = 0.0 if has_data["ocf"]: fail_reasons.append("OCF_NEGATIVE") # ── FCF (15점) ──────────────────────────────────────────────────────────── fcf = _num(raw.get("fcf_krw")) has_data["fcf"] = raw.get("fcf_krw") is not None and fcf != 0.0 if not has_data["fcf"]: scores["fcf"] = 0.0 elif fcf > 0: scores["fcf"] = 12.0 else: scores["fcf"] = 0.0 if has_data["fcf"]: fail_reasons.append("FCF_NEGATIVE") # ── 부채비율 (10점) ─────────────────────────────────────────────────────── net_debt = _num(raw.get("net_debt_krw")) has_data["debt"] = raw.get("net_debt_krw") is not None and net_debt != 0.0 if not has_data["debt"]: scores["debt"] = 5.0 # 알 수 없으면 중립 (5점) elif net_debt <= 0: scores["debt"] = 10.0 # 무부채 elif revenue > 0 and net_debt / revenue < 0.5: scores["debt"] = 8.0 elif revenue > 0 and net_debt / revenue < 1.5: scores["debt"] = 5.0 else: scores["debt"] = 2.0 fail_reasons.append("HIGH_NET_DEBT") # ── 밸류에이션 (15점) ───────────────────────────────────────────────────── per = _num(raw.get("per")) pbr = _num(raw.get("pbr")) has_data["valuation"] = (raw.get("per") is not None and per > 0) or (raw.get("pbr") is not None and pbr > 0) val_score = 0.0 if not has_data["valuation"]: scores["valuation"] = 0.0 else: if 0 < per <= 15: val_score += 8.0 elif 0 < per <= 25: val_score += 5.0 elif 0 < per <= 40: val_score += 2.0 elif per > 40: fail_reasons.append(f"HIGH_PER({per:.1f})") if 0 < pbr <= 1.5: val_score += 7.0 elif 0 < pbr <= 3.0: val_score += 4.0 elif 0 < pbr <= 6.0: val_score += 2.0 elif pbr > 6: pass # 0점 scores["valuation"] = _clip(val_score, 0, 15) return scores, fail_reasons, has_data def _normalize_score( scores: dict[str, float], has_data: dict[str, bool], data_quality: str, ) -> float: """보유 데이터 기준 100점 환산.""" multiplier = _QUALITY_MULTIPLIER.get(data_quality, 0.0) if multiplier is None or multiplier == 0.0: return 0.0 # 실제 점수 합산 raw_total = sum(scores.values()) # 보유 필드의 최대 가능 점수 계산 available_max = 0.0 for field, max_pts in _FIELD_MAX.items(): if field == "debt": # debt는 데이터 유무 관계없이 항상 5~10점 부여 available_max += max_pts elif has_data.get(field): available_max += max_pts if available_max <= 0: return 0.0 # 100점 환산 normalized = (raw_total / available_max) * 100.0 # 품질 계수 적용 final = normalized * multiplier return _clip(final, 0.0, 100.0) def _score_ticker(raw: dict[str, Any]) -> tuple[float, str, list[str], dict[str, float], bool]: """결정론 점수 산출. Returns (score, grade, fail_reasons, breakdown, buy_allowed).""" data_quality = str(raw.get("data_quality") or "MISSING") # ETF 별도 처리 if data_quality == "ETF_EXCLUDED" or raw.get("is_etf"): return 0.0, "ETF", [], {}, False scores, fail_reasons, has_data = _score_component(raw) score = _normalize_score(scores, has_data, data_quality) score = round(score, 2) grade = _grade_from_score(score) # 치명적 실패 사유 필터 critical_fails = [r for r in fail_reasons if any( kw in r for kw in ("NEGATIVE", "HIGH_NET_DEBT") )] buy_allowed = grade in ("A", "B") and len(critical_fails) == 0 breakdown = {k: round(v, 2) for k, v in scores.items()} return score, grade, fail_reasons, breakdown, buy_allowed def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--raw", default=str(DEFAULT_RAW)) ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() raw_path = Path(args.raw) out_path = Path(args.out) if not raw_path.is_absolute(): raw_path = ROOT / raw_path if not out_path.is_absolute(): out_path = ROOT / out_path raw_data = _load_json(raw_path) raw_rows = raw_data.get("rows") if isinstance(raw_data.get("rows"), list) else [] # data_feed에서 이름 맵 구성 (보완) json_path = Path(args.json) if not json_path.is_absolute(): json_path = ROOT / json_path gtd = _load_json(json_path) df_list = (gtd.get("data") or {}).get("data_feed") or [] name_map = {str(r.get("Ticker") or ""): str(r.get("Name") or "") for r in df_list if isinstance(r, dict)} grade_counts: dict[str, int] = {} rows: list[dict[str, Any]] = [] for raw in raw_rows: if not isinstance(raw, dict): continue ticker = str(raw.get("ticker") or "") if not ticker: continue score, grade, fail_reasons, breakdown, buy_allowed = _score_ticker(raw) name = raw.get("name") or name_map.get(ticker, "") rows.append({ "ticker": ticker, "name": name, "score": score, "grade": grade, "buy_allowed": buy_allowed, "fail_reasons": fail_reasons, "breakdown": breakdown, "data_quality": raw.get("data_quality", "MISSING"), "is_etf": bool(raw.get("is_etf")), "as_of_date": raw.get("as_of_date"), "source": raw.get("source", "fallback"), }) grade_counts[grade] = grade_counts.get(grade, 0) + 1 # Gate: 비-ETF 종목 기준 non_etf_rows = [r for r in rows if r["grade"] != "ETF"] data_missing_count = sum(1 for r in non_etf_rows if r["data_quality"] == "MISSING") unique_non_etf_grades = {r["grade"] for r in non_etf_rows} grade_diverse = len(unique_non_etf_grades) >= 2 gate = "PASS" if (non_etf_rows and data_missing_count == 0 and grade_diverse) else ( "CAUTION" if non_etf_rows else "FAIL" ) result = { "formula_id": "FUNDAMENTAL_MULTIFACTOR_V3", "gate": gate, "row_count": len(rows), "non_etf_count": len(non_etf_rows), "data_missing_count": data_missing_count, "grade_counts": grade_counts, "grade_diverse": grade_diverse, "rows": rows, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print( f"FUNDAMENTAL_MULTIFACTOR_V3 gate={gate} rows={len(rows)} non_etf={len(non_etf_rows)} " f"missing={data_missing_count} grades={grade_counts}" ) print("FUNDAMENTAL_MULTIFACTOR_V3_OK" if gate != "FAIL" else "FUNDAMENTAL_MULTIFACTOR_V3_FAIL") return 0 if gate != "FAIL" else 1 if __name__ == "__main__": raise SystemExit(main())