Files
QuantEngineByItz/src/quant_engine/macro_index_collection_v1.py
T
kjh2064 6d4ee39e04 WBS-7.3 F12/F13: distribution_risk 두 공식 역할 분리 확정(KEEP_BOTH)
GAS calcDistributionRiskRow_의 "THIN_ADAPTER: delegated to Python" 주석이
틀린 주석이었음을 발견 — GAS(DISTRIBUTION_RISK_SCORE_V1, 점수식 BUY 차단
게이트)와 Python calc_distribution_detector_per_ticker(DISTRIBUTION_SELL_DETECTOR_V1,
6신호 카운트, PRE_DISTRIBUTION_EARLY_WARNING 정밀도 보완)는 이미 spec에
서로 다른 고유 formula_id로 등록된 독립 공식이었다. "GAS가 Python의 중복"
이라는 ledger 전제가 거짓이었을 뿐, 코드는 원래부터 올바르게 분리돼 있었다.

사용자 결정(둘 다 유지, 역할 분리)에 따라:
- GAS 소스의 잘못된 주석 정정(gdf_03_portfolio_gates.gs) + 번들 재생성
- 양쪽 formula_registry에 상호 related_formula 참조 추가(향후 혼동 방지)
- governance/gas_logic_migration_ledger_v1.yaml: migration_action을
  DELETE_DISTRIBUTION_RISK_GAS → KEEP_BOTH_SEPARATE_ROLES로 변경, DONE
2026-06-22 02:29:50 +09:00

194 lines
7.4 KiB
Python

