feat(kis): cache tokens in sqlite and add inspector

This commit is contained in:
2026-06-23 18:00:34 +09:00
parent a343db5812
commit 357d2507da
3 changed files with 207 additions and 27 deletions
+37 -27
View File
@@ -33,6 +33,8 @@ if str(ROOT) not in sys.path:
REAL_DOMAIN = "https://openapi.koreainvestment.com:9443"
MOCK_DOMAIN = "https://openapivts.koreainvestment.com:29443"
TOKEN_CACHE_DIR = ROOT / "Temp"
TOKEN_CACHE_DB_NAME = "kis_tokens.db"
TOKEN_REFRESH_SKEW_MINUTES = 10
def _requests():
@@ -118,9 +120,13 @@ class KisCredentials:
import sqlite3
def _token_db_path() -> Path:
db_dir = ROOT / "src" / "quant_engine"
db_dir.mkdir(parents=True, exist_ok=True)
return db_dir / "kis_data_collection.db"
import os
override = os.environ.get("KIS_TOKEN_DB_PATH", "").strip()
if override:
return Path(override)
TOKEN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return TOKEN_CACHE_DIR / TOKEN_CACHE_DB_NAME
def _init_token_db(conn: sqlite3.Connection) -> None:
@@ -140,50 +146,54 @@ def _init_token_db(conn: sqlite3.Connection) -> None:
def _issue_or_reuse_token(creds: KisCredentials) -> str:
"""KIS는 토큰 발급 빈도를 제한한다 — 만료 전까지 DB 캐시 재사용 필수."""
db_path = _token_db_path()
# 1. DB에서 기존 토큰 및 만료 시각 조회
with sqlite3.connect(db_path) as conn:
db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(db_path, timeout=30) as conn:
_init_token_db(conn)
conn.execute("BEGIN IMMEDIATE")
row = conn.execute(
"SELECT access_token, expires_at FROM kis_tokens WHERE account = ?",
(creds.account,)
(creds.account,),
).fetchone()
if row:
token, expires_at_str = row
try:
expires_at = dt.datetime.fromisoformat(expires_at_str)
# 만료 시간 10분 전까지 재사용 가능 여부 검사
if dt.datetime.now(dt.timezone.utc) < expires_at - dt.timedelta(minutes=10):
if dt.datetime.now(dt.timezone.utc) < expires_at - dt.timedelta(minutes=TOKEN_REFRESH_SKEW_MINUTES):
conn.commit()
return token
except ValueError:
pass
# 2. 토큰이 만료되었거나 없을 시 KIS API로 새로 발급 요청
requests = _requests()
resp = requests.post(
f"{creds.domain}/oauth2/tokenP",
json={"grant_type": "client_credentials", "appkey": creds.app_key, "appsecret": creds.app_secret},
timeout=15,
)
resp.raise_for_status()
body = resp.json()
access_token = body["access_token"]
expires_in_sec = int(body.get("expires_in", 86400))
expires_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expires_in_sec)
# 3. 새로운 토큰 정보를 DB에 안전하게 업서트
with sqlite3.connect(db_path) as conn:
# 2. 토큰이 만료되었거나 없을 시 KIS API로 새로 발급 요청
requests = _requests()
resp = requests.post(
f"{creds.domain}/oauth2/tokenP",
json={"grant_type": "client_credentials", "appkey": creds.app_key, "appsecret": creds.app_secret},
timeout=15,
)
resp.raise_for_status()
body = resp.json()
access_token = body["access_token"]
expires_in_sec = int(body.get("expires_in", 86400))
expires_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expires_in_sec)
# 3. 새로운 토큰 정보를 DB에 안전하게 업서트
conn.execute(
"""
INSERT OR REPLACE INTO kis_tokens (account, access_token, expires_at, updated_at)
VALUES (?, ?, ?, ?)
""",
(creds.account, access_token, expires_at.isoformat(), dt.datetime.now(dt.timezone.utc).isoformat())
(
creds.account,
access_token,
expires_at.isoformat(),
dt.datetime.now(dt.timezone.utc).isoformat(),
),
)
conn.commit()
return access_token
return access_token
def _send_request(creds: KisCredentials, path: str, tr_id: str, params: dict[str, Any]) -> dict[str, Any]: