KIS Open API 조회전용 연동 + 직접매매 절대금지 안전게이트

매수/매도 주문 및 계좌 잔고조회를 API로 직접 실행하지 않는다는 원칙을
코드 레벨에서 강제하는 안전게이트(governance/rules/06, 07)와 함께,
시세/호가/공매도거래비중 등 조회전용 KIS Open API 연동 및 SQLite
수집 파이프라인을 추가한다.

- kis_api_client_v1: 모든 요청이 _assert_read_only를 통과해야 하며
  /trading/ 경로·주문 TR_ID는 RuntimeError로 즉시 차단
- kis_data_collection_v1: KIS 우선 + Naver 폴백, 네트워크 실패는
  개별 ticker 단위로 흡수(배치 전체 중단 없음)
- data_collection_store_v1 / storage_backend_v1: SQLite 캐노니컬
  저장소, PostgreSQL 전환 대비 백엔드 추상화
- Gitea 영업일 스케줄(2시간 간격) + CI 강제 게이트
  (validate_no_direct_api_trading_v1, validate_kis_api_credentials_v1)
This commit is contained in:
2026-06-21 20:04:44 +09:00
parent 34f6eebba6
commit 4cb206a269
20 changed files with 2034 additions and 0 deletions
@@ -0,0 +1,115 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
TABLE_SCHEMAS: dict[str, str] = {
"collection_runs": """
CREATE TABLE collection_runs (
run_id TEXT PRIMARY KEY,
collector_name TEXT NOT NULL,
started_at TEXT NOT NULL,
finished_at TEXT,
status TEXT NOT NULL,
input_source TEXT,
output_json_path TEXT,
output_db_path TEXT,
notes TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
""".strip(),
"collection_snapshots": """
CREATE TABLE collection_snapshots (
run_id TEXT NOT NULL,
dataset_name TEXT NOT NULL,
ticker TEXT NOT NULL,
name TEXT,
sector TEXT,
as_of_date TEXT,
source_priority TEXT,
source_status TEXT,
payload_json TEXT NOT NULL,
provenance_json TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (run_id, dataset_name, ticker)
);
""".strip(),
"collection_source_errors": """
CREATE TABLE collection_source_errors (
run_id TEXT NOT NULL,
ticker TEXT,
source_name TEXT NOT NULL,
error_kind TEXT NOT NULL,
error_message TEXT NOT NULL,
payload_json TEXT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
""".strip(),
"sell_strategy_results": """
CREATE TABLE sell_strategy_results (
id BIGSERIAL PRIMARY KEY,
code TEXT NOT NULL,
generated_at TEXT NOT NULL,
action TEXT,
conviction TEXT,
market_regime TEXT,
composite_score DOUBLE PRECISION,
rationale TEXT,
raw_json TEXT NOT NULL,
inserted_at TIMESTAMPTZ DEFAULT NOW()
);
""".strip(),
"satellite_recommendations": """
CREATE TABLE satellite_recommendations (
id BIGSERIAL PRIMARY KEY,
ticker TEXT NOT NULL,
generated_at TEXT NOT NULL,
satellite_action TEXT,
attractiveness_score DOUBLE PRECISION,
market_regime TEXT,
raw_json TEXT NOT NULL,
inserted_at TIMESTAMPTZ DEFAULT NOW()
);
""".strip(),
}
def main() -> int:
ap = argparse.ArgumentParser(description="Emit PostgreSQL migration stub from current canonical row contract.")
ap.add_argument("--output-json", type=Path, default=ROOT / "Temp" / "postgresql_upgrade_stub_v1.json")
ap.add_argument("--output-sql", type=Path, default=ROOT / "Temp" / "postgresql_upgrade_stub_v1.sql")
args = ap.parse_args()
sql_lines = [
"-- PostgreSQL upgrade stub",
"-- This file is a contract placeholder only. It is not executed by CI.",
"",
]
for name, ddl in TABLE_SCHEMAS.items():
sql_lines.append(f"-- {name}")
sql_lines.append(ddl)
sql_lines.append("")
sql_text = "\n".join(sql_lines).rstrip() + "\n"
args.output_sql.parent.mkdir(parents=True, exist_ok=True)
args.output_sql.write_text(sql_text, encoding="utf-8")
payload: dict[str, Any] = {
"formula_id": "POSTGRESQL_UPGRADE_STUB_V1",
"gate": "DATA_GATED",
"tables": sorted(TABLE_SCHEMAS.keys()),
"output_sql": str(args.output_sql),
"note": "DDL stub only; execution deferred until PostgreSQL rollout.",
}
args.output_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
+15
View File
@@ -0,0 +1,15 @@
#!/usr/bin/env python3
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from src.quant_engine.kis_data_collection_v1 import main
if __name__ == "__main__":
raise SystemExit(main())
@@ -0,0 +1,61 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
REQUIRED_PATTERNS = {
".gitea/workflows/kis_data_collection.yml": [
"secrets.KIS_APP_KEY_TEST",
"secrets.KIS_APP_SECRET_TEST",
"secrets.KIS_APP_KEY",
"secrets.KIS_APP_SECRET",
],
".gitea/workflows/qualitative_sell_strategy.yml": [
"secrets.KIS_APP_KEY_TEST",
"secrets.KIS_APP_SECRET_TEST",
"secrets.KIS_APP_KEY",
"secrets.KIS_APP_SECRET",
],
".gitea/workflows/ci.yml": [
"secrets.KIS_APP_KEY_TEST",
"secrets.KIS_APP_SECRET_TEST",
],
}
def main() -> int:
errors: list[str] = []
evidence: dict[str, dict[str, bool]] = {}
for rel, patterns in REQUIRED_PATTERNS.items():
path = ROOT / rel
text = path.read_text(encoding="utf-8") if path.exists() else ""
file_evidence: dict[str, bool] = {}
if not path.exists():
errors.append(f"missing:{rel}")
evidence[rel] = file_evidence
continue
for pattern in patterns:
found = pattern in text
file_evidence[pattern] = found
if not found:
errors.append(f"{rel}:{pattern}")
evidence[rel] = file_evidence
result = {
"formula_id": "GITEA_SECRETS_CONTRACT_V1",
"gate": "PASS" if not errors else "FAIL",
"evidence": evidence,
"errors": errors,
}
out = ROOT / "Temp" / "gitea_secrets_contract_v1.json"
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if not errors else 1
if __name__ == "__main__":
raise SystemExit(main())
+106
View File
@@ -0,0 +1,106 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
try:
from src.quant_engine.kis_api_client_v1 import (
KisCredentials,
MOCK_DOMAIN,
REAL_DOMAIN,
_read_env_var,
get_current_price,
)
except Exception as exc: # pragma: no cover - import failure is a hard validation error
KisCredentials = None # type: ignore[assignment]
MOCK_DOMAIN = ""
REAL_DOMAIN = ""
_read_env_var = None # type: ignore[assignment]
get_current_price = None # type: ignore[assignment]
_IMPORT_ERROR = str(exc)
else:
_IMPORT_ERROR = ""
def _payload(gate: str, **extra: Any) -> dict[str, Any]:
return {
"formula_id": "KIS_API_CREDENTIALS_VALIDATION_V1",
"gate": gate,
**extra,
}
def _expected_env_names(account: str) -> tuple[str, str]:
if account == "real":
return ("KIS_APP_Key", "KIS_APP_Secret")
if account == "mock":
return ("KIS_APP_Key_TEST", "KIS_APP_Secret_TEST")
raise ValueError("account must be 'mock' or 'real'")
def main() -> int:
ap = argparse.ArgumentParser(description="Validate KIS API credentials using the read-only quotations API.")
ap.add_argument("--account", choices=["mock", "real"], default="mock")
ap.add_argument("--ticker", default="005930")
ap.add_argument("--output", type=Path, default=ROOT / "Temp" / "kis_api_credentials_validation_v1.json")
args = ap.parse_args()
if KisCredentials is None or get_current_price is None:
result = _payload("FAIL", error=f"import_error: {_IMPORT_ERROR}")
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 1
errors: list[str] = []
evidence: dict[str, Any] = {
"account": args.account,
"ticker": args.ticker,
}
try:
key_name, secret_name = _expected_env_names(args.account)
creds = KisCredentials.load(args.account)
evidence["domain"] = creds.domain
evidence["expected_env"] = {"app_key": key_name, "app_secret": secret_name}
expected_key = _read_env_var(key_name) if _read_env_var is not None else None
expected_secret = _read_env_var(secret_name) if _read_env_var is not None else None
other_key = _read_env_var("KIS_APP_Key_TEST" if args.account == "real" else "KIS_APP_Key") if _read_env_var is not None else None
other_secret = _read_env_var("KIS_APP_Secret_TEST" if args.account == "real" else "KIS_APP_Secret") if _read_env_var is not None else None
actual_key = getattr(creds, "app_key", None)
actual_secret = getattr(creds, "app_secret", None)
evidence["env_match"] = {
"app_key": bool(expected_key and actual_key == expected_key),
"app_secret": bool(expected_secret and actual_secret == expected_secret),
"other_key_present": bool(other_key),
"other_secret_present": bool(other_secret),
}
if creds.domain != (REAL_DOMAIN if args.account == "real" else MOCK_DOMAIN):
errors.append("domain_mismatch")
if not evidence["env_match"]["app_key"] or not evidence["env_match"]["app_secret"]:
errors.append("selected_env_mismatch")
response = get_current_price(creds, args.ticker)
evidence["response_keys"] = sorted(response.keys())
if not isinstance(response, dict) or not response:
errors.append("empty_response")
except Exception as exc: # noqa: BLE001
errors.append(str(exc))
gate = "PASS" if not errors else "FAIL"
result = _payload(gate, evidence=evidence, errors=errors)
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if gate == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())
+112
View File
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""[CRITICAL] governance/rules/06_no_direct_api_trading.yaml 강제 게이트.
이 검증기는 순수 stdlib(re, pathlib)만 사용한다 — Synology CI(ARMv7, Python 3.8,
requests/pytest 미설치)에서도 항상 실행 가능해야 하는 하드 블로킹 게이트이기 때문이다.
문서·테스트만으로는 막을 수 없다는 사용자 지시(2026-06-21)에 따라 정적 소스 스캔으로
주문 제출/정정/취소 경로·TR_ID가 코드베이스 어디에도 존재하지 않음을 매 커밋마다 강제한다.
FAIL 시 CI 전체를 막는다(strict, warn_only 아님) — 다른 데이터 품질 게이트와 다르게
이 게이트는 완화 대상이 아니다.
"""
from __future__ import annotations
import re
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
# 이 문자열들이 "데이터"로 등장해도 되는 파일(블록리스트 정의/테스트/이 검증기 자신).
# 그 외 모든 .py 파일에서 발견되면 FAIL.
ALLOWLISTED_FILES = {
"src/quant_engine/kis_api_client_v1.py",
"tests/unit/test_kis_api_client_v1.py",
"tools/validate_no_direct_api_trading_v1.py",
}
FORBIDDEN_ORDER_PATH_SUBSTRINGS = (
"/trading/order-cash",
"/trading/order-rvsecncl",
"/trading/order-credit",
"/trading/order-resv",
"/trading/inquire-balance", # governance/rules/07 — 계좌 보유종목 조회 금지
)
FORBIDDEN_ORDER_TR_IDS = (
"TTTC0802U", "TTTC0801U", "VTTC0802U", "VTTC0801U",
"TTTC8434R", "VTTC8434R", # governance/rules/07 — 주식잔고조회 금지
)
BANNED_FUNCTION_NAME_SUBSTRINGS = (
"place_order", "submit_order", "cancel_order", "revise_order", "send_order",
"order_cash", "order_credit", "order_rvsecncl",
"inquire_balance", "account_balance", # governance/rules/07 — 계좌 보유종목 조회 금지
)
def _scan_python_files() -> list[str]:
violations: list[str] = []
for dir_name in ("src", "tools"):
for path in (ROOT / dir_name).rglob("*.py"):
rel = path.relative_to(ROOT).as_posix()
if rel in ALLOWLISTED_FILES:
continue
text = path.read_text(encoding="utf-8", errors="ignore")
for forbidden in FORBIDDEN_ORDER_PATH_SUBSTRINGS:
if forbidden in text:
violations.append(f"{rel}: 주문 엔드포인트 경로 발견 — {forbidden!r}")
for tr_id in FORBIDDEN_ORDER_TR_IDS:
if tr_id in text:
violations.append(f"{rel}: 주문 TR_ID 발견 — {tr_id!r}")
for match in re.finditer(r"def\s+(\w+)\s*\(", text):
name = match.group(1).lower()
for banned in BANNED_FUNCTION_NAME_SUBSTRINGS:
if banned in name:
violations.append(f"{rel}: 주문 제출/정정/취소로 의심되는 함수명 — def {match.group(1)}(")
return violations
def _check_kis_client_guard_intact() -> list[str]:
"""kis_api_client_v1.py가 실제로 존재하면, 가드 코드가 그대로 있는지 + _send_request가
HTTP 호출 전에 _assert_read_only를 부르는지 순서를 확인한다."""
client_path = ROOT / "src" / "quant_engine" / "kis_api_client_v1.py"
if not client_path.exists():
return [] # 클라이언트가 아직 없으면 이 검사는 스킵(다른 검사로 충분)
text = client_path.read_text(encoding="utf-8")
violations: list[str] = []
required_markers = ("_assert_read_only", "OrderEndpointBlockedError", "FORBIDDEN_PATH_SUBSTRINGS", "FORBIDDEN_TR_ID_PREFIXES")
for marker in required_markers:
if marker not in text:
violations.append(f"kis_api_client_v1.py: 필수 가드 구성요소 누락 — {marker!r}")
send_request_match = re.search(r"def _send_request\(.*?\)\s*(?:->[^:]*)?:(.*?)(?=\ndef |\Z)", text, re.S)
if send_request_match:
body = send_request_match.group(1)
guard_pos = body.find("_assert_read_only(")
http_pos = min(
(pos for pos in (body.find("requests.get("), body.find("requests.post(")) if pos != -1),
default=-1,
)
if guard_pos == -1:
violations.append("kis_api_client_v1.py: _send_request가 _assert_read_only를 호출하지 않음")
elif http_pos != -1 and guard_pos > http_pos:
violations.append("kis_api_client_v1.py: _assert_read_only 호출이 HTTP 전송보다 늦음(순서 위반)")
else:
violations.append("kis_api_client_v1.py: _send_request 함수를 찾을 수 없음")
return violations
def main() -> int:
violations = _scan_python_files() + _check_kis_client_guard_intact()
if violations:
print("NO_DIRECT_API_TRADING_GATE: FAIL")
for v in violations:
print(f" - {v}")
return 1
print("NO_DIRECT_API_TRADING_GATE: PASS")
return 0
if __name__ == "__main__":
raise SystemExit(main())