"""KIS-first data collector for the CI scheduler. The collector uses the existing `GatherTradingData.json` snapshot as the seed universe, then enriches Korean tickers with read-only KIS quotations and orderbook data, while retaining Naver/Yahoo fallbacks when available. The canonical persistence target is SQLite. """ from __future__ import annotations import argparse import datetime as dt import json import os import sys import uuid from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) try: from tools.fetch_naver_market_data_v1 import ( # type: ignore _session as naver_session, compute_relative_return_20d, compute_volume_ratio_5d, fetch_foreign_institution_flow, fetch_price_history, ) except Exception: # pragma: no cover - optional adapter naver_session = None compute_relative_return_20d = None compute_volume_ratio_5d = None fetch_foreign_institution_flow = None fetch_price_history = None try: from src.quant_engine.kis_api_client_v1 import ( # type: ignore KisCredentials, get_asking_price_10_level, get_current_price, get_daily_short_sale, ) except Exception: # pragma: no cover - safe fallback in non-KIS environments KisCredentials = None get_asking_price_10_level = None get_current_price = None get_daily_short_sale = None from src.quant_engine.data_collection_store_v1 import ( CollectionRun, append_collection_error, upsert_collection_run, upsert_collection_snapshot, ) from src.quant_engine.data_collection_backend_v1 import ( CollectionStoreSpec, normalize_store_spec, ) def _kst_now_iso() -> str: return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat() def _load_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} try: return json.loads(path.read_text(encoding="utf-8")) except Exception: return {} def _coerce_float(value: Any) -> float | None: if value is None or value == "": return None try: if isinstance(value, str): value = value.replace(",", "").replace("%", "") return float(value) except (TypeError, ValueError): return None def _find_first_value(payload: Any, keys: tuple[str, ...]) -> Any: stack = [payload] while stack: item = stack.pop() if isinstance(item, dict): for key in keys: value = item.get(key) if value not in (None, ""): return value stack.extend(item.values()) elif isinstance(item, list): stack.extend(item) return None def _normalize_naver_price_history(code: str) -> dict[str, Any]: if naver_session is None or fetch_price_history is None: return {"status": "DISABLED"} try: session = naver_session() price = fetch_price_history(session, code) result: dict[str, Any] = {"status": price.get("status", "UNKNOWN"), "source_url": price.get("source_url")} rows = price.get("rows") or [] if rows: result["close"] = rows[0].get("close") result["open"] = rows[0].get("open") result["high"] = rows[0].get("high") result["low"] = rows[0].get("low") result["volume"] = rows[0].get("volume") if compute_relative_return_20d is not None: benchmark = fetch_price_history(session, "069500") result["relative_return_20d"] = compute_relative_return_20d(rows, benchmark.get("rows", [])) if compute_volume_ratio_5d is not None: result["volume_ratio_5d"] = compute_volume_ratio_5d(rows) if fetch_foreign_institution_flow is not None: result["foreign_institution_flow"] = fetch_foreign_institution_flow(session, code) return result except Exception as exc: # noqa: BLE001 - fallback source must not break the batch return {"status": "ERROR", "error": str(exc)} def _normalize_kis_fields(code: str, account: str) -> dict[str, Any]: if KisCredentials is None or get_current_price is None or get_asking_price_10_level is None or get_daily_short_sale is None: return {"status": "DISABLED"} try: creds = KisCredentials.load(account) except Exception as exc: return {"status": "ERROR", "error": str(exc)} result: dict[str, Any] = {"status": "OK", "account": account} try: price = get_current_price(creds, code) result["current_price_raw"] = price result["current_price"] = _coerce_float(_find_first_value(price, ("stck_prpr", "stck_clpr", "close", "close_price"))) result["open"] = _coerce_float(_find_first_value(price, ("stck_oprc", "open", "open_price"))) result["high"] = _coerce_float(_find_first_value(price, ("stck_hgpr", "high", "high_price"))) result["low"] = _coerce_float(_find_first_value(price, ("stck_lwpr", "low", "low_price"))) result["prev_close"] = _coerce_float(_find_first_value(price, ("prdy_vrss", "prev_close"))) result["volume"] = _coerce_float(_find_first_value(price, ("acml_vol", "volume"))) result["change_pct"] = _coerce_float(_find_first_value(price, ("prdy_ctrt", "change_pct"))) except Exception as exc: result["price_status"] = "ERROR" result["price_error"] = str(exc) try: orderbook = get_asking_price_10_level(creds, code) output1 = orderbook.get("output1") or {} result["orderbook_raw"] = orderbook result["microstructure_pressure"] = _coerce_float( _find_first_value(output1, ("total_askp_rsqn", "total_bidp_rsqn")) ) result["ask_1"] = _coerce_float(_find_first_value(output1, ("askp1",))) result["bid_1"] = _coerce_float(_find_first_value(output1, ("bidp1",))) result["orderbook_status"] = "OK" except Exception as exc: result["orderbook_status"] = "ERROR" result["orderbook_error"] = str(exc) try: start = (dt.date.today() - dt.timedelta(days=10)).strftime("%Y%m%d") end = dt.date.today().strftime("%Y%m%d") short_sale = get_daily_short_sale(creds, code, start, end) result["short_sale_raw"] = short_sale rows = short_sale.get("output2") or [] if rows: latest = rows[0] result["short_turnover_share"] = _coerce_float(latest.get("ssts_vol_rlim")) result["short_sale_status"] = "OK" except Exception as exc: result["short_sale_status"] = "ERROR" result["short_sale_error"] = str(exc) return result def _build_seed_rows(source_json: Path) -> list[dict[str, Any]]: payload = _load_json(source_json) data = payload.get("data") or {} core_satellite = {str(row.get("Ticker") or row.get("ticker") or ""): row for row in data.get("core_satellite", [])} sector_lookup = {str(row.get("Ticker") or row.get("ticker") or ""): row.get("Sector") for row in data.get("core_satellite", [])} rows: list[dict[str, Any]] = [] for row in data.get("data_feed", []): ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() if not ticker: continue merged = dict(row) core_row = core_satellite.get(ticker) or {} if core_row: for key, value in core_row.items(): merged.setdefault(key, value) merged["Sector"] = merged.get("Sector") or sector_lookup.get(ticker) rows.append(merged) return rows def _merge_source_fields(target: dict[str, Any], source: dict[str, Any], keys: tuple[str, ...]) -> None: for key in keys: if key in source and source.get(key) not in (None, ""): target[key] = source[key] def _resolve_price_source( ticker: str, *, kis_account: str, include_naver: bool, include_live_kis: bool, ) -> tuple[dict[str, Any] | None, dict[str, Any] | None, list[str]]: source_priority: list[str] = ["gathertradingdata_json"] kis: dict[str, Any] | None = None naver: dict[str, Any] | None = None if include_live_kis and ticker.isdigit() and len(ticker) == 6: kis = _normalize_kis_fields(ticker, kis_account) if kis.get("status") == "OK": source_priority.insert(0, "kis_open_api") if include_naver and ticker.isdigit() and len(ticker) == 6: naver = _normalize_naver_price_history(ticker) if naver.get("status") in {"OK", "DATA_MISSING"}: source_priority.append("naver_finance") return kis, naver, source_priority def _apply_source_fallbacks( normalized: dict[str, Any], *, row: dict[str, Any], kis: dict[str, Any] | None, naver: dict[str, Any] | None, ) -> None: if kis and kis.get("status") == "OK": _merge_source_fields(normalized, kis, ("current_price", "open", "high", "low", "volume")) _merge_source_fields(normalized, kis, ("relative_return_20d", "volume_ratio_5d", "microstructure_pressure", "short_turnover_share")) if naver and naver.get("status") in {"OK", "DATA_MISSING"}: normalized.setdefault("relative_return_20d", naver.get("relative_return_20d")) normalized.setdefault("volume_ratio_5d", naver.get("volume_ratio_5d")) normalized.setdefault("naver_price_status", naver.get("status")) normalized.setdefault("current_price", naver.get("close")) normalized.setdefault("open", naver.get("open")) normalized.setdefault("high", naver.get("high")) normalized.setdefault("low", naver.get("low")) normalized.setdefault("volume", naver.get("volume")) normalized.setdefault("current_price", _coerce_float(row.get("current_price") or row.get("Current_Price") or row.get("close"))) normalized.setdefault("open", _coerce_float(row.get("open") or row.get("Open"))) normalized.setdefault("high", _coerce_float(row.get("high") or row.get("High"))) normalized.setdefault("low", _coerce_float(row.get("low") or row.get("Low"))) normalized.setdefault("volume", _coerce_float(row.get("volume") or row.get("Volume"))) def _persist_collection_row( *, sqlite_db: Path, run_id: str, ticker: str, normalized: dict[str, Any], provenance: dict[str, Any], ) -> None: upsert_collection_snapshot( sqlite_db, run_id=run_id, dataset_name="data_feed", ticker=ticker, name=str(normalized.get("Name") or normalized.get("name") or ""), sector=normalized.get("Sector"), as_of_date=str(normalized.get("Price_Date") or normalized.get("AsOfDate") or normalized.get("collection_as_of") or ""), source_priority=">".join(provenance.get("source_priority") or []), source_status="OK", payload=normalized, provenance=provenance, ) def _append_collection_failure( *, sqlite_db: Path, run_id: str, ticker: str, row: dict[str, Any], exc: Exception, ) -> dict[str, Any]: error = {"ticker": ticker, "error": str(exc)} append_collection_error( sqlite_db, run_id=run_id, source_name="collector", error_kind=type(exc).__name__, error_message=str(exc), ticker=ticker, payload=row, ) return error def _finalize_collection_summary( *, summary: dict[str, Any], output_json: Path, sqlite_db: Path, ) -> dict[str, Any]: summary["finished_at"] = _kst_now_iso() summary["status"] = "PASS" if not summary["errors"] else "PASS_WITH_WARNINGS" output_json.parent.mkdir(parents=True, exist_ok=True) output_json.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") upsert_collection_run( sqlite_db, CollectionRun( run_id=summary["run_id"], collector_name="kis_data_collection_v1", started_at=summary["started_at"], status=summary["status"], input_source=str(summary["input_json"]), output_json_path=str(output_json), output_db_path=str(sqlite_db), notes="KIS-first CI collection", ), finished_at=summary["finished_at"], ) return summary def _collect_one(row: dict[str, Any], *, kis_account: str, include_naver: bool, include_live_kis: bool) -> tuple[dict[str, Any], dict[str, Any]]: ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() name = str(row.get("Name") or row.get("name") or "").strip() sector = str(row.get("Sector") or row.get("sector") or "").strip() or None normalized = dict(row) provenance: dict[str, Any] = { "ticker": ticker, "name": name, "sector": sector, "source_priority": ["gathertradingdata_json"], } kis, naver, source_priority = _resolve_price_source( ticker, kis_account=kis_account, include_naver=include_naver, include_live_kis=include_live_kis, ) provenance["source_priority"] = source_priority if kis is not None: provenance["kis"] = kis normalized.update({k: v for k, v in kis.items() if k not in {"current_price_raw", "orderbook_raw", "short_sale_raw"}}) if naver is not None: provenance["naver"] = naver _apply_source_fallbacks(normalized, row=row, kis=kis, naver=naver) normalized.setdefault("collection_as_of", _kst_now_iso()) return normalized, provenance def collect_to_sqlite( *, input_json: Path, sqlite_db: Path, output_json: Path, kis_account: str, include_naver: bool = False, include_live_kis: bool = True, ) -> dict[str, Any]: run_id = uuid.uuid4().hex started_at = _kst_now_iso() upsert_collection_run( sqlite_db, CollectionRun( run_id=run_id, collector_name="kis_data_collection_v1", started_at=started_at, status="RUNNING", input_source=str(input_json), output_json_path=str(output_json), output_db_path=str(sqlite_db), notes="KIS-first CI collection", ), ) seed_rows = _build_seed_rows(input_json) summary = { "formula_id": "KIS_DATA_COLLECTION_V1", "run_id": run_id, "started_at": started_at, "input_json": str(input_json), "sqlite_db": str(sqlite_db), "row_count": len(seed_rows), "source_counts": {}, "errors": [], "rows": [], } for row in seed_rows: ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() if not ticker: continue try: normalized, provenance = _collect_one(row, kis_account=kis_account, include_naver=include_naver, include_live_kis=include_live_kis) source_counts = summary["source_counts"] for source_name in provenance.get("source_priority") or []: source_counts[source_name] = source_counts.get(source_name, 0) + 1 _persist_collection_row( sqlite_db=sqlite_db, run_id=run_id, ticker=ticker, normalized=normalized, provenance=provenance, ) summary["rows"].append( { "ticker": ticker, "name": normalized.get("Name") or normalized.get("name"), "sector": normalized.get("Sector"), "source_priority": provenance.get("source_priority"), "current_price": normalized.get("current_price"), "relative_return_20d": normalized.get("relative_return_20d"), "volume_ratio_5d": normalized.get("volume_ratio_5d"), } ) except Exception as exc: # noqa: BLE001 error = _append_collection_failure( sqlite_db=sqlite_db, run_id=run_id, ticker=ticker, row=row, exc=exc, ) summary["errors"].append(error) return _finalize_collection_summary(summary=summary, output_json=output_json, sqlite_db=sqlite_db) def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--input-json", type=Path, default=ROOT / "GatherTradingData.json") ap.add_argument("--sqlite-db", type=Path, default=ROOT / "src" / "quant_engine" / "kis_data_collection.db") ap.add_argument("--store-backend", default="sqlite", help="Storage backend contract placeholder (sqlite today, postgresql planned)") ap.add_argument("--store-location", default=None, help="Backend location/DSN. sqlite path or future postgres DSN.") ap.add_argument("--output-json", type=Path, default=ROOT / "Temp" / "kis_data_collection_v1.json") ap.add_argument("--kis-account", choices=["real", "mock"], default="real") ap.add_argument("--allow-naver-fallback", action="store_true") ap.add_argument("--no-live-kis", action="store_true") args = ap.parse_args() store_backend, store_location = normalize_store_spec( CollectionStoreSpec( backend=args.store_backend, location=args.store_location or args.sqlite_db, ), ROOT, ) if store_backend != "sqlite": raise SystemExit( "현재 실행 backend는 sqlite만 지원합니다. " "하지만 collector는 이미 backend contract로 분리되어 있어 " "후속 PostgreSQL 구현을 같은 호출 지점에 붙일 수 있습니다." ) summary = collect_to_sqlite( input_json=args.input_json, sqlite_db=Path(store_location), output_json=args.output_json, kis_account=args.kis_account, include_naver=args.allow_naver_fallback, include_live_kis=not args.no_live_kis, ) print(json.dumps(summary, ensure_ascii=False, indent=2)) return 0 if summary.get("status") == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())