27730704ae
Snapshot Admin Web Validation / validate-snapshot-admin-smoke (push) Has been cancelled
Snapshot Admin Web Validation / validate-snapshot-admin-full (push) Has been cancelled
Quant Engine CI/CD Pipeline / validate-core (pull_request) Has been cancelled
Quant Engine CI/CD Pipeline / validate-ui-and-storage (pull_request) Has been cancelled
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (pull_request) Has been cancelled
108 lines
3.6 KiB
Python
108 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
SOURCE_FILE = ROOT / "src" / "quant_engine" / "kis_api_client_v1.py"
|
|
TEST_FILE = ROOT / "tests" / "unit" / "test_kis_api_client_v1.py"
|
|
RUNBOOK_FILES = [
|
|
ROOT / "docs" / "GITEA_SECRETS_SETUP.md",
|
|
ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md",
|
|
]
|
|
SCAN_FILES = [
|
|
ROOT / "src" / "quant_engine" / "kis_api_client_v1.py",
|
|
ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py",
|
|
ROOT / "tools" / "run_kis_data_collection_v1.py",
|
|
ROOT / "tools" / "inspect_kis_token_cache_v1.py",
|
|
ROOT / "tools" / "validate_gitea_secrets_contract_v1.py",
|
|
ROOT / "tests" / "unit" / "test_kis_api_client_v1.py",
|
|
ROOT / "tests" / "unit" / "test_validate_kis_api_credentials_v1.py",
|
|
ROOT / "docs" / "GITEA_SECRETS_SETUP.md",
|
|
ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md",
|
|
ROOT / ".gitea" / "workflows" / "kis_data_collection.yml",
|
|
ROOT / ".gitea" / "workflows" / "qualitative_sell_strategy.yml",
|
|
ROOT / ".gitea" / "workflows" / "ci.yml",
|
|
]
|
|
|
|
FORBIDDEN_PATTERNS = [
|
|
r"print\(\s*.*appsecret",
|
|
r"print\(\s*.*appkey",
|
|
r"logger\.",
|
|
r"resp\.text",
|
|
r"response\.text",
|
|
r"json\.dumps\(\s*\{\s*.*appkey",
|
|
r"json\.dumps\(\s*\{\s*.*appsecret",
|
|
]
|
|
|
|
def _scan_text(path: Path, text: str) -> list[str]:
|
|
errors: list[str] = []
|
|
for pattern in FORBIDDEN_PATTERNS:
|
|
if re.search(pattern, text, re.IGNORECASE | re.MULTILINE):
|
|
errors.append(f"{path}:{pattern}")
|
|
return errors
|
|
|
|
|
|
def _scan_repository() -> list[str]:
|
|
errors: list[str] = []
|
|
for path in SCAN_FILES:
|
|
if not path.exists():
|
|
continue
|
|
text = path.read_text(encoding="utf-8")
|
|
errors.extend(_scan_text(path, text))
|
|
return errors
|
|
|
|
|
|
def main() -> int:
|
|
errors: list[str] = []
|
|
evidence: dict[str, dict[str, bool]] = {}
|
|
|
|
if not SOURCE_FILE.exists():
|
|
errors.append(f"missing:{SOURCE_FILE}")
|
|
else:
|
|
text = SOURCE_FILE.read_text(encoding="utf-8")
|
|
errors.extend(_scan_text(SOURCE_FILE, text))
|
|
evidence[str(SOURCE_FILE)] = {
|
|
"sanitized_token_refresh_error": "KIS token refresh failed; check credentials and API availability." in text,
|
|
"sanitized_readonly_error": "KIS read-only request failed for" in text,
|
|
"token_cache_db": "kis_tokens.db" in text,
|
|
}
|
|
|
|
if not TEST_FILE.exists():
|
|
errors.append(f"missing:{TEST_FILE}")
|
|
else:
|
|
text = TEST_FILE.read_text(encoding="utf-8")
|
|
evidence[str(TEST_FILE)] = {
|
|
"token_cache_tests": "test_issue_or_reuse_token_with_sqlite_db_cache" in text,
|
|
"token_override_tests": "test_issue_or_reuse_token_honors_token_db_override" in text,
|
|
}
|
|
|
|
for path in RUNBOOK_FILES:
|
|
if not path.exists():
|
|
errors.append(f"missing:{path}")
|
|
continue
|
|
text = path.read_text(encoding="utf-8")
|
|
evidence[str(path)] = {
|
|
"mentions_token_cache": "Temp/kis_tokens.db" in text,
|
|
"mentions_refresh_skew": "TOKEN_REFRESH_SKEW_MINUTES" in text,
|
|
}
|
|
errors.extend(_scan_repository())
|
|
|
|
result = {
|
|
"formula_id": "KIS_TOKEN_HYGIENE_V1",
|
|
"gate": "PASS" if not errors else "FAIL",
|
|
"errors": errors,
|
|
"evidence": evidence,
|
|
}
|
|
out = ROOT / "Temp" / "kis_token_hygiene_v1.json"
|
|
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(json.dumps(result, ensure_ascii=False, indent=2))
|
|
return 0 if not errors else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|