#!/usr/bin/env python3 from __future__ import annotations import argparse import json import sqlite3 import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from src.quant_engine.kis_api_client_v1 import TOKEN_REFRESH_SKEW_MINUTES, _token_db_path def main() -> int: parser = argparse.ArgumentParser(description="Inspect the KIS Open API token cache.") parser.add_argument("--account", default=None, help="Optional account filter (real|mock)") parser.add_argument("--json", action="store_true", help="Print JSON output") args = parser.parse_args() db_path = _token_db_path() if not db_path.exists(): payload = {"gate": "DATA_MISSING", "db_path": str(db_path), "rows": []} print(json.dumps(payload, ensure_ascii=False, indent=2) if args.json else f"DATA_MISSING: {db_path}") return 0 conn = sqlite3.connect(db_path) try: rows = conn.execute( "SELECT account, access_token, expires_at, updated_at FROM kis_tokens ORDER BY account" ).fetchall() finally: conn.close() data = [] for account, access_token, expires_at, updated_at in rows: if args.account and account != args.account: continue data.append( { "account": account, "access_token_prefix": f"{access_token[:6]}..." if access_token else "", "expires_at": expires_at, "updated_at": updated_at, } ) payload = { "gate": "PASS" if data else "DATA_MISSING", "db_path": str(db_path), "refresh_skew_minutes": TOKEN_REFRESH_SKEW_MINUTES, "row_count": len(data), "rows": data, } if args.json: print(json.dumps(payload, ensure_ascii=False, indent=2)) else: print(f"db_path: {payload['db_path']}") print(f"refresh_skew_minutes: {payload['refresh_skew_minutes']}") print(f"row_count: {payload['row_count']}") for row in data: print( f"- account={row['account']} token={row['access_token_prefix']} " f"expires_at={row['expires_at']} updated_at={row['updated_at']}" ) return 0 if __name__ == "__main__": raise SystemExit(main())