feat(kis-token): transition OAuth2 token cache from JSON files to SQLite database
This commit is contained in:
@@ -115,23 +115,51 @@ class KisCredentials:
|
|||||||
return cls(app_key=app_key, app_secret=app_secret, account=account)
|
return cls(app_key=app_key, app_secret=app_secret, account=account)
|
||||||
|
|
||||||
|
|
||||||
def _token_cache_path(creds: KisCredentials) -> Path:
|
import sqlite3
|
||||||
TOKEN_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
|
||||||
return TOKEN_CACHE_DIR / f"kis_token_cache_{creds.account}.json"
|
def _token_db_path() -> Path:
|
||||||
|
db_dir = ROOT / "outputs" / "kis_data_collection"
|
||||||
|
db_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
return db_dir / "kis_data_collection.db"
|
||||||
|
|
||||||
|
|
||||||
|
def _init_token_db(conn: sqlite3.Connection) -> None:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS kis_tokens (
|
||||||
|
account TEXT PRIMARY KEY,
|
||||||
|
access_token TEXT NOT NULL,
|
||||||
|
expires_at TEXT NOT NULL,
|
||||||
|
updated_at TEXT NOT NULL
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
|
|
||||||
def _issue_or_reuse_token(creds: KisCredentials) -> str:
|
def _issue_or_reuse_token(creds: KisCredentials) -> str:
|
||||||
"""KIS는 토큰 발급 빈도를 제한한다 — 만료 전까지 캐시 재사용 필수."""
|
"""KIS는 토큰 발급 빈도를 제한한다 — 만료 전까지 DB 캐시 재사용 필수."""
|
||||||
cache_path = _token_cache_path(creds)
|
db_path = _token_db_path()
|
||||||
if cache_path.exists():
|
|
||||||
try:
|
# 1. DB에서 기존 토큰 및 만료 시각 조회
|
||||||
cached = json.loads(cache_path.read_text(encoding="utf-8"))
|
with sqlite3.connect(db_path) as conn:
|
||||||
expires_at = dt.datetime.fromisoformat(cached["expires_at"])
|
_init_token_db(conn)
|
||||||
if dt.datetime.now(dt.timezone.utc) < expires_at - dt.timedelta(minutes=10):
|
row = conn.execute(
|
||||||
return cached["access_token"]
|
"SELECT access_token, expires_at FROM kis_tokens WHERE account = ?",
|
||||||
except (json.JSONDecodeError, KeyError, ValueError):
|
(creds.account,)
|
||||||
pass
|
).fetchone()
|
||||||
|
|
||||||
|
if row:
|
||||||
|
token, expires_at_str = row
|
||||||
|
try:
|
||||||
|
expires_at = dt.datetime.fromisoformat(expires_at_str)
|
||||||
|
# 만료 시간 10분 전까지 재사용 가능 여부 검사
|
||||||
|
if dt.datetime.now(dt.timezone.utc) < expires_at - dt.timedelta(minutes=10):
|
||||||
|
return token
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 2. 토큰이 만료되었거나 없을 시 KIS API로 새로 발급 요청
|
||||||
requests = _requests()
|
requests = _requests()
|
||||||
resp = requests.post(
|
resp = requests.post(
|
||||||
f"{creds.domain}/oauth2/tokenP",
|
f"{creds.domain}/oauth2/tokenP",
|
||||||
@@ -143,10 +171,18 @@ def _issue_or_reuse_token(creds: KisCredentials) -> str:
|
|||||||
access_token = body["access_token"]
|
access_token = body["access_token"]
|
||||||
expires_in_sec = int(body.get("expires_in", 86400))
|
expires_in_sec = int(body.get("expires_in", 86400))
|
||||||
expires_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expires_in_sec)
|
expires_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expires_in_sec)
|
||||||
cache_path.write_text(
|
|
||||||
json.dumps({"access_token": access_token, "expires_at": expires_at.isoformat()}, ensure_ascii=False),
|
# 3. 새로운 토큰 정보를 DB에 안전하게 업서트
|
||||||
encoding="utf-8",
|
with sqlite3.connect(db_path) as conn:
|
||||||
)
|
conn.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO kis_tokens (account, access_token, expires_at, updated_at)
|
||||||
|
VALUES (?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(creds.account, access_token, expires_at.isoformat(), dt.datetime.now(dt.timezone.utc).isoformat())
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
|
||||||
return access_token
|
return access_token
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user