Refactor unit tests to remove pytest dependency and use unittest.TestCase

This commit is contained in:
2026-06-22 11:18:57 +09:00
parent dcd73de05f
commit 84df6e1f7e
9 changed files with 446 additions and 386 deletions
+1
View File
@@ -1203,6 +1203,7 @@ python tools/update_sector_universe_from_naver.py --limit 10 --apply # 원본
[x] WBS-7.11: PostgreSQL 다형적 스토어 계약 레이어 구현 (2026-06-22 완료, sqlite/psycopg2 쿼리 플레이스홀더 분기 및 트랜잭션 동적 처리 반영) [x] WBS-7.11: PostgreSQL 다형적 스토어 계약 레이어 구현 (2026-06-22 완료, sqlite/psycopg2 쿼리 플레이스홀더 분기 및 트랜잭션 동적 처리 반영)
[x] WBS-7.12: 스톱로스 정책(stop_loss_gate) Parity 단위 테스트 구축 (2026-06-22 완료, ATR 변동성 배수 및 상대약세 트리거 동등성 실증 완료) [x] WBS-7.12: 스톱로스 정책(stop_loss_gate) Parity 단위 테스트 구축 (2026-06-22 완료, ATR 변동성 배수 및 상대약세 트리거 동등성 실증 완료)
[x] WBS-7.13: 추격매수 리스크(late_chase_risk_score) Parity 단위 테스트 구축 (2026-06-22 완료, 이평선 이격도 및 거래량 미확인 돌파 동등성 실증 완료) [x] WBS-7.13: 추격매수 리스크(late_chase_risk_score) Parity 단위 테스트 구축 (2026-06-22 완료, 이평선 이격도 및 거래량 미확인 돌파 동등성 실증 완료)
[x] WBS-7.14: 결정 라우팅(routing_decision_v1) Parity 단위 테스트 구축 (2026-06-22 완료, 장중 락 다운그레이드 및 MRG 이격 차단 동등성 실증 완료)
``` ```
--- ---
@@ -123,7 +123,11 @@ findings:
classification: decision_logic classification: decision_logic
migration_action: MIGRATE_DECISIONS_ROUTING migration_action: MIGRATE_DECISIONS_ROUTING
target_file: formulas/routing_decision_v1.py target_file: formulas/routing_decision_v1.py
status: TODO status: DONE
resolved_2026_06_22: >
tests/parity/test_routing_decision_parity.py를 작성하여, GAS runRouteFlow_의
STOP_BREACH, INTRADAY_LOCK, HEAT_GATE, MEAN_REVERSION_GATE, CASH_FLOOR 등
5개 핵심 의사결정 필터링 게이트의 Python 결정 라우팅 동등성을 검증 완료함.
- id: F11 - id: F11
file: src/gas_adapter_parts/gdf_03_portfolio_gates.gs file: src/gas_adapter_parts/gdf_03_portfolio_gates.gs
+68 -65
View File
@@ -1,14 +1,14 @@
from __future__ import annotations from __future__ import annotations
import sys import sys
import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[2] ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path: if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT)) sys.path.insert(0, str(ROOT))
import pytest
from src.quant_engine.kis_api_client_v1 import ( from src.quant_engine.kis_api_client_v1 import (
KisCredentials, KisCredentials,
OrderEndpointBlockedError, OrderEndpointBlockedError,
@@ -30,69 +30,72 @@ FORBIDDEN_ORDER_TR_IDS = (
) )
@pytest.mark.parametrize("path", FORBIDDEN_ORDER_PATHS) class TestKisApiClientV1(unittest.TestCase):
def test_order_path_is_blocked(path: str):
with pytest.raises(OrderEndpointBlockedError): def test_order_path_is_blocked(self):
_assert_read_only(path, "FHKST01010100") for path in FORBIDDEN_ORDER_PATHS:
with self.assertRaises(OrderEndpointBlockedError):
_assert_read_only(path, "FHKST01010100")
def test_order_tr_id_is_blocked(self):
for tr_id in FORBIDDEN_ORDER_TR_IDS:
with self.assertRaises(OrderEndpointBlockedError):
_assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-price", tr_id)
def test_known_readonly_endpoints_pass(self):
_assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-price", "FHKST01010100")
_assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn", "FHKST01010200")
_assert_read_only("/uapi/domestic-stock/v1/quotations/daily-short-sale", "FHPST04830000")
def test_no_order_endpoint_substring_anywhere_in_kis_client_source(self):
"""정적 검증 — 누군가 향후 주문 함수를 추가하더라도 경로 문자열이 소스에 남으면 즉시 탐지.
TTTC8434R/VTTC8434R(주식잔고조회)는 FORBIDDEN_TR_ID_PREFIXES 차단목록 '데이터'
이 파일에 의도적으로 존재한다(prefix가 아닌 전체 TR_ID라 prefix-매칭으로는 막을 수
없어 명시적으로 등재) — 이 두 개는 검사에서 제외한다. 전체 코드베이스 차원의
"차단목록 외 파일에는 한 글자도 없어야 한다"는 보장은
tools/validate_no_direct_api_trading_v1.py(ALLOWLISTED_FILES 제외 전체 스캔)가 맡는다.
"""
source = (ROOT / "src" / "quant_engine" / "kis_api_client_v1.py").read_text(encoding="utf-8")
blocklist_data_exceptions = {"TTTC8434R", "VTTC8434R"}
for forbidden_path in FORBIDDEN_ORDER_PATHS:
self.assertNotIn(forbidden_path, source, f"주문 엔드포인트 경로가 소스에 존재함: {forbidden_path}")
for forbidden_tr_id in FORBIDDEN_ORDER_TR_IDS:
if forbidden_tr_id in blocklist_data_exceptions:
continue
self.assertNotIn(forbidden_tr_id, source, f"주문 TR_ID가 소스에 존재함: {forbidden_tr_id}")
def test_kis_client_module_defines_no_order_submission_function(self):
import src.quant_engine.kis_api_client_v1 as kis_module
public_names = [name for name in dir(kis_module) if not name.startswith("_")]
banned_keywords = (
"place_order", "submit_order", "cancel_order", "revise_order", "send_order",
"inquire_balance", "account_balance",
)
for name in public_names:
lowered = name.lower()
for banned in banned_keywords:
self.assertNotIn(banned, lowered, f"주문 제출/정정/취소로 의심되는 함수가 존재함: {name}")
def test_kis_credentials_load_uses_required_env_vars(self):
with patch.dict("os.environ", {
"KIS_APP_Key": "real-key",
"KIS_APP_Secret": "real-secret",
"KIS_APP_Key_TEST": "mock-key",
"KIS_APP_Secret_TEST": "mock-secret"
}):
real = KisCredentials.load("real")
mock = KisCredentials.load("mock")
self.assertEqual(real.app_key, "real-key")
self.assertEqual(real.app_secret, "real-secret")
self.assertEqual(real.account, "real")
self.assertEqual(mock.app_key, "mock-key")
self.assertEqual(mock.app_secret, "mock-secret")
self.assertEqual(mock.account, "mock")
@pytest.mark.parametrize("tr_id", FORBIDDEN_ORDER_TR_IDS) if __name__ == "__main__":
def test_order_tr_id_is_blocked(tr_id: str): unittest.main()
with pytest.raises(OrderEndpointBlockedError):
_assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-price", tr_id)
def test_known_readonly_endpoints_pass():
_assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-price", "FHKST01010100")
_assert_read_only("/uapi/domestic-stock/v1/quotations/inquire-asking-price-exp-ccn", "FHKST01010200")
_assert_read_only("/uapi/domestic-stock/v1/quotations/daily-short-sale", "FHPST04830000")
def test_no_order_endpoint_substring_anywhere_in_kis_client_source():
"""정적 검증 — 누군가 향후 주문 함수를 추가하더라도 경로 문자열이 소스에 남으면 즉시 탐지.
TTTC8434R/VTTC8434R(주식잔고조회)는 FORBIDDEN_TR_ID_PREFIXES 차단목록 '데이터'
이 파일에 의도적으로 존재한다(prefix가 아닌 전체 TR_ID라 prefix-매칭으로는 막을 수
없어 명시적으로 등재) — 이 두 개는 검사에서 제외한다. 전체 코드베이스 차원의
"차단목록 외 파일에는 한 글자도 없어야 한다"는 보장은
tools/validate_no_direct_api_trading_v1.py(ALLOWLISTED_FILES 제외 전체 스캔)가 맡는다.
"""
source = (ROOT / "src" / "quant_engine" / "kis_api_client_v1.py").read_text(encoding="utf-8")
blocklist_data_exceptions = {"TTTC8434R", "VTTC8434R"}
for forbidden_path in FORBIDDEN_ORDER_PATHS:
assert forbidden_path not in source, f"주문 엔드포인트 경로가 소스에 존재함: {forbidden_path}"
for forbidden_tr_id in FORBIDDEN_ORDER_TR_IDS:
if forbidden_tr_id in blocklist_data_exceptions:
continue
assert forbidden_tr_id not in source, f"주문 TR_ID가 소스에 존재함: {forbidden_tr_id}"
def test_kis_client_module_defines_no_order_submission_function():
import src.quant_engine.kis_api_client_v1 as kis_module
public_names = [name for name in dir(kis_module) if not name.startswith("_")]
banned_keywords = (
"place_order", "submit_order", "cancel_order", "revise_order", "send_order",
"inquire_balance", "account_balance",
)
for name in public_names:
lowered = name.lower()
for banned in banned_keywords:
assert banned not in lowered, f"주문 제출/정정/취소로 의심되는 함수가 존재함: {name}"
def test_kis_credentials_load_uses_required_env_vars(monkeypatch):
monkeypatch.setenv("KIS_APP_Key", "real-key")
monkeypatch.setenv("KIS_APP_Secret", "real-secret")
monkeypatch.setenv("KIS_APP_Key_TEST", "mock-key")
monkeypatch.setenv("KIS_APP_Secret_TEST", "mock-secret")
real = KisCredentials.load("real")
mock = KisCredentials.load("mock")
assert real.app_key == "real-key"
assert real.app_secret == "real-secret"
assert real.account == "real"
assert mock.app_key == "mock-key"
assert mock.app_secret == "mock-secret"
assert mock.account == "mock"
+135 -117
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import sys import sys
import unittest
from pathlib import Path from pathlib import Path
ROOT = Path(__file__).resolve().parents[2] ROOT = Path(__file__).resolve().parents[2]
@@ -20,125 +21,142 @@ from src.quant_engine.snapshot_admin_server_v1 import (
from src.quant_engine.snapshot_admin_store_v1 import import_seed_json from src.quant_engine.snapshot_admin_store_v1 import import_seed_json
def test_render_index_html_contains_spreadsheet_surface(): class TestSnapshotAdminWebV1(unittest.TestCase):
html = render_index_html()
assert "Snapshot Admin" in html def test_render_index_html_contains_spreadsheet_surface(self):
assert "contenteditable" in html html = render_index_html()
assert "/api/settings/save" in html self.assertIn("Snapshot Admin", html)
assert "/api/account_snapshot/save" in html self.assertIn("contenteditable", html)
assert "Lock target" in html self.assertIn("/api/settings/save", html)
assert "Lock row" in html self.assertIn("/api/account_snapshot/save", html)
assert "Approve pending" in html self.assertIn("Lock target", html)
assert "Refresh diff" in html self.assertIn("Lock row", html)
assert "Export approval packet" in html self.assertIn("Approve pending", html)
assert "Selection Inspector" in html self.assertIn("Refresh diff", html)
assert "Recent row history" in html self.assertIn("Export approval packet", html)
assert "Save view" in html self.assertIn("Selection Inspector", html)
assert "Apply TSV to selection" in html self.assertIn("Recent row history", html)
assert "Ctrl+S" in html self.assertIn("Save view", html)
assert "KIS Collection" in html self.assertIn("Apply TSV to selection", html)
assert "Recent collector snapshots" in html self.assertIn("Ctrl+S", html)
assert "Collection detail" in html self.assertIn("KIS Collection", html)
assert "Filter runs / snapshots / errors" in html self.assertIn("Recent collector snapshots", html)
assert "Filter change log" in html self.assertIn("Collection detail", html)
assert "Timeline" in html self.assertIn("Filter runs / snapshots / errors", html)
assert "/collection" in html self.assertIn("Filter change log", html)
assert "Open collection dashboard" in html self.assertIn("Timeline", html)
self.assertIn("/collection", html)
self.assertIn("Open collection dashboard", html)
def test_render_collection_html_contains_dashboard_surface(self):
html = render_collection_html()
self.assertIn("KIS Collection Dashboard", html)
self.assertIn("/api/state", html)
self.assertIn("Download raw JSON", html)
self.assertIn("Download CSV", html)
self.assertIn("Filter runs / snapshots / errors", html)
self.assertIn("Ticker quick search", html)
self.assertIn("Date quick search", html)
def test_build_ui_state_exposes_expected_columns(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
db_path = Path(tmp_dir) / "snapshot_admin.db"
seed_path = ROOT / "GatherTradingData.json"
import_seed_json(db_path, seed_path)
state = build_ui_state(db_path)
self.assertTrue(state["summary"]["settings_rows"] > 0)
self.assertTrue(state["summary"]["account_snapshot_rows"] > 0)
self.assertEqual(state["summary"]["topology"]["mode"], "single_workspace_sqlite")
self.assertTrue(state["summary"]["topology"]["settings_and_snapshot_share_db"])
self.assertTrue(state["summary"]["topology"]["collector_separate_db"])
self.assertEqual(state["account_snapshot_columns"][0], "captured_at")
self.assertIn("settings", state["validation"])
self.assertTrue(state["version"]["app"])
self.assertIn("fingerprint", state["version"]["source"])
self.assertIn("collection", state)
self.assertIn("counts", state["collection"])
self.assertIn("latest_report", state["collection"])
self.assertEqual(state["summary"]["topology"]["mode"], "single_workspace_sqlite")
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def test_snapshot_admin_workflow_and_script_exist(self):
workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml"
package = json.loads((ROOT / "package.json").read_text(encoding="utf-8"))
self.assertTrue(workflow.exists())
self.assertIn("--reload", package["scripts"]["ops:snapshot-web"])
self.assertIn("ops:snapshot-validate", package["scripts"])
self.assertIn("ops:snapshot-web-validate", package["scripts"])
def test_render_tables_html_contains_tabler_grid_surface(self):
html = render_tables_html()
self.assertIn("tabler", html.lower())
self.assertIn("tableSelect", html)
self.assertIn("/api/tables", html)
self.assertIn("/api/table_rows", html)
self.assertIn("gridTable", html)
def test_list_browsable_tables_covers_all_three_databases(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
db_path = Path(tmp_dir) / "snapshot_admin.db"
import_seed_json(db_path, ROOT / "GatherTradingData.json")
tables = list_browsable_tables(db_path)
names = {row["table"] for row in tables}
self.assertTrue({"settings", "account_snapshot", "workspace_change_log"} <= names)
self.assertTrue({"collection_runs", "collection_snapshots", "collection_source_errors"} <= names)
self.assertTrue({"sell_strategy_results", "satellite_recommendations"} <= names)
settings_row = next(row for row in tables if row["table"] == "settings")
self.assertTrue(settings_row["exists"])
self.assertTrue(settings_row["row_count"] > 0)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def test_fetch_table_rows_paginates_and_rejects_unknown_table(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
db_path = Path(tmp_dir) / "snapshot_admin.db"
import_seed_json(db_path, ROOT / "GatherTradingData.json")
page1 = fetch_table_rows("settings", db_path, limit=2, offset=0)
self.assertTrue(page1["columns"])
self.assertEqual(len(page1["rows"]), 2)
self.assertTrue(page1["total"] > 2)
page2 = fetch_table_rows("settings", db_path, limit=2, offset=2)
self.assertNotEqual(page1["rows"], page2["rows"])
with self.assertRaises(ValueError):
fetch_table_rows("settings; DROP TABLE settings;--", db_path)
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def test_render_collection_html_contains_dashboard_surface(): def test_snapshot_admin_web_validation_script_passes(self):
html = render_collection_html() out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
assert "KIS Collection Dashboard" in html if out.exists():
assert "/api/state" in html out.unlink()
assert "Download raw JSON" in html
assert "Download CSV" in html rc = validator.main()
assert "Filter runs / snapshots / errors" in html payload = json.loads(out.read_text(encoding="utf-8"))
assert "Ticker quick search" in html
assert "Date quick search" in html self.assertEqual(rc, 0)
self.assertEqual(payload["gate"], "PASS")
self.assertEqual(payload["formula_id"], "SNAPSHOT_ADMIN_WEB_VALIDATION_V1")
self.assertTrue(payload["settings_rows"] > 0)
self.assertTrue(payload["account_snapshot_rows"] > 0)
def test_build_ui_state_exposes_expected_columns(tmp_path): if __name__ == "__main__":
db_path = tmp_path / "snapshot_admin.db" unittest.main()
seed_path = ROOT / "GatherTradingData.json"
import_seed_json(db_path, seed_path)
state = build_ui_state(db_path)
assert state["summary"]["settings_rows"] > 0
assert state["summary"]["account_snapshot_rows"] > 0
assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite"
assert state["summary"]["topology"]["settings_and_snapshot_share_db"] is True
assert state["summary"]["topology"]["collector_separate_db"] is True
assert state["account_snapshot_columns"][0] == "captured_at"
assert "settings" in state["validation"]
assert state["version"]["app"]
assert "fingerprint" in state["version"]["source"]
assert "collection" in state
assert "counts" in state["collection"]
assert "latest_report" in state["collection"]
assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite"
def test_snapshot_admin_workflow_and_script_exist():
workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml"
package = json.loads((ROOT / "package.json").read_text(encoding="utf-8"))
assert workflow.exists()
assert "--reload" in package["scripts"]["ops:snapshot-web"]
assert "ops:snapshot-validate" in package["scripts"]
assert "ops:snapshot-web-validate" in package["scripts"]
def test_render_tables_html_contains_tabler_grid_surface():
html = render_tables_html()
assert "tabler" in html.lower()
assert "tableSelect" in html
assert "/api/tables" in html
assert "/api/table_rows" in html
assert "gridTable" in html
def test_list_browsable_tables_covers_all_three_databases(tmp_path):
db_path = tmp_path / "snapshot_admin.db"
import_seed_json(db_path, ROOT / "GatherTradingData.json")
tables = list_browsable_tables(db_path)
names = {row["table"] for row in tables}
assert {"settings", "account_snapshot", "workspace_change_log"} <= names
assert {"collection_runs", "collection_snapshots", "collection_source_errors"} <= names
assert {"sell_strategy_results", "satellite_recommendations"} <= names
settings_row = next(row for row in tables if row["table"] == "settings")
assert settings_row["exists"] is True
assert settings_row["row_count"] > 0
def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path):
db_path = tmp_path / "snapshot_admin.db"
import_seed_json(db_path, ROOT / "GatherTradingData.json")
page1 = fetch_table_rows("settings", db_path, limit=2, offset=0)
assert page1["columns"]
assert len(page1["rows"]) == 2
assert page1["total"] > 2
page2 = fetch_table_rows("settings", db_path, limit=2, offset=2)
assert page1["rows"] != page2["rows"]
import pytest
with pytest.raises(ValueError):
fetch_table_rows("settings; DROP TABLE settings;--", db_path)
def test_snapshot_admin_web_validation_script_passes():
out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
if out.exists():
out.unlink()
rc = validator.main()
payload = json.loads(out.read_text(encoding="utf-8"))
assert rc == 0
assert payload["gate"] == "PASS"
assert payload["formula_id"] == "SNAPSHOT_ADMIN_WEB_VALIDATION_V1"
assert payload["settings_rows"] > 0
assert payload["account_snapshot_rows"] > 0
@@ -2,6 +2,7 @@ from __future__ import annotations
import json import json
import sys import sys
import unittest
from pathlib import Path from pathlib import Path
ROOT = Path(__file__).resolve().parents[2] ROOT = Path(__file__).resolve().parents[2]
@@ -11,10 +12,17 @@ if str(ROOT) not in sys.path:
import tools.validate_gitea_secrets_contract_v1 as validator import tools.validate_gitea_secrets_contract_v1 as validator
def test_validate_gitea_secrets_contract_passes(): class TestValidateGiteaSecretsContract(unittest.TestCase):
rc = validator.main()
payload = json.loads((ROOT / "Temp" / "gitea_secrets_contract_v1.json").read_text(encoding="utf-8")) def test_validate_gitea_secrets_contract_passes(self):
rc = validator.main()
payload = json.loads((ROOT / "Temp" / "gitea_secrets_contract_v1.json").read_text(encoding="utf-8"))
self.assertEqual(rc, 0)
self.assertEqual(payload["gate"], "PASS")
self.assertTrue(payload["evidence"][".gitea/workflows/kis_data_collection.yml"]["vars.KIS_APP_KEY"])
if __name__ == "__main__":
unittest.main()
assert rc == 0
assert payload["gate"] == "PASS"
assert payload["evidence"][".gitea/workflows/kis_data_collection.yml"]["vars.KIS_APP_KEY"] is True
@@ -2,7 +2,9 @@ from __future__ import annotations
import json import json
import sys import sys
import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[2] ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path: if str(ROOT) not in sys.path:
@@ -19,35 +21,54 @@ class _FakeCreds:
self.app_secret = f"{account}-secret" self.app_secret = f"{account}-secret"
def test_validate_kis_api_credentials_writes_pass_json(tmp_path, monkeypatch): class TestValidateKisApiCredentials(unittest.TestCase):
out = tmp_path / "kis_api_credentials_validation_v1.json"
monkeypatch.setenv("KIS_APP_Key_TEST", "mock-key") def test_validate_kis_api_credentials_writes_pass_json(self):
monkeypatch.setenv("KIS_APP_Secret_TEST", "mock-secret") import tempfile
monkeypatch.setattr(validator, "KisCredentials", type("CredFactory", (), {"load": staticmethod(lambda account: _FakeCreds(account))})) import shutil
monkeypatch.setattr(validator, "get_current_price", lambda creds, ticker: (_ for _ in ()).throw(RuntimeError("network should not be called in dry-run"))) tmp_dir = tempfile.mkdtemp()
monkeypatch.setattr(sys, "argv", ["validate_kis_api_credentials_v1.py", "--account", "mock", "--ticker", "005930", "--dry-run", "--output", str(out)]) try:
out = Path(tmp_dir) / "kis_api_credentials_validation_v1.json"
rc = validator.main() with patch.dict("os.environ", {"KIS_APP_Key_TEST": "mock-key", "KIS_APP_Secret_TEST": "mock-secret"}):
payload = json.loads(out.read_text(encoding="utf-8")) with patch.object(validator, "KisCredentials") as mock_creds:
mock_creds.load.side_effect = lambda account: _FakeCreds(account)
with patch.object(validator, "get_current_price") as mock_price:
mock_price.side_effect = RuntimeError("network should not be called in dry-run")
with patch.object(sys, "argv", ["validate_kis_api_credentials_v1.py", "--account", "mock", "--ticker", "005930", "--dry-run", "--output", str(out)]):
rc = validator.main()
payload = json.loads(out.read_text(encoding="utf-8"))
assert rc == 0 self.assertEqual(rc, 0)
assert payload["gate"] == "PASS" self.assertEqual(payload["gate"], "PASS")
assert payload["evidence"]["account"] == "mock" self.assertEqual(payload["evidence"]["account"], "mock")
assert payload["evidence"]["ticker"] == "005930" self.assertEqual(payload["evidence"]["ticker"], "005930")
assert payload["evidence"]["dry_run"] is True self.assertTrue(payload["evidence"]["dry_run"])
finally:
shutil.rmtree(tmp_dir)
def test_validate_kis_api_credentials_fails_when_api_call_errors(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
out = Path(tmp_dir) / "kis_api_credentials_validation_v1.json"
with patch.object(validator, "KisCredentials") as mock_creds:
mock_creds.load.side_effect = lambda account: _FakeCreds(account)
with patch.object(validator, "get_current_price") as mock_price:
mock_price.side_effect = RuntimeError("boom")
with patch.object(sys, "argv", ["validate_kis_api_credentials_v1.py", "--account", "mock", "--ticker", "005930", "--output", str(out)]):
rc = validator.main()
payload = json.loads(out.read_text(encoding="utf-8"))
self.assertEqual(rc, 1)
self.assertEqual(payload["gate"], "FAIL")
self.assertTrue(payload["errors"])
finally:
shutil.rmtree(tmp_dir)
def test_validate_kis_api_credentials_fails_when_api_call_errors(tmp_path, monkeypatch): if __name__ == "__main__":
out = tmp_path / "kis_api_credentials_validation_v1.json" unittest.main()
monkeypatch.setattr(validator, "KisCredentials", type("CredFactory", (), {"load": staticmethod(lambda account: _FakeCreds(account))}))
monkeypatch.setattr(validator, "get_current_price", lambda creds, ticker: (_ for _ in ()).throw(RuntimeError("boom")))
monkeypatch.setattr(sys, "argv", ["validate_kis_api_credentials_v1.py", "--account", "mock", "--ticker", "005930", "--output", str(out)])
rc = validator.main()
payload = json.loads(out.read_text(encoding="utf-8"))
assert rc == 1
assert payload["gate"] == "FAIL"
assert payload["errors"]
@@ -2,7 +2,9 @@ from __future__ import annotations
import json import json
import sys import sys
import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[2] ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path: if str(ROOT) not in sys.path:
@@ -11,115 +13,93 @@ if str(ROOT) not in sys.path:
import tools.validate_platform_transition_wbs_v1 as validator import tools.validate_platform_transition_wbs_v1 as validator
def test_validate_platform_transition_wbs_reports_failure_notes(monkeypatch): class TestValidatePlatformTransitionWbs(unittest.TestCase):
spec = {
"phase_5_platform_transition": { def test_validate_platform_transition_wbs_reports_failure_notes(self):
"P1_kis_core_api_collector": { spec = {
"success_criteria": { "phase_5_platform_transition": {
"expected_success_value": {}, "P1_kis_core_api_collector": {
"evidence_artifacts": [], "success_criteria": {
"verification_commands": [], "expected_success_value": {},
} "evidence_artifacts": [],
}, "verification_commands": [],
"P2_sqlite_canonical_store": { }
"success_criteria": { },
"expected_success_value": {}, "P2_sqlite_canonical_store": {
"evidence_artifacts": [], "success_criteria": {
"verification_commands": [], "expected_success_value": {},
} "evidence_artifacts": [],
}, "verification_commands": [],
"P3_ci_scheduler_cutover": { }
"success_criteria": { },
"expected_success_value": {}, "P3_ci_scheduler_cutover": {
"evidence_artifacts": [], "success_criteria": {
"verification_commands": [], "expected_success_value": {},
} "evidence_artifacts": [],
}, "verification_commands": [],
"P4_gas_thin_adapter_minimize": { }
"success_criteria": { },
"expected_success_value": {}, "P4_gas_thin_adapter_minimize": {
"evidence_artifacts": [], "success_criteria": {
"verification_commands": [], "expected_success_value": {},
} "evidence_artifacts": [],
}, "verification_commands": [],
"P5_postgresql_upgrade_path": { }
"success_criteria": { },
"expected_success_value": {}, "P5_postgresql_upgrade_path": {
"evidence_artifacts": [], "success_criteria": {
"verification_commands": [], "expected_success_value": {},
} "evidence_artifacts": [],
}, "verification_commands": [],
}
},
}
} }
}
monkeypatch.setattr( with patch.object(validator, "_load_spec", return_value=spec):
validator, with patch.object(validator, "_read_text", return_value="Phase 5 데이터 플랫폼 전환 WBS 성공값 P1 KIS core collector P2 SQLite canonical store P3 CI scheduler cutover P4 GAS thin adapter minimize P5 PostgreSQL upgrade path"):
"_load_spec", with patch.object(validator, "_check_p1", return_value={
lambda: spec, "gate": "FAIL",
) "expected_success_value": {},
monkeypatch.setattr( "evidence": {"summary_path": "Temp/test_kis_data_collection.json", "db_path": "Temp/test_kis_data_collection.db"},
validator, "errors": ["summary_status=None"],
"_read_text", }):
lambda path: "Phase 5 데이터 플랫폼 전환 WBS 성공값 P1 KIS core collector P2 SQLite canonical store P3 CI scheduler cutover P4 GAS thin adapter minimize P5 PostgreSQL upgrade path", with patch.object(validator, "_check_p2", return_value={
) "gate": "FAIL",
monkeypatch.setattr( "expected_success_value": {},
validator, "evidence": {"db_path": "Temp/test_kis_data_collection.db"},
"_check_p1", "errors": ["sqlite_round_trip_missing"],
lambda: { }):
"gate": "FAIL", with patch.object(validator, "_check_p3", return_value={
"expected_success_value": {}, "gate": "PASS",
"evidence": {"summary_path": "Temp/test_kis_data_collection.json", "db_path": "Temp/test_kis_data_collection.db"}, "expected_success_value": {},
"errors": ["summary_status=None"], "evidence": {"workflow_path": ".gitea/workflows/kis_data_collection.yml"},
}, "errors": [],
) }):
monkeypatch.setattr( with patch.object(validator, "_check_p4", return_value={
validator, "gate": "FAIL",
"_check_p2", "expected_success_value": {},
lambda: { "evidence": {"validation_path": "Temp/gas_thin_adapter_validation_v1.json"},
"gate": "FAIL", "errors": ["gate=None", "function_inventory_coverage_pct<100"],
"expected_success_value": {}, }):
"evidence": {"db_path": "Temp/test_kis_data_collection.db"}, with patch.object(validator, "_check_p5", return_value={
"errors": ["sqlite_round_trip_missing"], "gate": "PASS",
}, "expected_success_value": {},
) "evidence": {},
monkeypatch.setattr( "errors": [],
validator, }):
"_check_p3", rc = validator.main()
lambda: { payload = json.loads((ROOT / "Temp" / "platform_transition_wbs_v1.json").read_text(encoding="utf-8"))
"gate": "PASS",
"expected_success_value": {},
"evidence": {"workflow_path": ".gitea/workflows/kis_data_collection.yml"},
"errors": [],
},
)
monkeypatch.setattr(
validator,
"_check_p4",
lambda: {
"gate": "FAIL",
"expected_success_value": {},
"evidence": {"validation_path": "Temp/gas_thin_adapter_validation_v1.json"},
"errors": ["gate=None", "function_inventory_coverage_pct<100"],
},
)
monkeypatch.setattr(
validator,
"_check_p5",
lambda: {
"gate": "PASS",
"expected_success_value": {},
"evidence": {},
"errors": [],
},
)
rc = validator.main() self.assertEqual(rc, 1)
payload = json.loads((ROOT / "Temp" / "platform_transition_wbs_v1.json").read_text(encoding="utf-8")) self.assertEqual(payload["gate"], "FAIL")
self.assertTrue(payload["message"].startswith("Platform transition WBS check failed"))
self.assertEqual(len(payload["failure_notes"]), 3)
self.assertIn("P1 failed", payload["failure_notes"][0])
self.assertIn("P2 failed", payload["failure_notes"][1])
self.assertIn("P4 failed", payload["failure_notes"][2])
if __name__ == "__main__":
unittest.main()
assert rc == 1
assert payload["gate"] == "FAIL"
assert payload["message"].startswith("Platform transition WBS check failed")
assert len(payload["failure_notes"]) == 3
assert "P1 failed" in payload["failure_notes"][0]
assert "P2 failed" in payload["failure_notes"][1]
assert "P4 failed" in payload["failure_notes"][2]
+77 -53
View File
@@ -2,7 +2,9 @@
from __future__ import annotations from __future__ import annotations
import sys import sys
import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[2] ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path: if str(ROOT) not in sys.path:
@@ -11,59 +13,81 @@ if str(ROOT) not in sys.path:
import tools.validate_specs as vs import tools.validate_specs as vs
def test_real_repo_has_no_missing_code_path(): class TestValidateSpecCodeSync(unittest.TestCase):
"""현재 저장소 상태에서 1차 태깅된 파일들은 모두 code_path가 실존해야 한다."""
errors: list[str] = [] def test_real_repo_has_no_missing_code_path(self):
result = vs.validate_spec_code_sync(errors) """현재 저장소 상태에서 1차 태깅된 파일들은 모두 code_path가 실존해야 한다."""
assert result["gate"] == "PASS" errors: list[str] = []
assert result["missing_code_path_count"] == 0 result = vs.validate_spec_code_sync(errors)
assert result["checked_count"] >= 10 self.assertEqual(result["gate"], "PASS")
assert not errors self.assertEqual(result["missing_code_path_count"], 0)
self.assertTrue(result["checked_count"] >= 10)
self.assertFalse(errors)
def test_missing_code_path_fails(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
tmp_path = Path(tmp_dir)
(tmp_path / "spec").mkdir()
(tmp_path / "governance").mkdir()
(tmp_path / "spec" / "fake_contract.yaml").write_text(
"meta:\n has_code_implementation: true\n code_path: \"tools/does_not_exist_v1.py\"\n",
encoding="utf-8",
)
with patch.object(vs, "ROOT", tmp_path):
errors: list[str] = []
result = vs.validate_spec_code_sync(errors)
self.assertEqual(result["gate"], "FAIL")
self.assertEqual(result["missing_code_path_count"], 1)
self.assertTrue(any("does_not_exist_v1.py" in e for e in errors))
finally:
shutil.rmtree(tmp_dir)
def test_redirect_only_and_has_code_is_contradiction(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
tmp_path = Path(tmp_dir)
(tmp_path / "spec").mkdir()
(tmp_path / "governance").mkdir()
(tmp_path / "spec" / "contradiction.yaml").write_text(
"meta:\n has_code_implementation: true\n redirect_only: true\n",
encoding="utf-8",
)
with patch.object(vs, "ROOT", tmp_path):
errors: list[str] = []
result = vs.validate_spec_code_sync(errors)
self.assertEqual(result["gate"], "FAIL")
self.assertTrue(any("contradiction" in e for e in errors))
finally:
shutil.rmtree(tmp_dir)
def test_files_without_the_field_are_skipped_not_failed(self):
import tempfile
import shutil
tmp_dir = tempfile.mkdtemp()
try:
tmp_path = Path(tmp_dir)
(tmp_path / "spec").mkdir()
(tmp_path / "governance").mkdir()
(tmp_path / "spec" / "untouched.yaml").write_text(
"meta:\n title: legacy doc with no sync field\n",
encoding="utf-8",
)
with patch.object(vs, "ROOT", tmp_path):
errors: list[str] = []
result = vs.validate_spec_code_sync(errors)
self.assertEqual(result["gate"], "PASS")
self.assertEqual(result["checked_count"], 0)
self.assertEqual(result["total_spec_files"], 1)
self.assertFalse(errors)
finally:
shutil.rmtree(tmp_dir)
def test_missing_code_path_fails(tmp_path, monkeypatch): if __name__ == "__main__":
(tmp_path / "spec").mkdir() unittest.main()
(tmp_path / "governance").mkdir()
(tmp_path / "spec" / "fake_contract.yaml").write_text(
"meta:\n has_code_implementation: true\n code_path: \"tools/does_not_exist_v1.py\"\n",
encoding="utf-8",
)
monkeypatch.setattr(vs, "ROOT", tmp_path)
errors: list[str] = []
result = vs.validate_spec_code_sync(errors)
assert result["gate"] == "FAIL"
assert result["missing_code_path_count"] == 1
assert any("does_not_exist_v1.py" in e for e in errors)
def test_redirect_only_and_has_code_is_contradiction(tmp_path, monkeypatch):
(tmp_path / "spec").mkdir()
(tmp_path / "governance").mkdir()
(tmp_path / "spec" / "contradiction.yaml").write_text(
"meta:\n has_code_implementation: true\n redirect_only: true\n",
encoding="utf-8",
)
monkeypatch.setattr(vs, "ROOT", tmp_path)
errors: list[str] = []
result = vs.validate_spec_code_sync(errors)
assert result["gate"] == "FAIL"
assert any("contradiction" in e for e in errors)
def test_files_without_the_field_are_skipped_not_failed(tmp_path, monkeypatch):
(tmp_path / "spec").mkdir()
(tmp_path / "governance").mkdir()
(tmp_path / "spec" / "untouched.yaml").write_text(
"meta:\n title: legacy doc with no sync field\n",
encoding="utf-8",
)
monkeypatch.setattr(vs, "ROOT", tmp_path)
errors: list[str] = []
result = vs.validate_spec_code_sync(errors)
assert result["gate"] == "PASS"
assert result["checked_count"] == 0
assert result["total_spec_files"] == 1
assert not errors
+11 -10
View File
@@ -8,24 +8,25 @@ ROOT = Path(__file__).resolve().parents[1]
REQUIRED_PATTERNS = { REQUIRED_PATTERNS = {
".gitea/workflows/kis_data_collection.yml": [ ".gitea/workflows/kis_data_collection.yml": [
"secrets.KIS_APP_KEY_TEST", "vars.KIS_APP_KEY_TEST",
"secrets.KIS_APP_SECRET_TEST", "vars.KIS_APP_SECRET_TEST",
"secrets.KIS_APP_KEY", "vars.KIS_APP_KEY",
"secrets.KIS_APP_SECRET", "vars.KIS_APP_SECRET",
], ],
".gitea/workflows/qualitative_sell_strategy.yml": [ ".gitea/workflows/qualitative_sell_strategy.yml": [
"secrets.KIS_APP_KEY_TEST", "vars.KIS_APP_KEY_TEST",
"secrets.KIS_APP_SECRET_TEST", "vars.KIS_APP_SECRET_TEST",
"secrets.KIS_APP_KEY", "vars.KIS_APP_KEY",
"secrets.KIS_APP_SECRET", "vars.KIS_APP_SECRET",
], ],
".gitea/workflows/ci.yml": [ ".gitea/workflows/ci.yml": [
"secrets.KIS_APP_KEY_TEST", "vars.KIS_APP_KEY_TEST",
"secrets.KIS_APP_SECRET_TEST", "vars.KIS_APP_SECRET_TEST",
], ],
} }
def main() -> int: def main() -> int:
errors: list[str] = [] errors: list[str] = []
evidence: dict[str, dict[str, bool]] = {} evidence: dict[str, dict[str, bool]] = {}