[Sprint-3] Complete WBS-2.1, 3.2, 4.1, 5.1 - Fundamental V2, Engine V2, Performance Ledger, and CI/CD

This commit is contained in:
2026-06-13 15:38:28 +09:00
parent 0748c224da
commit 08f4fa2293
5 changed files with 473 additions and 358 deletions
@@ -1,106 +1,89 @@
"""build_operational_t20_outcome_ledger_v1.py — ALPHA_FEEDBACK_LOOP_V2
매수/매도 결정 20거래일 후의 실제 수익률을 추적하여 레저(Ledger)를 구축한다.
성과 인텔리전스(Phase 4)의 핵심 데이터 소스로 활용됨.
로직:
1. alpha_history 시트에서 과거 결정(Decision) 데이터를 읽음.
2. 현재 가격(Close)을 T+20 가격으로 가정하여 실현 수익률 계산.
3. outcome_ledger.json 생성.
"""
from __future__ import annotations
import argparse
import json
from datetime import date, timedelta
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_HISTORY = ROOT / "Temp" / "proposal_evaluation_history.json"
DEFAULT_OUT = ROOT / "Temp" / "operational_t20_outcome_ledger_v1.json"
TEMP = ROOT / "Temp"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "outcome_ledger_v1.json"
# T+20 성숙 판정: 20 영업일 ≈ 28 캘린더일 (보수적 기준)
T20_CALENDAR_DAYS = 28
def _load(path: Path) -> Any:
if not path.exists(): return {}
try: return json.loads(path.read_text(encoding="utf-8"))
except: return {}
def _f(v: Any, default: float = 0.0) -> float:
try: return float(v)
except: return default
def _load(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return {}
return obj if isinstance(obj, dict) else {}
def _is_matured(r: dict) -> bool:
"""proposal_date + 28 캘린더일 <= today 이면 T+20 성숙 판정."""
pd = r.get("proposal_date") or r.get("entry_date") or ""
if not pd:
return False
try:
entry = date.fromisoformat(str(pd)[:10])
return (date.today() - entry).days >= T20_CALENDAR_DAYS
except Exception:
return False
def main() -> int:
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--history", default=str(DEFAULT_HISTORY))
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
hist_path = Path(args.history) if Path(args.history).is_absolute() else ROOT / args.history
hist = _load(hist_path)
records = hist.get("records") if isinstance(hist.get("records"), list) else []
payload = _load(Path(args.json))
data = payload.get("data", {})
# alpha_history: 과거 예측 데이터
alpha_history = data.get("alpha_history", [])
# data_feed: 현재 가격 데이터 (T+20 프록시)
df_rows = data.get("data_feed", [])
current_prices = {str(r.get("Ticker")): _f(r.get("Close")) for r in df_rows if r.get("Ticker")}
# [T3/SG1] 운영(비-REPLAY) 레코드만 추출, T+20 성숙 확인
operational = [
r for r in records
if isinstance(r, dict)
and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL"
]
# T+20 평가 완료 OR 20 캘린더일 이상 경과한 행 → matured
t20 = [
r for r in operational
if str(r.get("t20_evaluation_status") or "").startswith("EVALUATED_")
or _is_matured(r)
]
# INCONCLUSIVE는 통계에서 제외 (match_rate 분자/분모 모두)
decisive = [r for r in t20 if r.get("t20_outcome") in ("MATCHED", "MISMATCHED")]
matched = sum(1 for r in decisive if r.get("t20_outcome") == "MATCHED")
mismatched = sum(1 for r in decisive if r.get("t20_outcome") == "MISMATCHED")
rate = round((matched / len(decisive)) * 100.0, 2) if decisive else 0.0
ledger_rows = []
for h in alpha_history:
ticker = str(h.get("ticker"))
entry_price = _f(h.get("close_at_record"))
current_price = current_prices.get(ticker, 0.0)
if entry_price > 0 and current_price > 0:
return_pct = round((current_price - entry_price) / entry_price * 100, 2)
verdict = h.get("synthesis_verdict")
# 예측 적중 여부 (간단 로직: BUY면 +, EXIT면 -)
is_correct = False
if "BUY" in str(verdict) and return_pct > 0: is_correct = True
if "EXIT" in str(verdict) and return_pct < 0: is_correct = True
ledger_rows.append({
"date": h.get("date"),
"ticker": ticker,
"verdict": verdict,
"entry_price": entry_price,
"exit_price": current_price,
"return_pct": return_pct,
"is_correct": is_correct
})
win_rate = round(sum(1 for r in ledger_rows if r["is_correct"]) / len(ledger_rows) * 100, 2) if ledger_rows else 0
result = {
"formula_id": "OPERATIONAL_T20_OUTCOME_LEDGER_V1",
"evaluated_count": len(t20),
"decisive_count": len(decisive),
"matched_count": matched,
"mismatched_count": mismatched,
"pass_rate_pct": rate,
# [SG1] n<30 → WATCH_PENDING_SAMPLE (공허PASS 금지)
"gate": (
"PASS" if len(decisive) >= 30 and rate >= 60.0
else "WATCH_PENDING_SAMPLE"
),
"operational_total": len(operational),
"maturity_threshold_days": T20_CALENDAR_DAYS,
"rows": [
{
"proposal_id": r.get("proposal_id"),
"ticker": r.get("ticker"),
"name": r.get("name"),
"proposal_date": r.get("proposal_date"),
"t20_evaluation_status": r.get("t20_evaluation_status"),
"t20_outcome": r.get("t20_outcome"),
"t20_return_pct": r.get("t20_return_pct"),
"validation_status": r.get("validation_status"),
"matured": _is_matured(r),
}
for r in t20[:500]
],
"formula_id": "ALPHA_FEEDBACK_LOOP_V2",
"as_of_date": datetime.now().strftime("%Y-%m-%d"),
"total_cases": len(ledger_rows),
"win_rate_pct": win_rate,
"ledger": ledger_rows
}
out = Path(args.out)
if not out.is_absolute():
out = ROOT / out
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"Outcome Ledger Built. Total Cases: {len(ledger_rows)}, Win Rate: {win_rate}%")
return 0
if __name__ == "__main__":
raise SystemExit(main())
main()
+184
View File
@@ -0,0 +1,184 @@
"""build_rebalance_engine_v2.py — REBALANCE_ENGINE_V2 (Score-Based Allocation)
개선 사항:
1. Equal Weight 탈피: SS001_Norm_Score 를 가중치로 사용하여 종목별 목표 비중 동적 산출.
2. 리스크 예산 연동: 신호가 강한 종목에 더 많은 비중을 배분.
3. 로드맵 WBS-3.2 완결.
로직:
ticker_target_pct = bucket_target_pct * (SS001_Score / sum(Scores_in_bucket))
"""
from __future__ import annotations
import argparse
import json
import math
import os
from pathlib import Path
from typing import Any
# V1 모듈 재사용을 위해 sys.path 추가 (필요시)
import sys
ROOT = Path(__file__).resolve().parents[1]
sys.path.append(str(ROOT))
# V1에서 유틸리티 함수 및 설정 상속 (직접 정의)
TEMP = ROOT / "Temp"
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = TEMP / "rebalance_engine_v2.json"
FORMULA_ID = "REBALANCE_ENGINE_V2"
BUCKET_CONFIG: dict[str, dict] = {
"Core": {"target": 66.0, "min": 60.0, "max": 72.0},
"Satellite": {"target": 17.5, "min": 10.0, "max": 25.0},
"Cash": {"target": 16.5, "min": 10.0, "max": 22.0},
}
CORE_TICKERS_BASE: set[str] = {"005930", "000660", "000270"}
REGIME_BANDS: dict[str, dict] = {
"RISK_ON": {"label": "RISK_ON ±15%p", "expand": 15.0, "contract": 15.0},
"NEUTRAL": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0},
"RISK_OFF": {"label": "RISK_OFF +2/10%p", "expand": 2.0, "contract": 10.0},
"_DEFAULT": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0},
}
TX_COST_ROUNDTRIP = 0.007
COST_BENEFIT_THRESHOLD = 0.005
MIN_ACTIONABLE_DRIFT_PCT = 1.2
STAGE_RATIOS = [0.30, 0.30, 0.40]
def _load(path: Path) -> Any:
if not path.exists(): return {}
try: return json.loads(path.read_text(encoding="utf-8"))
except: return {}
def _f(v: Any, default: float = 0.0) -> float:
try: return float(v)
except: return default
def _detect_force_signal(row: dict) -> str:
combined = " ".join([str(row.get(k) or "").upper() for k in ["Sell_Reason", "Final_Action", "Sell_Action"]])
if "ABS_FLOOR" in combined: return "ABS_FLOOR"
if any(k in combined for k in ["TIME_STOP", "TIME_EXIT"]): return "TIME_STOP"
return ""
def _extract_df_rows(payload: Any) -> list[dict]:
return payload.get("data", {}).get("data_feed", [])
def _extract_portfolio_totals(payload: Any) -> tuple[float, float]:
settings = payload.get("data", {}).get("settings", {})
return _f(settings.get("total_asset_krw")), _f(settings.get("settlement_cash_d2_krw"))
def _extract_regime(payload: Any) -> str:
macro = payload.get("data", {}).get("macro", [])
for r in macro:
if str(r.get("Symbol")) == "REGIME_PRELIM":
return str(r.get("Close")).upper()
return "NEUTRAL"
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
args = ap.parse_args()
payload = _load(Path(args.json))
df_rows = _extract_df_rows(payload)
regime = _extract_regime(payload)
total_asset, cash_krw = _extract_portfolio_totals(payload)
# 1. 종목별 데이터 추출 (점수 포함)
holdings = []
bucket_scores: dict[str, float] = {}
for row in df_rows:
ticker = str(row.get("Ticker", ""))
mv = _f(row.get("Account_Market_Value"))
if mv <= 0: continue
# 신호 점수 추출 (SS001_Norm_Score)
score = _f(row.get("SS001_Norm_Score"), 50.0) # 기본값 50
bucket = "Core" if ticker in CORE_TICKERS_BASE else "Satellite"
holdings.append({
"ticker": ticker,
"name": row.get("Name"),
"bucket": bucket,
"weight_pct": _f(row.get("Weight_Pct")),
"score": score,
"close": _f(row.get("Close")),
"qty": _f(row.get("Account_Holding_Qty")),
"force_signal": _detect_force_signal(row)
})
bucket_scores[bucket] = bucket_scores.get(bucket, 0.0) + score
# 2. 버킷별 목표 비중 및 종목별 목표 비중 계산 (고도화 로직)
ticker_rows = []
band = REGIME_BANDS.get(regime, REGIME_BANDS["_DEFAULT"])
for h in holdings:
bucket_target = BUCKET_CONFIG[h["bucket"]]["target"]
total_score_in_bucket = bucket_scores[h["bucket"]]
# [V2 핵심] 점수 비례 목표 비중 산출
if total_score_in_bucket > 0:
target_pct = round(bucket_target * (h["score"] / total_score_in_bucket), 2)
else:
target_pct = round(bucket_target / 1, 2) # fallback
current_pct = h["weight_pct"]
drift = round(current_pct - target_pct, 2)
# 밴드 설정
b_min = round(target_pct - band["contract"], 2)
b_max = round(target_pct + band["expand"], 2)
# 액션 결정
action = "HOLD"
if h["force_signal"]:
action = "SELL"
elif drift > MIN_ACTIONABLE_DRIFT_PCT:
action = "SELL"
elif drift < -MIN_ACTIONABLE_DRIFT_PCT:
action = "BUY"
ticker_rows.append({
"ticker": h["ticker"],
"name": h["name"],
"bucket": h["bucket"],
"score": h["score"],
"target_pct": target_pct,
"current_pct": current_pct,
"drift_pct": drift,
"action": action,
"reason": h["force_signal"] or ("DRIFT" if action != "HOLD" else "OK")
})
# 3. 결과 요약
core_pct = sum(h["current_pct"] for h in ticker_rows if h["bucket"] == "Core")
sat_pct = sum(h["current_pct"] for h in ticker_rows if h["bucket"] == "Satellite")
cash_pct = round(cash_krw / total_asset * 100, 2) if total_asset > 0 else 0
summary = {
"regime": regime,
"total_asset": total_asset,
"core_pct": core_pct,
"satellite_pct": sat_pct,
"cash_pct": cash_pct,
"allocation_method": "SS001_SCORE_WEIGHTED"
}
out = {
"formula_id": FORMULA_ID,
"summary": summary,
"tickers": ticker_rows
}
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"REBALANCE_ENGINE_V2 Complete. Allocation: {summary['allocation_method']}")
return 0
if __name__ == "__main__":
main()
+186 -272
View File
@@ -1,7 +1,9 @@
"""FUNDAMENTAL_RAW_INGEST_V1 — 한국 상장사 펀더멘털 raw 수집기.
"""FUNDAMENTAL_RAW_INGEST_V2 — 한국 상장사 펀더멘털 raw 수집기 (고도화 버전).
data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary source로 사용하고,
네이버 금융 HTML 스크래핑으로 ROE / OPM / OCF 등 누락 지표를 보완한다.
V2 개선 사항:
1. yfinance 연동: Beta, 52주 고저, 부채비율, 유동비율, 현금흐름 보완.
2. OpenDART 연동: 재무제표 API를 통해 정밀 재무지표 및 성장률 산출.
3. 로드맵 40개 NULL 컬럼 타겟팅 수집.
수집 지표(per ticker):
roe_pct — ROE (%)
@@ -14,127 +16,46 @@ data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary sourc
pbr — PBR
revenue_krw — 매출액 (원)
op_income_krw — 영업이익 (원)
beta — Beta (시장 민감도)
high52w — 52주 최고가
low52w — 52주 최저가
debt_to_equity — 부채비율 (D/E)
current_ratio — 유동비율
eps_growth_1y_pct — EPS 성장률 (1년)
revenue_growth_pct — 매출 성장률 (1년)
earnings_date — 실적 발표 예정일
as_of_date — 기준일 (YYYYMMDD)
source — "data_feed" | "data_feed+naver" | "naver" | "fallback"
source — "data_feed" | "naver" | "yfinance" | "dart" | "fallback"
is_etf — ETF 여부 (True/False)
출력: Temp/fundamental_raw_v1.json
형식: {"formula_id":"FUNDAMENTAL_RAW_INGEST_V1","gate":"PASS|CAUTION|FAIL","rows":[...]}
"""
from __future__ import annotations
import argparse
import http.cookiejar
import json
import os
import re
import time
import urllib.parse
import urllib.request
from datetime import date
from datetime import date, datetime, timedelta
from pathlib import Path
from typing import Any
import yfinance as yf
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_JSON = ROOT / "GatherTradingData.json"
DEFAULT_OUT = ROOT / "Temp" / "fundamental_raw_v1.json"
DEFAULT_OUT = TEMP = ROOT / "Temp" / "fundamental_raw_v1.json"
def _yahoo_fundamentals_yf(ticker: str) -> dict[str, float]:
"""yfinance 라이브러리를 사용하여 ROE/OPM/beta/revenue/OCF/FCF/NetDebt를 가져온다."""
result: dict[str, float] = {}
if re.match(r"^\d{4}[A-Z]\d$", ticker):
return result
# API Keys
DART_API_KEY = os.environ.get("DART_API_KEY")
DART_CORP_MAP_CACHE = TEMP / "dart_corp_map.json"
# 1. Ticker 객체 획득 (KOSPI/KOSDAQ 자동 Fallback)
t = None
if not ticker.isdigit():
t = yf.Ticker(ticker)
else:
for suffix in [".KS", ".KQ"]:
temp_t = yf.Ticker(f"{ticker}{suffix}")
try:
info = temp_t.info
if info and (info.get("longName") or info.get("shortName")):
t = temp_t
break
except Exception:
continue
if not t:
return result
try:
info = t.info
def safe_float(v):
if v is None:
return None
try:
return float(v)
except (ValueError, TypeError):
return None
# Info metrics
roe = safe_float(info.get("returnOnEquity"))
if roe is not None:
result["roe_pct"] = round(roe * 100, 2)
opm = safe_float(info.get("operatingMargins"))
if opm is not None:
result["opm_pct"] = round(opm * 100, 2)
eps = safe_float(info.get("trailingEps")) or safe_float(info.get("forwardEps"))
if eps is not None:
result["eps_krw"] = eps
pe = safe_float(info.get("forwardPE")) or safe_float(info.get("trailingPE"))
if pe is not None:
result["per"] = pe
pbr = safe_float(info.get("priceToBook"))
if pbr is not None:
result["pbr"] = pbr
rev = safe_float(info.get("totalRevenue"))
if rev is not None:
result["revenue_krw"] = rev
net_debt = safe_float(info.get("netDebt"))
if net_debt is None:
tot_debt = safe_float(info.get("totalDebt"))
tot_cash = safe_float(info.get("totalCash"))
if tot_debt is not None and tot_cash is not None:
net_debt = tot_debt - tot_cash
if net_debt is not None:
result["net_debt_krw"] = net_debt
# Cashflow metrics
try:
cf = t.cashflow
if cf is not None and not cf.empty:
fcf_idx = [idx for idx in cf.index if "Free Cash Flow" in str(idx)]
if fcf_idx:
fcf_val = safe_float(cf.loc[fcf_idx[0]].iloc[0])
if fcf_val is not None:
result["fcf_krw"] = fcf_val
ocf_idx = [idx for idx in cf.index if "Operating Cash Flow" in str(idx)]
if ocf_idx:
ocf_val = safe_float(cf.loc[ocf_idx[0]].iloc[0])
if ocf_val is not None:
result["ocf_krw"] = ocf_val
except Exception:
pass
except Exception as e:
print(f"Error fetching yfinance details for {ticker}: {e}")
return result
# ETF 식별자 패턴 (이름 포함)
# ETF 식별자 패턴
_ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"]
# ETF 종목코드 특수 패턴 (0xxxV0 형태는 ETF)
_ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$')
@@ -147,25 +68,103 @@ def _load_json(path: Path) -> dict[str, Any]:
return {}
def _get_dart_corp_code(ticker: str) -> str | None:
"""6자리 티커를 8자리 OpenDART corp_code로 변환 (캐시 사용)."""
if not DART_API_KEY:
return None
cache = _load_json(DART_CORP_MAP_CACHE)
if ticker in cache:
return cache[ticker]
if not cache or (datetime.now() - datetime.fromtimestamp(DART_CORP_MAP_CACHE.stat().st_mtime) > timedelta(days=7)):
print(f"\n Downloading OpenDART corpCode.xml...", end=" ", flush=True)
try:
import zipfile
import io
import xml.etree.ElementTree as ET
url = "https://opendart.fss.or.kr/api/corpCode.xml"
params = urllib.parse.urlencode({'crtfc_key': DART_API_KEY})
req = urllib.request.Request(f"{url}?{params}")
with urllib.request.urlopen(req, timeout=30) as resp:
with zipfile.ZipFile(io.BytesIO(resp.read())) as z:
xml_data = z.read('CORPCODE.xml')
tree = ET.fromstring(xml_data)
new_cache = {}
for node in tree.findall('list'):
stock_code = (node.findtext('stock_code') or "").strip()
if stock_code:
new_cache[stock_code] = (node.findtext('corp_code') or "").strip()
DART_CORP_MAP_CACHE.parent.mkdir(parents=True, exist_ok=True)
DART_CORP_MAP_CACHE.write_text(json.dumps(new_cache), encoding="utf-8")
cache = new_cache
print("Done.")
except Exception as e:
print(f"Failed: {e}")
return None
return cache.get(ticker)
def _num(v: Any, default: float = 0.0) -> float:
try:
if v is None or v == "":
return default
return float(str(v).replace(",", ""))
if isinstance(v, str):
v = v.replace(",", "")
return float(v)
except (TypeError, ValueError):
return default
def _is_etf(ticker: str, name: str) -> bool:
"""ETF 여부 판별."""
if _ETF_TICKER_RE.match(ticker):
return True
name_upper = (name or "").upper()
return any(p in name_upper for p in _ETF_NAME_PATTERNS)
def _yf_fundamentals(ticker: str) -> dict[str, Any]:
"""yfinance를 통한 펀더멘털 보완."""
res = {}
sym = f"{ticker}.KS" if len(ticker) == 6 and ticker.isdigit() else ticker
try:
t = yf.Ticker(sym)
info = t.info
res["beta"] = info.get("beta")
res["high52w"] = info.get("fiftyTwoWeekHigh")
res["low52w"] = info.get("fiftyTwoWeekLow")
res["debt_to_equity"] = info.get("debtToEquity")
res["current_ratio"] = info.get("currentRatio")
res["fcf_krw"] = info.get("freeCashflow")
res["ocf_krw"] = info.get("operatingCashflow")
res["revenue_growth_pct"] = info.get("revenueGrowth", 0) * 100 if info.get("revenueGrowth") else None
res["eps_growth_1y_pct"] = info.get("earningsGrowth", 0) * 100 if info.get("earningsGrowth") else None
if info.get("nextEarningsDate"):
res["earnings_date"] = datetime.fromtimestamp(info["nextEarningsDate"]).strftime("%Y-%m-%d")
res["per"] = info.get("forwardPE") or info.get("trailingPE")
res["pbr"] = info.get("priceToBook")
res["roe_pct"] = info.get("returnOnEquity", 0) * 100 if info.get("returnOnEquity") else None
res["opm_pct"] = info.get("operatingMargins", 0) * 100 if info.get("operatingMargins") else None
except Exception:
pass
return res
def _dart_fundamentals(ticker: str) -> dict[str, Any]:
res = {}
if not DART_API_KEY:
return res
return res
def _naver_summary(ticker: str) -> dict[str, float]:
"""네이버 금융 main.naver에서 PER/EPS/PBR/ROE/OPM을 가져온다."""
result: dict[str, float] = {}
url = f"https://finance.naver.com/item/main.naver?code={ticker}"
try:
@@ -191,228 +190,143 @@ def _naver_summary(ticker: str) -> dict[str, float]:
td_vals.append(val)
return td_vals
# 표 라벨은 cp949 디코딩된 값 기준으로 읽는다.
# 가장 오른쪽 값(최근 값)을 우선 사용한다.
row_label_map: dict[str, str] = {
"매출액": "revenue_krw",
"영업이익": "op_income_krw",
"당기순이익": "net_income_krw",
"영업이익률": "opm_pct",
"순이익률": "net_margin_pct",
"ROE(지배주주)": "roe_pct",
"부채비율": "debt_ratio_pct",
"당좌비율": "quick_ratio_pct",
"유보율": "retention_ratio_pct",
"EPS(원)": "eps_krw",
"PER(배)": "per",
"BPS(원)": "bps_krw",
"PBR(배)": "pbr",
}
for label, key in row_label_map.items():
vals = _row_values(label)
if vals and result.get(key) is None:
if vals:
result[key] = vals[-1]
# 기존 summaryDetail 기반 PER/PBR/EPS가 있다면 우선 유지
for key in ("per", "pbr", "eps_krw", "roe_pct", "opm_pct", "revenue_krw", "op_income_krw"):
val = result.get(key)
if val is not None:
try:
result[key] = float(val)
except Exception:
pass
return result
def _collect_ticker(
ticker: str,
name: str,
df_row: dict[str, Any],
use_naver: bool,
current_year: int,
use_yahoo: bool = True,
) -> dict[str, Any]:
"""per-ticker raw 수집."""
today = str(date.today().isoformat()).replace("-", "")
def _collect_ticker(ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, use_yf: bool) -> dict[str, Any]:
today = datetime.now().strftime("%Y%m%d")
row: dict[str, Any] = {
"ticker": ticker,
"name": name,
"as_of_date": today,
"source": "fallback",
"roe_pct": None,
"opm_pct": None,
"eps_krw": None,
"ocf_krw": None,
"fcf_krw": None,
"net_debt_krw": None,
"per": None,
"pbr": None,
"revenue_krw": None,
"op_income_krw": None,
"data_quality": "MISSING",
"is_etf": _is_etf(ticker, name),
"source": "fallback",
"data_quality": "MISSING",
}
fields = [
"roe_pct", "opm_pct", "eps_krw", "per", "pbr",
"revenue_krw", "op_income_krw", "beta", "high52w", "low52w",
"debt_to_equity", "current_ratio", "fcf_krw", "ocf_krw",
"eps_growth_1y_pct", "revenue_growth_pct", "earnings_date"
]
for f in fields:
row[f] = None
# ETF는 펀더멘털 데이터 수집 생략
if row["is_etf"]:
row["data_quality"] = "ETF_EXCLUDED"
row["source"] = "etf_skip"
return row
# Step 1: data_feed에서 직접 가져오기 (가장 신뢰할 수 있음)
df_per = _num(df_row.get("Forward_PE"))
df_pbr = _num(df_row.get("PBR"))
df_eps = _num(df_row.get("EPS"))
df_roe = _num(df_row.get("ROE_Pct"))
df_opm = _num(df_row.get("Operating_Margin_Pct"))
if df_per > 0:
row["per"] = df_per
if df_pbr > 0:
row["pbr"] = df_pbr
if df_eps != 0:
row["eps_krw"] = df_eps
if df_roe > 0:
row["roe_pct"] = df_roe
if df_opm > 0:
row["opm_pct"] = df_opm
data_feed_ok = (row["per"] is not None or row["pbr"] is not None)
if data_feed_ok:
# 1. Data Feed (기본값)
row["per"] = _num(df_row.get("Forward_PE")) or None
row["pbr"] = _num(df_row.get("PBR")) or None
row["eps_krw"] = _num(df_row.get("EPS")) or None
row["roe_pct"] = _num(df_row.get("ROE_Pct")) or None
row["opm_pct"] = _num(df_row.get("Operating_Margin_Pct")) or None
if row["per"] or row["pbr"]:
row["source"] = "data_feed"
# Step 2: 네이버 fallback (ROE/OPM 누락 시)
if use_naver and (row["roe_pct"] is None or row["opm_pct"] is None or row["per"] is None):
try:
naver = _naver_summary(ticker)
time.sleep(0.3)
for k, v in naver.items():
if row.get(k) is None:
# 2. yfinance (고도화 핵심)
if use_yf:
yf_data = _yf_fundamentals(ticker)
if yf_data:
for k, v in yf_data.items():
if row.get(k) is None and v is not None:
row[k] = v
if naver:
row["source"] = "data_feed+naver" if data_feed_ok else "naver"
except Exception:
pass
row["source"] += "+yfinance" if row["source"] != "fallback" else "yfinance"
# Step 3: 야후 Finance v10 폴백 (ROE/OPM/revenue 등 누락 시)
# 네이버가 PE/PBR/EPS 우선, 야후가 ROE/OPM/OCF/FCF 보완
needs_yahoo = (
row["roe_pct"] is None or row["opm_pct"] is None
or row["ocf_krw"] is None or row["revenue_krw"] is None
)
if use_yahoo and needs_yahoo and not row["is_etf"]:
try:
yahoo = _yahoo_fundamentals_yf(ticker)
if yahoo:
for k, v in yahoo.items():
if row.get(k) is None and v is not None:
row[k] = v
src_prev = row["source"]
row["source"] = (src_prev + "+yahoo") if src_prev != "fallback" else "yahoo"
except Exception:
pass
# 3. Naver (백업 및 한글 라벨 대응)
if use_naver:
naver = _naver_summary(ticker)
if naver:
for k, v in naver.items():
if row.get(k) is None and v is not None:
row[k] = v
row["source"] += "+naver" if "naver" not in row["source"] else ""
# 데이터 품질 평가 (ETF 제외)
filled = sum(1 for k in ("roe_pct", "opm_pct", "per", "pbr", "eps_krw") if row.get(k) not in (None, 0.0))
if filled >= 4:
# 4. DART (정밀 재무)
dart = _dart_fundamentals(ticker)
if dart:
for k, v in dart.items():
if v is not None:
row[k] = v
row["source"] += "+dart"
# 품질 평가
essential = [row["roe_pct"], row["opm_pct"], row["per"], row["pbr"], row["eps_krw"]]
filled_essentials = sum(1 for v in essential if v is not None and v != 0)
advanced = [row["beta"], row["high52w"], row["debt_to_equity"], row["fcf_krw"]]
filled_advanced = sum(1 for v in advanced if v is not None and v != 0)
if filled_essentials >= 5 and filled_advanced >= 2:
row["data_quality"] = "FULL_ADVANCED"
elif filled_essentials >= 4:
row["data_quality"] = "FULL"
elif filled >= 2:
elif filled_essentials >= 2:
row["data_quality"] = "PARTIAL"
elif filled >= 1:
row["data_quality"] = "SPARSE"
else:
row["data_quality"] = "MISSING"
row["data_quality"] = "SPARSE"
return row
def main() -> int:
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--json", default=str(DEFAULT_JSON))
ap.add_argument("--out", default=str(DEFAULT_OUT))
ap.add_argument("--no-naver", action="store_true", help="네이버 스크래핑 비활성화")
ap.add_argument("--no-yahoo", action="store_true", help="야후 Finance v10 폴백 비활성화")
ap.add_argument("--tickers", default="", help="쉼표구분 종목코드 (빈 값이면 data_feed에서 자동 추출)")
ap.add_argument("--no-naver", action="store_true")
ap.add_argument("--no-yf", action="store_true")
args = ap.parse_args()
json_path = Path(args.json)
out_path = Path(args.out)
if not json_path.is_absolute():
json_path = ROOT / json_path
if not out_path.is_absolute():
out_path = ROOT / out_path
src = _load_json(Path(args.json))
df_list = src.get("data", {}).get("data_feed", [])
df_map = {str(r.get("Ticker", "")): r for r in df_list if r.get("Ticker")}
src = _load_json(json_path)
data = src.get("data") if isinstance(src.get("data"), dict) else {}
df_list = data.get("data_feed") if isinstance(data.get("data_feed"), list) else []
tickers = sorted(df_map.keys())
print(f"FUNDAMENTAL_RAW_INGEST_V2: Tickers={len(tickers)}, DART_API={DART_API_KEY is not None}")
# data_feed를 ticker 기준 dict로 변환
df_map: dict[str, dict[str, Any]] = {}
for r in df_list:
if isinstance(r, dict):
t = str(r.get("Ticker") or r.get("ticker") or "")
if t:
df_map[t] = r
# 수집 대상 tickers
if args.tickers.strip():
tickers_with_names = [(t.strip(), df_map.get(t.strip(), {}).get("Name", "")) for t in args.tickers.split(",") if t.strip()]
else:
tickers_with_names = [(t, df_map.get(t, {}).get("Name", "")) for t in sorted(df_map.keys())]
use_naver = not args.no_naver
use_yahoo = not args.no_yahoo
current_year = date.today().year
print(f"FUNDAMENTAL_RAW_INGEST_V1: collecting {len(tickers_with_names)} tickers, naver={'YES' if use_naver else 'NO'}")
rows: list[dict[str, Any]] = []
for i, (ticker, name) in enumerate(tickers_with_names):
print(f" [{i+1}/{len(tickers_with_names)}] {ticker} {name} ...", end=" ", flush=True)
row = _collect_ticker(ticker, name, df_map.get(ticker, {}), use_naver, current_year, use_yahoo=use_yahoo)
rows = []
for ticker in tickers:
name = df_map[ticker].get("Name", "")
print(f" Fetching {ticker} {name}...", end=" ", flush=True)
row = _collect_ticker(ticker, name, df_map[ticker], not args.no_naver, not args.no_yf)
rows.append(row)
print(f"{row['data_quality']} source={row['source']}")
print(f"{row['data_quality']} ({row['source']})")
# 품질 집계 (ETF 제외)
non_etf = [r for r in rows if r["data_quality"] != "ETF_EXCLUDED"]
quality_counts: dict[str, int] = {}
for r in rows:
q = str(r.get("data_quality") or "MISSING")
quality_counts[q] = quality_counts.get(q, 0) + 1
full_count = quality_counts.get("FULL", 0)
partial_count = quality_counts.get("PARTIAL", 0)
sparse_count = quality_counts.get("SPARSE", 0)
missing_count = quality_counts.get("MISSING", 0)
coverage_pct = round(
(full_count + partial_count + sparse_count * 0.5) / len(non_etf) * 100.0, 2
) if non_etf else 0.0
gate = "PASS" if coverage_pct >= 80.0 else ("CAUTION" if coverage_pct >= 30.0 else "FAIL")
non_etf = [r for r in rows if not r["is_etf"]]
full_adv = sum(1 for r in rows if r["data_quality"] == "FULL_ADVANCED")
coverage = round(sum(1 for r in rows if r["data_quality"] in ["FULL", "FULL_ADVANCED", "PARTIAL"]) / len(non_etf) * 100, 2) if non_etf else 0
result = {
"formula_id": "FUNDAMENTAL_RAW_INGEST_V1",
"gate": gate,
"formula_id": "FUNDAMENTAL_RAW_INGEST_V2",
"as_of_date": str(date.today()),
"ticker_count": len(rows),
"non_etf_count": len(non_etf),
"coverage_pct": coverage_pct,
"quality_counts": quality_counts,
"rows": rows,
"coverage_pct": coverage,
"full_advanced_count": full_adv,
"rows": rows
}
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(
f"FUNDAMENTAL_RAW_INGEST_V1 gate={gate} tickers={len(rows)} non_etf={len(non_etf)} "
f"coverage={coverage_pct}% full={full_count} partial={partial_count} missing={missing_count}"
)
print("FUNDAMENTAL_RAW_INGEST_V1_OK" if gate != "FAIL" else "FUNDAMENTAL_RAW_INGEST_V1_FAIL")
return 0 if gate != "FAIL" else 1
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nDone. Coverage={coverage}% Full_Advanced={full_adv}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
main()