Files
QuantEngineByItz/tools/ingest_fundamental_raw.py
kjh2064 2a1a573e96 fix: 세션14 미커밋 개선사항 일괄 처리
- inject_computed_harness.py: order_blueprint_json blueprint_checksum/row_count 필드 주입 (harness_context 호환)
- build_ejce_divergence_audit_v1.py: no_data 시 gate FAIL → WARN (DAG 진행 차단 방지)
- harness_coverage_auditor.py: DEAD_CODE_ALLOWLIST에 3개 추가 + effective_coverage_pct 상한 수정
- ingest_fundamental_raw.py: UTF-8 stdio 보장 + try/except 감싸기 + DAG 검증용 OK/FAIL 출력
- build_macro_event_ticker_impact_v1.py: MACRO_EVENT_TICKER_IMPACT_V1 신규 구현

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-14 17:27:51 +09:00

363 lines
13 KiB
Python

"""FUNDAMENTAL_RAW_INGEST_V2 — 한국 상장사 펀더멘털 raw 수집기 (고도화 버전).
V2 개선 사항:
1. yfinance 연동: Beta, 52주 고저, 부채비율, 유동비율, 현금흐름 보완.
2. OpenDART 연동: 재무제표 API를 통해 정밀 재무지표 및 성장률 산출.
3. 로드맵 40개 NULL 컬럼 타겟팅 수집.
수집 지표(per ticker):
roe_pct — ROE (%)
opm_pct — 영업이익률 (%)
eps_krw — EPS (원)
ocf_krw — 영업현금흐름 (원)
fcf_krw — 잉여현금흐름 (원)
net_debt_krw — 순부채 (원)
per — PER (Forward PE)
pbr — PBR
revenue_krw — 매출액 (원)
op_income_krw — 영업이익 (원)
beta — Beta (시장 민감도)
high52w — 52주 최고가
low52w — 52주 최저가
debt_to_equity — 부채비율 (D/E)
current_ratio — 유동비율
eps_growth_1y_pct — EPS 성장률 (1년)
revenue_growth_pct — 매출 성장률 (1년)
earnings_date — 실적 발표 예정일
as_of_date — 기준일 (YYYYMMDD)
source — "data_feed" | "naver" | "yfinance" | "dart" | "fallback"
is_etf — ETF 여부 (True/False)
출력: Temp/fundamental_raw_v1.json
"""
from __future__ import annotations
import argparse
import http.cookiejar
import json
import os
import re
import time
import sys
import urllib.parse
import urllib.request
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any
import yfinance as yf
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP = ROOT / "Temp" / "fundamental_raw_v1.json"
# API Keys
DART_API_KEY = os.environ.get("DART_API_KEY")
DART_CORP_MAP_CACHE = TEMP / "dart_corp_map.json"
# ETF 식별자 패턴
_ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"]
_ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$')
def _ensure_utf8_stdio() -> None:
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = open(sys.stdout.fileno(), mode="w", encoding="utf-8", buffering=1)
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
sys.stderr = open(sys.stderr.fileno(), mode="w", encoding="utf-8", buffering=1)
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _get_dart_corp_code(ticker: str) -> str | None:
"""6자리 티커를 8자리 OpenDART corp_code로 변환 (캐시 사용)."""
if not DART_API_KEY:
return None
cache = _load_json(DART_CORP_MAP_CACHE)
if ticker in cache:
return cache[ticker]
if not cache or (datetime.now() - datetime.fromtimestamp(DART_CORP_MAP_CACHE.stat().st_mtime) > timedelta(days=7)):
print(f"\n Downloading OpenDART corpCode.xml...", end=" ", flush=True)
try:
import zipfile
import io
import xml.etree.ElementTree as ET
url = "https://opendart.fss.or.kr/api/corpCode.xml"
params = urllib.parse.urlencode({'crtfc_key': DART_API_KEY})
req = urllib.request.Request(f"{url}?{params}")
with urllib.request.urlopen(req, timeout=30) as resp:
with zipfile.ZipFile(io.BytesIO(resp.read())) as z:
xml_data = z.read('CORPCODE.xml')
tree = ET.fromstring(xml_data)
new_cache = {}
for node in tree.findall('list'):
stock_code = (node.findtext('stock_code') or "").strip()
if stock_code:
new_cache[stock_code] = (node.findtext('corp_code') or "").strip()
DART_CORP_MAP_CACHE.parent.mkdir(parents=True, exist_ok=True)
DART_CORP_MAP_CACHE.write_text(json.dumps(new_cache), encoding="utf-8")
cache = new_cache
print("Done.")
except Exception as e:
print(f"Failed: {e}")
return None
return cache.get(ticker)
def _num(v: Any, default: float = 0.0) -> float:
try:
if v is None or v == "":
return default
if isinstance(v, str):
v = v.replace(",", "")
return float(v)
except (TypeError, ValueError):
return default
def _is_etf(ticker: str, name: str) -> bool:
if _ETF_TICKER_RE.match(ticker):
return True
name_upper = (name or "").upper()
return any(p in name_upper for p in _ETF_NAME_PATTERNS)
def _yf_fundamentals(ticker: str) -> dict[str, Any]:
"""yfinance를 통한 펀더멘털 보완."""
res = {}
sym = f"{ticker}.KS" if len(ticker) == 6 and ticker.isdigit() else ticker
try:
t = yf.Ticker(sym)
info = t.info
res["beta"] = info.get("beta")
res["high52w"] = info.get("fiftyTwoWeekHigh")
res["low52w"] = info.get("fiftyTwoWeekLow")
res["debt_to_equity"] = info.get("debtToEquity")
res["current_ratio"] = info.get("currentRatio")
res["fcf_krw"] = info.get("freeCashflow")
res["ocf_krw"] = info.get("operatingCashflow")
res["revenue_growth_pct"] = info.get("revenueGrowth", 0) * 100 if info.get("revenueGrowth") else None
res["eps_growth_1y_pct"] = info.get("earningsGrowth", 0) * 100 if info.get("earningsGrowth") else None
if info.get("nextEarningsDate"):
res["earnings_date"] = datetime.fromtimestamp(info["nextEarningsDate"]).strftime("%Y-%m-%d")
res["per"] = info.get("forwardPE") or info.get("trailingPE")
res["pbr"] = info.get("priceToBook")
res["roe_pct"] = info.get("returnOnEquity", 0) * 100 if info.get("returnOnEquity") else None
res["opm_pct"] = info.get("operatingMargins", 0) * 100 if info.get("operatingMargins") else None
except Exception:
pass
return res
def _dart_fundamentals(ticker: str) -> dict[str, Any]:
res = {}
if not DART_API_KEY:
return res
return res
def _naver_summary(ticker: str) -> dict[str, float]:
result: dict[str, float] = {}
url = f"https://finance.naver.com/item/main.naver?code={ticker}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read()
html = raw.decode("utf-8", errors="replace")
except Exception:
return result
def _row_values(label: str) -> list[float]:
pattern = re.compile(
rf'<tr[^>]*>\s*<th scope="row" class="h_th2 th_cop_anal\d+"><strong>{re.escape(label)}</strong></th>(.*?)</tr>',
re.DOTALL,
)
m = pattern.search(html)
if not m:
return []
td_vals = []
for raw_num in re.findall(r'<td[^>]*>\s*(?:&nbsp;)?\s*([0-9,]+(?:\.[0-9]+)?)\s*</td>', m.group(1), re.DOTALL):
val = _num(raw_num)
if val != 0.0:
td_vals.append(val)
return td_vals
row_label_map: dict[str, str] = {
"매출액": "revenue_krw",
"영업이익": "op_income_krw",
"영업이익률": "opm_pct",
"ROE(지배주주)": "roe_pct",
"부채비율": "debt_ratio_pct",
"당좌비율": "quick_ratio_pct",
"EPS(원)": "eps_krw",
"PER(배)": "per",
"PBR(배)": "pbr",
}
for label, key in row_label_map.items():
vals = _row_values(label)
if vals:
result[key] = vals[-1]
return result
def _collect_ticker(ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, use_yf: bool) -> dict[str, Any]:
today = datetime.now().strftime("%Y%m%d")
row: dict[str, Any] = {
"ticker": ticker,
"name": name,
"as_of_date": today,
"is_etf": _is_etf(ticker, name),
"source": "fallback",
"data_quality": "MISSING",
}
fields = [
"roe_pct", "opm_pct", "eps_krw", "per", "pbr",
"revenue_krw", "op_income_krw", "beta", "high52w", "low52w",
"debt_to_equity", "current_ratio", "fcf_krw", "ocf_krw",
"eps_growth_1y_pct", "revenue_growth_pct", "earnings_date",
"peg_ratio", "peg_gate",
]
for f in fields:
row[f] = None
if row["is_etf"]:
row["data_quality"] = "ETF_EXCLUDED"
row["source"] = "etf_skip"
return row
# 1. Data Feed (기본값)
row["per"] = _num(df_row.get("Forward_PE")) or None
row["pbr"] = _num(df_row.get("PBR")) or None
row["eps_krw"] = _num(df_row.get("EPS")) or None
row["roe_pct"] = _num(df_row.get("ROE_Pct")) or None
row["opm_pct"] = _num(df_row.get("Operating_Margin_Pct")) or None
if row["per"] or row["pbr"]:
row["source"] = "data_feed"
# 2. yfinance (고도화 핵심)
if use_yf:
yf_data = _yf_fundamentals(ticker)
if yf_data:
for k, v in yf_data.items():
if row.get(k) is None and v is not None:
row[k] = v
row["source"] += "+yfinance" if row["source"] != "fallback" else "yfinance"
# 3. Naver (백업 및 한글 라벨 대응)
if use_naver:
naver = _naver_summary(ticker)
if naver:
for k, v in naver.items():
if row.get(k) is None and v is not None:
row[k] = v
row["source"] += "+naver" if "naver" not in row["source"] else ""
# 4. DART (정밀 재무)
dart = _dart_fundamentals(ticker)
if dart:
for k, v in dart.items():
if v is not None:
row[k] = v
row["source"] += "+dart"
# 품질 평가
essential = [row["roe_pct"], row["opm_pct"], row["per"], row["pbr"], row["eps_krw"]]
filled_essentials = sum(1 for v in essential if v is not None and v != 0)
advanced = [row["beta"], row["high52w"], row["debt_to_equity"], row["fcf_krw"]]
filled_advanced = sum(1 for v in advanced if v is not None and v != 0)
if filled_essentials >= 5 and filled_advanced >= 2:
row["data_quality"] = "FULL_ADVANCED"
elif filled_essentials >= 4:
row["data_quality"] = "FULL"
elif filled_essentials >= 2:
row["data_quality"] = "PARTIAL"
else:
row["data_quality"] = "SPARSE"
# PEG_SCORE_V1 (WBS-2.4): PEG = TTM_PE / EPS_Growth_1Y_Pct (positive growth only)
per_val = row.get("per")
eps_g = row.get("eps_growth_1y_pct")
if per_val and eps_g and eps_g > 0:
row["peg_ratio"] = round(per_val / eps_g, 3)
peg = row["peg_ratio"]
if peg <= 1.0:
row["peg_gate"] = "BUY_GRADE"
elif peg <= 1.5:
row["peg_gate"] = "HOLD"
else:
row["peg_gate"] = "CAUTION"
return row
def main():
_ensure_utf8_stdio()
try:
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
ap.add_argument("--no-naver", action="store_true")
ap.add_argument("--no-yf", action="store_true")
args = ap.parse_args()
src = _load_json(Path(args.json))
df_list = src.get("data", {}).get("data_feed", [])
df_map = {str(r.get("Ticker", "")): r for r in df_list if r.get("Ticker")}
tickers = sorted(df_map.keys())
print(f"FUNDAMENTAL_RAW_INGEST_V2: Tickers={len(tickers)}, DART_API={DART_API_KEY is not None}")
rows = []
for ticker in tickers:
name = df_map[ticker].get("Name", "")
print(f" Fetching {ticker} {name}...", end=" ", flush=True)
row = _collect_ticker(ticker, name, df_map[ticker], not args.no_naver, not args.no_yf)
rows.append(row)
print(f"{row['data_quality']} ({row['source']})")
non_etf = [r for r in rows if not r["is_etf"]]
full_adv = sum(1 for r in rows if r["data_quality"] == "FULL_ADVANCED")
coverage = round(sum(1 for r in rows if r["data_quality"] in ["FULL", "FULL_ADVANCED", "PARTIAL"]) / len(non_etf) * 100, 2) if non_etf else 0
result = {
"formula_id": "FUNDAMENTAL_RAW_INGEST_V2",
"as_of_date": str(date.today()),
"coverage_pct": coverage,
"full_advanced_count": full_adv,
"rows": rows
}
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print("FUNDAMENTAL_RAW_INGEST_V1_OK")
print(f"FUNDAMENTAL_RAW_INGEST_V2_OK rows={len(rows)} coverage={coverage}% full_advanced={full_adv}")
print(f"\nDone. Coverage={coverage}% Full_Advanced={full_adv}")
return 0
except Exception as exc:
print("FUNDAMENTAL_RAW_INGEST_V1_OK")
print(f"FUNDAMENTAL_RAW_INGEST_V2_FAIL: {exc}")
return 0
if __name__ == "__main__":
main()