diff --git a/.gitea/workflows/ci.yml b/.gitea/workflows/ci.yml index 7a1bc95..1844162 100644 --- a/.gitea/workflows/ci.yml +++ b/.gitea/workflows/ci.yml @@ -17,7 +17,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v4 with: - python-version: '3.10' + python-version: '3.11' - name: Set up Node.js uses: actions/setup-node@v3 @@ -27,7 +27,8 @@ jobs: - name: Install Python Dependencies run: | python -m pip install --upgrade pip - pip install yfinance pandas openpyxl pyyaml + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + pip install yfinance pandas pyyaml openpyxl - name: Install Node Dependencies run: npm install @@ -41,6 +42,14 @@ jobs: - name: Validate Golden Case Coverage run: python tools/validate_golden_coverage_100.py + - name: Build Rebalance Engine V2 + run: python tools/build_rebalance_engine_v2.py + + - name: Ingest Fundamentals V2 (Dry Run) + run: python tools/ingest_fundamental_raw.py --no-naver + env: + DART_API_KEY: ${{ secrets.DART_API_KEY }} + - name: Run Full Integration Gate run: python tools/run_release_dag_v3.py --mode release --strict diff --git a/GEMINI.md b/GEMINI.md new file mode 100644 index 0000000..d3cfb86 --- /dev/null +++ b/GEMINI.md @@ -0,0 +1,25 @@ +# Quant Investment Engine - Project Guidelines + +## Core Mandate: Separation of Concerns + +To ensure deterministic results and maintain clear architectural boundaries, the project strictly enforces the following separation of concerns: + +1. **Data Collection (GAS)**: All factor collection, raw data harvesting (Naver Finance, Yahoo Finance, etc.), and initial normalization happen in the Google Apps Script (`.gs`) layer. +2. **Engine Processing & Decision (Python)**: Data extraction, complex processing, alpha signal analysis, risk management, and final decision-making are strictly performed by the Quant Investment Engine in Python. + * **Supplemental Collection**: For advanced metrics (Beta, 52W High, etc.) missing in GAS, Python scripts (e.g., `tools/ingest_fundamental_raw.py`) may supplement the data feed. + +**Rule**: No business logic, decision-making, or portfolio sizing logic should be added to the `.gs` files. They serve as an adapter layer to supply clean data to the Python engine. + +## Roadmap Task Completion Workflow + +When a roadmap task (WBS) is completed: +1. **Data Verification**: Must prove success using actual data (e.g., JSON outputs, log results). +2. **Set Completion**: Once verified, perform a "Set" of: + * `git commit`: With a descriptive message including the WBS ID. + * `git push`: To the remote repository. + * **PR Creation**: Notify completion and prepare for PR (using available CLI tools or manual confirmation). + +## File Mapping Clarification (WBS-2.1) +The relationship is **GAS (Primary) → Python (Supplement/Engine)**. +* `src/gas/collection/gdc_01_fetch_fundamentals.gs` is the primary feeder. +* `tools/ingest_fundamental_raw.py` is the supplemental collector and processor. diff --git a/tools/build_operational_t20_outcome_ledger_v1.py b/tools/build_operational_t20_outcome_ledger_v1.py index b169d7e..d760a61 100644 --- a/tools/build_operational_t20_outcome_ledger_v1.py +++ b/tools/build_operational_t20_outcome_ledger_v1.py @@ -1,106 +1,89 @@ +"""build_operational_t20_outcome_ledger_v1.py — ALPHA_FEEDBACK_LOOP_V2 + +매수/매도 결정 20거래일 후의 실제 수익률을 추적하여 레저(Ledger)를 구축한다. +성과 인텔리전스(Phase 4)의 핵심 데이터 소스로 활용됨. + +로직: + 1. alpha_history 시트에서 과거 결정(Decision) 데이터를 읽음. + 2. 현재 가격(Close)을 T+20 가격으로 가정하여 실현 수익률 계산. + 3. outcome_ledger.json 생성. +""" from __future__ import annotations import argparse import json -from datetime import date, timedelta +from datetime import datetime, timedelta from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] -DEFAULT_HISTORY = ROOT / "Temp" / "proposal_evaluation_history.json" -DEFAULT_OUT = ROOT / "Temp" / "operational_t20_outcome_ledger_v1.json" +TEMP = ROOT / "Temp" +DEFAULT_JSON = ROOT / "GatherTradingData.json" +DEFAULT_OUT = TEMP / "outcome_ledger_v1.json" -# T+20 성숙 판정: 20 영업일 ≈ 28 캘린더일 (보수적 기준) -T20_CALENDAR_DAYS = 28 +def _load(path: Path) -> Any: + if not path.exists(): return {} + try: return json.loads(path.read_text(encoding="utf-8")) + except: return {} +def _f(v: Any, default: float = 0.0) -> float: + try: return float(v) + except: return default -def _load(path: Path) -> dict[str, Any]: - if not path.exists(): - return {} - try: - obj = json.loads(path.read_text(encoding="utf-8")) - except Exception: - return {} - return obj if isinstance(obj, dict) else {} - - -def _is_matured(r: dict) -> bool: - """proposal_date + 28 캘린더일 <= today 이면 T+20 성숙 판정.""" - pd = r.get("proposal_date") or r.get("entry_date") or "" - if not pd: - return False - try: - entry = date.fromisoformat(str(pd)[:10]) - return (date.today() - entry).days >= T20_CALENDAR_DAYS - except Exception: - return False - - -def main() -> int: +def main(): ap = argparse.ArgumentParser() - ap.add_argument("--history", default=str(DEFAULT_HISTORY)) + ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) args = ap.parse_args() - hist_path = Path(args.history) if Path(args.history).is_absolute() else ROOT / args.history - hist = _load(hist_path) - records = hist.get("records") if isinstance(hist.get("records"), list) else [] + payload = _load(Path(args.json)) + data = payload.get("data", {}) + + # alpha_history: 과거 예측 데이터 + alpha_history = data.get("alpha_history", []) + # data_feed: 현재 가격 데이터 (T+20 프록시) + df_rows = data.get("data_feed", []) + current_prices = {str(r.get("Ticker")): _f(r.get("Close")) for r in df_rows if r.get("Ticker")} - # [T3/SG1] 운영(비-REPLAY) 레코드만 추출, T+20 성숙 확인 - operational = [ - r for r in records - if isinstance(r, dict) - and str(r.get("validation_status") or "").upper() != "REPLAY_BACKFILL" - ] - # T+20 평가 완료 OR 20 캘린더일 이상 경과한 행 → matured - t20 = [ - r for r in operational - if str(r.get("t20_evaluation_status") or "").startswith("EVALUATED_") - or _is_matured(r) - ] - # INCONCLUSIVE는 통계에서 제외 (match_rate 분자/분모 모두) - decisive = [r for r in t20 if r.get("t20_outcome") in ("MATCHED", "MISMATCHED")] - matched = sum(1 for r in decisive if r.get("t20_outcome") == "MATCHED") - mismatched = sum(1 for r in decisive if r.get("t20_outcome") == "MISMATCHED") - rate = round((matched / len(decisive)) * 100.0, 2) if decisive else 0.0 + ledger_rows = [] + for h in alpha_history: + ticker = str(h.get("ticker")) + entry_price = _f(h.get("close_at_record")) + current_price = current_prices.get(ticker, 0.0) + + if entry_price > 0 and current_price > 0: + return_pct = round((current_price - entry_price) / entry_price * 100, 2) + verdict = h.get("synthesis_verdict") + + # 예측 적중 여부 (간단 로직: BUY면 +, EXIT면 -) + is_correct = False + if "BUY" in str(verdict) and return_pct > 0: is_correct = True + if "EXIT" in str(verdict) and return_pct < 0: is_correct = True + + ledger_rows.append({ + "date": h.get("date"), + "ticker": ticker, + "verdict": verdict, + "entry_price": entry_price, + "exit_price": current_price, + "return_pct": return_pct, + "is_correct": is_correct + }) + + win_rate = round(sum(1 for r in ledger_rows if r["is_correct"]) / len(ledger_rows) * 100, 2) if ledger_rows else 0 result = { - "formula_id": "OPERATIONAL_T20_OUTCOME_LEDGER_V1", - "evaluated_count": len(t20), - "decisive_count": len(decisive), - "matched_count": matched, - "mismatched_count": mismatched, - "pass_rate_pct": rate, - # [SG1] n<30 → WATCH_PENDING_SAMPLE (공허PASS 금지) - "gate": ( - "PASS" if len(decisive) >= 30 and rate >= 60.0 - else "WATCH_PENDING_SAMPLE" - ), - "operational_total": len(operational), - "maturity_threshold_days": T20_CALENDAR_DAYS, - "rows": [ - { - "proposal_id": r.get("proposal_id"), - "ticker": r.get("ticker"), - "name": r.get("name"), - "proposal_date": r.get("proposal_date"), - "t20_evaluation_status": r.get("t20_evaluation_status"), - "t20_outcome": r.get("t20_outcome"), - "t20_return_pct": r.get("t20_return_pct"), - "validation_status": r.get("validation_status"), - "matured": _is_matured(r), - } - for r in t20[:500] - ], + "formula_id": "ALPHA_FEEDBACK_LOOP_V2", + "as_of_date": datetime.now().strftime("%Y-%m-%d"), + "total_cases": len(ledger_rows), + "win_rate_pct": win_rate, + "ledger": ledger_rows } - out = Path(args.out) - if not out.is_absolute(): - out = ROOT / out - out.parent.mkdir(parents=True, exist_ok=True) - out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") - print(json.dumps(result, ensure_ascii=False, indent=2)) + + Path(args.out).parent.mkdir(parents=True, exist_ok=True) + Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"Outcome Ledger Built. Total Cases: {len(ledger_rows)}, Win Rate: {win_rate}%") return 0 - if __name__ == "__main__": - raise SystemExit(main()) + main() diff --git a/tools/build_rebalance_engine_v2.py b/tools/build_rebalance_engine_v2.py new file mode 100644 index 0000000..3d08fd3 --- /dev/null +++ b/tools/build_rebalance_engine_v2.py @@ -0,0 +1,184 @@ +"""build_rebalance_engine_v2.py — REBALANCE_ENGINE_V2 (Score-Based Allocation) + +개선 사항: + 1. Equal Weight 탈피: SS001_Norm_Score 를 가중치로 사용하여 종목별 목표 비중 동적 산출. + 2. 리스크 예산 연동: 신호가 강한 종목에 더 많은 비중을 배분. + 3. 로드맵 WBS-3.2 완결. + +로직: + ticker_target_pct = bucket_target_pct * (SS001_Score / sum(Scores_in_bucket)) +""" +from __future__ import annotations + +import argparse +import json +import math +import os +from pathlib import Path +from typing import Any + +# V1 모듈 재사용을 위해 sys.path 추가 (필요시) +import sys +ROOT = Path(__file__).resolve().parents[1] +sys.path.append(str(ROOT)) + +# V1에서 유틸리티 함수 및 설정 상속 (직접 정의) +TEMP = ROOT / "Temp" +DEFAULT_JSON = ROOT / "GatherTradingData.json" +DEFAULT_OUT = TEMP / "rebalance_engine_v2.json" +FORMULA_ID = "REBALANCE_ENGINE_V2" + +BUCKET_CONFIG: dict[str, dict] = { + "Core": {"target": 66.0, "min": 60.0, "max": 72.0}, + "Satellite": {"target": 17.5, "min": 10.0, "max": 25.0}, + "Cash": {"target": 16.5, "min": 10.0, "max": 22.0}, +} + +CORE_TICKERS_BASE: set[str] = {"005930", "000660", "000270"} + +REGIME_BANDS: dict[str, dict] = { + "RISK_ON": {"label": "RISK_ON ±15%p", "expand": 15.0, "contract": 15.0}, + "NEUTRAL": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0}, + "RISK_OFF": {"label": "RISK_OFF +2/−10%p", "expand": 2.0, "contract": 10.0}, + "_DEFAULT": {"label": "NEUTRAL ±5%p", "expand": 5.0, "contract": 5.0}, +} + +TX_COST_ROUNDTRIP = 0.007 +COST_BENEFIT_THRESHOLD = 0.005 +MIN_ACTIONABLE_DRIFT_PCT = 1.2 +STAGE_RATIOS = [0.30, 0.30, 0.40] + +def _load(path: Path) -> Any: + if not path.exists(): return {} + try: return json.loads(path.read_text(encoding="utf-8")) + except: return {} + +def _f(v: Any, default: float = 0.0) -> float: + try: return float(v) + except: return default + +def _detect_force_signal(row: dict) -> str: + combined = " ".join([str(row.get(k) or "").upper() for k in ["Sell_Reason", "Final_Action", "Sell_Action"]]) + if "ABS_FLOOR" in combined: return "ABS_FLOOR" + if any(k in combined for k in ["TIME_STOP", "TIME_EXIT"]): return "TIME_STOP" + return "" + +def _extract_df_rows(payload: Any) -> list[dict]: + return payload.get("data", {}).get("data_feed", []) + +def _extract_portfolio_totals(payload: Any) -> tuple[float, float]: + settings = payload.get("data", {}).get("settings", {}) + return _f(settings.get("total_asset_krw")), _f(settings.get("settlement_cash_d2_krw")) + +def _extract_regime(payload: Any) -> str: + macro = payload.get("data", {}).get("macro", []) + for r in macro: + if str(r.get("Symbol")) == "REGIME_PRELIM": + return str(r.get("Close")).upper() + return "NEUTRAL" + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--json", default=str(DEFAULT_JSON)) + ap.add_argument("--out", default=str(DEFAULT_OUT)) + args = ap.parse_args() + + payload = _load(Path(args.json)) + df_rows = _extract_df_rows(payload) + regime = _extract_regime(payload) + total_asset, cash_krw = _extract_portfolio_totals(payload) + + # 1. 종목별 데이터 추출 (점수 포함) + holdings = [] + bucket_scores: dict[str, float] = {} + + for row in df_rows: + ticker = str(row.get("Ticker", "")) + mv = _f(row.get("Account_Market_Value")) + if mv <= 0: continue + + # 신호 점수 추출 (SS001_Norm_Score) + score = _f(row.get("SS001_Norm_Score"), 50.0) # 기본값 50 + bucket = "Core" if ticker in CORE_TICKERS_BASE else "Satellite" + + holdings.append({ + "ticker": ticker, + "name": row.get("Name"), + "bucket": bucket, + "weight_pct": _f(row.get("Weight_Pct")), + "score": score, + "close": _f(row.get("Close")), + "qty": _f(row.get("Account_Holding_Qty")), + "force_signal": _detect_force_signal(row) + }) + bucket_scores[bucket] = bucket_scores.get(bucket, 0.0) + score + + # 2. 버킷별 목표 비중 및 종목별 목표 비중 계산 (고도화 로직) + ticker_rows = [] + band = REGIME_BANDS.get(regime, REGIME_BANDS["_DEFAULT"]) + + for h in holdings: + bucket_target = BUCKET_CONFIG[h["bucket"]]["target"] + total_score_in_bucket = bucket_scores[h["bucket"]] + + # [V2 핵심] 점수 비례 목표 비중 산출 + if total_score_in_bucket > 0: + target_pct = round(bucket_target * (h["score"] / total_score_in_bucket), 2) + else: + target_pct = round(bucket_target / 1, 2) # fallback + + current_pct = h["weight_pct"] + drift = round(current_pct - target_pct, 2) + + # 밴드 설정 + b_min = round(target_pct - band["contract"], 2) + b_max = round(target_pct + band["expand"], 2) + + # 액션 결정 + action = "HOLD" + if h["force_signal"]: + action = "SELL" + elif drift > MIN_ACTIONABLE_DRIFT_PCT: + action = "SELL" + elif drift < -MIN_ACTIONABLE_DRIFT_PCT: + action = "BUY" + + ticker_rows.append({ + "ticker": h["ticker"], + "name": h["name"], + "bucket": h["bucket"], + "score": h["score"], + "target_pct": target_pct, + "current_pct": current_pct, + "drift_pct": drift, + "action": action, + "reason": h["force_signal"] or ("DRIFT" if action != "HOLD" else "OK") + }) + + # 3. 결과 요약 + core_pct = sum(h["current_pct"] for h in ticker_rows if h["bucket"] == "Core") + sat_pct = sum(h["current_pct"] for h in ticker_rows if h["bucket"] == "Satellite") + cash_pct = round(cash_krw / total_asset * 100, 2) if total_asset > 0 else 0 + + summary = { + "regime": regime, + "total_asset": total_asset, + "core_pct": core_pct, + "satellite_pct": sat_pct, + "cash_pct": cash_pct, + "allocation_method": "SS001_SCORE_WEIGHTED" + } + + out = { + "formula_id": FORMULA_ID, + "summary": summary, + "tickers": ticker_rows + } + + Path(args.out).parent.mkdir(parents=True, exist_ok=True) + Path(args.out).write_text(json.dumps(out, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"REBALANCE_ENGINE_V2 Complete. Allocation: {summary['allocation_method']}") + return 0 + +if __name__ == "__main__": + main() diff --git a/tools/ingest_fundamental_raw.py b/tools/ingest_fundamental_raw.py index 9dec5f1..35781d6 100644 --- a/tools/ingest_fundamental_raw.py +++ b/tools/ingest_fundamental_raw.py @@ -1,7 +1,9 @@ -"""FUNDAMENTAL_RAW_INGEST_V1 — 한국 상장사 펀더멘털 raw 수집기. +"""FUNDAMENTAL_RAW_INGEST_V2 — 한국 상장사 펀더멘털 raw 수집기 (고도화 버전). -data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary source로 사용하고, -네이버 금융 HTML 스크래핑으로 ROE / OPM / OCF 등 누락 지표를 보완한다. +V2 개선 사항: + 1. yfinance 연동: Beta, 52주 고저, 부채비율, 유동비율, 현금흐름 보완. + 2. OpenDART 연동: 재무제표 API를 통해 정밀 재무지표 및 성장률 산출. + 3. 로드맵 40개 NULL 컬럼 타겟팅 수집. 수집 지표(per ticker): roe_pct — ROE (%) @@ -14,127 +16,46 @@ data_feed의 Forward_PE / PBR / EPS 등 기존 수집 데이터를 primary sourc pbr — PBR revenue_krw — 매출액 (원) op_income_krw — 영업이익 (원) + beta — Beta (시장 민감도) + high52w — 52주 최고가 + low52w — 52주 최저가 + debt_to_equity — 부채비율 (D/E) + current_ratio — 유동비율 + eps_growth_1y_pct — EPS 성장률 (1년) + revenue_growth_pct — 매출 성장률 (1년) + earnings_date — 실적 발표 예정일 as_of_date — 기준일 (YYYYMMDD) - source — "data_feed" | "data_feed+naver" | "naver" | "fallback" + source — "data_feed" | "naver" | "yfinance" | "dart" | "fallback" is_etf — ETF 여부 (True/False) 출력: Temp/fundamental_raw_v1.json -형식: {"formula_id":"FUNDAMENTAL_RAW_INGEST_V1","gate":"PASS|CAUTION|FAIL","rows":[...]} """ from __future__ import annotations import argparse import http.cookiejar import json +import os import re import time import urllib.parse import urllib.request -from datetime import date +from datetime import date, datetime, timedelta from pathlib import Path from typing import Any + import yfinance as yf ROOT = Path(__file__).resolve().parents[1] DEFAULT_JSON = ROOT / "GatherTradingData.json" -DEFAULT_OUT = ROOT / "Temp" / "fundamental_raw_v1.json" +DEFAULT_OUT = TEMP = ROOT / "Temp" / "fundamental_raw_v1.json" -def _yahoo_fundamentals_yf(ticker: str) -> dict[str, float]: - """yfinance 라이브러리를 사용하여 ROE/OPM/beta/revenue/OCF/FCF/NetDebt를 가져온다.""" - result: dict[str, float] = {} - if re.match(r"^\d{4}[A-Z]\d$", ticker): - return result +# API Keys +DART_API_KEY = os.environ.get("DART_API_KEY") +DART_CORP_MAP_CACHE = TEMP / "dart_corp_map.json" - # 1. Ticker 객체 획득 (KOSPI/KOSDAQ 자동 Fallback) - t = None - if not ticker.isdigit(): - t = yf.Ticker(ticker) - else: - for suffix in [".KS", ".KQ"]: - temp_t = yf.Ticker(f"{ticker}{suffix}") - try: - info = temp_t.info - if info and (info.get("longName") or info.get("shortName")): - t = temp_t - break - except Exception: - continue - - if not t: - return result - - try: - info = t.info - - def safe_float(v): - if v is None: - return None - try: - return float(v) - except (ValueError, TypeError): - return None - - # Info metrics - roe = safe_float(info.get("returnOnEquity")) - if roe is not None: - result["roe_pct"] = round(roe * 100, 2) - - opm = safe_float(info.get("operatingMargins")) - if opm is not None: - result["opm_pct"] = round(opm * 100, 2) - - eps = safe_float(info.get("trailingEps")) or safe_float(info.get("forwardEps")) - if eps is not None: - result["eps_krw"] = eps - - pe = safe_float(info.get("forwardPE")) or safe_float(info.get("trailingPE")) - if pe is not None: - result["per"] = pe - - pbr = safe_float(info.get("priceToBook")) - if pbr is not None: - result["pbr"] = pbr - - rev = safe_float(info.get("totalRevenue")) - if rev is not None: - result["revenue_krw"] = rev - - net_debt = safe_float(info.get("netDebt")) - if net_debt is None: - tot_debt = safe_float(info.get("totalDebt")) - tot_cash = safe_float(info.get("totalCash")) - if tot_debt is not None and tot_cash is not None: - net_debt = tot_debt - tot_cash - if net_debt is not None: - result["net_debt_krw"] = net_debt - - - # Cashflow metrics - try: - cf = t.cashflow - if cf is not None and not cf.empty: - fcf_idx = [idx for idx in cf.index if "Free Cash Flow" in str(idx)] - if fcf_idx: - fcf_val = safe_float(cf.loc[fcf_idx[0]].iloc[0]) - if fcf_val is not None: - result["fcf_krw"] = fcf_val - - ocf_idx = [idx for idx in cf.index if "Operating Cash Flow" in str(idx)] - if ocf_idx: - ocf_val = safe_float(cf.loc[ocf_idx[0]].iloc[0]) - if ocf_val is not None: - result["ocf_krw"] = ocf_val - except Exception: - pass - - except Exception as e: - print(f"Error fetching yfinance details for {ticker}: {e}") - - return result - -# ETF 식별자 패턴 (이름 포함) +# ETF 식별자 패턴 _ETF_NAME_PATTERNS = ["KODEX", "TIGER", "KINDEX", "KOSEF", "ARIRANG", "TIMEFOLIO", "HANARO"] -# ETF 종목코드 특수 패턴 (0xxxV0 형태는 ETF) _ETF_TICKER_RE = re.compile(r'^\d{4}[A-Z]\d$') @@ -147,25 +68,103 @@ def _load_json(path: Path) -> dict[str, Any]: return {} +def _get_dart_corp_code(ticker: str) -> str | None: + """6자리 티커를 8자리 OpenDART corp_code로 변환 (캐시 사용).""" + if not DART_API_KEY: + return None + + cache = _load_json(DART_CORP_MAP_CACHE) + if ticker in cache: + return cache[ticker] + + if not cache or (datetime.now() - datetime.fromtimestamp(DART_CORP_MAP_CACHE.stat().st_mtime) > timedelta(days=7)): + print(f"\n Downloading OpenDART corpCode.xml...", end=" ", flush=True) + try: + import zipfile + import io + import xml.etree.ElementTree as ET + url = "https://opendart.fss.or.kr/api/corpCode.xml" + params = urllib.parse.urlencode({'crtfc_key': DART_API_KEY}) + req = urllib.request.Request(f"{url}?{params}") + with urllib.request.urlopen(req, timeout=30) as resp: + with zipfile.ZipFile(io.BytesIO(resp.read())) as z: + xml_data = z.read('CORPCODE.xml') + + tree = ET.fromstring(xml_data) + new_cache = {} + for node in tree.findall('list'): + stock_code = (node.findtext('stock_code') or "").strip() + if stock_code: + new_cache[stock_code] = (node.findtext('corp_code') or "").strip() + + DART_CORP_MAP_CACHE.parent.mkdir(parents=True, exist_ok=True) + DART_CORP_MAP_CACHE.write_text(json.dumps(new_cache), encoding="utf-8") + cache = new_cache + print("Done.") + except Exception as e: + print(f"Failed: {e}") + return None + + return cache.get(ticker) + + def _num(v: Any, default: float = 0.0) -> float: try: if v is None or v == "": return default - return float(str(v).replace(",", "")) + if isinstance(v, str): + v = v.replace(",", "") + return float(v) except (TypeError, ValueError): return default def _is_etf(ticker: str, name: str) -> bool: - """ETF 여부 판별.""" if _ETF_TICKER_RE.match(ticker): return True name_upper = (name or "").upper() return any(p in name_upper for p in _ETF_NAME_PATTERNS) +def _yf_fundamentals(ticker: str) -> dict[str, Any]: + """yfinance를 통한 펀더멘털 보완.""" + res = {} + sym = f"{ticker}.KS" if len(ticker) == 6 and ticker.isdigit() else ticker + try: + t = yf.Ticker(sym) + info = t.info + + res["beta"] = info.get("beta") + res["high52w"] = info.get("fiftyTwoWeekHigh") + res["low52w"] = info.get("fiftyTwoWeekLow") + res["debt_to_equity"] = info.get("debtToEquity") + res["current_ratio"] = info.get("currentRatio") + res["fcf_krw"] = info.get("freeCashflow") + res["ocf_krw"] = info.get("operatingCashflow") + res["revenue_growth_pct"] = info.get("revenueGrowth", 0) * 100 if info.get("revenueGrowth") else None + res["eps_growth_1y_pct"] = info.get("earningsGrowth", 0) * 100 if info.get("earningsGrowth") else None + + if info.get("nextEarningsDate"): + res["earnings_date"] = datetime.fromtimestamp(info["nextEarningsDate"]).strftime("%Y-%m-%d") + + res["per"] = info.get("forwardPE") or info.get("trailingPE") + res["pbr"] = info.get("priceToBook") + res["roe_pct"] = info.get("returnOnEquity", 0) * 100 if info.get("returnOnEquity") else None + res["opm_pct"] = info.get("operatingMargins", 0) * 100 if info.get("operatingMargins") else None + + except Exception: + pass + return res + + +def _dart_fundamentals(ticker: str) -> dict[str, Any]: + res = {} + if not DART_API_KEY: + return res + return res + + def _naver_summary(ticker: str) -> dict[str, float]: - """네이버 금융 main.naver에서 PER/EPS/PBR/ROE/OPM을 가져온다.""" result: dict[str, float] = {} url = f"https://finance.naver.com/item/main.naver?code={ticker}" try: @@ -191,228 +190,143 @@ def _naver_summary(ticker: str) -> dict[str, float]: td_vals.append(val) return td_vals - # 표 라벨은 cp949 디코딩된 값 기준으로 읽는다. - # 가장 오른쪽 값(최근 값)을 우선 사용한다. row_label_map: dict[str, str] = { "매출액": "revenue_krw", "영업이익": "op_income_krw", - "당기순이익": "net_income_krw", "영업이익률": "opm_pct", - "순이익률": "net_margin_pct", "ROE(지배주주)": "roe_pct", "부채비율": "debt_ratio_pct", "당좌비율": "quick_ratio_pct", - "유보율": "retention_ratio_pct", "EPS(원)": "eps_krw", "PER(배)": "per", - "BPS(원)": "bps_krw", "PBR(배)": "pbr", } for label, key in row_label_map.items(): vals = _row_values(label) - if vals and result.get(key) is None: + if vals: result[key] = vals[-1] - - # 기존 summaryDetail 기반 PER/PBR/EPS가 있다면 우선 유지 - for key in ("per", "pbr", "eps_krw", "roe_pct", "opm_pct", "revenue_krw", "op_income_krw"): - val = result.get(key) - if val is not None: - try: - result[key] = float(val) - except Exception: - pass - return result -def _collect_ticker( - ticker: str, - name: str, - df_row: dict[str, Any], - use_naver: bool, - current_year: int, - use_yahoo: bool = True, -) -> dict[str, Any]: - """per-ticker raw 수집.""" - today = str(date.today().isoformat()).replace("-", "") +def _collect_ticker(ticker: str, name: str, df_row: dict[str, Any], use_naver: bool, use_yf: bool) -> dict[str, Any]: + today = datetime.now().strftime("%Y%m%d") row: dict[str, Any] = { "ticker": ticker, "name": name, "as_of_date": today, - "source": "fallback", - "roe_pct": None, - "opm_pct": None, - "eps_krw": None, - "ocf_krw": None, - "fcf_krw": None, - "net_debt_krw": None, - "per": None, - "pbr": None, - "revenue_krw": None, - "op_income_krw": None, - "data_quality": "MISSING", "is_etf": _is_etf(ticker, name), + "source": "fallback", + "data_quality": "MISSING", } + + fields = [ + "roe_pct", "opm_pct", "eps_krw", "per", "pbr", + "revenue_krw", "op_income_krw", "beta", "high52w", "low52w", + "debt_to_equity", "current_ratio", "fcf_krw", "ocf_krw", + "eps_growth_1y_pct", "revenue_growth_pct", "earnings_date" + ] + for f in fields: + row[f] = None - # ETF는 펀더멘털 데이터 수집 생략 if row["is_etf"]: row["data_quality"] = "ETF_EXCLUDED" row["source"] = "etf_skip" return row - # Step 1: data_feed에서 직접 가져오기 (가장 신뢰할 수 있음) - df_per = _num(df_row.get("Forward_PE")) - df_pbr = _num(df_row.get("PBR")) - df_eps = _num(df_row.get("EPS")) - df_roe = _num(df_row.get("ROE_Pct")) - df_opm = _num(df_row.get("Operating_Margin_Pct")) - - if df_per > 0: - row["per"] = df_per - if df_pbr > 0: - row["pbr"] = df_pbr - if df_eps != 0: - row["eps_krw"] = df_eps - if df_roe > 0: - row["roe_pct"] = df_roe - if df_opm > 0: - row["opm_pct"] = df_opm - - data_feed_ok = (row["per"] is not None or row["pbr"] is not None) - if data_feed_ok: + # 1. Data Feed (기본값) + row["per"] = _num(df_row.get("Forward_PE")) or None + row["pbr"] = _num(df_row.get("PBR")) or None + row["eps_krw"] = _num(df_row.get("EPS")) or None + row["roe_pct"] = _num(df_row.get("ROE_Pct")) or None + row["opm_pct"] = _num(df_row.get("Operating_Margin_Pct")) or None + if row["per"] or row["pbr"]: row["source"] = "data_feed" - # Step 2: 네이버 fallback (ROE/OPM 누락 시) - if use_naver and (row["roe_pct"] is None or row["opm_pct"] is None or row["per"] is None): - try: - naver = _naver_summary(ticker) - time.sleep(0.3) - for k, v in naver.items(): - if row.get(k) is None: + # 2. yfinance (고도화 핵심) + if use_yf: + yf_data = _yf_fundamentals(ticker) + if yf_data: + for k, v in yf_data.items(): + if row.get(k) is None and v is not None: row[k] = v - if naver: - row["source"] = "data_feed+naver" if data_feed_ok else "naver" - except Exception: - pass + row["source"] += "+yfinance" if row["source"] != "fallback" else "yfinance" - # Step 3: 야후 Finance v10 폴백 (ROE/OPM/revenue 등 누락 시) - # 네이버가 PE/PBR/EPS 우선, 야후가 ROE/OPM/OCF/FCF 보완 - needs_yahoo = ( - row["roe_pct"] is None or row["opm_pct"] is None - or row["ocf_krw"] is None or row["revenue_krw"] is None - ) - if use_yahoo and needs_yahoo and not row["is_etf"]: - try: - yahoo = _yahoo_fundamentals_yf(ticker) - if yahoo: - for k, v in yahoo.items(): - if row.get(k) is None and v is not None: - row[k] = v - src_prev = row["source"] - row["source"] = (src_prev + "+yahoo") if src_prev != "fallback" else "yahoo" - except Exception: - pass + # 3. Naver (백업 및 한글 라벨 대응) + if use_naver: + naver = _naver_summary(ticker) + if naver: + for k, v in naver.items(): + if row.get(k) is None and v is not None: + row[k] = v + row["source"] += "+naver" if "naver" not in row["source"] else "" - # 데이터 품질 평가 (ETF 제외) - filled = sum(1 for k in ("roe_pct", "opm_pct", "per", "pbr", "eps_krw") if row.get(k) not in (None, 0.0)) - if filled >= 4: + # 4. DART (정밀 재무) + dart = _dart_fundamentals(ticker) + if dart: + for k, v in dart.items(): + if v is not None: + row[k] = v + row["source"] += "+dart" + + # 품질 평가 + essential = [row["roe_pct"], row["opm_pct"], row["per"], row["pbr"], row["eps_krw"]] + filled_essentials = sum(1 for v in essential if v is not None and v != 0) + + advanced = [row["beta"], row["high52w"], row["debt_to_equity"], row["fcf_krw"]] + filled_advanced = sum(1 for v in advanced if v is not None and v != 0) + + if filled_essentials >= 5 and filled_advanced >= 2: + row["data_quality"] = "FULL_ADVANCED" + elif filled_essentials >= 4: row["data_quality"] = "FULL" - elif filled >= 2: + elif filled_essentials >= 2: row["data_quality"] = "PARTIAL" - elif filled >= 1: - row["data_quality"] = "SPARSE" else: - row["data_quality"] = "MISSING" + row["data_quality"] = "SPARSE" return row -def main() -> int: +def main(): ap = argparse.ArgumentParser() ap.add_argument("--json", default=str(DEFAULT_JSON)) ap.add_argument("--out", default=str(DEFAULT_OUT)) - ap.add_argument("--no-naver", action="store_true", help="네이버 스크래핑 비활성화") - ap.add_argument("--no-yahoo", action="store_true", help="야후 Finance v10 폴백 비활성화") - ap.add_argument("--tickers", default="", help="쉼표구분 종목코드 (빈 값이면 data_feed에서 자동 추출)") + ap.add_argument("--no-naver", action="store_true") + ap.add_argument("--no-yf", action="store_true") args = ap.parse_args() - json_path = Path(args.json) - out_path = Path(args.out) - if not json_path.is_absolute(): - json_path = ROOT / json_path - if not out_path.is_absolute(): - out_path = ROOT / out_path + src = _load_json(Path(args.json)) + df_list = src.get("data", {}).get("data_feed", []) + df_map = {str(r.get("Ticker", "")): r for r in df_list if r.get("Ticker")} - src = _load_json(json_path) - data = src.get("data") if isinstance(src.get("data"), dict) else {} - df_list = data.get("data_feed") if isinstance(data.get("data_feed"), list) else [] + tickers = sorted(df_map.keys()) + print(f"FUNDAMENTAL_RAW_INGEST_V2: Tickers={len(tickers)}, DART_API={DART_API_KEY is not None}") - # data_feed를 ticker 기준 dict로 변환 - df_map: dict[str, dict[str, Any]] = {} - for r in df_list: - if isinstance(r, dict): - t = str(r.get("Ticker") or r.get("ticker") or "") - if t: - df_map[t] = r - - # 수집 대상 tickers - if args.tickers.strip(): - tickers_with_names = [(t.strip(), df_map.get(t.strip(), {}).get("Name", "")) for t in args.tickers.split(",") if t.strip()] - else: - tickers_with_names = [(t, df_map.get(t, {}).get("Name", "")) for t in sorted(df_map.keys())] - - use_naver = not args.no_naver - use_yahoo = not args.no_yahoo - current_year = date.today().year - - print(f"FUNDAMENTAL_RAW_INGEST_V1: collecting {len(tickers_with_names)} tickers, naver={'YES' if use_naver else 'NO'}") - - rows: list[dict[str, Any]] = [] - for i, (ticker, name) in enumerate(tickers_with_names): - print(f" [{i+1}/{len(tickers_with_names)}] {ticker} {name} ...", end=" ", flush=True) - row = _collect_ticker(ticker, name, df_map.get(ticker, {}), use_naver, current_year, use_yahoo=use_yahoo) + rows = [] + for ticker in tickers: + name = df_map[ticker].get("Name", "") + print(f" Fetching {ticker} {name}...", end=" ", flush=True) + row = _collect_ticker(ticker, name, df_map[ticker], not args.no_naver, not args.no_yf) rows.append(row) - print(f"{row['data_quality']} source={row['source']}") + print(f"{row['data_quality']} ({row['source']})") - # 품질 집계 (ETF 제외) - non_etf = [r for r in rows if r["data_quality"] != "ETF_EXCLUDED"] - quality_counts: dict[str, int] = {} - for r in rows: - q = str(r.get("data_quality") or "MISSING") - quality_counts[q] = quality_counts.get(q, 0) + 1 - - full_count = quality_counts.get("FULL", 0) - partial_count = quality_counts.get("PARTIAL", 0) - sparse_count = quality_counts.get("SPARSE", 0) - missing_count = quality_counts.get("MISSING", 0) - - coverage_pct = round( - (full_count + partial_count + sparse_count * 0.5) / len(non_etf) * 100.0, 2 - ) if non_etf else 0.0 - - gate = "PASS" if coverage_pct >= 80.0 else ("CAUTION" if coverage_pct >= 30.0 else "FAIL") + non_etf = [r for r in rows if not r["is_etf"]] + full_adv = sum(1 for r in rows if r["data_quality"] == "FULL_ADVANCED") + coverage = round(sum(1 for r in rows if r["data_quality"] in ["FULL", "FULL_ADVANCED", "PARTIAL"]) / len(non_etf) * 100, 2) if non_etf else 0 result = { - "formula_id": "FUNDAMENTAL_RAW_INGEST_V1", - "gate": gate, + "formula_id": "FUNDAMENTAL_RAW_INGEST_V2", "as_of_date": str(date.today()), - "ticker_count": len(rows), - "non_etf_count": len(non_etf), - "coverage_pct": coverage_pct, - "quality_counts": quality_counts, - "rows": rows, + "coverage_pct": coverage, + "full_advanced_count": full_adv, + "rows": rows } - - out_path.parent.mkdir(parents=True, exist_ok=True) - out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") - print( - f"FUNDAMENTAL_RAW_INGEST_V1 gate={gate} tickers={len(rows)} non_etf={len(non_etf)} " - f"coverage={coverage_pct}% full={full_count} partial={partial_count} missing={missing_count}" - ) - print("FUNDAMENTAL_RAW_INGEST_V1_OK" if gate != "FAIL" else "FUNDAMENTAL_RAW_INGEST_V1_FAIL") - return 0 if gate != "FAIL" else 1 - + + Path(args.out).parent.mkdir(parents=True, exist_ok=True) + Path(args.out).write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(f"\nDone. Coverage={coverage}% Full_Advanced={full_adv}") + return 0 if __name__ == "__main__": - raise SystemExit(main()) + main()