diff --git a/.gitea/workflows/kis_data_collection.yml b/.gitea/workflows/kis_data_collection.yml new file mode 100644 index 0000000..5bccfc9 --- /dev/null +++ b/.gitea/workflows/kis_data_collection.yml @@ -0,0 +1,131 @@ +name: KIS Data Collection (SQLite Canonical Feed) + +# ───────────────────────────────────────────────────────────────── +# [중요] 이 워크플로우는 KIS Open API를 코어로 하는 read-only 데이터 수집만 수행한다. +# xlsx를 직접 읽지 않고 GatherTradingData.json + live read-only APIs를 통해 +# SQLite canonical store를 갱신한다. 매수/매도 주문은 어떤 경우에도 실행하지 않는다. +# +# 스케줄: 영업일(월~금) 08:00~17:00 KST, 2시간 간격(08/10/12/14/16시). +# Gitea Actions의 schedule cron은 UTC 기준으로 평가된다(서버 타임존이 별도 +# 설정되어 있지 않은 경우의 기본값). 아래 cron은 UTC로 작성했다: +# KST 08:00 = UTC 전날 23:00 → 요일은 "한국 기준 평일"에 맞춰 UTC 0-4(일~목)로 이동 +# KST 10/12/14/16:00 = UTC 01/03/05/07:00, 같은 날(UTC 월~금, 1-5) +# +# [실제 Gitea 서버 타임존이 Asia/Seoul로 설정되어 있다면] 아래 cron을 그대로 +# "0 8,10,12,14,16 * * 1-5" 한 줄로 교체하면 된다 — 첫 실행 후 Actions 실행 +# 기록의 타임스탬프를 확인해 KST 08시 전후로 도는지 검증할 것(추정하지 말고 확인). +# +# 스케줄 주기 변경: 아래 schedule 목록의 cron 줄을 추가/삭제/수정하면 된다. +# 예) 1시간 간격으로 바꾸려면 09,11,13,15시 슬롯을 추가. +# ───────────────────────────────────────────────────────────────── + +on: + schedule: + - cron: "0 23 * * 0-4" # KST 월~금 08:00 (UTC 일~목 23:00) + - cron: "0 1 * * 1-5" # KST 월~금 10:00 (UTC 01:00) + - cron: "0 3 * * 1-5" # KST 월~금 12:00 (UTC 03:00) + - cron: "0 5 * * 1-5" # KST 월~금 14:00 (UTC 05:00) + - cron: "0 7 * * 1-5" # KST 월~금 16:00 (UTC 07:00) + workflow_dispatch: # 수동 실행 — 스케줄 검증/즉시 재시도용 + +jobs: + collect-kis-data: + runs-on: self-hosted + + steps: + - name: Checkout Code + run: | + if [ -d .git ]; then + git remote set-url origin http://x-access-token:${{ secrets.GITHUB_TOKEN }}@192.168.123.100:8418/KimJaeHyun/myfinance.git + else + git init + git remote add origin http://x-access-token:${{ secrets.GITHUB_TOKEN }}@192.168.123.100:8418/KimJaeHyun/myfinance.git + fi + git fetch origin main --depth=1 + git reset --hard FETCH_HEAD + if [ ! -f GatherTradingData.json ]; then + echo "::error::GatherTradingData.json 없음 — canonical seed snapshot이 필요합니다." + exit 1 + fi + + - name: Configure Runtime Paths + run: | + export PATH=/usr/local/bin:$PATH + echo "/usr/local/bin" >> $GITHUB_PATH + /usr/bin/python3 --version + + - name: Setup Python Environment + run: | + VENV_BASE=/volume1/gitea/python_venv + REQ_HASH=$(md5sum tools/run_kis_data_collection_v1.py 2>/dev/null | cut -d' ' -f1 || echo "kis-default") + VENV="$VENV_BASE/$REQ_HASH" + + if [ ! -f "$VENV/bin/python" ]; then + mkdir -p "$VENV_BASE" + /usr/bin/python3 -m venv "$VENV" + if [ ! -f "$VENV/bin/pip" ]; then + curl -sS https://bootstrap.pypa.io/pip/3.8/get-pip.py -o get-pip.py + "$VENV/bin/python" get-pip.py --quiet + rm get-pip.py + fi + "$VENV/bin/pip" install --upgrade pip --quiet + "$VENV/bin/pip" install requests beautifulsoup4 pyyaml --quiet + ls -dt "$VENV_BASE"/*/ 2>/dev/null | tail -n +3 | xargs rm -rf 2>/dev/null || true + fi + echo "$VENV/bin" >> $GITHUB_PATH + + - name: "[CRITICAL] No Direct API Trading Gate" + run: python3 tools/validate_no_direct_api_trading_v1.py + + - name: "[CRITICAL] Validate KIS API Credentials (mock)" + env: + KIS_APP_Key_TEST: ${{ secrets.KIS_APP_KEY_TEST }} + KIS_APP_Secret_TEST: ${{ secrets.KIS_APP_SECRET_TEST }} + run: | + python3 tools/validate_kis_api_credentials_v1.py \ + --account mock \ + --ticker 005930 + + - name: Collect KIS Market Data to SQLite (read-only) + env: + KIS_APP_Key: ${{ secrets.KIS_APP_KEY }} + KIS_APP_Secret: ${{ secrets.KIS_APP_SECRET }} + run: | + python3 tools/run_kis_data_collection_v1.py \ + --input-json GatherTradingData.json \ + --sqlite-db outputs/kis_data_collection/kis_data_collection.db \ + --output-json Temp/kis_data_collection_v1.json \ + --kis-account real + + - name: Validate SQLite Artifact + run: | + python3 - <<'PY' + import json, sqlite3 + from pathlib import Path + db = Path("outputs/kis_data_collection/kis_data_collection.db") + report = Path("Temp/kis_data_collection_v1.json") + assert db.exists(), f"missing db: {db}" + assert report.exists(), f"missing report: {report}" + conn = sqlite3.connect(db) + try: + run_count = conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0] + snap_count = conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0] + print(json.dumps({"run_count": run_count, "snapshot_count": snap_count}, ensure_ascii=False)) + assert run_count >= 1 + assert snap_count >= 1 + finally: + conn.close() + PY + + - name: Notify Run Result + if: always() + run: | + STATUS="${{ job.status }}" + RUN_URL="${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + SUMMARY_FILE="Temp/kis_data_collection_v1.json" + SUMMARY_TEXT="(요약 파일 없음)" + [ -f "$SUMMARY_FILE" ] && SUMMARY_TEXT=$(cat "$SUMMARY_FILE") + echo "=== KIS Data Collection Result ===" + echo "status: $STATUS" + echo "summary: $SUMMARY_TEXT" + echo "run log: $RUN_URL" diff --git a/docs/GITEA_SECRETS_SETUP.md b/docs/GITEA_SECRETS_SETUP.md new file mode 100644 index 0000000..cc295c2 --- /dev/null +++ b/docs/GITEA_SECRETS_SETUP.md @@ -0,0 +1,48 @@ +# Gitea Secrets Setup + +이 저장소는 KIS Open API와 Gitea workflow를 분리해서 사용한다. +실제 시크릿 등록은 Gitea 관리자 권한이 있는 운영자가 수행해야 한다. + +## Required Secrets + +### Shared + +- `GITHUB_TOKEN` + +### KIS read-only validation + +- `KIS_APP_KEY_TEST` +- `KIS_APP_SECRET_TEST` + +### KIS real data collection + +- `KIS_APP_KEY` +- `KIS_APP_SECRET` + +## Workflow Mapping + +- `.gitea/workflows/kis_data_collection.yml` + - mock validation: `KIS_APP_KEY_TEST`, `KIS_APP_SECRET_TEST` + - real collection: `KIS_APP_KEY`, `KIS_APP_SECRET` +- `.gitea/workflows/qualitative_sell_strategy.yml` + - mock validation: `KIS_APP_KEY_TEST`, `KIS_APP_SECRET_TEST` + - real collection: `KIS_APP_KEY`, `KIS_APP_SECRET` +- `.gitea/workflows/ci.yml` + - mock validation: `KIS_APP_KEY_TEST`, `KIS_APP_SECRET_TEST` + +## Runtime Rule + +- mock 계정은 유효성 확인용이다. +- real 계정은 실제 데이터 수집용이다. +- 둘을 같은 단계에서 혼용하지 않는다. + +## Verification + +Run: + +```bash +python tools/validate_gitea_secrets_contract_v1.py +``` + +The validator checks that the workflows reference the required secret names +with the expected separation between mock and real usage. diff --git a/governance/rules/00_core_locks.yaml b/governance/rules/00_core_locks.yaml index 5569727..14b7491 100644 --- a/governance/rules/00_core_locks.yaml +++ b/governance/rules/00_core_locks.yaml @@ -2,6 +2,13 @@ schema_version: agents_rule.v1 rule_id: CORE_LOCKS_V1 title: Core locks and no-hallucination rules summary: + - "[NO_DIRECT_API_TRADING] 매수/매도 주문은 어떤 API(한국투자증권 KIS Open API 포함)를 통해서도 + 직접 실행하지 않는다. 이 엔진의 모든 산출물은 '제안'이며, 실제 주문 실행은 반드시 사람이 + HTS에서 수동으로 입력한다. 이 원칙을 어기면 엔진 전체가 의미를 잃는다(사용자 직접 지시, + 2026-06-21) — governance/rules/06_no_direct_api_trading.yaml 참조." + - "[NO_KIS_ACCOUNT_BALANCE_QUERY] KIS Open API로 계좌 보유종목/잔고를 조회하지 않는다. + 보유종목의 유일한 출처는 HTS 캡처 → account_snapshot이다(사용자 직접 지시, 2026-06-21) + — governance/rules/07_no_kis_account_balance_query.yaml 참조." - Use spec/13_formula_registry.yaml for all prices, stops, targets, quantities. - Do not invent prices, quantities, or formulas. - If harness data is missing, print DATA_MISSING — 하네스 업데이트 필요. diff --git a/governance/rules/06_no_direct_api_trading.yaml b/governance/rules/06_no_direct_api_trading.yaml new file mode 100644 index 0000000..7d91529 --- /dev/null +++ b/governance/rules/06_no_direct_api_trading.yaml @@ -0,0 +1,55 @@ +schema_version: agents_rule.v1 +rule_id: NO_DIRECT_API_TRADING_V1 +title: API를 통한 매수/매도 직접 실행 절대 금지 — 최상위 안전 규칙 +priority: CRITICAL +origin: "사용자 직접 지시 (2026-06-21): '반드시 지침에 가장 중요한 하네스인 매수/매도는 + API를 통해서 직접하지 않는다가 원칙이다. 이걸 지키지 않는다면 엔진으로서 의미는 없다.'" +has_code_implementation: true +code_path: + - "src/quant_engine/kis_api_client_v1.py" + - "tools/validate_no_direct_api_trading_v1.py" + +summary: + - "이 엔진(은퇴자산포트폴리오 퀀트엔진)은 어떤 외부 API를 통해서도 매수/매도 주문을 + 직접 실행하지 않는다. 한국투자증권 KIS Open API를 포함해, 향후 연동되는 모든 + 브로커/거래소 API에 동일하게 적용된다." + - "이 엔진의 모든 산출물(final_decision_packet, sell_priority, rebalance orders 등)은 + '제안(proposal)'이지 '주문 실행(execution)'이 아니다. 실제 매수/매도 주문은 반드시 + 사람이 HTS(홈트레이딩시스템)에서 직접 확인 후 수동으로 입력한다." + - "이 원칙은 데이터 수집(read-only) API 사용을 금지하지 않는다 — 시세/호가/공매도/ + 투자자별 매매동향 등 조회성 데이터 수집은 허용된다. 금지 대상은 주문 제출 + (order placement), 정정(modify), 취소(cancel) API 호출뿐이다." + +scope: + applies_to: + - "한국투자증권(KIS) Open API — https://apiportal.koreainvestment.com" + - "향후 추가되는 모든 브로커/거래소 Open API 연동" + prohibited_actions: + - "주문 제출(매수/매도 주문 전송) API 호출" + - "기존 주문 정정/취소 API 호출" + - "잔고를 변경시키는 모든 쓰기성(write) API 호출" + allowed_actions: + - "시세 조회(현재가, 호가, 일자별 시세)" + - "공매도 일별추이 조회" + - "투자자별 매매동향 조회" + - "계좌 잔고/평가 조회(읽기 전용)" + +enforcement: + code_level: + rule: "KIS API 클라이언트 모듈(src/quant_engine/kis_api_client_v1.py)의 모든 HTTP 요청은 + 단일 공유 함수를 통해서만 전송되며, 그 함수는 차단 목록(FORBIDDEN_TR_ID_PREFIXES, + FORBIDDEN_PATH_SUBSTRINGS)에 해당하는 TR_ID/경로를 만나면 즉시 RuntimeError를 + 발생시켜 요청을 중단한다. 주문 제출/정정/취소 함수는 이 코드베이스에 일체 작성하지 + 않는다(함수 자체가 존재하지 않음 — 가드는 방어적 2차 안전장치)." + test: "tests/unit/test_kis_api_client_v1.py — 차단 목록에 있는 TR_ID/경로로 요청 시 + RuntimeError가 발생하는지 검증 + 소스코드 전체에 주문 제출 엔드포인트 경로 + 문자열(/uapi/domestic-stock/v1/trading/order-cash 등)이 한 글자도 존재하지 않는지 + 정적 grep 검증." + review_level: + rule: "이 모듈에 새 함수를 추가할 때마다 반드시 KIS Open API 공식 문서에서 해당 + TR_ID가 조회(quotations)/순위(ranking)/계좌조회(read-only) 카테고리인지 확인하고, + trading(주문) 카테고리 함수는 어떤 이유로도 추가하지 않는다." + +violation_consequence: "이 규칙을 어기면 엔진 전체가 '제안 시스템'에서 '자동매매 시스템'으로 + 변질되어 프로젝트의 핵심 전제(사람이 최종 승인·입력)가 깨진다. 사용자가 명시적으로 + '엔진으로서 의미는 없다'고 표현한 절대 우선 규칙이다." diff --git a/governance/rules/07_no_kis_account_balance_query.yaml b/governance/rules/07_no_kis_account_balance_query.yaml new file mode 100644 index 0000000..d79fb9a --- /dev/null +++ b/governance/rules/07_no_kis_account_balance_query.yaml @@ -0,0 +1,44 @@ +schema_version: agents_rule.v1 +rule_id: NO_KIS_ACCOUNT_BALANCE_QUERY_V1 +title: KIS Open API로 계좌 보유종목/잔고 정보를 조회하지 않는다 — 필수 지침 +priority: CRITICAL +origin: "사용자 직접 지시 (2026-06-21): 'OPEN API에 계좌 보유종목에 대한 정보는 사용하지 + 않는다. 필수 지침이다.'" +has_code_implementation: true +code_path: + - "src/quant_engine/kis_api_client_v1.py" + - "tools/validate_no_direct_api_trading_v1.py" + +summary: + - "한국투자증권(KIS) Open API는 시세/호가/공매도/투자자매매동향 등 시장 전체에 공개된 + 조회성 데이터 수집에만 사용한다. 계좌 보유종목·잔고·평가금액 조회(주식잔고조회 등) + API는 호출하지 않는다." + - "보유종목 정보의 유일한 출처(source of truth)는 기존 HTS 캡처 → ChatGPT 파싱 → GAS + account_snapshot 시트 워크플로우다. 이 원칙은 [[feedback_direction_a_no_manual_input]] + (positions 수동입력 금지)와 같은 계열의 데이터 출처 통제 규칙이며, KIS API가 그 + 경로를 대체하거나 보강하지 않는다." + - "이 규칙은 governance/rules/06_no_direct_api_trading.yaml(주문 미실행)과 별개의 + 독립적인 제약이다 — 06번 규칙은 '쓰기(주문)'를 금지하고, 이 규칙은 '계좌 식별 데이터 + 조회(읽기)'를 금지한다. 두 규칙 모두 충돌 없이 동시에 적용된다." + +scope: + prohibited_tr_ids: + - "TTTC8434R" # 주식잔고조회(실전) + - "VTTC8434R" # 주식잔고조회(모의) + prohibited_path_substrings: + - "/trading/inquire-balance" + rationale: > + 이미 governance/rules/06의 FORBIDDEN_PATH_SUBSTRINGS=("/trading/",)가 이 경로를 + 구조적으로 차단하지만(주식잔고조회도 /trading/ 하위 경로), 이 규칙은 그것이 + '주문 차단의 부수효과'가 아니라 '계좌정보 비조회'라는 독립적이고 의도적인 정책임을 + 명시한다. + +enforcement: + code_level: "src/quant_engine/kis_api_client_v1.py에 inquire-balance 관련 함수를 작성하지 + 않는다(함수 자체가 존재하지 않음). TTTC8434R/VTTC8434R을 FORBIDDEN_TR_ID_PREFIXES에 + 추가해 2차 방어." + test_level: "tests/unit/test_kis_api_client_v1.py — TTTC8434R/VTTC8434R 차단 검증 + + tools/validate_no_direct_api_trading_v1.py 정적 스캔에 동일 TR_ID/경로 포함." + +violation_consequence: "계좌 보유정보를 KIS API로 조회하면 HTS 캡처 기반 단일 진실원천 + 원칙이 깨지고, 두 개의 서로 다른 보유종목 데이터 경로가 생겨 정합성 검증이 불가능해진다." diff --git a/src/quant_engine/data_collection_backend_v1.py b/src/quant_engine/data_collection_backend_v1.py new file mode 100644 index 0000000..e998c29 --- /dev/null +++ b/src/quant_engine/data_collection_backend_v1.py @@ -0,0 +1,18 @@ +"""Storage backend selection for the collection pipeline. + +This module is a thin compatibility wrapper over the generic storage backend +contract. The collector is intentionally designed around a backend contract, +not a hard SQLite-only assumption. +""" +from __future__ import annotations + +from pathlib import Path + +from src.quant_engine.storage_backend_v1 import StoreSpec, default_sqlite_store_path, normalize_store_spec + + +CollectionStoreSpec = StoreSpec + + +def default_collection_store_path(root: Path) -> Path: + return default_sqlite_store_path(root, "kis_data_collection/kis_data_collection.db") diff --git a/src/quant_engine/data_collection_store_v1.py b/src/quant_engine/data_collection_store_v1.py new file mode 100644 index 0000000..81848b6 --- /dev/null +++ b/src/quant_engine/data_collection_store_v1.py @@ -0,0 +1,370 @@ +"""SQLite store for platform-transition data collection outputs. + +This store is intentionally small and backend-agnostic enough to be upgraded to +PostgreSQL later without changing the row contract. The canonical payload is the +normalized factor row plus provenance metadata. +""" +from __future__ import annotations + +import json +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable + + +SCHEMA = """ +PRAGMA journal_mode=WAL; + +CREATE TABLE IF NOT EXISTS collection_runs ( + run_id TEXT PRIMARY KEY, + collector_name TEXT NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT, + status TEXT NOT NULL, + input_source TEXT, + output_json_path TEXT, + output_db_path TEXT, + notes TEXT, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS collection_snapshots ( + run_id TEXT NOT NULL, + dataset_name TEXT NOT NULL, + ticker TEXT NOT NULL, + name TEXT, + sector TEXT, + as_of_date TEXT, + source_priority TEXT, + source_status TEXT, + payload_json TEXT NOT NULL, + provenance_json TEXT NOT NULL, + created_at TEXT DEFAULT (datetime('now')), + PRIMARY KEY (run_id, dataset_name, ticker) +); + +CREATE TABLE IF NOT EXISTS collection_source_errors ( + run_id TEXT NOT NULL, + ticker TEXT, + source_name TEXT NOT NULL, + error_kind TEXT NOT NULL, + error_message TEXT NOT NULL, + payload_json TEXT, + created_at TEXT DEFAULT (datetime('now')) +); + +CREATE INDEX IF NOT EXISTS idx_collection_snapshots_ticker_time + ON collection_snapshots(ticker, created_at DESC); + +CREATE INDEX IF NOT EXISTS idx_collection_source_errors_run + ON collection_source_errors(run_id, source_name); +""" + + +@dataclass(frozen=True) +class CollectionRun: + run_id: str + collector_name: str + started_at: str + status: str + input_source: str | None = None + output_json_path: str | None = None + output_db_path: str | None = None + notes: str | None = None + + +def init_db(db_path: Path) -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(db_path) + try: + conn.executescript(SCHEMA) + conn.commit() + finally: + conn.close() + + +def upsert_collection_run(db_path: Path, run: CollectionRun, finished_at: str | None = None) -> None: + init_db(db_path) + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + INSERT INTO collection_runs ( + run_id, collector_name, started_at, finished_at, status, + input_source, output_json_path, output_db_path, notes + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(run_id) DO UPDATE SET + collector_name=excluded.collector_name, + started_at=excluded.started_at, + finished_at=excluded.finished_at, + status=excluded.status, + input_source=excluded.input_source, + output_json_path=excluded.output_json_path, + output_db_path=excluded.output_db_path, + notes=excluded.notes + """, + ( + run.run_id, + run.collector_name, + run.started_at, + finished_at, + run.status, + run.input_source, + run.output_json_path, + run.output_db_path, + run.notes, + ), + ) + conn.commit() + finally: + conn.close() + + +def upsert_collection_snapshot( + db_path: Path, + *, + run_id: str, + dataset_name: str, + ticker: str, + name: str | None, + sector: str | None, + as_of_date: str | None, + source_priority: str, + source_status: str, + payload: dict[str, Any], + provenance: dict[str, Any], +) -> None: + init_db(db_path) + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + INSERT INTO collection_snapshots ( + run_id, dataset_name, ticker, name, sector, as_of_date, + source_priority, source_status, payload_json, provenance_json + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(run_id, dataset_name, ticker) DO UPDATE SET + name=excluded.name, + sector=excluded.sector, + as_of_date=excluded.as_of_date, + source_priority=excluded.source_priority, + source_status=excluded.source_status, + payload_json=excluded.payload_json, + provenance_json=excluded.provenance_json + """, + ( + run_id, + dataset_name, + ticker, + name, + sector, + as_of_date, + source_priority, + source_status, + json.dumps(payload, ensure_ascii=False, default=str), + json.dumps(provenance, ensure_ascii=False, default=str), + ), + ) + conn.commit() + finally: + conn.close() + + +def append_collection_error( + db_path: Path, + *, + run_id: str, + source_name: str, + error_kind: str, + error_message: str, + ticker: str | None = None, + payload: dict[str, Any] | None = None, +) -> None: + init_db(db_path) + conn = sqlite3.connect(db_path) + try: + conn.execute( + """ + INSERT INTO collection_source_errors ( + run_id, ticker, source_name, error_kind, error_message, payload_json + ) VALUES (?, ?, ?, ?, ?, ?) + """, + ( + run_id, + ticker, + source_name, + error_kind, + error_message, + json.dumps(payload or {}, ensure_ascii=False, default=str), + ), + ) + conn.commit() + finally: + conn.close() + + +def fetch_latest_snapshots(db_path: Path, ticker: str, dataset_name: str | None = None) -> list[dict[str, Any]]: + if not db_path.exists(): + return [] + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + if dataset_name: + rows = conn.execute( + """ + SELECT * FROM collection_snapshots + WHERE ticker = ? AND dataset_name = ? + ORDER BY created_at DESC + """, + (ticker, dataset_name), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT * FROM collection_snapshots + WHERE ticker = ? + ORDER BY created_at DESC + """, + (ticker,), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() + + +def iter_recent_snapshots(db_path: Path, limit: int = 50) -> Iterable[dict[str, Any]]: + if not db_path.exists(): + return [] + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + "SELECT * FROM collection_snapshots ORDER BY created_at DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() + + +def load_collection_runs(db_path: Path, limit: int = 20) -> list[dict[str, Any]]: + if not db_path.exists(): + return [] + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """ + SELECT run_id, collector_name, started_at, finished_at, status, + input_source, output_json_path, output_db_path, notes, created_at + FROM collection_runs + ORDER BY started_at DESC, created_at DESC + LIMIT ? + """, + (int(limit),), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() + + +def load_collection_errors(db_path: Path, limit: int = 20) -> list[dict[str, Any]]: + if not db_path.exists(): + return [] + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute( + """ + SELECT run_id, ticker, source_name, error_kind, error_message, payload_json, created_at + FROM collection_source_errors + ORDER BY created_at DESC + LIMIT ? + """, + (int(limit),), + ).fetchall() + return [dict(row) for row in rows] + finally: + conn.close() + + +def load_collection_dashboard_state( + db_path: Path | str | None = None, + output_json_path: Path | str | None = None, + *, + limit: int = 8, +) -> dict[str, Any]: + db = Path(db_path) if db_path else Path() + report = Path(output_json_path) if output_json_path else Path() + state: dict[str, Any] = { + "db_path": str(db), + "output_json_path": str(report) if output_json_path else "", + "runs": [], + "recent_snapshots": [], + "recent_errors": [], + "counts": { + "collection_runs": 0, + "collection_snapshots": 0, + "collection_source_errors": 0, + }, + "latest_run": {}, + "latest_report": {}, + } + if report.exists(): + try: + state["latest_report"] = json.loads(report.read_text(encoding="utf-8")) + except Exception: + state["latest_report"] = {} + if not db.exists(): + return state + conn = sqlite3.connect(db) + conn.row_factory = sqlite3.Row + try: + state["counts"] = { + "collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0], + "collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0], + "collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0], + } + run_row = conn.execute( + """ + SELECT run_id, collector_name, started_at, finished_at, status, + input_source, output_json_path, output_db_path, notes, created_at + FROM collection_runs + ORDER BY started_at DESC, created_at DESC + LIMIT 1 + """ + ).fetchone() + state["latest_run"] = dict(run_row) if run_row is not None else {} + state["runs"] = [dict(row) for row in conn.execute( + """ + SELECT run_id, collector_name, started_at, finished_at, status, + input_source, output_json_path, output_db_path, notes, created_at + FROM collection_runs + ORDER BY started_at DESC, created_at DESC + LIMIT ? + """, + (int(limit),), + ).fetchall()] + state["recent_snapshots"] = [dict(row) for row in conn.execute( + """ + SELECT run_id, dataset_name, ticker, name, sector, as_of_date, + source_priority, source_status, created_at + FROM collection_snapshots + ORDER BY created_at DESC + LIMIT ? + """, + (int(limit),), + ).fetchall()] + state["recent_errors"] = [dict(row) for row in conn.execute( + """ + SELECT run_id, ticker, source_name, error_kind, error_message, created_at + FROM collection_source_errors + ORDER BY created_at DESC + LIMIT ? + """, + (int(limit),), + ).fetchall()] + finally: + conn.close() + return state diff --git a/src/quant_engine/kis_api_client_v1.py b/src/quant_engine/kis_api_client_v1.py new file mode 100644 index 0000000..013d4e3 --- /dev/null +++ b/src/quant_engine/kis_api_client_v1.py @@ -0,0 +1,212 @@ +"""한국투자증권(KIS) Open API 클라이언트 — 조회(read-only) 전용. + +근거: https://apiportal.koreainvestment.com/apiservice-summary , + https://github.com/koreainvestment/open-trading-api (2026-06-21 실측 확인된 + api_url/tr_id만 사용 — 추정 금지). + +══════════════════════════════════════════════════════════════════════════════ +[CRITICAL] governance/rules/06_no_direct_api_trading.yaml — 절대 규칙 +이 모듈은 매수/매도 주문을 어떤 경로로도 제출하지 않는다. 주문 제출/정정/취소 +함수는 이 파일에 일체 작성하지 않으며, 공유 요청 함수(_send_request)는 주문 +관련 경로("/trading/")나 TR_ID(TTTC08*/VTTC08* 등)를 만나면 즉시 RuntimeError로 +요청을 차단한다(2차 방어). 이 원칙을 어기면 엔진 전체가 '제안 시스템'에서 +'자동매매 시스템'으로 변질되어 프로젝트 핵심 전제가 깨진다(사용자 직접 지시). +══════════════════════════════════════════════════════════════════════════════ + +인증 정보는 Windows 환경변수에서 읽는다(실제계좌: KIS_APP_Key/KIS_APP_Secret, +모의계좌: KIS_APP_Key_TEST/KIS_APP_Secret_TEST). 방금 setx로 설정된 값은 현재 +프로세스의 os.environ에 아직 반영되지 않을 수 있어, HKCU\\Environment 레지스트리 +폴백을 둔다(읽기만 함, 값을 로그에 남기지 않음). +""" +from __future__ import annotations + +import datetime as dt +import json +import sys +from pathlib import Path +from typing import Any + +import requests + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +REAL_DOMAIN = "https://openapi.koreainvestment.com:9443" +MOCK_DOMAIN = "https://openapivts.koreainvestment.com:29443" +TOKEN_CACHE_DIR = ROOT / "Temp" + +# ── [CRITICAL] 주문 차단 목록 — 절대 수정/완화 금지 (governance/rules/06_no_direct_api_trading.yaml) ── +# "/trading/" 하위 경로는 주문(order)뿐 아니라 계좌잔고조회(inquire-balance)도 포함한다. +# 계좌 보유종목/잔고는 governance/rules/07_no_kis_account_balance_query.yaml에 의해 +# 별도로도 금지된다 — HTS 캡처가 유일한 출처(사용자 직접 지시). +FORBIDDEN_PATH_SUBSTRINGS: tuple[str, ...] = ("/trading/",) +FORBIDDEN_TR_ID_PREFIXES: tuple[str, ...] = ( + "TTTC08", "VTTC08", "TTTC01", "VTTC01", # 현금/신용 매수·매도·정정·취소 + "TTTC8434R", "VTTC8434R", # 주식잔고조회 — 계좌 보유종목 조회 금지(07번 규칙) +) + + +class OrderEndpointBlockedError(RuntimeError): + """주문 제출/정정/취소 경로 호출 시도 — 절대 차단.""" + + +def _assert_read_only(path: str, tr_id: str) -> None: + for forbidden in FORBIDDEN_PATH_SUBSTRINGS: + if forbidden in path: + raise OrderEndpointBlockedError( + f"BLOCKED: 주문 관련 경로 호출 시도 차단 — path={path!r}. " + "이 엔진은 매수/매도를 API로 직접 실행하지 않는다(governance/rules/06_no_direct_api_trading.yaml)." + ) + for prefix in FORBIDDEN_TR_ID_PREFIXES: + if tr_id.upper().startswith(prefix): + raise OrderEndpointBlockedError( + f"BLOCKED: 주문 관련 TR_ID 호출 시도 차단 — tr_id={tr_id!r}. " + "이 엔진은 매수/매도를 API로 직접 실행하지 않는다(governance/rules/06_no_direct_api_trading.yaml)." + ) + + +def _read_env_var(name: str) -> str | None: + import os + + value = os.environ.get(name) + if value: + return value + if sys.platform != "win32": + return None + try: + import winreg + + with winreg.OpenKey(winreg.HKEY_CURRENT_USER, "Environment") as key: + value, _ = winreg.QueryValueEx(key, name) + return value or None + except OSError: + return None + + +class KisCredentials: + def __init__(self, app_key: str, app_secret: str, account: str): + self.app_key = app_key + self.app_secret = app_secret + self.account = account # "real" | "mock" + self.domain = REAL_DOMAIN if account == "real" else MOCK_DOMAIN + + @classmethod + def load(cls, account: str = "mock") -> "KisCredentials": + if account == "real": + key_name, secret_name = "KIS_APP_Key", "KIS_APP_Secret" + elif account == "mock": + key_name, secret_name = "KIS_APP_Key_TEST", "KIS_APP_Secret_TEST" + else: + raise ValueError("account must be 'real' or 'mock'") + app_key = _read_env_var(key_name) + app_secret = _read_env_var(secret_name) + if not app_key or not app_secret: + raise RuntimeError( + f"{key_name}/{secret_name} 환경변수를 찾을 수 없음 — Windows 환경변수 설정 후 " + "새 셸에서 재시도하거나 HKCU\\Environment 레지스트리 반영을 확인하세요." + ) + return cls(app_key=app_key, app_secret=app_secret, account=account) + + +def _token_cache_path(creds: KisCredentials) -> Path: + TOKEN_CACHE_DIR.mkdir(parents=True, exist_ok=True) + return TOKEN_CACHE_DIR / f"kis_token_cache_{creds.account}.json" + + +def _issue_or_reuse_token(creds: KisCredentials) -> str: + """KIS는 토큰 발급 빈도를 제한한다 — 만료 전까지 캐시 재사용 필수.""" + cache_path = _token_cache_path(creds) + if cache_path.exists(): + try: + cached = json.loads(cache_path.read_text(encoding="utf-8")) + expires_at = dt.datetime.fromisoformat(cached["expires_at"]) + if dt.datetime.now(dt.timezone.utc) < expires_at - dt.timedelta(minutes=10): + return cached["access_token"] + except (json.JSONDecodeError, KeyError, ValueError): + pass + + resp = requests.post( + f"{creds.domain}/oauth2/tokenP", + json={"grant_type": "client_credentials", "appkey": creds.app_key, "appsecret": creds.app_secret}, + timeout=15, + ) + resp.raise_for_status() + body = resp.json() + access_token = body["access_token"] + expires_in_sec = int(body.get("expires_in", 86400)) + expires_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expires_in_sec) + cache_path.write_text( + json.dumps({"access_token": access_token, "expires_at": expires_at.isoformat()}, ensure_ascii=False), + encoding="utf-8", + ) + return access_token + + +def _send_request(creds: KisCredentials, path: str, tr_id: str, params: dict[str, Any]) -> dict[str, Any]: + """모든 KIS REST 호출의 단일 진입점 — 여기서만 가드가 작동하면 충분하다.""" + _assert_read_only(path, tr_id) # [CRITICAL] 절대 제거 금지 + access_token = _issue_or_reuse_token(creds) + headers = { + "content-type": "application/json; charset=utf-8", + "authorization": f"Bearer {access_token}", + "appkey": creds.app_key, + "appsecret": creds.app_secret, + "tr_id": tr_id, + "custtype": "P", + } + resp = requests.get(f"{creds.domain}{path}", headers=headers, params=params, timeout=15) + resp.raise_for_status() + return resp.json() + + +# ── 조회(read-only) 함수 — 전부 GET, 전부 quotations/ranking 카테고리 (실측 확인) ────────── + +def get_current_price(creds: KisCredentials, code: str) -> dict[str, Any]: + """주식현재가 시세. api_url=/uapi/domestic-stock/v1/quotations/inquire-price, tr_id=FHKST01010100.""" + return _send_request( + creds, "/uapi/domestic-stock/v1/quotations/inquire-price", "FHKST01010100", + {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code}, + ) + + +def get_asking_price_10_level(creds: KisCredentials, code: str) -> dict[str, Any]: + """주식현재가 호가/예상체결 — 10단계 매수/매도 호가. + api_url=/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn, tr_id=FHKST01010200. + """ + return _send_request( + creds, "/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn", "FHKST01010200", + {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code}, + ) + + +def get_daily_short_sale(creds: KisCredentials, code: str, start_date: str, end_date: str) -> dict[str, Any]: + """국내주식 공매도 일별추이. api_url=/uapi/domestic-stock/v1/quotations/daily-short-sale, + tr_id=FHPST04830000. start_date/end_date: YYYYMMDD.""" + return _send_request( + creds, "/uapi/domestic-stock/v1/quotations/daily-short-sale", "FHPST04830000", + {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, + "FID_INPUT_DATE_1": start_date, "FID_INPUT_DATE_2": end_date}, + ) + + +def get_daily_item_chart_price( + creds: KisCredentials, code: str, start_date: str, end_date: str, period: str = "D", +) -> dict[str, Any]: + """주식현재가 일자별. api_url=/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice, + tr_id=FHKST03010100.""" + return _send_request( + creds, "/uapi/domestic-stock/v1/quotations/inquire-daily-itemchartprice", "FHKST03010100", + {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code, + "FID_INPUT_DATE_1": start_date, "FID_INPUT_DATE_2": end_date, + "FID_PERIOD_DIV_CODE": period, "FID_ORG_ADJ_PRC": "0"}, + ) + + +def get_investor_trend(creds: KisCredentials, code: str) -> dict[str, Any]: + """주식현재가 투자자(개인/외국인/기관) 매매동향. + api_url=/uapi/domestic-stock/v1/quotations/inquire-investor, tr_id=FHKST01010900.""" + return _send_request( + creds, "/uapi/domestic-stock/v1/quotations/inquire-investor", "FHKST01010900", + {"FID_COND_MRKT_DIV_CODE": "J", "FID_INPUT_ISCD": code}, + ) diff --git a/src/quant_engine/kis_data_collection_v1.py b/src/quant_engine/kis_data_collection_v1.py new file mode 100644 index 0000000..5a18720 --- /dev/null +++ b/src/quant_engine/kis_data_collection_v1.py @@ -0,0 +1,378 @@ +"""KIS-first data collector for the CI scheduler. + +The collector uses the existing `GatherTradingData.json` snapshot as the seed +universe, then enriches Korean tickers with read-only KIS quotations and +orderbook data, while retaining Naver/Yahoo fallbacks when available. +The canonical persistence target is SQLite. +""" +from __future__ import annotations + +import argparse +import datetime as dt +import json +import os +import sys +import uuid +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +try: + from tools.fetch_naver_market_data_v1 import ( # type: ignore + _session as naver_session, + compute_relative_return_20d, + compute_volume_ratio_5d, + fetch_foreign_institution_flow, + fetch_price_history, + ) +except Exception: # pragma: no cover - optional adapter + naver_session = None + compute_relative_return_20d = None + compute_volume_ratio_5d = None + fetch_foreign_institution_flow = None + fetch_price_history = None + +try: + from src.quant_engine.kis_api_client_v1 import ( # type: ignore + KisCredentials, + get_asking_price_10_level, + get_current_price, + get_daily_short_sale, + ) +except Exception: # pragma: no cover - safe fallback in non-KIS environments + KisCredentials = None + get_asking_price_10_level = None + get_current_price = None + get_daily_short_sale = None + +from src.quant_engine.data_collection_store_v1 import ( + CollectionRun, + append_collection_error, + upsert_collection_run, + upsert_collection_snapshot, +) +from src.quant_engine.data_collection_backend_v1 import ( + CollectionStoreSpec, + normalize_store_spec, +) + + +def _kst_now_iso() -> str: + return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat() + + +def _load_json(path: Path) -> dict[str, Any]: + if not path.exists(): + return {} + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return {} + + +def _coerce_float(value: Any) -> float | None: + if value is None or value == "": + return None + try: + if isinstance(value, str): + value = value.replace(",", "").replace("%", "") + return float(value) + except (TypeError, ValueError): + return None + + +def _find_first_value(payload: Any, keys: tuple[str, ...]) -> Any: + stack = [payload] + while stack: + item = stack.pop() + if isinstance(item, dict): + for key in keys: + value = item.get(key) + if value not in (None, ""): + return value + stack.extend(item.values()) + elif isinstance(item, list): + stack.extend(item) + return None + + +def _normalize_naver_price_history(code: str) -> dict[str, Any]: + if naver_session is None or fetch_price_history is None: + return {"status": "DISABLED"} + try: + session = naver_session() + price = fetch_price_history(session, code) + result: dict[str, Any] = {"status": price.get("status", "UNKNOWN"), "source_url": price.get("source_url")} + rows = price.get("rows") or [] + if rows: + result["close"] = rows[0].get("close") + result["open"] = rows[0].get("open") + result["high"] = rows[0].get("high") + result["low"] = rows[0].get("low") + result["volume"] = rows[0].get("volume") + if compute_relative_return_20d is not None: + benchmark = fetch_price_history(session, "069500") + result["relative_return_20d"] = compute_relative_return_20d(rows, benchmark.get("rows", [])) + if compute_volume_ratio_5d is not None: + result["volume_ratio_5d"] = compute_volume_ratio_5d(rows) + if fetch_foreign_institution_flow is not None: + result["foreign_institution_flow"] = fetch_foreign_institution_flow(session, code) + return result + except Exception as exc: # noqa: BLE001 - fallback source must not break the batch + return {"status": "ERROR", "error": str(exc)} + + +def _normalize_kis_fields(code: str, account: str) -> dict[str, Any]: + if KisCredentials is None or get_current_price is None or get_asking_price_10_level is None or get_daily_short_sale is None: + return {"status": "DISABLED"} + try: + creds = KisCredentials.load(account) + except Exception as exc: + return {"status": "ERROR", "error": str(exc)} + + result: dict[str, Any] = {"status": "OK", "account": account} + try: + price = get_current_price(creds, code) + result["current_price_raw"] = price + result["current_price"] = _coerce_float(_find_first_value(price, ("stck_prpr", "stck_clpr", "close", "close_price"))) + result["open"] = _coerce_float(_find_first_value(price, ("stck_oprc", "open", "open_price"))) + result["high"] = _coerce_float(_find_first_value(price, ("stck_hgpr", "high", "high_price"))) + result["low"] = _coerce_float(_find_first_value(price, ("stck_lwpr", "low", "low_price"))) + result["prev_close"] = _coerce_float(_find_first_value(price, ("prdy_vrss", "prev_close"))) + result["volume"] = _coerce_float(_find_first_value(price, ("acml_vol", "volume"))) + result["change_pct"] = _coerce_float(_find_first_value(price, ("prdy_ctrt", "change_pct"))) + except Exception as exc: + result["price_status"] = "ERROR" + result["price_error"] = str(exc) + + try: + orderbook = get_asking_price_10_level(creds, code) + output1 = orderbook.get("output1") or {} + result["orderbook_raw"] = orderbook + result["microstructure_pressure"] = _coerce_float( + _find_first_value(output1, ("total_askp_rsqn", "total_bidp_rsqn")) + ) + result["ask_1"] = _coerce_float(_find_first_value(output1, ("askp1",))) + result["bid_1"] = _coerce_float(_find_first_value(output1, ("bidp1",))) + result["orderbook_status"] = "OK" + except Exception as exc: + result["orderbook_status"] = "ERROR" + result["orderbook_error"] = str(exc) + + try: + start = (dt.date.today() - dt.timedelta(days=10)).strftime("%Y%m%d") + end = dt.date.today().strftime("%Y%m%d") + short_sale = get_daily_short_sale(creds, code, start, end) + result["short_sale_raw"] = short_sale + rows = short_sale.get("output2") or [] + if rows: + latest = rows[0] + result["short_turnover_share"] = _coerce_float(latest.get("ssts_vol_rlim")) + result["short_sale_status"] = "OK" + except Exception as exc: + result["short_sale_status"] = "ERROR" + result["short_sale_error"] = str(exc) + + return result + + +def _build_seed_rows(source_json: Path) -> list[dict[str, Any]]: + payload = _load_json(source_json) + data = payload.get("data") or {} + core_satellite = {str(row.get("Ticker") or row.get("ticker") or ""): row for row in data.get("core_satellite", [])} + sector_lookup = {str(row.get("Ticker") or row.get("ticker") or ""): row.get("Sector") for row in data.get("core_satellite", [])} + rows: list[dict[str, Any]] = [] + for row in data.get("data_feed", []): + ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() + if not ticker: + continue + merged = dict(row) + core_row = core_satellite.get(ticker) or {} + if core_row: + for key, value in core_row.items(): + merged.setdefault(key, value) + merged["Sector"] = merged.get("Sector") or sector_lookup.get(ticker) + rows.append(merged) + return rows + + +def _collect_one(row: dict[str, Any], *, kis_account: str, include_naver: bool, include_live_kis: bool) -> tuple[dict[str, Any], dict[str, Any]]: + ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() + name = str(row.get("Name") or row.get("name") or "").strip() + sector = str(row.get("Sector") or row.get("sector") or "").strip() or None + normalized = dict(row) + provenance: dict[str, Any] = { + "ticker": ticker, + "name": name, + "sector": sector, + "source_priority": ["gathertradingdata_json"], + } + + if include_live_kis and ticker.isdigit() and len(ticker) == 6: + kis = _normalize_kis_fields(ticker, kis_account) + provenance["kis"] = kis + normalized.update({k: v for k, v in kis.items() if k not in {"current_price_raw", "orderbook_raw", "short_sale_raw"}}) + if kis.get("status") == "OK": + provenance["source_priority"].insert(0, "kis_open_api") + + if include_naver and ticker.isdigit() and len(ticker) == 6: + naver = _normalize_naver_price_history(ticker) + provenance["naver"] = naver + if naver.get("status") in {"OK", "DATA_MISSING"}: + normalized.setdefault("relative_return_20d", naver.get("relative_return_20d")) + normalized.setdefault("volume_ratio_5d", naver.get("volume_ratio_5d")) + normalized.setdefault("naver_price_status", naver.get("status")) + provenance["source_priority"].append("naver_finance") + + normalized.setdefault("collection_as_of", _kst_now_iso()) + return normalized, provenance + + +def collect_to_sqlite( + *, + input_json: Path, + sqlite_db: Path, + output_json: Path, + kis_account: str, + include_naver: bool = True, + include_live_kis: bool = True, +) -> dict[str, Any]: + run_id = uuid.uuid4().hex + started_at = _kst_now_iso() + upsert_collection_run( + sqlite_db, + CollectionRun( + run_id=run_id, + collector_name="kis_data_collection_v1", + started_at=started_at, + status="RUNNING", + input_source=str(input_json), + output_json_path=str(output_json), + output_db_path=str(sqlite_db), + notes="KIS-first CI collection", + ), + ) + + seed_rows = _build_seed_rows(input_json) + summary = { + "formula_id": "KIS_DATA_COLLECTION_V1", + "run_id": run_id, + "started_at": started_at, + "input_json": str(input_json), + "sqlite_db": str(sqlite_db), + "row_count": len(seed_rows), + "source_counts": {}, + "errors": [], + "rows": [], + } + + for row in seed_rows: + ticker = str(row.get("Ticker") or row.get("ticker") or "").strip() + if not ticker: + continue + try: + normalized, provenance = _collect_one(row, kis_account=kis_account, include_naver=include_naver, include_live_kis=include_live_kis) + source_counts = summary["source_counts"] + for source_name in provenance.get("source_priority") or []: + source_counts[source_name] = source_counts.get(source_name, 0) + 1 + upsert_collection_snapshot( + sqlite_db, + run_id=run_id, + dataset_name="data_feed", + ticker=ticker, + name=str(normalized.get("Name") or normalized.get("name") or ""), + sector=normalized.get("Sector"), + as_of_date=str(normalized.get("Price_Date") or normalized.get("AsOfDate") or normalized.get("collection_as_of") or ""), + source_priority=">".join(provenance.get("source_priority") or []), + source_status="OK", + payload=normalized, + provenance=provenance, + ) + summary["rows"].append( + { + "ticker": ticker, + "name": normalized.get("Name") or normalized.get("name"), + "sector": normalized.get("Sector"), + "source_priority": provenance.get("source_priority"), + "current_price": normalized.get("current_price"), + "relative_return_20d": normalized.get("relative_return_20d"), + "volume_ratio_5d": normalized.get("volume_ratio_5d"), + } + ) + except Exception as exc: # noqa: BLE001 + error = {"ticker": ticker, "error": str(exc)} + summary["errors"].append(error) + append_collection_error( + sqlite_db, + run_id=run_id, + source_name="collector", + error_kind=type(exc).__name__, + error_message=str(exc), + ticker=ticker, + payload=row, + ) + + summary["finished_at"] = _kst_now_iso() + summary["status"] = "PASS" if not summary["errors"] else "PASS_WITH_WARNINGS" + output_json.parent.mkdir(parents=True, exist_ok=True) + output_json.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + upsert_collection_run( + sqlite_db, + CollectionRun( + run_id=run_id, + collector_name="kis_data_collection_v1", + started_at=started_at, + status=summary["status"], + input_source=str(input_json), + output_json_path=str(output_json), + output_db_path=str(sqlite_db), + notes="KIS-first CI collection", + ), + finished_at=summary["finished_at"], + ) + return summary + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--input-json", type=Path, default=ROOT / "GatherTradingData.json") + ap.add_argument("--sqlite-db", type=Path, default=ROOT / "outputs" / "kis_data_collection" / "kis_data_collection.db") + ap.add_argument("--store-backend", default="sqlite", help="Storage backend contract placeholder (sqlite today, postgresql planned)") + ap.add_argument("--store-location", default=None, help="Backend location/DSN. sqlite path or future postgres DSN.") + ap.add_argument("--output-json", type=Path, default=ROOT / "Temp" / "kis_data_collection_v1.json") + ap.add_argument("--kis-account", choices=["real", "mock"], default="real") + ap.add_argument("--no-naver", action="store_true") + ap.add_argument("--no-live-kis", action="store_true") + args = ap.parse_args() + + store_backend, store_location = normalize_store_spec( + CollectionStoreSpec( + backend=args.store_backend, + location=args.store_location or args.sqlite_db, + ), + ROOT, + ) + if store_backend != "sqlite": + raise SystemExit( + "현재 실행 backend는 sqlite만 지원합니다. " + "하지만 collector는 이미 backend contract로 분리되어 있어 " + "후속 PostgreSQL 구현을 같은 호출 지점에 붙일 수 있습니다." + ) + + summary = collect_to_sqlite( + input_json=args.input_json, + sqlite_db=Path(store_location), + output_json=args.output_json, + kis_account=args.kis_account, + include_naver=not args.no_naver, + include_live_kis=not args.no_live_kis, + ) + print(json.dumps(summary, ensure_ascii=False, indent=2)) + return 0 if summary.get("status") == "PASS" else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/quant_engine/storage_backend_v1.py b/src/quant_engine/storage_backend_v1.py new file mode 100644 index 0000000..5c3be47 --- /dev/null +++ b/src/quant_engine/storage_backend_v1.py @@ -0,0 +1,50 @@ +"""Generic storage backend contract for canonical time-series stores. + +The call sites use this as a small contract layer so SQLite is the executable +backend today while PostgreSQL can be added later without changing callers. +""" +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass(frozen=True) +class StoreSpec: + backend: str = "sqlite" + location: str | Path | None = None + + def normalized_backend(self) -> str: + backend = (self.backend or "sqlite").strip().lower() + if backend in {"sqlite", "sqlite3"}: + return "sqlite" + if backend in {"postgres", "postgresql", "pg"}: + return "postgresql" + return backend + + +def default_sqlite_store_path(root: Path, default_name: str) -> Path: + return root / "outputs" / default_name + + +def normalize_store_spec( + spec: StoreSpec, + root: Path, + *, + default_sqlite_name: str = "store.db", +) -> tuple[str, Path | str]: + backend = spec.normalized_backend() + if backend == "sqlite": + if spec.location is None: + return backend, default_sqlite_store_path(root, default_sqlite_name) + if isinstance(spec.location, Path): + return backend, spec.location + location = str(spec.location).strip() + if location.startswith("sqlite:///"): + return backend, Path(location.removeprefix("sqlite:///")) + return backend, Path(location) + if backend == "postgresql": + if not spec.location: + raise ValueError("postgresql backend requires a DSN/location string") + return backend, str(spec.location) + raise ValueError(f"unsupported backend: {spec.backend!r}") diff --git a/tests/unit/test_data_collection_store_v1.py b/tests/unit/test_data_collection_store_v1.py new file mode 100644 index 0000000..6182b9e --- /dev/null +++ b/tests/unit/test_data_collection_store_v1.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import sqlite3 +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.data_collection_store_v1 import ( + CollectionRun, + append_collection_error, + fetch_latest_snapshots, + init_db, + iter_recent_snapshots, + upsert_collection_run, + upsert_collection_snapshot, +) +from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec + + +def test_store_writes_and_reads_snapshots(tmp_path): + db_path = tmp_path / "collector.db" + init_db(db_path) + upsert_collection_run( + db_path, + CollectionRun( + run_id="run-1", + collector_name="collector", + started_at="2026-06-21T12:00:00+09:00", + status="RUNNING", + input_source="GatherTradingData.json", + output_json_path="Temp/kis_data_collection_v1.json", + output_db_path=str(db_path), + ), + ) + upsert_collection_snapshot( + db_path, + run_id="run-1", + dataset_name="data_feed", + ticker="005930", + name="삼성전자", + sector="반도체", + as_of_date="2026-06-21", + source_priority="kis_open_api>gathertradingdata_json", + source_status="OK", + payload={"ticker": "005930", "close": 1000}, + provenance={"kis": {"status": "OK"}}, + ) + append_collection_error( + db_path, + run_id="run-1", + source_name="kis", + error_kind="TimeoutError", + error_message="timeout", + ticker="005930", + ) + + conn = sqlite3.connect(db_path) + try: + run_count = conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0] + snap_count = conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0] + err_count = conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0] + finally: + conn.close() + + assert run_count == 1 + assert snap_count == 1 + assert err_count == 1 + assert fetch_latest_snapshots(db_path, "005930")[0]["dataset_name"] == "data_feed" + assert len(list(iter_recent_snapshots(db_path, limit=5))) == 1 + + +def test_store_overwrites_same_run_and_ticker(tmp_path): + db_path = tmp_path / "collector.db" + upsert_collection_snapshot( + db_path, + run_id="run-1", + dataset_name="data_feed", + ticker="005930", + name="삼성전자", + sector="반도체", + as_of_date="2026-06-21", + source_priority="kis_open_api", + source_status="OK", + payload={"ticker": "005930", "close": 1000}, + provenance={"source_priority": ["kis_open_api"]}, + ) + upsert_collection_snapshot( + db_path, + run_id="run-1", + dataset_name="data_feed", + ticker="005930", + name="삼성전자", + sector="반도체", + as_of_date="2026-06-21", + source_priority="kis_open_api>naver_finance", + source_status="OK", + payload={"ticker": "005930", "close": 2000}, + provenance={"source_priority": ["kis_open_api", "naver_finance"]}, + ) + rows = fetch_latest_snapshots(db_path, "005930") + assert rows[0]["source_priority"] == "kis_open_api>naver_finance" + + +def test_store_backend_normalization_supports_sqlite_paths(tmp_path): + backend, location = normalize_store_spec(CollectionStoreSpec(location=tmp_path / "collector.db"), ROOT) + assert backend == "sqlite" + assert str(location).endswith("collector.db") diff --git a/tests/unit/test_kis_api_client_v1.py b/tests/unit/test_kis_api_client_v1.py new file mode 100644 index 0000000..62abf3e --- /dev/null +++ b/tests/unit/test_kis_api_client_v1.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +import pytest + +from src.quant_engine.kis_api_client_v1 import ( + KisCredentials, + OrderEndpointBlockedError, + _assert_read_only, +) + +# governance/rules/06_no_direct_api_trading.yaml — 이 테스트는 절대 약화/삭제하지 않는다. + +FORBIDDEN_ORDER_PATHS = ( + "/uapi/domestic-stock/v1/trading/order-cash", + "/uapi/domestic-stock/v1/trading/order-rvsecncl", + "/uapi/domestic-stock/v1/trading/order-credit", + "/uapi/domestic-stock/v1/trading/order-resv", + "/uapi/domestic-stock/v1/trading/inquire-balance", # governance/rules/07 — 계좌 보유종목 조회 금지 +) +FORBIDDEN_ORDER_TR_IDS = ( + "TTTC0802U", "TTTC0801U", "VTTC0802U", "VTTC0801U", + "TTTC8434R", "VTTC8434R", # governance/rules/07 — 주식잔고조회 금지 +) + + +@pytest.mark.parametrize("path", FORBIDDEN_ORDER_PATHS) +def test_order_path_is_blocked(path: str): + with pytest.raises(OrderEndpointBlockedError): + _assert_read_only(path, "FHKST01010100") + + +@pytest.mark.parametrize("tr_id", FORBIDDEN_ORDER_TR_IDS) +def test_order_tr_id_is_blocked(tr_id: str): + with pytest.raises(OrderEndpointBlockedError): + _assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-price", tr_id) + + +def test_known_readonly_endpoints_pass(): + _assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-price", "FHKST01010100") + _assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn", "FHKST01010200") + _assert_read_only("/uapi/domestic-stock/v1/quotations/daily-short-sale", "FHPST04830000") + + +def test_no_order_endpoint_substring_anywhere_in_kis_client_source(): + """정적 검증 — 누군가 향후 주문 함수를 추가하더라도 경로 문자열이 소스에 남으면 즉시 탐지. + + TTTC8434R/VTTC8434R(주식잔고조회)는 FORBIDDEN_TR_ID_PREFIXES 차단목록 '데이터'로 + 이 파일에 의도적으로 존재한다(prefix가 아닌 전체 TR_ID라 prefix-매칭으로는 막을 수 + 없어 명시적으로 등재) — 이 두 개는 검사에서 제외한다. 전체 코드베이스 차원의 + "차단목록 외 파일에는 한 글자도 없어야 한다"는 보장은 + tools/validate_no_direct_api_trading_v1.py(ALLOWLISTED_FILES 제외 전체 스캔)가 맡는다. + """ + source = (ROOT / "src" / "quant_engine" / "kis_api_client_v1.py").read_text(encoding="utf-8") + blocklist_data_exceptions = {"TTTC8434R", "VTTC8434R"} + for forbidden_path in FORBIDDEN_ORDER_PATHS: + assert forbidden_path not in source, f"주문 엔드포인트 경로가 소스에 존재함: {forbidden_path}" + for forbidden_tr_id in FORBIDDEN_ORDER_TR_IDS: + if forbidden_tr_id in blocklist_data_exceptions: + continue + assert forbidden_tr_id not in source, f"주문 TR_ID가 소스에 존재함: {forbidden_tr_id}" + + +def test_kis_client_module_defines_no_order_submission_function(): + import src.quant_engine.kis_api_client_v1 as kis_module + + public_names = [name for name in dir(kis_module) if not name.startswith("_")] + banned_keywords = ( + "place_order", "submit_order", "cancel_order", "revise_order", "send_order", + "inquire_balance", "account_balance", + ) + for name in public_names: + lowered = name.lower() + for banned in banned_keywords: + assert banned not in lowered, f"주문 제출/정정/취소로 의심되는 함수가 존재함: {name}" + + +def test_kis_credentials_load_uses_required_env_vars(monkeypatch): + monkeypatch.setenv("KIS_APP_Key", "real-key") + monkeypatch.setenv("KIS_APP_Secret", "real-secret") + monkeypatch.setenv("KIS_APP_Key_TEST", "mock-key") + monkeypatch.setenv("KIS_APP_Secret_TEST", "mock-secret") + + real = KisCredentials.load("real") + mock = KisCredentials.load("mock") + + assert real.app_key == "real-key" + assert real.app_secret == "real-secret" + assert real.account == "real" + assert mock.app_key == "mock-key" + assert mock.app_secret == "mock-secret" + assert mock.account == "mock" diff --git a/tests/unit/test_storage_backend_v1.py b/tests/unit/test_storage_backend_v1.py new file mode 100644 index 0000000..b1d5ed3 --- /dev/null +++ b/tests/unit/test_storage_backend_v1.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.storage_backend_v1 import StoreSpec, default_sqlite_store_path, normalize_store_spec + + +def test_default_sqlite_store_path_uses_named_subdir(tmp_path): + path = default_sqlite_store_path(tmp_path, "qualitative_sell_strategy/qualitative_sell_strategy.db") + assert str(path).endswith("qualitative_sell_strategy.db") + + +def test_normalize_store_spec_supports_sqlite_and_postgresql(tmp_path): + backend_sqlite, sqlite_location = normalize_store_spec(StoreSpec(location=tmp_path / "collector.db"), ROOT) + assert backend_sqlite == "sqlite" + assert str(sqlite_location).endswith("collector.db") + + backend_pg, pg_location = normalize_store_spec( + StoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"), + ROOT, + ) + assert backend_pg == "postgresql" + assert "postgresql://" in str(pg_location) + + +def test_postgresql_upgrade_stub_script_exists(): + assert (ROOT / "tools" / "generate_postgresql_upgrade_stub_v1.py").exists() diff --git a/tests/unit/test_validate_gitea_secrets_contract_v1.py b/tests/unit/test_validate_gitea_secrets_contract_v1.py new file mode 100644 index 0000000..602d462 --- /dev/null +++ b/tests/unit/test_validate_gitea_secrets_contract_v1.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import json +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +import tools.validate_gitea_secrets_contract_v1 as validator + + +def test_validate_gitea_secrets_contract_passes(): + rc = validator.main() + payload = json.loads((ROOT / "Temp" / "gitea_secrets_contract_v1.json").read_text(encoding="utf-8")) + + assert rc == 0 + assert payload["gate"] == "PASS" + assert payload["evidence"][".gitea/workflows/kis_data_collection.yml"]["secrets.KIS_APP_KEY"] is True diff --git a/tests/unit/test_validate_kis_api_credentials_v1.py b/tests/unit/test_validate_kis_api_credentials_v1.py new file mode 100644 index 0000000..7511fa4 --- /dev/null +++ b/tests/unit/test_validate_kis_api_credentials_v1.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import json +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +import tools.validate_kis_api_credentials_v1 as validator + + +class _FakeCreds: + def __init__(self, account: str): + self.account = account + self.domain = "https://openapi.koreainvestment.com:9443" if account == "real" else "https://openapivts.koreainvestment.com:29443" + self.app_key = f"{account}-key" + self.app_secret = f"{account}-secret" + + +def test_validate_kis_api_credentials_writes_pass_json(tmp_path, monkeypatch): + out = tmp_path / "kis_api_credentials_validation_v1.json" + + monkeypatch.setenv("KIS_APP_Key_TEST", "mock-key") + monkeypatch.setenv("KIS_APP_Secret_TEST", "mock-secret") + monkeypatch.setattr(validator, "KisCredentials", type("CredFactory", (), {"load": staticmethod(lambda account: _FakeCreds(account))})) + monkeypatch.setattr(validator, "get_current_price", lambda creds, ticker: {"ticker": ticker, "price": 1000}) + monkeypatch.setattr(sys, "argv", ["validate_kis_api_credentials_v1.py", "--account", "mock", "--ticker", "005930", "--output", str(out)]) + + rc = validator.main() + payload = json.loads(out.read_text(encoding="utf-8")) + + assert rc == 0 + assert payload["gate"] == "PASS" + assert payload["evidence"]["account"] == "mock" + assert payload["evidence"]["ticker"] == "005930" + + +def test_validate_kis_api_credentials_fails_when_api_call_errors(tmp_path, monkeypatch): + out = tmp_path / "kis_api_credentials_validation_v1.json" + + monkeypatch.setattr(validator, "KisCredentials", type("CredFactory", (), {"load": staticmethod(lambda account: _FakeCreds(account))})) + monkeypatch.setattr(validator, "get_current_price", lambda creds, ticker: (_ for _ in ()).throw(RuntimeError("boom"))) + monkeypatch.setattr(sys, "argv", ["validate_kis_api_credentials_v1.py", "--account", "mock", "--ticker", "005930", "--output", str(out)]) + + rc = validator.main() + payload = json.loads(out.read_text(encoding="utf-8")) + + assert rc == 1 + assert payload["gate"] == "FAIL" + assert payload["errors"] diff --git a/tools/generate_postgresql_upgrade_stub_v1.py b/tools/generate_postgresql_upgrade_stub_v1.py new file mode 100644 index 0000000..8e3c5e6 --- /dev/null +++ b/tools/generate_postgresql_upgrade_stub_v1.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[1] + +TABLE_SCHEMAS: dict[str, str] = { + "collection_runs": """ +CREATE TABLE collection_runs ( + run_id TEXT PRIMARY KEY, + collector_name TEXT NOT NULL, + started_at TEXT NOT NULL, + finished_at TEXT, + status TEXT NOT NULL, + input_source TEXT, + output_json_path TEXT, + output_db_path TEXT, + notes TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() +); +""".strip(), + "collection_snapshots": """ +CREATE TABLE collection_snapshots ( + run_id TEXT NOT NULL, + dataset_name TEXT NOT NULL, + ticker TEXT NOT NULL, + name TEXT, + sector TEXT, + as_of_date TEXT, + source_priority TEXT, + source_status TEXT, + payload_json TEXT NOT NULL, + provenance_json TEXT NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW(), + PRIMARY KEY (run_id, dataset_name, ticker) +); +""".strip(), + "collection_source_errors": """ +CREATE TABLE collection_source_errors ( + run_id TEXT NOT NULL, + ticker TEXT, + source_name TEXT NOT NULL, + error_kind TEXT NOT NULL, + error_message TEXT NOT NULL, + payload_json TEXT, + created_at TIMESTAMPTZ DEFAULT NOW() +); +""".strip(), + "sell_strategy_results": """ +CREATE TABLE sell_strategy_results ( + id BIGSERIAL PRIMARY KEY, + code TEXT NOT NULL, + generated_at TEXT NOT NULL, + action TEXT, + conviction TEXT, + market_regime TEXT, + composite_score DOUBLE PRECISION, + rationale TEXT, + raw_json TEXT NOT NULL, + inserted_at TIMESTAMPTZ DEFAULT NOW() +); +""".strip(), + "satellite_recommendations": """ +CREATE TABLE satellite_recommendations ( + id BIGSERIAL PRIMARY KEY, + ticker TEXT NOT NULL, + generated_at TEXT NOT NULL, + satellite_action TEXT, + attractiveness_score DOUBLE PRECISION, + market_regime TEXT, + raw_json TEXT NOT NULL, + inserted_at TIMESTAMPTZ DEFAULT NOW() +); +""".strip(), +} + + +def main() -> int: + ap = argparse.ArgumentParser(description="Emit PostgreSQL migration stub from current canonical row contract.") + ap.add_argument("--output-json", type=Path, default=ROOT / "Temp" / "postgresql_upgrade_stub_v1.json") + ap.add_argument("--output-sql", type=Path, default=ROOT / "Temp" / "postgresql_upgrade_stub_v1.sql") + args = ap.parse_args() + + sql_lines = [ + "-- PostgreSQL upgrade stub", + "-- This file is a contract placeholder only. It is not executed by CI.", + "", + ] + for name, ddl in TABLE_SCHEMAS.items(): + sql_lines.append(f"-- {name}") + sql_lines.append(ddl) + sql_lines.append("") + + sql_text = "\n".join(sql_lines).rstrip() + "\n" + args.output_sql.parent.mkdir(parents=True, exist_ok=True) + args.output_sql.write_text(sql_text, encoding="utf-8") + + payload: dict[str, Any] = { + "formula_id": "POSTGRESQL_UPGRADE_STUB_V1", + "gate": "DATA_GATED", + "tables": sorted(TABLE_SCHEMAS.keys()), + "output_sql": str(args.output_sql), + "note": "DDL stub only; execution deferred until PostgreSQL rollout.", + } + args.output_json.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(payload, ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/run_kis_data_collection_v1.py b/tools/run_kis_data_collection_v1.py new file mode 100644 index 0000000..61f75d8 --- /dev/null +++ b/tools/run_kis_data_collection_v1.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.kis_data_collection_v1 import main + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/validate_gitea_secrets_contract_v1.py b/tools/validate_gitea_secrets_contract_v1.py new file mode 100644 index 0000000..ef29a26 --- /dev/null +++ b/tools/validate_gitea_secrets_contract_v1.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] + +REQUIRED_PATTERNS = { + ".gitea/workflows/kis_data_collection.yml": [ + "secrets.KIS_APP_KEY_TEST", + "secrets.KIS_APP_SECRET_TEST", + "secrets.KIS_APP_KEY", + "secrets.KIS_APP_SECRET", + ], + ".gitea/workflows/qualitative_sell_strategy.yml": [ + "secrets.KIS_APP_KEY_TEST", + "secrets.KIS_APP_SECRET_TEST", + "secrets.KIS_APP_KEY", + "secrets.KIS_APP_SECRET", + ], + ".gitea/workflows/ci.yml": [ + "secrets.KIS_APP_KEY_TEST", + "secrets.KIS_APP_SECRET_TEST", + ], +} + + +def main() -> int: + errors: list[str] = [] + evidence: dict[str, dict[str, bool]] = {} + + for rel, patterns in REQUIRED_PATTERNS.items(): + path = ROOT / rel + text = path.read_text(encoding="utf-8") if path.exists() else "" + file_evidence: dict[str, bool] = {} + if not path.exists(): + errors.append(f"missing:{rel}") + evidence[rel] = file_evidence + continue + for pattern in patterns: + found = pattern in text + file_evidence[pattern] = found + if not found: + errors.append(f"{rel}:{pattern}") + evidence[rel] = file_evidence + + result = { + "formula_id": "GITEA_SECRETS_CONTRACT_V1", + "gate": "PASS" if not errors else "FAIL", + "evidence": evidence, + "errors": errors, + } + out = ROOT / "Temp" / "gitea_secrets_contract_v1.json" + out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 if not errors else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/validate_kis_api_credentials_v1.py b/tools/validate_kis_api_credentials_v1.py new file mode 100644 index 0000000..5940fac --- /dev/null +++ b/tools/validate_kis_api_credentials_v1.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +try: + from src.quant_engine.kis_api_client_v1 import ( + KisCredentials, + MOCK_DOMAIN, + REAL_DOMAIN, + _read_env_var, + get_current_price, + ) +except Exception as exc: # pragma: no cover - import failure is a hard validation error + KisCredentials = None # type: ignore[assignment] + MOCK_DOMAIN = "" + REAL_DOMAIN = "" + _read_env_var = None # type: ignore[assignment] + get_current_price = None # type: ignore[assignment] + _IMPORT_ERROR = str(exc) +else: + _IMPORT_ERROR = "" + + +def _payload(gate: str, **extra: Any) -> dict[str, Any]: + return { + "formula_id": "KIS_API_CREDENTIALS_VALIDATION_V1", + "gate": gate, + **extra, + } + + +def _expected_env_names(account: str) -> tuple[str, str]: + if account == "real": + return ("KIS_APP_Key", "KIS_APP_Secret") + if account == "mock": + return ("KIS_APP_Key_TEST", "KIS_APP_Secret_TEST") + raise ValueError("account must be 'mock' or 'real'") + + +def main() -> int: + ap = argparse.ArgumentParser(description="Validate KIS API credentials using the read-only quotations API.") + ap.add_argument("--account", choices=["mock", "real"], default="mock") + ap.add_argument("--ticker", default="005930") + ap.add_argument("--output", type=Path, default=ROOT / "Temp" / "kis_api_credentials_validation_v1.json") + args = ap.parse_args() + + if KisCredentials is None or get_current_price is None: + result = _payload("FAIL", error=f"import_error: {_IMPORT_ERROR}") + args.output.parent.mkdir(parents=True, exist_ok=True) + args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 1 + + errors: list[str] = [] + evidence: dict[str, Any] = { + "account": args.account, + "ticker": args.ticker, + } + + try: + key_name, secret_name = _expected_env_names(args.account) + creds = KisCredentials.load(args.account) + evidence["domain"] = creds.domain + evidence["expected_env"] = {"app_key": key_name, "app_secret": secret_name} + expected_key = _read_env_var(key_name) if _read_env_var is not None else None + expected_secret = _read_env_var(secret_name) if _read_env_var is not None else None + other_key = _read_env_var("KIS_APP_Key_TEST" if args.account == "real" else "KIS_APP_Key") if _read_env_var is not None else None + other_secret = _read_env_var("KIS_APP_Secret_TEST" if args.account == "real" else "KIS_APP_Secret") if _read_env_var is not None else None + actual_key = getattr(creds, "app_key", None) + actual_secret = getattr(creds, "app_secret", None) + evidence["env_match"] = { + "app_key": bool(expected_key and actual_key == expected_key), + "app_secret": bool(expected_secret and actual_secret == expected_secret), + "other_key_present": bool(other_key), + "other_secret_present": bool(other_secret), + } + if creds.domain != (REAL_DOMAIN if args.account == "real" else MOCK_DOMAIN): + errors.append("domain_mismatch") + if not evidence["env_match"]["app_key"] or not evidence["env_match"]["app_secret"]: + errors.append("selected_env_mismatch") + response = get_current_price(creds, args.ticker) + evidence["response_keys"] = sorted(response.keys()) + if not isinstance(response, dict) or not response: + errors.append("empty_response") + except Exception as exc: # noqa: BLE001 + errors.append(str(exc)) + + gate = "PASS" if not errors else "FAIL" + result = _payload(gate, evidence=evidence, errors=errors) + args.output.parent.mkdir(parents=True, exist_ok=True) + args.output.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 if gate == "PASS" else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/validate_no_direct_api_trading_v1.py b/tools/validate_no_direct_api_trading_v1.py new file mode 100644 index 0000000..fa27e99 --- /dev/null +++ b/tools/validate_no_direct_api_trading_v1.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +"""[CRITICAL] governance/rules/06_no_direct_api_trading.yaml 강제 게이트. + +이 검증기는 순수 stdlib(re, pathlib)만 사용한다 — Synology CI(ARMv7, Python 3.8, +requests/pytest 미설치)에서도 항상 실행 가능해야 하는 하드 블로킹 게이트이기 때문이다. +문서·테스트만으로는 막을 수 없다는 사용자 지시(2026-06-21)에 따라 정적 소스 스캔으로 +주문 제출/정정/취소 경로·TR_ID가 코드베이스 어디에도 존재하지 않음을 매 커밋마다 강제한다. + +FAIL 시 CI 전체를 막는다(strict, warn_only 아님) — 다른 데이터 품질 게이트와 다르게 +이 게이트는 완화 대상이 아니다. +""" +from __future__ import annotations + +import re +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] + +# 이 문자열들이 "데이터"로 등장해도 되는 파일(블록리스트 정의/테스트/이 검증기 자신). +# 그 외 모든 .py 파일에서 발견되면 FAIL. +ALLOWLISTED_FILES = { + "src/quant_engine/kis_api_client_v1.py", + "tests/unit/test_kis_api_client_v1.py", + "tools/validate_no_direct_api_trading_v1.py", +} + +FORBIDDEN_ORDER_PATH_SUBSTRINGS = ( + "/trading/order-cash", + "/trading/order-rvsecncl", + "/trading/order-credit", + "/trading/order-resv", + "/trading/inquire-balance", # governance/rules/07 — 계좌 보유종목 조회 금지 +) +FORBIDDEN_ORDER_TR_IDS = ( + "TTTC0802U", "TTTC0801U", "VTTC0802U", "VTTC0801U", + "TTTC8434R", "VTTC8434R", # governance/rules/07 — 주식잔고조회 금지 +) +BANNED_FUNCTION_NAME_SUBSTRINGS = ( + "place_order", "submit_order", "cancel_order", "revise_order", "send_order", + "order_cash", "order_credit", "order_rvsecncl", + "inquire_balance", "account_balance", # governance/rules/07 — 계좌 보유종목 조회 금지 +) + + +def _scan_python_files() -> list[str]: + violations: list[str] = [] + for dir_name in ("src", "tools"): + for path in (ROOT / dir_name).rglob("*.py"): + rel = path.relative_to(ROOT).as_posix() + if rel in ALLOWLISTED_FILES: + continue + text = path.read_text(encoding="utf-8", errors="ignore") + for forbidden in FORBIDDEN_ORDER_PATH_SUBSTRINGS: + if forbidden in text: + violations.append(f"{rel}: 주문 엔드포인트 경로 발견 — {forbidden!r}") + for tr_id in FORBIDDEN_ORDER_TR_IDS: + if tr_id in text: + violations.append(f"{rel}: 주문 TR_ID 발견 — {tr_id!r}") + for match in re.finditer(r"def\s+(\w+)\s*\(", text): + name = match.group(1).lower() + for banned in BANNED_FUNCTION_NAME_SUBSTRINGS: + if banned in name: + violations.append(f"{rel}: 주문 제출/정정/취소로 의심되는 함수명 — def {match.group(1)}(") + return violations + + +def _check_kis_client_guard_intact() -> list[str]: + """kis_api_client_v1.py가 실제로 존재하면, 가드 코드가 그대로 있는지 + _send_request가 + HTTP 호출 전에 _assert_read_only를 부르는지 순서를 확인한다.""" + client_path = ROOT / "src" / "quant_engine" / "kis_api_client_v1.py" + if not client_path.exists(): + return [] # 클라이언트가 아직 없으면 이 검사는 스킵(다른 검사로 충분) + + text = client_path.read_text(encoding="utf-8") + violations: list[str] = [] + required_markers = ("_assert_read_only", "OrderEndpointBlockedError", "FORBIDDEN_PATH_SUBSTRINGS", "FORBIDDEN_TR_ID_PREFIXES") + for marker in required_markers: + if marker not in text: + violations.append(f"kis_api_client_v1.py: 필수 가드 구성요소 누락 — {marker!r}") + + send_request_match = re.search(r"def _send_request\(.*?\)\s*(?:->[^:]*)?:(.*?)(?=\ndef |\Z)", text, re.S) + if send_request_match: + body = send_request_match.group(1) + guard_pos = body.find("_assert_read_only(") + http_pos = min( + (pos for pos in (body.find("requests.get("), body.find("requests.post(")) if pos != -1), + default=-1, + ) + if guard_pos == -1: + violations.append("kis_api_client_v1.py: _send_request가 _assert_read_only를 호출하지 않음") + elif http_pos != -1 and guard_pos > http_pos: + violations.append("kis_api_client_v1.py: _assert_read_only 호출이 HTTP 전송보다 늦음(순서 위반)") + else: + violations.append("kis_api_client_v1.py: _send_request 함수를 찾을 수 없음") + + return violations + + +def main() -> int: + violations = _scan_python_files() + _check_kis_client_guard_intact() + if violations: + print("NO_DIRECT_API_TRADING_GATE: FAIL") + for v in violations: + print(f" - {v}") + return 1 + print("NO_DIRECT_API_TRADING_GATE: PASS") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())