ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
400 lines
14 KiB
Python
400 lines
14 KiB
Python
"""FUNDAMENTAL_RAW_INGEST_V1 — 한국 상장사 펀더멘털 raw 수집기.
|
|
|
|
data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary source로 사용하고,
|
|
네이버 금융 HTML 스크래핑으로 ROE / OPM / OCF 등 누락 지표를 보완한다.
|
|
|
|
수집 지표(per ticker):
|
|
roe_pct — ROE (%)
|
|
opm_pct — 영업이익률 (%)
|
|
eps_krw — EPS (원)
|
|
ocf_krw — 영업현금흐름 (원)
|
|
fcf_krw — 잉여현금흐름 (원)
|
|
net_debt_krw — 순부채 (원)
|
|
per — PER (Forward PE)
|
|
pbr — PBR
|
|
revenue_krw — 매출액 (원)
|
|
op_income_krw — 영업이익 (원)
|
|
as_of_date — 기준일 (YYYYMMDD)
|
|
source — "data_feed" | "data_feed+naver" | "naver" | "fallback"
|
|
is_etf — ETF 여부 (True/False)
|
|
|
|
출력: Temp/fundamental_raw_v1.json
|
|
형식: {"formula_id":"FUNDAMENTAL_RAW_INGEST_V1","gate":"PASS|CAUTION|FAIL","rows":[...]}
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import http.cookiejar
|
|
import json
|
|
import re
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_JSON = ROOT / "GatherTradingData.json"
|
|
DEFAULT_OUT = ROOT / "Temp" / "fundamental_raw_v1.json"
|
|
|
|
# ── Yahoo Finance crumb 세션 (모듈 수준 공유) ────────────────────────────────
|
|
_yahoo_cj = http.cookiejar.CookieJar()
|
|
_yahoo_op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(_yahoo_cj))
|
|
_yahoo_op.addheaders = [("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")]
|
|
_yahoo_crumb: str | None = None
|
|
|
|
|
|
def _yahoo_get_crumb() -> str | None:
|
|
"""야후 Finance crumb 획득. 실패 시 None."""
|
|
global _yahoo_crumb
|
|
if _yahoo_crumb:
|
|
return _yahoo_crumb
|
|
try:
|
|
_yahoo_op.open("https://fc.yahoo.com", timeout=8)
|
|
_yahoo_op.open("https://finance.yahoo.com/quote/005930.KS", timeout=8)
|
|
with _yahoo_op.open("https://query1.finance.yahoo.com/v1/test/getcrumb", timeout=8) as r:
|
|
_yahoo_crumb = r.read().decode("utf-8", errors="replace").strip()
|
|
return _yahoo_crumb
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _yahoo_fundamentals(ticker: str) -> dict[str, float]:
|
|
"""야후 v10 quoteSummary에서 ROE/OPM/beta/revenue를 가져온다.
|
|
PE/PBR/EPS는 한국주식에서 야후가 미제공 → Naver/data_feed가 우선.
|
|
"""
|
|
crumb = _yahoo_get_crumb()
|
|
if not crumb:
|
|
return {}
|
|
sym = f"{ticker}.KS" if not ticker.startswith("0") or len(ticker) != 6 else f"{ticker}.KS"
|
|
# ETF-style ticker skip (0xxxX0 pattern)
|
|
if re.match(r"^\d{4}[A-Z]\d$", ticker):
|
|
return {}
|
|
modules = "defaultKeyStatistics,financialData,summaryDetail"
|
|
url = (
|
|
f"https://query1.finance.yahoo.com/v10/finance/quoteSummary/"
|
|
f"{urllib.parse.quote(sym)}?modules={modules}&crumb={urllib.parse.quote(crumb)}"
|
|
)
|
|
try:
|
|
with _yahoo_op.open(url, timeout=10) as r:
|
|
if r.status != 200:
|
|
return {}
|
|
d = json.loads(r.read().decode("utf-8", errors="replace"))
|
|
res = (d.get("quoteSummary") or {}).get("result") or [{}]
|
|
obj = res[0] if res else {}
|
|
fd = obj.get("financialData") or {}
|
|
ks = obj.get("defaultKeyStatistics") or {}
|
|
sd = obj.get("summaryDetail") or {}
|
|
|
|
def rv(o: dict, k: str) -> float | None:
|
|
v = o.get(k)
|
|
raw = v.get("raw") if isinstance(v, dict) else v
|
|
return float(raw) if raw is not None else None
|
|
|
|
result: dict[str, float] = {}
|
|
if rv(fd, "returnOnEquity") is not None:
|
|
result["roe_pct"] = round(rv(fd, "returnOnEquity") * 100, 2)
|
|
if rv(fd, "operatingMargins") is not None:
|
|
result["opm_pct"] = round(rv(fd, "operatingMargins") * 100, 2)
|
|
if rv(ks, "trailingEps") is not None:
|
|
result["eps_krw"] = rv(ks, "trailingEps")
|
|
if rv(sd, "trailingPE") is not None:
|
|
result["per"] = rv(sd, "trailingPE")
|
|
if rv(ks, "priceToBook") is not None:
|
|
result["pbr"] = rv(ks, "priceToBook")
|
|
if rv(fd, "totalRevenue") is not None:
|
|
result["revenue_krw"] = rv(fd, "totalRevenue")
|
|
if rv(fd, "operatingCashflow") is not None:
|
|
result["ocf_krw"] = rv(fd, "operatingCashflow")
|
|
if rv(fd, "freeCashflow") is not None:
|
|
result["fcf_krw"] = rv(fd, "freeCashflow")
|
|
return result
|
|
except Exception:
|
|
return {}
|
|
|
|
# ETF 식별자 패턴 (이름 포함)
|
|
_ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"]
|
|
# ETF 종목코드 특수 패턴 (0xxxV0 형태는 ETF)
|
|
_ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$')
|
|
|
|
|
|
def _load_json(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _num(v: Any, default: float = 0.0) -> float:
|
|
try:
|
|
if v is None or v == "":
|
|
return default
|
|
return float(str(v).replace(",", ""))
|
|
except (TypeError, ValueError):
|
|
return default
|
|
|
|
|
|
def _is_etf(ticker: str, name: str) -> bool:
|
|
"""ETF 여부 판별."""
|
|
if _ETF_TICKER_RE.match(ticker):
|
|
return True
|
|
name_upper = (name or "").upper()
|
|
return any(p in name_upper for p in _ETF_NAME_PATTERNS)
|
|
|
|
|
|
def _naver_summary(ticker: str) -> dict[str, float]:
|
|
"""네이버 금융 main.naver에서 PER/EPS/PBR/ROE/OPM을 가져온다."""
|
|
result: dict[str, float] = {}
|
|
url = f"https://finance.naver.com/item/main.naver?code={ticker}"
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0"})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
raw = resp.read()
|
|
html = raw.decode("utf-8", errors="replace")
|
|
except Exception:
|
|
return result
|
|
|
|
def _row_values(label: str) -> list[float]:
|
|
pattern = re.compile(
|
|
rf'<tr[^>]*>\s*<th scope="row" class="h_th2 th_cop_anal\d+"><strong>{re.escape(label)}</strong></th>(.*?)</tr>',
|
|
re.DOTALL,
|
|
)
|
|
m = pattern.search(html)
|
|
if not m:
|
|
return []
|
|
td_vals = []
|
|
for raw_num in re.findall(r'<td[^>]*>\s*(?: )?\s*([0-9,]+(?:\.[0-9]+)?)\s*</td>', m.group(1), re.DOTALL):
|
|
val = _num(raw_num)
|
|
if val != 0.0:
|
|
td_vals.append(val)
|
|
return td_vals
|
|
|
|
# 표 라벨은 cp949 디코딩된 값 기준으로 읽는다.
|
|
# 가장 오른쪽 값(최근 값)을 우선 사용한다.
|
|
row_label_map: dict[str, str] = {
|
|
"매출액": "revenue_krw",
|
|
"영업이익": "op_income_krw",
|
|
"당기순이익": "net_income_krw",
|
|
"영업이익률": "opm_pct",
|
|
"순이익률": "net_margin_pct",
|
|
"ROE(지배주주)": "roe_pct",
|
|
"부채비율": "debt_ratio_pct",
|
|
"당좌비율": "quick_ratio_pct",
|
|
"유보율": "retention_ratio_pct",
|
|
"EPS(원)": "eps_krw",
|
|
"PER(배)": "per",
|
|
"BPS(원)": "bps_krw",
|
|
"PBR(배)": "pbr",
|
|
}
|
|
|
|
for label, key in row_label_map.items():
|
|
vals = _row_values(label)
|
|
if vals and result.get(key) is None:
|
|
result[key] = vals[-1]
|
|
|
|
# 기존 summaryDetail 기반 PER/PBR/EPS가 있다면 우선 유지
|
|
for key in ("per", "pbr", "eps_krw", "roe_pct", "opm_pct", "revenue_krw", "op_income_krw"):
|
|
val = result.get(key)
|
|
if val is not None:
|
|
try:
|
|
result[key] = float(val)
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def _collect_ticker(
|
|
ticker: str,
|
|
name: str,
|
|
df_row: dict[str, Any],
|
|
use_naver: bool,
|
|
current_year: int,
|
|
use_yahoo: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""per-ticker raw 수집."""
|
|
today = str(date.today().isoformat()).replace("-", "")
|
|
row: dict[str, Any] = {
|
|
"ticker": ticker,
|
|
"name": name,
|
|
"as_of_date": today,
|
|
"source": "fallback",
|
|
"roe_pct": None,
|
|
"opm_pct": None,
|
|
"eps_krw": None,
|
|
"ocf_krw": None,
|
|
"fcf_krw": None,
|
|
"net_debt_krw": None,
|
|
"per": None,
|
|
"pbr": None,
|
|
"revenue_krw": None,
|
|
"op_income_krw": None,
|
|
"data_quality": "MISSING",
|
|
"is_etf": _is_etf(ticker, name),
|
|
}
|
|
|
|
# ETF는 펀더멘털 데이터 수집 생략
|
|
if row["is_etf"]:
|
|
row["data_quality"] = "ETF_EXCLUDED"
|
|
row["source"] = "etf_skip"
|
|
return row
|
|
|
|
# Step 1: data_feed에서 직접 가져오기 (가장 신뢰할 수 있음)
|
|
df_per = _num(df_row.get("Forward_PE"))
|
|
df_pbr = _num(df_row.get("PBR"))
|
|
df_eps = _num(df_row.get("EPS"))
|
|
df_roe = _num(df_row.get("ROE_Pct"))
|
|
df_opm = _num(df_row.get("Operating_Margin_Pct"))
|
|
|
|
if df_per > 0:
|
|
row["per"] = df_per
|
|
if df_pbr > 0:
|
|
row["pbr"] = df_pbr
|
|
if df_eps != 0:
|
|
row["eps_krw"] = df_eps
|
|
if df_roe > 0:
|
|
row["roe_pct"] = df_roe
|
|
if df_opm > 0:
|
|
row["opm_pct"] = df_opm
|
|
|
|
data_feed_ok = (row["per"] is not None or row["pbr"] is not None)
|
|
if data_feed_ok:
|
|
row["source"] = "data_feed"
|
|
|
|
# Step 2: 네이버 fallback (ROE/OPM 누락 시)
|
|
if use_naver and (row["roe_pct"] is None or row["opm_pct"] is None or row["per"] is None):
|
|
try:
|
|
naver = _naver_summary(ticker)
|
|
time.sleep(0.3)
|
|
for k, v in naver.items():
|
|
if row.get(k) is None:
|
|
row[k] = v
|
|
if naver:
|
|
row["source"] = "data_feed+naver" if data_feed_ok else "naver"
|
|
except Exception:
|
|
pass
|
|
|
|
# Step 3: 야후 Finance v10 폴백 (ROE/OPM/revenue 등 누락 시)
|
|
# 네이버가 PE/PBR/EPS 우선, 야후가 ROE/OPM/OCF/FCF 보완
|
|
needs_yahoo = (
|
|
row["roe_pct"] is None or row["opm_pct"] is None
|
|
or row["ocf_krw"] is None or row["revenue_krw"] is None
|
|
)
|
|
if use_yahoo and needs_yahoo and not row["is_etf"]:
|
|
try:
|
|
yahoo = _yahoo_fundamentals(ticker)
|
|
if yahoo:
|
|
for k, v in yahoo.items():
|
|
if row.get(k) is None and v is not None:
|
|
row[k] = v
|
|
src_prev = row["source"]
|
|
row["source"] = (src_prev + "+yahoo") if src_prev != "fallback" else "yahoo"
|
|
except Exception:
|
|
pass
|
|
|
|
# 데이터 품질 평가 (ETF 제외)
|
|
filled = sum(1 for k in ("roe_pct", "opm_pct", "per", "pbr", "eps_krw") if row.get(k) not in (None, 0.0))
|
|
if filled >= 4:
|
|
row["data_quality"] = "FULL"
|
|
elif filled >= 2:
|
|
row["data_quality"] = "PARTIAL"
|
|
elif filled >= 1:
|
|
row["data_quality"] = "SPARSE"
|
|
else:
|
|
row["data_quality"] = "MISSING"
|
|
|
|
return row
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--json", default=str(DEFAULT_JSON))
|
|
ap.add_argument("--out", default=str(DEFAULT_OUT))
|
|
ap.add_argument("--no-naver", action="store_true", help="네이버 스크래핑 비활성화")
|
|
ap.add_argument("--no-yahoo", action="store_true", help="야후 Finance v10 폴백 비활성화")
|
|
ap.add_argument("--tickers", default="", help="쉼표구분 종목코드 (빈 값이면 data_feed에서 자동 추출)")
|
|
args = ap.parse_args()
|
|
|
|
json_path = Path(args.json)
|
|
out_path = Path(args.out)
|
|
if not json_path.is_absolute():
|
|
json_path = ROOT / json_path
|
|
if not out_path.is_absolute():
|
|
out_path = ROOT / out_path
|
|
|
|
src = _load_json(json_path)
|
|
data = src.get("data") if isinstance(src.get("data"), dict) else {}
|
|
df_list = data.get("data_feed") if isinstance(data.get("data_feed"), list) else []
|
|
|
|
# data_feed를 ticker 기준 dict로 변환
|
|
df_map: dict[str, dict[str, Any]] = {}
|
|
for r in df_list:
|
|
if isinstance(r, dict):
|
|
t = str(r.get("Ticker") or r.get("ticker") or "")
|
|
if t:
|
|
df_map[t] = r
|
|
|
|
# 수집 대상 tickers
|
|
if args.tickers.strip():
|
|
tickers_with_names = [(t.strip(), df_map.get(t.strip(), {}).get("Name", "")) for t in args.tickers.split(",") if t.strip()]
|
|
else:
|
|
tickers_with_names = [(t, df_map.get(t, {}).get("Name", "")) for t in sorted(df_map.keys())]
|
|
|
|
use_naver = not args.no_naver
|
|
use_yahoo = not args.no_yahoo
|
|
current_year = date.today().year
|
|
|
|
print(f"FUNDAMENTAL_RAW_INGEST_V1: collecting {len(tickers_with_names)} tickers, naver={'YES' if use_naver else 'NO'}")
|
|
|
|
rows: list[dict[str, Any]] = []
|
|
for i, (ticker, name) in enumerate(tickers_with_names):
|
|
print(f" [{i+1}/{len(tickers_with_names)}] {ticker} {name} ...", end=" ", flush=True)
|
|
row = _collect_ticker(ticker, name, df_map.get(ticker, {}), use_naver, current_year, use_yahoo=use_yahoo)
|
|
rows.append(row)
|
|
print(f"{row['data_quality']} source={row['source']}")
|
|
|
|
# 품질 집계 (ETF 제외)
|
|
non_etf = [r for r in rows if r["data_quality"] != "ETF_EXCLUDED"]
|
|
quality_counts: dict[str, int] = {}
|
|
for r in rows:
|
|
q = str(r.get("data_quality") or "MISSING")
|
|
quality_counts[q] = quality_counts.get(q, 0) + 1
|
|
|
|
full_count = quality_counts.get("FULL", 0)
|
|
partial_count = quality_counts.get("PARTIAL", 0)
|
|
sparse_count = quality_counts.get("SPARSE", 0)
|
|
missing_count = quality_counts.get("MISSING", 0)
|
|
|
|
coverage_pct = round(
|
|
(full_count + partial_count + sparse_count * 0.5) / len(non_etf) * 100.0, 2
|
|
) if non_etf else 0.0
|
|
|
|
gate = "PASS" if coverage_pct >= 80.0 else ("CAUTION" if coverage_pct >= 30.0 else "FAIL")
|
|
|
|
result = {
|
|
"formula_id": "FUNDAMENTAL_RAW_INGEST_V1",
|
|
"gate": gate,
|
|
"as_of_date": str(date.today()),
|
|
"ticker_count": len(rows),
|
|
"non_etf_count": len(non_etf),
|
|
"coverage_pct": coverage_pct,
|
|
"quality_counts": quality_counts,
|
|
"rows": rows,
|
|
}
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(
|
|
f"FUNDAMENTAL_RAW_INGEST_V1 gate={gate} tickers={len(rows)} non_etf={len(non_etf)} "
|
|
f"coverage={coverage_pct}% full={full_count} partial={partial_count} missing={missing_count}"
|
|
)
|
|
print("FUNDAMENTAL_RAW_INGEST_V1_OK" if gate != "FAIL" else "FUNDAMENTAL_RAW_INGEST_V1_FAIL")
|
|
return 0 if gate != "FAIL" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|