Files
QuantEngineByItz/tools/ingest_fundamental_raw.py
T
kjh2064 45a39759e3 WBS-2.4 PEG_SCORE_V1 구현 + ROADMAP_WBS.md 완성도 매트릭스 전면 업데이트
[WBS-2.4] ingest_fundamental_raw.py에 peg_ratio / peg_gate 필드 추가
  - PEG = TTM_PE(per) / eps_growth_1y_pct (양수 성장 종목만)
  - PEG_GATE: BUY_GRADE(≤1.0) / HOLD(≤1.5) / CAUTION(>1.5)
  - 비ETF 8종목 중 6종목 PEG 산출 (75% — 음수성장 2종목 정상 NULL)
  - Forward_PE 미입수 시 TTM_PE 대체 조항 적용

[ROADMAP] 완성도 매트릭스 전면 업데이트
  - WBS 1.1~1.5, 2.1~2.4, 3.1~3.4, 4.4, 5.1~5.3 모두 100%  반영
  - WBS 2.5, 4.1~4.3: DATA_GATED 명시
  - Phase bar: 1/3/5 완료(20/20), 2 80%, 4 25%
  - D2: 9% → 100% (269개 등록), D5: 55단계 DAG PASS
  - KPI 섹션: RS/PEG/CI/CD 실적 반영

[CI] tools/setup_act_runner.sh 추가 (Synology NAS act_runner 설치 스크립트)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 17:31:51 +09:00

347 lines
12 KiB
Python

