"""FUNDAMENTAL_RAW_INGEST_V2 — 한국 상장사 펀더멘털 raw 수집기 (고도화 버전). V2 개선 사항: 1. yfinance 연동: Beta, 52주 고저, 부채비율, 유동비율, 현금흐름 보완. 2. OpenDART 연동: 재무제표 API를 통해 정밀 재무지표 및 성장률 산출. 3. 로드맵 40개 NULL 컬럼 타겟팅 수집. 수집 지표(per ticker): roe_pct — ROE (%) opm_pct — 영업이익률 (%) eps_krw — EPS (원) ocf_krw — 영업현금흐름 (원) fcf_krw — 잉여현금흐름 (원) net_debt_krw — 순부채 (원) per — PER (Forward PE) pbr — PBR revenue_krw — 매출액 (원) op_income_krw — 영업이익 (원) beta — Beta (시장 민감도) high52w — 52주 최고가 low52w — 52주 최저가 debt_to_equity — 부채비율 (D/E) current_ratio — 유동비율 eps_growth_1y_pct — EPS 성장률 (1년) revenue_growth_pct — 매출 성장률 (1년) earnings_date — 실적 발표 예정일 as_of_date — 기준일 (YYYYMMDD) source — "data_feed" | "naver" | "yfinance" | "dart" | "fallback" is_etf — ETF 여부 (True/False) 출력: Temp/fundamental_raw_v1.json """ from __future__ import annotations import argparse import http.cookiejar import json import os import re import time import urllib.parse import urllib.request from datetime import date, datetime, timedelta from pathlib import Path from typing import Any import yfinance as yf ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" DEFAULT_OUT = TEMP = ROOT / "Temp" / "fundamental_raw_v1.json" # API Keys DART_API_KEY = os.environ.get("DART_API_KEY") DART_CORP_MAP_CACHE = TEMP / "dart_corp_map.json" # ETF 식별자 패턴 _ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"] _ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$') def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _get_dart_corp_code(ticker: str) -> str | None: """6자리 티커를 8자리 OpenDART corp_code로 변환 (캐시 사용).""" if not DART_API_KEY: return None cache = _load_json(DART_CORP_MAP_CACHE) if ticker in cache: return cache[ticker] if not cache or (datetime.now() - datetime.fromtimestamp(DART_CORP_MAP_CACHE.stat().st_mtime) > timedelta(days=7)): print(f"\n Downloading OpenDART corpCode.xml...", end=" ", flush=True) try: import zipfile import io import xml.etree.ElementTree as ET url = "https://opendart.fss.or.kr/api/corpCode.xml" params = urllib.parse.urlencode({'crtfc_key': DART_API_KEY}) req = urllib.request.Request(f"{url}?{params}") with urllib.request.urlopen(req, timeout=30) as resp: with zipfile.ZipFile(io.BytesIO(resp.read())) as z: xml_data = z.read('CORPCODE.xml') tree = ET.fromstring(xml_data) new_cache = {} for node in tree.findall('list'): stock_code = (node.findtext('stock_code') or "").strip() if stock_code: new_cache[stock_code] = (node.findtext('corp_code') or "").strip() DART_CORP_MAP_CACHE.parent.mkdir(parents=True, exist_ok=True) DART_CORP_MAP_CACHE.write_text(json.dumps(new_cache), encoding="utf-8") cache = new_cache print("Done.") except Exception as e: print(f"Failed: {e}") return None return cache.get(ticker) def _num(v: Any, default: float = 0.0) -> float: try: if v is None or v == "": return default if isinstance(v, str): v = v.replace(",", "") return float(v) except (TypeError, ValueError): return default def _is_etf(ticker: str, name: str) -> bool: if _ETF_TICKER_RE.match(ticker): return True name_upper = (name or "").upper() return any(p in name_upper for p in _ETF_NAME_PATTERNS) def _yf_fundamentals(ticker: str) -> dict[str, Any]: """yfinance를 통한 펀더멘털 보완.""" res = {} sym = f"{ticker}.KS" if len(ticker) == 6 and ticker.isdigit() else ticker try: t = yf.Ticker(sym) info = t.info res["beta"] = info.get("beta") res["high52w"] = info.get("fiftyTwoWeekHigh") res["low52w"] = info.get("fiftyTwoWeekLow") res["debt_to_equity"] = info.get("debtToEquity") res["current_ratio"] = info.get("currentRatio") res["fcf_krw"] = info.get("freeCashflow") res["ocf_krw"] = info.get("operatingCashflow") res["revenue_growth_pct"] = info.get("revenueGrowth", 0) * 100 if info.get("revenueGrowth") else None res["eps_growth_1y_pct"] = info.get("earningsGrowth", 0) * 100 if info.get("earningsGrowth") else None if info.get("nextEarningsDate"): res["earnings_date"] = datetime.fromtimestamp(info["nextEarningsDate"]).strftime("%Y-%m-%d") res["per"] = info.get("forwardPE") or info.get("trailingPE") res["pbr"] = info.get("priceToBook") res["roe_pct"] = info.get("returnOnEquity", 0) * 100 if info.get("returnOnEquity") else None res["opm_pct"] = info.get("operatingMargins", 0) * 100 if info.get("operatingMargins") else None except Exception: pass return res def _dart_fundamentals(ticker: str) -> dict[str, Any]: res = {} if not DART_API_KEY: return res return res def _naver_summary(ticker: str) -> dict[str, float]: result: dict[str, float] = {} url = f"https://finance.naver.com/item/main.naver?code={ticker}" try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urllib.request.urlopen(req, timeout=10) as resp: raw = resp.read() html = raw.decode("utf-8", errors="replace") except Exception: return result def _row_values(label: str) -> list[float]: pattern = re.compile( rf']*>\s*{re.escape(label)}(.*?)', re.DOTALL, ) m = pattern.search(html) if not m: return [] td_vals = [] for raw_num in re.findall(r']*>\s*(?: )?\s*([0-9,]+(?:\.[0-9]+)?)\s*', m.group(1), re.DOTALL): val = _num(raw_num) if val != 0.0: td_vals.append(val) return td_vals row_label_map: dict[str, str] = { "매출액": "revenue_krw", "영업이익": "op_income_krw", "영업이익률": "opm_pct", "ROE(지배주주)": "roe_pct", "부채비율": "debt_ratio_pct", "당좌비율": "quick_ratio_pct", "EPS(원)": "eps_krw", "PER(배)": "per", "PBR(배)": "pbr", } for label, key in row_label_map.items(): vals = _row_values(label) if vals: result[key] = vals[-1] return result def _collect_ticker(ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, use_yf: bool) -> dict[str, Any]: today = datetime.now().strftime("%Y%m%d") row: dict[str, Any] = { "ticker": ticker, "name": name, "as_of_date": today, "is_etf": _is_etf(ticker, name), "source": "fallback", "data_quality": "MISSING", } fields = [ "roe_pct", "opm_pct", "eps_krw", "per", "pbr", "revenue_krw", "op_income_krw", "beta", "high52w", "low52w", "debt_to_equity", "current_ratio", "fcf_krw", "ocf_krw", "eps_growth_1y_pct", "revenue_growth_pct", "earnings_date", "peg_ratio", "peg_gate", ] for f in fields: row[f] = None if row["is_etf"]: row["data_quality"] = "ETF_EXCLUDED" row["source"] = "etf_skip" return row # 1. Data Feed (기본값) row["per"] = _num(df_row.get("Forward_PE")) or None row["pbr"] = _num(df_row.get("PBR")) or None row["eps_krw"] = _num(df_row.get("EPS")) or None row["roe_pct"] = _num(df_row.get("ROE_Pct")) or None row["opm_pct"] = _num(df_row.get("Operating_Margin_Pct")) or None if row["per"] or row["pbr"]: row["source"] = "data_feed" # 2. yfinance (고도화 핵심) if use_yf: yf_data = _yf_fundamentals(ticker) if yf_data: for k, v in yf_data.items(): if row.get(k) is None and v is not None: row[k] = v row["source"] += "+yfinance" if row["source"] != "fallback" else "yfinance" # 3. Naver (백업 및 한글 라벨 대응) if use_naver: naver = _naver_summary(ticker) if naver: for k, v in naver.items(): if row.get(k) is None and v is not None: row[k] = v row["source"] += "+naver" if "naver" not in row["source"] else "" # 4. DART (정밀 재무) dart = _dart_fundamentals(ticker) if dart: for k, v in dart.items(): if v is not None: row[k] = v row["source"] += "+dart" # 품질 평가 essential = [row["roe_pct"], row["opm_pct"], row["per"], row["pbr"], row["eps_krw"]] filled_essentials = sum(1 for v in essential if v is not None and v != 0) advanced = [row["beta"], row["high52w"], row["debt_to_equity"], row["fcf_krw"]] filled_advanced = sum(1 for v in advanced if v is not None and v != 0) if filled_essentials >= 5 and filled_advanced >= 2: row["data_quality"] = "FULL_ADVANCED" elif filled_essentials >= 4: row["data_quality"] = "FULL" elif filled_essentials >= 2: row["data_quality"] = "PARTIAL" else: row["data_quality"] = "SPARSE" # PEG_SCORE_V1 (WBS-2.4): PEG = TTM_PE / EPS_Growth_1Y_Pct (positive growth only) per_val = row.get("per") eps_g = row.get("eps_growth_1y_pct") if per_val and eps_g and eps_g > 0: row["peg_ratio"] = round(per_val / eps_g, 3) peg = row["peg_ratio"] if peg <= 1.0: row["peg_gate"] = "BUY_GRADE" elif peg <= 1.5: row["peg_gate"] = "HOLD" else: row["peg_gate"] = "CAUTION" return row def main(): ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) ap.add_argument("--no-naver", action="store_true") ap.add_argument("--no-yf", action="store_true") args = ap.parse_args() src = _load_json(Path(args.json)) df_list = src.get("data", {}).get("data_feed", []) df_map = {str(r.get("Ticker", "")): r for r in df_list if r.get("Ticker")} tickers = sorted(df_map.keys()) print(f"FUNDAMENTAL_RAW_INGEST_V2: Tickers={len(tickers)}, DART_API={DART_API_KEY is not None}") rows = [] for ticker in tickers: name = df_map[ticker].get("Name", "") print(f" Fetching {ticker} {name}...", end=" ", flush=True) row = _collect_ticker(ticker, name, df_map[ticker], not args.no_naver, not args.no_yf) rows.append(row) print(f"{row['data_quality']} ({row['source']})") non_etf = [r for r in rows if not r["is_etf"]] full_adv = sum(1 for r in rows if r["data_quality"] == "FULL_ADVANCED") coverage = round(sum(1 for r in rows if r["data_quality"] in ["FULL", "FULL_ADVANCED", "PARTIAL"]) / len(non_etf) * 100, 2) if non_etf else 0 result = { "formula_id": "FUNDAMENTAL_RAW_INGEST_V2", "as_of_date": str(date.today()), "coverage_pct": coverage, "full_advanced_count": full_adv, "rows": rows } Path(args.out).parent.mkdir(parents=True, exist_ok=True) Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(f"\nDone. Coverage={coverage}% Full_Advanced={full_adv}") return 0 if __name__ == "__main__": main()