Files
QuantEngineByItz/tools/validate_kis_api_credentials_v1.py
T

110 lines
4.4 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
try:
from src.quant_engine.kis_api_client_v1 import (
KisCredentials,
MOCK_DOMAIN,
REAL_DOMAIN,
_read_env_var,
get_current_price,
)
except Exception as exc: # pragma: no cover - import failure is a hard validation error
KisCredentials = None # type: ignore[assignment]
MOCK_DOMAIN = ""
REAL_DOMAIN = ""
_read_env_var = None # type: ignore[assignment]
get_current_price = None # type: ignore[assignment]
_IMPORT_ERROR = str(exc)
else:
_IMPORT_ERROR = ""
def _payload(gate: str, **extra: Any) -> dict[str, Any]:
return {
"formula_id": "KIS_API_CREDENTIALS_VALIDATION_V1",
"gate": gate,
**extra,
}
def _expected_env_names(account: str) -> tuple[str, str]:
if account == "real":
return ("KIS_APP_Key", "KIS_APP_Secret")
if account == "mock":
return ("KIS_APP_Key_TEST", "KIS_APP_Secret_TEST")
raise ValueError("account must be 'mock' or 'real'")
def main() -> int:
ap = argparse.ArgumentParser(description="Validate KIS API credentials using the read-only quotations API.")
ap.add_argument("--account", choices=["mock", "real"], default="mock")
ap.add_argument("--ticker", default="005930")
ap.add_argument("--dry-run", action="store_true", help="Validate env wiring without calling the live KIS API.")
ap.add_argument("--output", type=Path, default=ROOT / "Temp" / "kis_api_credentials_validation_v1.json")
args = ap.parse_args()
if KisCredentials is None or get_current_price is None:
result = _payload("FAIL", error=f"import_error: {_IMPORT_ERROR}")
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 1
errors: list[str] = []
evidence: dict[str, Any] = {
"account": args.account,
"ticker": args.ticker,
}
try:
key_name, secret_name = _expected_env_names(args.account)
creds = KisCredentials.load(args.account)
evidence["domain"] = creds.domain
evidence["expected_env"] = {"app_key": key_name, "app_secret": secret_name}
expected_key = _read_env_var(key_name) if _read_env_var is not None else None
expected_secret = _read_env_var(secret_name) if _read_env_var is not None else None
other_key = _read_env_var("KIS_APP_Key_TEST" if args.account == "real" else "KIS_APP_Key") if _read_env_var is not None else None
other_secret = _read_env_var("KIS_APP_Secret_TEST" if args.account == "real" else "KIS_APP_Secret") if _read_env_var is not None else None
actual_key = getattr(creds, "app_key", None)
actual_secret = getattr(creds, "app_secret", None)
evidence["env_match"] = {
"app_key": bool(expected_key and actual_key == expected_key),
"app_secret": bool(expected_secret and actual_secret == expected_secret),
"other_key_present": bool(other_key),
"other_secret_present": bool(other_secret),
}
if creds.domain != (REAL_DOMAIN if args.account == "real" else MOCK_DOMAIN):
errors.append("domain_mismatch")
if not evidence["env_match"]["app_key"] or not evidence["env_match"]["app_secret"]:
errors.append("selected_env_mismatch")
evidence["dry_run"] = bool(args.dry_run)
if not args.dry_run:
response = get_current_price(creds, args.ticker)
evidence["response_keys"] = sorted(response.keys())
if not isinstance(response, dict) or not response:
errors.append("empty_response")
except Exception as exc: # noqa: BLE001
errors.append(str(exc))
gate = "PASS" if not errors else "FAIL"
result = _payload(gate, evidence=evidence, errors=errors)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if gate == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())