#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) try: from src.quant_engine.kis_api_client_v1 import ( KisCredentials, MOCK_DOMAIN, REAL_DOMAIN, _read_env_var, get_current_price, ) except Exception as exc: # pragma: no cover - import failure is a hard validation error KisCredentials = None # type: ignore[assignment] MOCK_DOMAIN = "" REAL_DOMAIN = "" _read_env_var = None # type: ignore[assignment] get_current_price = None # type: ignore[assignment] _IMPORT_ERROR = str(exc) else: _IMPORT_ERROR = "" def _payload(gate: str, **extra: Any) -> dict[str, Any]: return { "formula_id": "KIS_API_CREDENTIALS_VALIDATION_V1", "gate": gate, **extra, } def _expected_env_names(account: str) -> tuple[str, str]: if account == "real": return ("KIS_APP_Key", "KIS_APP_Secret") if account == "mock": return ("KIS_APP_Key_TEST", "KIS_APP_Secret_TEST") raise ValueError("account must be 'mock' or 'real'") def main() -> int: ap = argparse.ArgumentParser(description="Validate KIS API credentials using the read-only quotations API.") ap.add_argument("--account", choices=["mock", "real"], default="mock") ap.add_argument("--ticker", default="005930") ap.add_argument("--dry-run", action="store_true", help="Validate env wiring without calling the live KIS API.") ap.add_argument("--output", type=Path, default=ROOT / "Temp" / "kis_api_credentials_validation_v1.json") args = ap.parse_args() if KisCredentials is None or get_current_price is None: result = _payload("FAIL", error=f"import_error: {_IMPORT_ERROR}") args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 1 errors: list[str] = [] evidence: dict[str, Any] = { "account": args.account, "ticker": args.ticker, } try: key_name, secret_name = _expected_env_names(args.account) creds = KisCredentials.load(args.account) evidence["domain"] = creds.domain evidence["expected_env"] = {"app_key": key_name, "app_secret": secret_name} expected_key = _read_env_var(key_name) if _read_env_var is not None else None expected_secret = _read_env_var(secret_name) if _read_env_var is not None else None other_key = _read_env_var("KIS_APP_Key_TEST" if args.account == "real" else "KIS_APP_Key") if _read_env_var is not None else None other_secret = _read_env_var("KIS_APP_Secret_TEST" if args.account == "real" else "KIS_APP_Secret") if _read_env_var is not None else None actual_key = getattr(creds, "app_key", None) actual_secret = getattr(creds, "app_secret", None) evidence["env_match"] = { "app_key": bool(expected_key and actual_key == expected_key), "app_secret": bool(expected_secret and actual_secret == expected_secret), "other_key_present": bool(other_key), "other_secret_present": bool(other_secret), } if creds.domain != (REAL_DOMAIN if args.account == "real" else MOCK_DOMAIN): errors.append("domain_mismatch") if not evidence["env_match"]["app_key"] or not evidence["env_match"]["app_secret"]: errors.append("selected_env_mismatch") evidence["dry_run"] = bool(args.dry_run) if not args.dry_run: response = get_current_price(creds, args.ticker) evidence["response_keys"] = sorted(response.keys()) if not isinstance(response, dict) or not response: errors.append("empty_response") except Exception as exc: # noqa: BLE001 errors.append(str(exc)) gate = "PASS" if not errors else "FAIL" result = _payload(gate, evidence=evidence, errors=errors) args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if gate == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())