"""FUNDAMENTAL_RAW_INGEST_V1 — 한국 상장사 펀더멘털 raw 수집기. data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary source로 사용하고, 네이버 금융 HTML 스크래핑으로 ROE / OPM / OCF 등 누락 지표를 보완한다. 수집 지표(per ticker): roe_pct — ROE (%) opm_pct — 영업이익률 (%) eps_krw — EPS (원) ocf_krw — 영업현금흐름 (원) fcf_krw — 잉여현금흐름 (원) net_debt_krw — 순부채 (원) per — PER (Forward PE) pbr — PBR revenue_krw — 매출액 (원) op_income_krw — 영업이익 (원) as_of_date — 기준일 (YYYYMMDD) source — "data_feed" | "data_feed+naver" | "naver" | "fallback" is_etf — ETF 여부 (True/False) 출력: Temp/fundamental_raw_v1.json 형식: {"formula_id":"FUNDAMENTAL_RAW_INGEST_V1","gate":"PASS|CAUTION|FAIL","rows":[...]} """ from __future__ import annotations import argparse import http.cookiejar import json import re import time import urllib.parse import urllib.request from datetime import date from pathlib import Path from typing import Any import yfinance as yf ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "fundamental_raw_v1.json" def _yahoo_fundamentals_yf(ticker: str) -> dict[str, float]: """yfinance 라이브러리를 사용하여 ROE/OPM/beta/revenue/OCF/FCF/NetDebt를 가져온다.""" result: dict[str, float] = {} if re.match(r"^\d{4}[A-Z]\d$", ticker): return result # 1. Ticker 객체 획득 (KOSPI/KOSDAQ 자동 Fallback) t = None if not ticker.isdigit(): t = yf.Ticker(ticker) else: for suffix in [".KS", ".KQ"]: temp_t = yf.Ticker(f"{ticker}{suffix}") try: info = temp_t.info if info and (info.get("longName") or info.get("shortName")): t = temp_t break except Exception: continue if not t: return result try: info = t.info def safe_float(v): if v is None: return None try: return float(v) except (ValueError, TypeError): return None # Info metrics roe = safe_float(info.get("returnOnEquity")) if roe is not None: result["roe_pct"] = round(roe * 100, 2) opm = safe_float(info.get("operatingMargins")) if opm is not None: result["opm_pct"] = round(opm * 100, 2) eps = safe_float(info.get("trailingEps")) or safe_float(info.get("forwardEps")) if eps is not None: result["eps_krw"] = eps pe = safe_float(info.get("forwardPE")) or safe_float(info.get("trailingPE")) if pe is not None: result["per"] = pe pbr = safe_float(info.get("priceToBook")) if pbr is not None: result["pbr"] = pbr rev = safe_float(info.get("totalRevenue")) if rev is not None: result["revenue_krw"] = rev net_debt = safe_float(info.get("netDebt")) if net_debt is None: tot_debt = safe_float(info.get("totalDebt")) tot_cash = safe_float(info.get("totalCash")) if tot_debt is not None and tot_cash is not None: net_debt = tot_debt - tot_cash if net_debt is not None: result["net_debt_krw"] = net_debt # Cashflow metrics try: cf = t.cashflow if cf is not None and not cf.empty: fcf_idx = [idx for idx in cf.index if "Free Cash Flow" in str(idx)] if fcf_idx: fcf_val = safe_float(cf.loc[fcf_idx[0]].iloc[0]) if fcf_val is not None: result["fcf_krw"] = fcf_val ocf_idx = [idx for idx in cf.index if "Operating Cash Flow" in str(idx)] if ocf_idx: ocf_val = safe_float(cf.loc[ocf_idx[0]].iloc[0]) if ocf_val is not None: result["ocf_krw"] = ocf_val except Exception: pass except Exception as e: print(f"Error fetching yfinance details for {ticker}: {e}") return result # ETF 식별자 패턴 (이름 포함) _ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"] # ETF 종목코드 특수 패턴 (0xxxV0 형태는 ETF) _ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$') def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _num(v: Any, default: float = 0.0) -> float: try: if v is None or v == "": return default return float(str(v).replace(",", "")) except (TypeError, ValueError): return default def _is_etf(ticker: str, name: str) -> bool: """ETF 여부 판별.""" if _ETF_TICKER_RE.match(ticker): return True name_upper = (name or "").upper() return any(p in name_upper for p in _ETF_NAME_PATTERNS) def _naver_summary(ticker: str) -> dict[str, float]: """네이버 금융 main.naver에서 PER/EPS/PBR/ROE/OPM을 가져온다.""" result: dict[str, float] = {} url = f"https://finance.naver.com/item/main.naver?code={ticker}" try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read() html = raw.decode("utf-8", errors="replace") except Exception: return result def _row_values(label: str) -> list[float]: pattern = re.compile( rf']*>\s*{re.escape(label)}(.*?)', re.DOTALL, ) m = pattern.search(html) if not m: return [] td_vals = [] for raw_num in re.findall(r']*>\s*(?: )?\s*([0-9,]+(?:\.[0-9]+)?)\s*', m.group(1), re.DOTALL): val = _num(raw_num) if val != 0.0: td_vals.append(val) return td_vals # 표 라벨은 cp949 디코딩된 값 기준으로 읽는다. # 가장 오른쪽 값(최근 값)을 우선 사용한다. row_label_map: dict[str, str] = { "매출액": "revenue_krw", "영업이익": "op_income_krw", "당기순이익": "net_income_krw", "영업이익률": "opm_pct", "순이익률": "net_margin_pct", "ROE(지배주주)": "roe_pct", "부채비율": "debt_ratio_pct", "당좌비율": "quick_ratio_pct", "유보율": "retention_ratio_pct", "EPS(원)": "eps_krw", "PER(배)": "per", "BPS(원)": "bps_krw", "PBR(배)": "pbr", } for label, key in row_label_map.items(): vals = _row_values(label) if vals and result.get(key) is None: result[key] = vals[-1] # 기존 summaryDetail 기반 PER/PBR/EPS가 있다면 우선 유지 for key in ("per", "pbr", "eps_krw", "roe_pct", "opm_pct", "revenue_krw", "op_income_krw"): val = result.get(key) if val is not None: try: result[key] = float(val) except Exception: pass return result def _collect_ticker( ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, current_year: int, use_yahoo: bool = True, ) -> dict[str, Any]: """per-ticker raw 수집.""" today = str(date.today().isoformat()).replace("-", "") row: dict[str, Any] = { "ticker": ticker, "name": name, "as_of_date": today, "source": "fallback", "roe_pct": None, "opm_pct": None, "eps_krw": None, "ocf_krw": None, "fcf_krw": None, "net_debt_krw": None, "per": None, "pbr": None, "revenue_krw": None, "op_income_krw": None, "data_quality": "MISSING", "is_etf": _is_etf(ticker, name), } # ETF는 펀더멘털 데이터 수집 생략 if row["is_etf"]: row["data_quality"] = "ETF_EXCLUDED" row["source"] = "etf_skip" return row # Step 1: data_feed에서 직접 가져오기 (가장 신뢰할 수 있음) df_per = _num(df_row.get("Forward_PE")) df_pbr = _num(df_row.get("PBR")) df_eps = _num(df_row.get("EPS")) df_roe = _num(df_row.get("ROE_Pct")) df_opm = _num(df_row.get("Operating_Margin_Pct")) if df_per > 0: row["per"] = df_per if df_pbr > 0: row["pbr"] = df_pbr if df_eps != 0: row["eps_krw"] = df_eps if df_roe > 0: row["roe_pct"] = df_roe if df_opm > 0: row["opm_pct"] = df_opm data_feed_ok = (row["per"] is not None or row["pbr"] is not None) if data_feed_ok: row["source"] = "data_feed" # Step 2: 네이버 fallback (ROE/OPM 누락 시) if use_naver and (row["roe_pct"] is None or row["opm_pct"] is None or row["per"] is None): try: naver = _naver_summary(ticker) time.sleep(0.3) for k, v in naver.items(): if row.get(k) is None: row[k] = v if naver: row["source"] = "data_feed+naver" if data_feed_ok else "naver" except Exception: pass # Step 3: 야후 Finance v10 폴백 (ROE/OPM/revenue 등 누락 시) # 네이버가 PE/PBR/EPS 우선, 야후가 ROE/OPM/OCF/FCF 보완 needs_yahoo = ( row["roe_pct"] is None or row["opm_pct"] is None or row["ocf_krw"] is None or row["revenue_krw"] is None ) if use_yahoo and needs_yahoo and not row["is_etf"]: try: yahoo = _yahoo_fundamentals_yf(ticker) if yahoo: for k, v in yahoo.items(): if row.get(k) is None and v is not None: row[k] = v src_prev = row["source"] row["source"] = (src_prev + "+yahoo") if src_prev != "fallback" else "yahoo" except Exception: pass # 데이터 품질 평가 (ETF 제외) filled = sum(1 for k in ("roe_pct", "opm_pct", "per", "pbr", "eps_krw") if row.get(k) not in (None, 0.0)) if filled >= 4: row["data_quality"] = "FULL" elif filled >= 2: row["data_quality"] = "PARTIAL" elif filled >= 1: row["data_quality"] = "SPARSE" else: row["data_quality"] = "MISSING" return row def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) ap.add_argument("--no-naver", action="store_true", help="네이버 스크래핑 비활성화") ap.add_argument("--no-yahoo", action="store_true", help="야후 Finance v10 폴백 비활성화") ap.add_argument("--tickers", default="", help="쉼표구분 종목코드 (빈 값이면 data_feed에서 자동 추출)") args = ap.parse_args() json_path = Path(args.json) out_path = Path(args.out) if not json_path.is_absolute(): json_path = ROOT / json_path if not out_path.is_absolute(): out_path = ROOT / out_path src = _load_json(json_path) data = src.get("data") if isinstance(src.get("data"), dict) else {} df_list = data.get("data_feed") if isinstance(data.get("data_feed"), list) else [] # data_feed를 ticker 기준 dict로 변환 df_map: dict[str, dict[str, Any]] = {} for r in df_list: if isinstance(r, dict): t = str(r.get("Ticker") or r.get("ticker") or "") if t: df_map[t] = r # 수집 대상 tickers if args.tickers.strip(): tickers_with_names = [(t.strip(), df_map.get(t.strip(), {}).get("Name", "")) for t in args.tickers.split(",") if t.strip()] else: tickers_with_names = [(t, df_map.get(t, {}).get("Name", "")) for t in sorted(df_map.keys())] use_naver = not args.no_naver use_yahoo = not args.no_yahoo current_year = date.today().year print(f"FUNDAMENTAL_RAW_INGEST_V1: collecting {len(tickers_with_names)} tickers, naver={'YES' if use_naver else 'NO'}") rows: list[dict[str, Any]] = [] for i, (ticker, name) in enumerate(tickers_with_names): print(f" [{i+1}/{len(tickers_with_names)}] {ticker} {name} ...", end=" ", flush=True) row = _collect_ticker(ticker, name, df_map.get(ticker, {}), use_naver, current_year, use_yahoo=use_yahoo) rows.append(row) print(f"{row['data_quality']} source={row['source']}") # 품질 집계 (ETF 제외) non_etf = [r for r in rows if r["data_quality"] != "ETF_EXCLUDED"] quality_counts: dict[str, int] = {} for r in rows: q = str(r.get("data_quality") or "MISSING") quality_counts[q] = quality_counts.get(q, 0) + 1 full_count = quality_counts.get("FULL", 0) partial_count = quality_counts.get("PARTIAL", 0) sparse_count = quality_counts.get("SPARSE", 0) missing_count = quality_counts.get("MISSING", 0) coverage_pct = round( (full_count + partial_count + sparse_count * 0.5) / len(non_etf) * 100.0, 2 ) if non_etf else 0.0 gate = "PASS" if coverage_pct >= 80.0 else ("CAUTION" if coverage_pct >= 30.0 else "FAIL") result = { "formula_id": "FUNDAMENTAL_RAW_INGEST_V1", "gate": gate, "as_of_date": str(date.today()), "ticker_count": len(rows), "non_etf_count": len(non_etf), "coverage_pct": coverage_pct, "quality_counts": quality_counts, "rows": rows, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print( f"FUNDAMENTAL_RAW_INGEST_V1 gate={gate} tickers={len(rows)} non_etf={len(non_etf)} " f"coverage={coverage_pct}% full={full_count} partial={partial_count} missing={missing_count}" ) print("FUNDAMENTAL_RAW_INGEST_V1_OK" if gate != "FAIL" else "FUNDAMENTAL_RAW_INGEST_V1_FAIL") return 0 if gate != "FAIL" else 1 if __name__ == "__main__": raise SystemExit(main())