#!/usr/bin/env python3 from __future__ import annotations import json import re from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SOURCE_FILE = ROOT / "src" / "quant_engine" / "kis_api_client_v1.py" TEST_FILE = ROOT / "tests" / "unit" / "test_kis_api_client_v1.py" RUNBOOK_FILES = [ ROOT / "docs" / "GITEA_SECRETS_SETUP.md", ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md", ] SCAN_FILES = [ ROOT / "src" / "quant_engine" / "kis_api_client_v1.py", ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py", ROOT / "tools" / "run_kis_data_collection_v1.py", ROOT / "tools" / "inspect_kis_token_cache_v1.py", ROOT / "tools" / "validate_gitea_secrets_contract_v1.py", ROOT / "tests" / "unit" / "test_kis_api_client_v1.py", ROOT / "tests" / "unit" / "test_validate_kis_api_credentials_v1.py", ROOT / "docs" / "GITEA_SECRETS_SETUP.md", ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md", ROOT / ".gitea" / "workflows" / "kis_data_collection.yml", ROOT / ".gitea" / "workflows" / "qualitative_sell_strategy.yml", ROOT / ".gitea" / "workflows" / "ci.yml", ] FORBIDDEN_PATTERNS = [ r"print\(\s*.*appsecret", r"print\(\s*.*appkey", r"logger\.", r"resp\.text", r"response\.text", r"json\.dumps\(\s*\{\s*.*appkey", r"json\.dumps\(\s*\{\s*.*appsecret", ] def _scan_text(path: Path, text: str) -> list[str]: errors: list[str] = [] for pattern in FORBIDDEN_PATTERNS: if re.search(pattern, text, re.IGNORECASE | re.MULTILINE): errors.append(f"{path}:{pattern}") return errors def _scan_repository() -> list[str]: errors: list[str] = [] for path in SCAN_FILES: if not path.exists(): continue text = path.read_text(encoding="utf-8") errors.extend(_scan_text(path, text)) return errors def main() -> int: errors: list[str] = [] evidence: dict[str, dict[str, bool]] = {} if not SOURCE_FILE.exists(): errors.append(f"missing:{SOURCE_FILE}") else: text = SOURCE_FILE.read_text(encoding="utf-8") errors.extend(_scan_text(SOURCE_FILE, text)) evidence[str(SOURCE_FILE)] = { "sanitized_token_refresh_error": "KIS token refresh failed; check credentials and API availability." in text, "sanitized_readonly_error": "KIS read-only request failed for" in text, "token_cache_db": "kis_tokens.db" in text, } if not TEST_FILE.exists(): errors.append(f"missing:{TEST_FILE}") else: text = TEST_FILE.read_text(encoding="utf-8") evidence[str(TEST_FILE)] = { "token_cache_tests": "test_issue_or_reuse_token_with_sqlite_db_cache" in text, "token_override_tests": "test_issue_or_reuse_token_honors_token_db_override" in text, } for path in RUNBOOK_FILES: if not path.exists(): errors.append(f"missing:{path}") continue text = path.read_text(encoding="utf-8") evidence[str(path)] = { "mentions_token_cache": "Temp/kis_tokens.db" in text, "mentions_refresh_skew": "TOKEN_REFRESH_SKEW_MINUTES" in text, } errors.extend(_scan_repository()) result = { "formula_id": "KIS_TOKEN_HYGIENE_V1", "gate": "PASS" if not errors else "FAIL", "errors": errors, "evidence": evidence, } out = ROOT / "Temp" / "kis_token_hygiene_v1.json" out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(result, ensure_ascii=False, indent=2)) return 0 if not errors else 1 if __name__ == "__main__": raise SystemExit(main())