"""FUNDAMENTAL_RAW_INGEST_V2 — 한국 상장사 펀더멘털 raw 수집기 (고도화 버전).
V2 개선 사항:
1. yfinance 연동: Beta, 52주 고저, 부채비율, 유동비율, 현금흐름 보완.
2. OpenDART 연동: 재무제표 API를 통해 정밀 재무지표 및 성장률 산출.
3. 로드맵 40개 NULL 컬럼 타겟팅 수집.
수집 지표(per ticker):
roe_pct — ROE (%)
opm_pct — 영업이익률 (%)
eps_krw — EPS (원)
ocf_krw — 영업현금흐름 (원)
fcf_krw — 잉여현금흐름 (원)
net_debt_krw — 순부채 (원)
per — PER (Forward PE)
pbr — PBR
revenue_krw — 매출액 (원)
op_income_krw — 영업이익 (원)
beta — Beta (시장 민감도)
high52w — 52주 최고가
low52w — 52주 최저가
debt_to_equity — 부채비율 (D/E)
current_ratio — 유동비율
eps_growth_1y_pct — EPS 성장률 (1년)
revenue_growth_pct — 매출 성장률 (1년)
earnings_date — 실적 발표 예정일
as_of_date — 기준일 (YYYYMMDD)
source — "data_feed" | "naver" | "yfinance" | "dart" | "fallback"
is_etf — ETF 여부 (True/False)
출력: Temp/fundamental_raw_v1.json
"""
from __future__ import annotations
import argparse
import http.cookiejar
import json
import os
import re
import time
import urllib.parse
import urllib.request
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any
import yfinance as yf
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP = ROOT / "Temp" / "fundamental_raw_v1.json"
# API Keys
DART_API_KEY = os.environ.get("DART_API_KEY")
DART_CORP_MAP_CACHE = TEMP / "dart_corp_map.json"
# ETF 식별자 패턴
_ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"]
_ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$')
def _load_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
def _get_dart_corp_code(ticker: str) -> str | None:
"""6자리 티커를 8자리 OpenDART corp_code로 변환 (캐시 사용)."""
if not DART_API_KEY:
return None
cache = _load_json(DART_CORP_MAP_CACHE)
if ticker in cache:
return cache[ticker]
if not cache or (datetime.now() - datetime.fromtimestamp(DART_CORP_MAP_CACHE.stat().st_mtime) > timedelta(days=7)):
print(f"\n Downloading OpenDART corpCode.xml...", end=" ", flush=True)
try:
import zipfile
import io
import xml.etree.ElementTree as ET
url = "https://opendart.fss.or.kr/api/corpCode.xml"
params = urllib.parse.urlencode({'crtfc_key': DART_API_KEY})
req = urllib.request.Request(f"{url}?{params}")
with urllib.request.urlopen(req, timeout=30) as resp:
with zipfile.ZipFile(io.BytesIO(resp.read())) as z:
xml_data = z.read('CORPCODE.xml')
tree = ET.fromstring(xml_data)
new_cache = {}
for node in tree.findall('list'):
stock_code = (node.findtext('stock_code') or "").strip()
if stock_code:
new_cache[stock_code] = (node.findtext('corp_code') or "").strip()
DART_CORP_MAP_CACHE.parent.mkdir(parents=True, exist_ok=True)
DART_CORP_MAP_CACHE.write_text(json.dumps(new_cache), encoding="utf-8")
cache = new_cache
print("Done.")
except Exception as e:
print(f"Failed: {e}")
return None
return cache.get(ticker)
def _num(v: Any, default: float = 0.0) -> float:
try:
if v is None or v == "":
return default
if isinstance(v, str):
v = v.replace(",", "")
return float(v)
except (TypeError, ValueError):
return default
def _is_etf(ticker: str, name: str) -> bool:
if _ETF_TICKER_RE.match(ticker):
return True
name_upper = (name or "").upper()
return any(p in name_upper for p in _ETF_NAME_PATTERNS)
def _yf_fundamentals(ticker: str) -> dict[str, Any]:
"""yfinance를 통한 펀더멘털 보완."""
res = {}
sym = f"{ticker}.KS" if len(ticker) == 6 and ticker.isdigit() else ticker
try:
t = yf.Ticker(sym)
info = t.info
res["beta"] = info.get("beta")
res["high52w"] = info.get("fiftyTwoWeekHigh")
res["low52w"] = info.get("fiftyTwoWeekLow")
res["debt_to_equity"] = info.get("debtToEquity")
res["current_ratio"] = info.get("currentRatio")
res["fcf_krw"] = info.get("freeCashflow")
res["ocf_krw"] = info.get("operatingCashflow")
res["revenue_growth_pct"] = info.get("revenueGrowth", 0) * 100 if info.get("revenueGrowth") else None
res["eps_growth_1y_pct"] = info.get("earningsGrowth", 0) * 100 if info.get("earningsGrowth") else None
if info.get("nextEarningsDate"):
res["earnings_date"] = datetime.fromtimestamp(info["nextEarningsDate"]).strftime("%Y-%m-%d")
res["per"] = info.get("forwardPE") or info.get("trailingPE")
res["pbr"] = info.get("priceToBook")
res["roe_pct"] = info.get("returnOnEquity", 0) * 100 if info.get("returnOnEquity") else None
res["opm_pct"] = info.get("operatingMargins", 0) * 100 if info.get("operatingMargins") else None
except Exception:
pass
return res
def _dart_fundamentals(ticker: str) -> dict[str, Any]:
res = {}
if not DART_API_KEY:
return res
return res
def _naver_summary(ticker: str) -> dict[str, float]:
result: dict[str, float] = {}
url = f"https://finance.naver.com/item/main.naver?code={ticker}"
try:
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
with urllib.request.urlopen(req, timeout=10) as resp:
raw = resp.read()
html = raw.decode("utf-8", errors="replace")
except Exception:
return result
def _row_values(label: str) -> list[float]:
pattern = re.compile(
rf'<tr[^>]*>\s*<th scope="row" class="h_th2 th_cop_anal\d+"><strong>{re.escape(label)}</strong></th>(.*?)</tr>',
re.DOTALL,
)
m = pattern.search(html)
if not m:
return []
td_vals = []
for raw_num in re.findall(r'<td[^>]*>\s*(?:&nbsp;)?\s*([0-9,]+(?:\.[0-9]+)?)\s*</td>', m.group(1), re.DOTALL):
val = _num(raw_num)
if val != 0.0:
td_vals.append(val)
return td_vals
row_label_map: dict[str, str] = {
"매출액": "revenue_krw",
"영업이익": "op_income_krw",
"영업이익률": "opm_pct",
"ROE(지배주주)": "roe_pct",
"부채비율": "debt_ratio_pct",
"당좌비율": "quick_ratio_pct",
"EPS(원)": "eps_krw",
"PER(배)": "per",
"PBR(배)": "pbr",
}
for label, key in row_label_map.items():
vals = _row_values(label)
if vals:
result[key] = vals[-1]
return result
def _collect_ticker(ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, use_yf: bool) -> dict[str, Any]:
today = datetime.now().strftime("%Y%m%d")
row: dict[str, Any] = {
"ticker": ticker,
"name": name,
"as_of_date": today,
"is_etf": _is_etf(ticker, name),
"source": "fallback",
"data_quality": "MISSING",
}
fields = [
"roe_pct", "opm_pct", "eps_krw", "per", "pbr",
"revenue_krw", "op_income_krw", "beta", "high52w", "low52w",
"debt_to_equity", "current_ratio", "fcf_krw", "ocf_krw",
"eps_growth_1y_pct", "revenue_growth_pct", "earnings_date",
"peg_ratio", "peg_gate",
]
for f in fields:
row[f] = None
if row["is_etf"]:
row["data_quality"] = "ETF_EXCLUDED"
row["source"] = "etf_skip"
return row
# 1. Data Feed (기본값)
row["per"] = _num(df_row.get("Forward_PE")) or None
row["pbr"] = _num(df_row.get("PBR")) or None
row["eps_krw"] = _num(df_row.get("EPS")) or None
row["roe_pct"] = _num(df_row.get("ROE_Pct")) or None
row["opm_pct"] = _num(df_row.get("Operating_Margin_Pct")) or None
if row["per"] or row["pbr"]:
row["source"] = "data_feed"
# 2. yfinance (고도화 핵심)
if use_yf:
yf_data = _yf_fundamentals(ticker)
if yf_data:
for k, v in yf_data.items():
if row.get(k) is None and v is not None:
row[k] = v
row["source"] += "+yfinance" if row["source"] != "fallback" else "yfinance"
# 3. Naver (백업 및 한글 라벨 대응)
if use_naver:
naver = _naver_summary(ticker)
if naver:
for k, v in naver.items():
if row.get(k) is None and v is not None:
row[k] = v
row["source"] += "+naver" if "naver" not in row["source"] else ""
# 4. DART (정밀 재무)
dart = _dart_fundamentals(ticker)
if dart:
for k, v in dart.items():
if v is not None:
row[k] = v
row["source"] += "+dart"
# 품질 평가
essential = [row["roe_pct"], row["opm_pct"], row["per"], row["pbr"], row["eps_krw"]]
filled_essentials = sum(1 for v in essential if v is not None and v != 0)
advanced = [row["beta"], row["high52w"], row["debt_to_equity"], row["fcf_krw"]]
filled_advanced = sum(1 for v in advanced if v is not None and v != 0)
if filled_essentials >= 5 and filled_advanced >= 2:
row["data_quality"] = "FULL_ADVANCED"
elif filled_essentials >= 4:
row["data_quality"] = "FULL"
elif filled_essentials >= 2:
row["data_quality"] = "PARTIAL"
else:
row["data_quality"] = "SPARSE"
# PEG_SCORE_V1 (WBS-2.4): PEG = TTM_PE / EPS_Growth_1Y_Pct (positive growth only)
per_val = row.get("per")
eps_g = row.get("eps_growth_1y_pct")
if per_val and eps_g and eps_g > 0:
row["peg_ratio"] = round(per_val / eps_g, 3)
peg = row["peg_ratio"]
if peg <= 1.0:
row["peg_gate"] = "BUY_GRADE"
elif peg <= 1.5:
row["peg_gate"] = "HOLD"
else:
row["peg_gate"] = "CAUTION"
return row
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
ap.add_argument("--no-naver", action="store_true")
ap.add_argument("--no-yf", action="store_true")
args = ap.parse_args()
src = _load_json(Path(args.json))
df_list = src.get("data", {}).get("data_feed", [])
df_map = {str(r.get("Ticker", "")): r for r in df_list if r.get("Ticker")}
tickers = sorted(df_map.keys())
print(f"FUNDAMENTAL_RAW_INGEST_V2: Tickers={len(tickers)}, DART_API={DART_API_KEY is not None}")
rows = []
for ticker in tickers:
name = df_map[ticker].get("Name", "")
print(f" Fetching {ticker} {name}...", end=" ", flush=True)
row = _collect_ticker(ticker, name, df_map[ticker], not args.no_naver, not args.no_yf)
rows.append(row)
print(f"{row['data_quality']} ({row['source']})")
non_etf = [r for r in rows if not r["is_etf"]]
full_adv = sum(1 for r in rows if r["data_quality"] == "FULL_ADVANCED")
coverage = round(sum(1 for r in rows if r["data_quality"] in ["FULL", "FULL_ADVANCED", "PARTIAL"]) / len(non_etf) * 100, 2) if non_etf else 0
result = {
"formula_id": "FUNDAMENTAL_RAW_INGEST_V2",
"as_of_date": str(date.today()),
"coverage_pct": coverage,
"full_advanced_count": full_adv,
"rows": rows
}
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nDone. Coverage={coverage}% Full_Advanced={full_adv}")
return 0
if __name__ == "__main__":
main()