"""FUNDAMENTAL_RAW_INGEST_V1 — 한국 상장사 펀더멘털 raw 수집기. data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary source로 사용하고, 네이버 금융 HTML 스크래핑으로 ROE / OPM / OCF 등 누락 지표를 보완한다. 수집 지표(per ticker): roe_pct — ROE (%) opm_pct — 영업이익률 (%) eps_krw — EPS (원) ocf_krw — 영업현금흐름 (원) fcf_krw — 잉여현금흐름 (원) net_debt_krw — 순부채 (원) per — PER (Forward PE) pbr — PBR revenue_krw — 매출액 (원) op_income_krw — 영업이익 (원) as_of_date — 기준일 (YYYYMMDD) source — "data_feed" | "data_feed+naver" | "naver" | "fallback" is_etf — ETF 여부 (True/False) 출력: Temp/fundamental_raw_v1.json 형식: {"formula_id":"FUNDAMENTAL_RAW_INGEST_V1","gate":"PASS|CAUTION|FAIL","rows":[...]} """ from __future__ import annotations import argparse import http.cookiejar import json import re import time import urllib.parse import urllib.request from datetime import date from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = ROOT / "Temp" / "fundamental_raw_v1.json" # ── Yahoo Finance crumb 세션 (모듈 수준 공유) ──────────────────────────────── _yahoo_cj = http.cookiejar.CookieJar() _yahoo_op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(_yahoo_cj)) _yahoo_op.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")] _yahoo_crumb: str | None = None def _yahoo_get_crumb() -> str | None: """야후 Finance crumb 획득. 실패 시 None.""" global _yahoo_crumb if _yahoo_crumb: return _yahoo_crumb try: _yahoo_op.open("https://fc.yahoo.com", timeout=8) _yahoo_op.open("https://finance.yahoo.com/quote/005930.KS", timeout=8) with _yahoo_op.open("https://query1.finance.yahoo.com/v1/test/getcrumb", timeout=8) as r: _yahoo_crumb = r.read().decode("utf-8", errors="replace").strip() return _yahoo_crumb except Exception: return None def _yahoo_fundamentals(ticker: str) -> dict[str, float]: """야후 v10 quoteSummary에서 ROE/OPM/beta/revenue를 가져온다. PE/PBR/EPS는 한국주식에서 야후가 미제공 → Naver/data_feed가 우선. """ crumb = _yahoo_get_crumb() if not crumb: return {} sym = f"{ticker}.KS" if not ticker.startswith("0") or len(ticker) != 6 else f"{ticker}.KS" # ETF-style ticker skip (0xxxX0 pattern) if re.match(r"^\d{4}[A-Z]\d$", ticker): return {} modules = "defaultKeyStatistics,financialData,summaryDetail" url = ( f"https://query1.finance.yahoo.com/v10/finance/quoteSummary/" f"{urllib.parse.quote(sym)}?modules={modules}&crumb={urllib.parse.quote(crumb)}" ) try: with _yahoo_op.open(url, timeout=10) as r: if r.status != 200: return {} d = json.loads(r.read().decode("utf-8", errors="replace")) res = (d.get("quoteSummary") or {}).get("result") or [{}] obj = res[0] if res else {} fd = obj.get("financialData") or {} ks = obj.get("defaultKeyStatistics") or {} sd = obj.get("summaryDetail") or {} def rv(o: dict, k: str) -> float | None: v = o.get(k) raw = v.get("raw") if isinstance(v, dict) else v return float(raw) if raw is not None else None result: dict[str, float] = {} if rv(fd, "returnOnEquity") is not None: result["roe_pct"] = round(rv(fd, "returnOnEquity") * 100, 2) if rv(fd, "operatingMargins") is not None: result["opm_pct"] = round(rv(fd, "operatingMargins") * 100, 2) if rv(ks, "trailingEps") is not None: result["eps_krw"] = rv(ks, "trailingEps") if rv(sd, "trailingPE") is not None: result["per"] = rv(sd, "trailingPE") if rv(ks, "priceToBook") is not None: result["pbr"] = rv(ks, "priceToBook") if rv(fd, "totalRevenue") is not None: result["revenue_krw"] = rv(fd, "totalRevenue") if rv(fd, "operatingCashflow") is not None: result["ocf_krw"] = rv(fd, "operatingCashflow") if rv(fd, "freeCashflow") is not None: result["fcf_krw"] = rv(fd, "freeCashflow") return result except Exception: return {} # ETF 식별자 패턴 (이름 포함) _ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"] # ETF 종목코드 특수 패턴 (0xxxV0 형태는 ETF) _ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$') def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _num(v: Any, default: float = 0.0) -> float: try: if v is None or v == "": return default return float(str(v).replace(",", "")) except (TypeError, ValueError): return default def _is_etf(ticker: str, name: str) -> bool: """ETF 여부 판별.""" if _ETF_TICKER_RE.match(ticker): return True name_upper = (name or "").upper() return any(p in name_upper for p in _ETF_NAME_PATTERNS) def _naver_summary(ticker: str) -> dict[str, float]: """네이버 금융 main.naver에서 PER/EPS/PBR/ROE/OPM을 가져온다.""" result: dict[str, float] = {} url = f"https://finance.naver.com/item/main.naver?code={ticker}" try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read() html = raw.decode("utf-8", errors="replace") except Exception: return result def _row_values(label: str) -> list[float]: pattern = re.compile( rf']*>\s*{re.escape(label)}(.*?)', re.DOTALL, ) m = pattern.search(html) if not m: return [] td_vals = [] for raw_num in re.findall(r']*>\s*(?: )?\s*([0-9,]+(?:\.[0-9]+)?)\s*', m.group(1), re.DOTALL): val = _num(raw_num) if val != 0.0: td_vals.append(val) return td_vals # 표 라벨은 cp949 디코딩된 값 기준으로 읽는다. # 가장 오른쪽 값(최근 값)을 우선 사용한다. row_label_map: dict[str, str] = { "매출액": "revenue_krw", "영업이익": "op_income_krw", "당기순이익": "net_income_krw", "영업이익률": "opm_pct", "순이익률": "net_margin_pct", "ROE(지배주주)": "roe_pct", "부채비율": "debt_ratio_pct", "당좌비율": "quick_ratio_pct", "유보율": "retention_ratio_pct", "EPS(원)": "eps_krw", "PER(배)": "per", "BPS(원)": "bps_krw", "PBR(배)": "pbr", } for label, key in row_label_map.items(): vals = _row_values(label) if vals and result.get(key) is None: result[key] = vals[-1] # 기존 summaryDetail 기반 PER/PBR/EPS가 있다면 우선 유지 for key in ("per", "pbr", "eps_krw", "roe_pct", "opm_pct", "revenue_krw", "op_income_krw"): val = result.get(key) if val is not None: try: result[key] = float(val) except Exception: pass return result def _collect_ticker( ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, current_year: int, use_yahoo: bool = True, ) -> dict[str, Any]: """per-ticker raw 수집.""" today = str(date.today().isoformat()).replace("-", "") row: dict[str, Any] = { "ticker": ticker, "name": name, "as_of_date": today, "source": "fallback", "roe_pct": None, "opm_pct": None, "eps_krw": None, "ocf_krw": None, "fcf_krw": None, "net_debt_krw": None, "per": None, "pbr": None, "revenue_krw": None, "op_income_krw": None, "data_quality": "MISSING", "is_etf": _is_etf(ticker, name), } # ETF는 펀더멘털 데이터 수집 생략 if row["is_etf"]: row["data_quality"] = "ETF_EXCLUDED" row["source"] = "etf_skip" return row # Step 1: data_feed에서 직접 가져오기 (가장 신뢰할 수 있음) df_per = _num(df_row.get("Forward_PE")) df_pbr = _num(df_row.get("PBR")) df_eps = _num(df_row.get("EPS")) df_roe = _num(df_row.get("ROE_Pct")) df_opm = _num(df_row.get("Operating_Margin_Pct")) if df_per > 0: row["per"] = df_per if df_pbr > 0: row["pbr"] = df_pbr if df_eps != 0: row["eps_krw"] = df_eps if df_roe > 0: row["roe_pct"] = df_roe if df_opm > 0: row["opm_pct"] = df_opm data_feed_ok = (row["per"] is not None or row["pbr"] is not None) if data_feed_ok: row["source"] = "data_feed" # Step 2: 네이버 fallback (ROE/OPM 누락 시) if use_naver and (row["roe_pct"] is None or row["opm_pct"] is None or row["per"] is None): try: naver = _naver_summary(ticker) time.sleep(0.3) for k, v in naver.items(): if row.get(k) is None: row[k] = v if naver: row["source"] = "data_feed+naver" if data_feed_ok else "naver" except Exception: pass # Step 3: 야후 Finance v10 폴백 (ROE/OPM/revenue 등 누락 시) # 네이버가 PE/PBR/EPS 우선, 야후가 ROE/OPM/OCF/FCF 보완 needs_yahoo = ( row["roe_pct"] is None or row["opm_pct"] is None or row["ocf_krw"] is None or row["revenue_krw"] is None ) if use_yahoo and needs_yahoo and not row["is_etf"]: try: yahoo = _yahoo_fundamentals(ticker) if yahoo: for k, v in yahoo.items(): if row.get(k) is None and v is not None: row[k] = v src_prev = row["source"] row["source"] = (src_prev + "+yahoo") if src_prev != "fallback" else "yahoo" except Exception: pass # 데이터 품질 평가 (ETF 제외) filled = sum(1 for k in ("roe_pct", "opm_pct", "per", "pbr", "eps_krw") if row.get(k) not in (None, 0.0)) if filled >= 4: row["data_quality"] = "FULL" elif filled >= 2: row["data_quality"] = "PARTIAL" elif filled >= 1: row["data_quality"] = "SPARSE" else: row["data_quality"] = "MISSING" return row def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) ap.add_argument("--no-naver", action="store_true", help="네이버 스크래핑 비활성화") ap.add_argument("--no-yahoo", action="store_true", help="야후 Finance v10 폴백 비활성화") ap.add_argument("--tickers", default="", help="쉼표구분 종목코드 (빈 값이면 data_feed에서 자동 추출)") args = ap.parse_args() json_path = Path(args.json) out_path = Path(args.out) if not json_path.is_absolute(): json_path = ROOT / json_path if not out_path.is_absolute(): out_path = ROOT / out_path src = _load_json(json_path) data = src.get("data") if isinstance(src.get("data"), dict) else {} df_list = data.get("data_feed") if isinstance(data.get("data_feed"), list) else [] # data_feed를 ticker 기준 dict로 변환 df_map: dict[str, dict[str, Any]] = {} for r in df_list: if isinstance(r, dict): t = str(r.get("Ticker") or r.get("ticker") or "") if t: df_map[t] = r # 수집 대상 tickers if args.tickers.strip(): tickers_with_names = [(t.strip(), df_map.get(t.strip(), {}).get("Name", "")) for t in args.tickers.split(",") if t.strip()] else: tickers_with_names = [(t, df_map.get(t, {}).get("Name", "")) for t in sorted(df_map.keys())] use_naver = not args.no_naver use_yahoo = not args.no_yahoo current_year = date.today().year print(f"FUNDAMENTAL_RAW_INGEST_V1: collecting {len(tickers_with_names)} tickers, naver={'YES' if use_naver else 'NO'}") rows: list[dict[str, Any]] = [] for i, (ticker, name) in enumerate(tickers_with_names): print(f" [{i+1}/{len(tickers_with_names)}] {ticker} {name} ...", end=" ", flush=True) row = _collect_ticker(ticker, name, df_map.get(ticker, {}), use_naver, current_year, use_yahoo=use_yahoo) rows.append(row) print(f"{row['data_quality']} source={row['source']}") # 품질 집계 (ETF 제외) non_etf = [r for r in rows if r["data_quality"] != "ETF_EXCLUDED"] quality_counts: dict[str, int] = {} for r in rows: q = str(r.get("data_quality") or "MISSING") quality_counts[q] = quality_counts.get(q, 0) + 1 full_count = quality_counts.get("FULL", 0) partial_count = quality_counts.get("PARTIAL", 0) sparse_count = quality_counts.get("SPARSE", 0) missing_count = quality_counts.get("MISSING", 0) coverage_pct = round( (full_count + partial_count + sparse_count * 0.5) / len(non_etf) * 100.0, 2 ) if non_etf else 0.0 gate = "PASS" if coverage_pct >= 80.0 else ("CAUTION" if coverage_pct >= 30.0 else "FAIL") result = { "formula_id": "FUNDAMENTAL_RAW_INGEST_V1", "gate": gate, "as_of_date": str(date.today()), "ticker_count": len(rows), "non_etf_count": len(non_etf), "coverage_pct": coverage_pct, "quality_counts": quality_counts, "rows": rows, } out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print( f"FUNDAMENTAL_RAW_INGEST_V1 gate={gate} tickers={len(rows)} non_etf={len(non_etf)} " f"coverage={coverage_pct}% full={full_count} partial={partial_count} missing={missing_count}" ) print("FUNDAMENTAL_RAW_INGEST_V1_OK" if gate != "FAIL" else "FUNDAMENTAL_RAW_INGEST_V1_FAIL") return 0 if gate != "FAIL" else 1 if __name__ == "__main__": raise SystemExit(main())