"""yfinance 기반 macro 인덱스 수집기 — GAS fetchYahooOhlcMetrics 계열의 Python/SQLite 대체.
사용자 요청(2026-06-22): "GAS 대신 Python이 수집해서 SQLite로 조회돼야 하는거 아니냐"
의 두 번째 트랙. data_feed(kis_data_collection_v1.py)에 이어, GatherTradingData.json
data.macro 시트의 원자료 13개 심볼(KOSPI/KOSDAQ/VIX/USD_KRW/USD_JPY/DXY/Gold/WTI_Oil/
US10Y_Yield/US30Y_Yield/SP500/NASDAQ100/HYG_HY_Bond)을 수집한다.
macro 시트의 나머지 9개 행(MRS_COMPUTED/REGIME_PRELIM/BAYESIAN_COMPUTED/TOTAL_HEAT/
FC_BUDGET/NET_RETURN_FEEDBACK/ORBIT_GAP/ORBIT_STATE/BUCKET_STATUS, category="Computed")은
포트폴리오 결정 로직의 산출값이며 외부 수집 대상이 아니다 — 이 모듈의 범위 밖이다
(data_feed의 SS001/AC/RW 계열과 같은 GAS 결정로직 이전 트랙, WBS-7.3 참조).
"""
from __future__ import annotations
import datetime as dt
import sys
import uuid
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
try:
import yfinance as yf # type: ignore
except Exception: # pragma: no cover - optional dependency
yf = None
from src.quant_engine.data_collection_store_v1 import (
CollectionRun,
append_collection_error,
upsert_collection_run,
upsert_collection_snapshot,
)
# GatherTradingData.json data.macro의 raw 수집 대상 13개 심볼(Symbol -> Name/Category).
# "Computed" category 9개 행(MRS_COMPUTED 등)은 의도적으로 제외한다.
MACRO_SYMBOLS: tuple[tuple[str, str, str], ...] = (
("^KS11", "KOSPI", "Index"),
("^KQ11", "KOSDAQ", "Index"),
("^VIX", "VIX", "Risk"),
("KRW=X", "USD_KRW", "FX"),
("JPY=X", "USD_JPY", "FX"),
("DX-Y.NYB", "DXY", "FX"),
("GC=F", "Gold", "Commodity"),
("CL=F", "WTI_Oil", "Commodity"),
("^TNX", "US10Y_Yield", "Bond"),
("^TYX", "US30Y_Yield", "Bond"),
("^GSPC", "SP500", "Index"),
("^NDX", "NASDAQ100", "Index"),
("HYG", "HYG_HY_Bond", "CreditProxy"),
)
def _kst_now_iso() -> str:
return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat()
def _avg(values: list[float]) -> float | None:
return round(sum(values) / len(values), 4) if values else None
def _ret_pct(closes: list[float], n: int) -> float | None:
"""closes[0]이 최신. n거래일전 종가 대비 수익률(%)."""
if len(closes) <= n or not closes[n]:
return None
return round((closes[0] / closes[n] - 1.0) * 100.0, 4)
def fetch_macro_symbol(symbol: str, name: str, category: str) -> dict[str, Any]:
"""yfinance에서 OHLC 히스토리를 받아 macro 시트 컬럼(Close/Ret1D~20D/MA20/MA60)을 산출."""
if yf is None:
return {"status": "DISABLED", "symbol": symbol, "name": name, "category": category}
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period="4mo") # ~85 거래일 — MA60/Ret20D 계산에 충분
if hist is None or hist.empty:
return {"status": "DATA_MISSING", "symbol": symbol, "name": name, "category": category}
closes = list(hist["Close"].iloc[::-1]) # 최신순으로 정렬(rows[0]=최신)
as_of = hist.index[-1]
result: dict[str, Any] = {
"status": "OK",
"symbol": symbol,
"name": name,
"category": category,
"close": round(float(closes[0]), 4),
"ret1d": _ret_pct(closes, 1),
"ret2d": _ret_pct(closes, 2),
"ret5d": _ret_pct(closes, 5),
"ret10d": _ret_pct(closes, 10),
"ret20d": _ret_pct(closes, 20),
"ma20": _avg(closes[:20]) if len(closes) >= 20 else None,
"ma60": _avg(closes[:60]) if len(closes) >= 60 else None,
"as_of_date": as_of.strftime("%Y-%m-%dT%H:%M:%S"),
}
return result
except Exception as exc: # noqa: BLE001 - per-symbol failure must not break the batch
return {"status": "ERROR", "symbol": symbol, "name": name, "category": category, "error": str(exc)}
def collect_macro_to_sqlite(*, sqlite_db: Path, symbols: tuple[tuple[str, str, str], ...] = MACRO_SYMBOLS) -> dict[str, Any]:
run_id = uuid.uuid4().hex
started_at = _kst_now_iso()
upsert_collection_run(
sqlite_db,
CollectionRun(
run_id=run_id,
collector_name="macro_index_collection_v1",
started_at=started_at,
status="RUNNING",
input_source="yfinance",
output_db_path=str(sqlite_db),
notes="macro 시트 raw 수집(GAS fetchYahooOhlcMetrics 대체)",
),
)
summary: dict[str, Any] = {
"formula_id": "MACRO_INDEX_COLLECTION_V1",
"run_id": run_id,
"started_at": started_at,
"sqlite_db": str(sqlite_db),
"row_count": len(symbols),
"errors": [],
"rows": [],
}
for symbol, name, category in symbols:
result = fetch_macro_symbol(symbol, name, category)
if result.get("status") in ("OK", "DATA_MISSING"):
upsert_collection_snapshot(
sqlite_db,
run_id=run_id,
dataset_name="macro",
ticker=symbol,
name=name,
sector=category,
as_of_date=result.get("as_of_date"),
source_priority="yfinance",
source_status=result.get("status", "UNKNOWN"),
payload=result,
provenance={"source": "yfinance", "symbol": symbol},
)
summary["rows"].append({"symbol": symbol, "name": name, "close": result.get("close"), "status": result.get("status")})
else:
error = {"symbol": symbol, "error": result.get("error", "unknown")}
summary["errors"].append(error)
append_collection_error(
sqlite_db,
run_id=run_id,
source_name="yfinance",
error_kind=result.get("status", "ERROR"),
error_message=str(result.get("error", "")),
ticker=symbol,
payload=result,
)
summary["finished_at"] = _kst_now_iso()
summary["status"] = "PASS" if not summary["errors"] else "PASS_WITH_WARNINGS"
upsert_collection_run(
sqlite_db,
CollectionRun(
run_id=run_id,
collector_name="macro_index_collection_v1",
started_at=started_at,
status=summary["status"],
input_source="yfinance",
output_db_path=str(sqlite_db),
notes="macro 시트 raw 수집(GAS fetchYahooOhlcMetrics 대체)",
),
finished_at=summary["finished_at"],
)
return summary
def main() -> int:
import argparse
import json
parser = argparse.ArgumentParser()
parser.add_argument("--sqlite-db", type=Path, default=ROOT / "outputs" / "macro_index_collection" / "macro_index_collection.db")
parser.add_argument("--output-json", type=Path, default=ROOT / "Temp" / "macro_index_collection_v1.json")
args = parser.parse_args()
summary = collect_macro_to_sqlite(sqlite_db=args.sqlite_db)
args.output_json.parent.mkdir(parents=True, exist_ok=True)
args.output_json.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(summary, ensure_ascii=False, indent=2))
return 0 if summary["status"] in ("PASS", "PASS_WITH_WARNINGS") else 1
if __name__ == "__main__":
raise SystemExit(main())