feat(kis-collection): finalize sqlite migration, add fallback resilience, and update WBS documentation
This commit is contained in:
@@ -19,7 +19,7 @@ ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
import pytest
|
||||
import unittest
|
||||
|
||||
from src.quant_engine import kis_data_collection_v1 as kdc
|
||||
from src.quant_engine.data_collection_store_v1 import load_collection_dashboard_state
|
||||
@@ -35,104 +35,143 @@ SEED_ROWS = [
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def seed_json(tmp_path: Path) -> Path:
|
||||
path = tmp_path / "seed.json"
|
||||
path.write_text(
|
||||
json.dumps({"data": {"data_feed": SEED_ROWS}}, ensure_ascii=False),
|
||||
encoding="utf-8",
|
||||
)
|
||||
def _write_seed_json(path: Path) -> Path:
|
||||
path.write_text(json.dumps({"data": {"data_feed": SEED_ROWS}}, ensure_ascii=False), encoding="utf-8")
|
||||
return path
|
||||
|
||||
|
||||
def test_kis_collection_writes_sqlite_that_snapshot_admin_dashboard_reads_back(tmp_path: Path, seed_json: Path):
|
||||
"""1단계: KIS 수집(네트워크 미사용) → SQLite 적재 → snapshot_admin 대시보드 read-back."""
|
||||
db_path = tmp_path / "data_collection_store_v1.db"
|
||||
output_json = tmp_path / "kis_data_collection_v1.json"
|
||||
class TestKisCollectionIntegration(unittest.TestCase):
|
||||
|
||||
summary = kdc.collect_to_sqlite(
|
||||
input_json=seed_json,
|
||||
sqlite_db=db_path,
|
||||
output_json=output_json,
|
||||
kis_account="mock",
|
||||
include_naver=False,
|
||||
include_live_kis=False,
|
||||
)
|
||||
def test_kis_collection_writes_sqlite_that_snapshot_admin_dashboard_reads_back(self):
|
||||
"""1단계: KIS 수집(네트워크 미사용) → SQLite 적재 → snapshot_admin 대시보드 read-back."""
|
||||
import tempfile
|
||||
|
||||
assert summary["status"] in {"PASS", "PASS_WITH_WARNINGS"}
|
||||
assert summary["row_count"] == len(SEED_ROWS)
|
||||
assert not summary["errors"]
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_path = Path(tmpdir)
|
||||
seed_json = _write_seed_json(tmp_path / "seed.json")
|
||||
db_path = tmp_path / "data_collection_store_v1.db"
|
||||
output_json = tmp_path / "kis_data_collection_v1.json"
|
||||
|
||||
dashboard = load_collection_dashboard_state(db_path=db_path, output_json_path=output_json)
|
||||
assert dashboard["counts"]["collection_runs"] >= 1
|
||||
assert dashboard["counts"]["collection_snapshots"] == len(SEED_ROWS)
|
||||
assert dashboard["counts"]["collection_source_errors"] == 0
|
||||
tickers_in_dashboard = {row["ticker"] for row in dashboard["recent_snapshots"]}
|
||||
assert {"005930", "000660"} <= tickers_in_dashboard
|
||||
summary = kdc.collect_to_sqlite(
|
||||
input_json=seed_json,
|
||||
sqlite_db=db_path,
|
||||
output_json=output_json,
|
||||
kis_account="mock",
|
||||
include_naver=False,
|
||||
include_live_kis=False,
|
||||
)
|
||||
|
||||
self.assertIn(summary["status"], {"PASS", "PASS_WITH_WARNINGS"})
|
||||
self.assertEqual(summary["row_count"], len(SEED_ROWS))
|
||||
self.assertFalse(summary["errors"])
|
||||
|
||||
def test_naver_fetch_exception_degrades_gracefully_without_breaking_batch(tmp_path: Path, seed_json: Path, monkeypatch):
|
||||
"""Cloudflare 403 등 Naver 폴백 차단 시 graceful degradation 검증 (spec/exit/qualitative_sell_strategy_v1.yaml:81-82 명시 리스크)."""
|
||||
dashboard = load_collection_dashboard_state(db_path, output_json)
|
||||
self.assertGreaterEqual(dashboard["counts"]["collection_runs"], 1)
|
||||
self.assertEqual(dashboard["counts"]["collection_snapshots"], len(SEED_ROWS))
|
||||
self.assertEqual(dashboard["counts"]["collection_source_errors"], 0)
|
||||
tickers_in_dashboard = {row["ticker"] for row in dashboard["recent_snapshots"]}
|
||||
self.assertTrue({"005930", "000660"} <= tickers_in_dashboard)
|
||||
|
||||
def _raise_cloudflare_block(_session, _code):
|
||||
raise RuntimeError("HTTP 403 Forbidden (Cloudflare)")
|
||||
def test_naver_fetch_exception_degrades_gracefully_without_breaking_batch(self):
|
||||
"""Cloudflare 403 등 Naver 폴백 차단 시 graceful degradation 검증."""
|
||||
import tempfile
|
||||
|
||||
monkeypatch.setattr(kdc, "fetch_price_history", _raise_cloudflare_block)
|
||||
# naver_session/fetch_price_history may be None on environments without the optional
|
||||
# dependency wired; force both non-None so _normalize_naver_price_history actually tries.
|
||||
monkeypatch.setattr(kdc, "naver_session", lambda: object())
|
||||
def _raise_cloudflare_block(_session, _code):
|
||||
raise RuntimeError("HTTP 403 Forbidden (Cloudflare)")
|
||||
|
||||
db_path = tmp_path / "data_collection_store_v1.db"
|
||||
output_json = tmp_path / "kis_data_collection_v1.json"
|
||||
original_fetch = kdc.fetch_price_history
|
||||
original_session = kdc.naver_session
|
||||
kdc.fetch_price_history = _raise_cloudflare_block
|
||||
kdc.naver_session = lambda: object()
|
||||
try:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_path = Path(tmpdir)
|
||||
seed_json = _write_seed_json(tmp_path / "seed.json")
|
||||
db_path = tmp_path / "data_collection_store_v1.db"
|
||||
output_json = tmp_path / "kis_data_collection_v1.json"
|
||||
summary = kdc.collect_to_sqlite(
|
||||
input_json=seed_json,
|
||||
sqlite_db=db_path,
|
||||
output_json=output_json,
|
||||
kis_account="mock",
|
||||
include_naver=True,
|
||||
include_live_kis=False,
|
||||
)
|
||||
self.assertIn(summary["status"], {"PASS", "PASS_WITH_WARNINGS"})
|
||||
self.assertEqual(summary["row_count"], len(SEED_ROWS))
|
||||
self.assertFalse(summary["errors"])
|
||||
finally:
|
||||
kdc.fetch_price_history = original_fetch
|
||||
kdc.naver_session = original_session
|
||||
|
||||
summary = kdc.collect_to_sqlite(
|
||||
input_json=seed_json,
|
||||
sqlite_db=db_path,
|
||||
output_json=output_json,
|
||||
kis_account="mock",
|
||||
include_naver=True,
|
||||
include_live_kis=False,
|
||||
)
|
||||
def test_seed_rows_and_price_source_helpers_are_deterministic(self):
|
||||
import tempfile
|
||||
|
||||
# 배치 전체가 죽지 않고 끝까지 진행되어야 한다 — 개별 ticker의 naver 보강 실패는
|
||||
# collection_source_errors가 아니라 정상 row로 (naver 필드 없이) 기록된다.
|
||||
assert summary["status"] in {"PASS", "PASS_WITH_WARNINGS"}
|
||||
assert summary["row_count"] == len(SEED_ROWS)
|
||||
assert not summary["errors"], "Naver 차단은 개별 ticker 처리 중 흡수되어야 하며 배치 errors로 전파되면 안 된다"
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
tmp_path = Path(tmpdir)
|
||||
seed_json = _write_seed_json(tmp_path / "seed.json")
|
||||
rows = kdc._build_seed_rows(seed_json)
|
||||
self.assertEqual([row["Ticker"] for row in rows], ["005930", "000660"])
|
||||
|
||||
original_kis = kdc._normalize_kis_fields
|
||||
original_naver = kdc._normalize_naver_price_history
|
||||
kdc._normalize_kis_fields = lambda ticker, account: {"status": "OK", "current_price": 70000, "volume": 1234}
|
||||
kdc._normalize_naver_price_history = lambda ticker: {"status": "OK", "close": 65000, "volume": 1111}
|
||||
try:
|
||||
kis, naver, source_priority = kdc._resolve_price_source(
|
||||
"005930",
|
||||
kis_account="mock",
|
||||
include_naver=True,
|
||||
include_live_kis=True,
|
||||
)
|
||||
self.assertEqual(kis["status"], "OK")
|
||||
self.assertEqual(naver["status"], "OK")
|
||||
self.assertIn("kis_open_api", source_priority)
|
||||
self.assertIn("naver_finance", source_priority)
|
||||
self.assertEqual(source_priority[0], "kis_open_api")
|
||||
|
||||
def test_qualitative_sell_strategy_decision_round_trips_through_store(tmp_path: Path):
|
||||
"""2단계: 정성매도전략 평가(순수 함수, 네트워크 미사용) → SQLite 저장 → 조회 round-trip."""
|
||||
ctx = {
|
||||
"today": date(2026, 6, 21),
|
||||
"macro_pressure": 0.5,
|
||||
"fundamental_trajectory": 0.4,
|
||||
"short_interest_pressure": 0.6,
|
||||
"microstructure_pressure": 0.2,
|
||||
"liquidity_rotation_risk": 0.5,
|
||||
"rate_trend": "RISING",
|
||||
}
|
||||
decision = compute_qualitative_sell_strategy(ctx)
|
||||
assert decision["action"] in {
|
||||
"EXIT_REVIEW_FULL",
|
||||
"TRIM_REVIEW_PARTIAL",
|
||||
"HOLD_ADD_CONVICTION",
|
||||
"HOLD_NO_CONFLUENCE",
|
||||
"INSUFFICIENT_DATA_NO_ACTION",
|
||||
}
|
||||
normalized = {"Ticker": "005930", "Name": "삼성전자", "Sector": "반도체"}
|
||||
kdc._apply_source_fallbacks(normalized, row=normalized, kis=kis, naver=naver)
|
||||
self.assertEqual(normalized["current_price"], 70000)
|
||||
self.assertEqual(normalized["volume"], 1234)
|
||||
finally:
|
||||
kdc._normalize_kis_fields = original_kis
|
||||
kdc._normalize_naver_price_history = original_naver
|
||||
|
||||
result = {
|
||||
"code": "005930",
|
||||
"generated_at": "2026-06-21T15:30:00+09:00",
|
||||
"decision": decision,
|
||||
}
|
||||
def test_qualitative_sell_strategy_decision_round_trips_through_store(self):
|
||||
"""2단계: 정성매도전략 평가(순수 함수, 네트워크 미사용) → SQLite 저장 → 조회 round-trip."""
|
||||
ctx = {
|
||||
"today": date(2026, 6, 21),
|
||||
"macro_pressure": 0.5,
|
||||
"fundamental_trajectory": 0.4,
|
||||
"short_interest_pressure": 0.6,
|
||||
"microstructure_pressure": 0.2,
|
||||
"liquidity_rotation_risk": 0.5,
|
||||
"rate_trend": "RISING",
|
||||
}
|
||||
decision = compute_qualitative_sell_strategy(ctx)
|
||||
self.assertIn(decision["action"], {
|
||||
"EXIT_REVIEW_FULL",
|
||||
"TRIM_REVIEW_PARTIAL",
|
||||
"HOLD_ADD_CONVICTION",
|
||||
"HOLD_NO_CONFLUENCE",
|
||||
"INSUFFICIENT_DATA_NO_ACTION",
|
||||
})
|
||||
|
||||
db_path = tmp_path / "qualitative_sell_strategy.db"
|
||||
insert_sell_strategy_result(db_path, result)
|
||||
result = {
|
||||
"code": "005930",
|
||||
"generated_at": "2026-06-21T15:30:00+09:00",
|
||||
"decision": decision,
|
||||
}
|
||||
|
||||
fetched = fetch_recent_sell_strategy_results(db_path, "005930", limit=5)
|
||||
assert len(fetched) == 1
|
||||
assert fetched[0]["code"] == "005930"
|
||||
assert fetched[0]["action"] == decision["action"]
|
||||
assert fetched[0]["conviction"] == decision["conviction"]
|
||||
assert fetched[0]["market_regime"] == decision["market_regime"]
|
||||
import tempfile
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
db_path = Path(tmpdir) / "qualitative_sell_strategy.db"
|
||||
insert_sell_strategy_result(db_path, result)
|
||||
fetched = fetch_recent_sell_strategy_results(db_path, "005930", limit=5)
|
||||
self.assertEqual(len(fetched), 1)
|
||||
self.assertEqual(fetched[0]["code"], "005930")
|
||||
self.assertEqual(fetched[0]["action"], decision["action"])
|
||||
self.assertEqual(fetched[0]["conviction"], decision["conviction"])
|
||||
self.assertEqual(fetched[0]["market_regime"], decision["market_regime"])
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from src.quant_engine.exit_decisions import compute_sell_decision
|
||||
from src.quant_engine.exit_decisions import compute_stop_action_ladder
|
||||
from src.quant_engine.exit_decisions import normalize_tick
|
||||
|
||||
|
||||
class TestPriceQtyParityV1(unittest.TestCase):
|
||||
def test_tp1_price_and_qty_parity(self):
|
||||
res = compute_sell_decision({"close": 10000, "profitPct": 10.0, "tp1Price": 11000})
|
||||
self.assertEqual(res["action"], "TAKE_PROFIT_TIER1")
|
||||
self.assertEqual(res["ratio_pct"], 25)
|
||||
self.assertEqual(res["price_basis"], "TAKE_PROFIT_TIER1_PRICE")
|
||||
self.assertEqual(res["limit_price"], 11000)
|
||||
self.assertEqual(res["order_type"], "LIMIT_SELL")
|
||||
|
||||
def test_tp2_price_and_fallback_parity(self):
|
||||
res_fallback = compute_sell_decision({"close": 10000, "profitPct": 50.0, "tp2Price": None})
|
||||
self.assertEqual(res_fallback["price_basis"], "PRIOR_CLOSE_X_0.998")
|
||||
self.assertEqual(res_fallback["action"], "PROFIT_TRIM_50")
|
||||
|
||||
res_tp2 = compute_sell_decision({"close": 10000, "profitPct": 50.0, "tp2Price": 12000})
|
||||
self.assertEqual(res_tp2["price_basis"], "TAKE_PROFIT_TIER2_PRICE")
|
||||
self.assertEqual(res_tp2["limit_price"], 12000)
|
||||
|
||||
def test_relative_weakness_and_time_exit_price_qty_parity(self):
|
||||
res_rw = compute_sell_decision({"close": 10000, "rwPartial": 1})
|
||||
self.assertEqual(res_rw["action"], "TRIM_25")
|
||||
self.assertEqual(res_rw["ratio_pct"], 25)
|
||||
self.assertEqual(res_rw["price_basis"], "PRIOR_CLOSE_X_0.998")
|
||||
|
||||
res_rw2 = compute_sell_decision({"close": 10000, "rwPartial": 2})
|
||||
self.assertEqual(res_rw2["action"], "TRIM_50")
|
||||
self.assertEqual(res_rw2["ratio_pct"], 50)
|
||||
self.assertEqual(res_rw2["order_type"], "LIMIT_SELL")
|
||||
|
||||
res_time = compute_sell_decision({"close": 10000, "daysToTimeStop": 0})
|
||||
self.assertEqual(res_time["action"], "TIME_EXIT_100")
|
||||
self.assertEqual(res_time["ratio_pct"], 100)
|
||||
self.assertEqual(res_time["price_basis"], "TIME_STOP_CLOSE_PROTECT")
|
||||
|
||||
res_time_trim = compute_sell_decision({"close": 10000, "daysToTimeStop": 6})
|
||||
self.assertEqual(res_time_trim["action"], "TIME_TRIM_50")
|
||||
self.assertEqual(res_time_trim["ratio_pct"], 50)
|
||||
|
||||
res_time_window = compute_sell_decision({"close": 10000, "daysToTimeStop": 7})
|
||||
self.assertEqual(res_time_window["action"], "TIME_TRIM_50")
|
||||
self.assertEqual(res_time_window["ratio_pct"], 50)
|
||||
|
||||
res_time_14 = compute_sell_decision({"close": 10000, "daysToTimeStop": 14})
|
||||
self.assertEqual(res_time_14["action"], "TIME_TRIM_25")
|
||||
self.assertEqual(res_time_14["ratio_pct"], 25)
|
||||
|
||||
res_time_15 = compute_sell_decision({"close": 10000, "daysToTimeStop": 15})
|
||||
self.assertEqual(res_time_15["action"], "HOLD")
|
||||
|
||||
def test_stop_action_ladder_parity(self):
|
||||
res = compute_stop_action_ladder({"profitPct": 10.0})
|
||||
self.assertEqual(res["action"], "TAKE_PROFIT_TIER1")
|
||||
self.assertEqual(res["quantity_pct"], 25)
|
||||
self.assertEqual(res["priority"], 5)
|
||||
|
||||
res_review = compute_stop_action_ladder({"profitPct": 9.99, "daysToTimeStop": 1})
|
||||
self.assertEqual(res_review["action"], "REVIEW_HUMAN")
|
||||
self.assertEqual(res_review["quantity_pct"], 0)
|
||||
|
||||
res_exit = compute_stop_action_ladder({"timingAction": "STOP_OR_TIME_EXIT_READY"})
|
||||
self.assertEqual(res_exit["action"], "EXIT_100")
|
||||
self.assertEqual(res_exit["quantity_pct"], 100)
|
||||
|
||||
res_risk_off = compute_stop_action_ladder({"REGIME_PRELIM": "RISK_OFF"})
|
||||
self.assertEqual(res_risk_off["action"], "REGIME_TRIM_50")
|
||||
self.assertEqual(res_risk_off["quantity_pct"], 50)
|
||||
self.assertEqual(res_risk_off["priority"], 2)
|
||||
|
||||
res_rw2b = compute_stop_action_ladder({"rw_partial_excluding_rw2b": 1, "RW2b_5d_rapid_weakness": True})
|
||||
self.assertEqual(res_rw2b["action"], "TRIM_50")
|
||||
self.assertEqual(res_rw2b["priority"], 2.5)
|
||||
|
||||
res_trailing = compute_stop_action_ladder({"trailingStopBreach": True})
|
||||
self.assertEqual(res_trailing["action"], "TRIM_50")
|
||||
self.assertEqual(res_trailing["priority"], 4)
|
||||
|
||||
def test_fallback_stop_price_is_tick_independent(self):
|
||||
res = compute_sell_decision({"close": 10000, "profitPct": 20.0, "tp1Price": None})
|
||||
self.assertEqual(res["action"], "PROFIT_TRIM_25")
|
||||
self.assertEqual(res["price_basis"], "PRIOR_CLOSE_X_0.998")
|
||||
self.assertEqual(res["validation"], "SIGNAL_CONFIRMED")
|
||||
|
||||
res_tp2 = compute_sell_decision({"close": 10000, "profitPct": 50.0, "tp2Price": None})
|
||||
self.assertEqual(res_tp2["action"], "PROFIT_TRIM_50")
|
||||
self.assertEqual(res_tp2["price_basis"], "PRIOR_CLOSE_X_0.998")
|
||||
|
||||
res_tp2_fallback = compute_sell_decision({"close": 10000, "profitPct": 50.0, "tp2Price": None, "tp1Price": None})
|
||||
self.assertEqual(res_tp2_fallback["action"], "PROFIT_TRIM_50")
|
||||
self.assertEqual(res_tp2_fallback["price_source"], "CLOSE_PROFIT_PROTECT")
|
||||
|
||||
def test_tick_normalization_boundaries(self):
|
||||
self.assertEqual(normalize_tick(1999.9), 1999)
|
||||
self.assertEqual(normalize_tick(2000.0), 2000)
|
||||
self.assertEqual(normalize_tick(5001.0), 5000)
|
||||
self.assertEqual(normalize_tick(20000.0), 20000)
|
||||
self.assertEqual(normalize_tick(50000.0), 50000)
|
||||
self.assertEqual(normalize_tick(200000.0), 200000)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,164 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
import math
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
# Python Port simulation of GAS runRouteFlow_ logic
|
||||
def run_route_flow_simulation(
|
||||
h: dict,
|
||||
df: dict,
|
||||
h1: dict
|
||||
) -> tuple[str, list[dict]]:
|
||||
base_fa = str(df.get("finalAction") or "INSUFFICIENT_DATA").upper()
|
||||
final_fa = base_fa
|
||||
trace = []
|
||||
|
||||
# ── Gate 1a: Stop_Price Breach 감지
|
||||
if h.get("stopBreach"):
|
||||
if h1.get("intradayLock"):
|
||||
final_fa = "TRIM_50"
|
||||
trace.append({"gate": "STOP_BREACH", "result": "DOWNGRADE_P4", "reason": "장중(P4): stop_breach→TRIM_50 완화"})
|
||||
else:
|
||||
final_fa = "EXIT_100"
|
||||
trace.append({"gate": "STOP_BREACH", "result": "FORCE_EXIT", "reason": f"close({h.get('close')})<=stop({h.get('stop_price')})"})
|
||||
else:
|
||||
trace.append({"gate": "STOP_BREACH", "result": "PASS", "reason": "no_breach"})
|
||||
|
||||
# ── Gate 1a-bis: Relative Stop
|
||||
if final_fa != "EXIT_100":
|
||||
rs_ret20d = df.get("ret20d")
|
||||
rs_atr20 = df.get("atr20")
|
||||
rs_close = h.get("close") or df.get("close") or 0.0
|
||||
rs_pft = h.get("profitPct")
|
||||
rs_hdays = h.get("holdingDays") or 0
|
||||
rs_kospi = h1.get("kospiRet20d") or 0.0
|
||||
|
||||
if rs_ret20d is not None and rs_atr20 is not None and rs_close > 0:
|
||||
rs_beta = min(3.0, max(0.3, rs_ret20d / rs_kospi)) if abs(rs_kospi) >= 0.5 else 1.0
|
||||
rs_excess = rs_ret20d - rs_beta * rs_kospi
|
||||
rs_sigma = (rs_atr20 / rs_close * 100.0) * math.sqrt(20)
|
||||
rs_thresh = -2.0 * rs_sigma
|
||||
rs_abs_fl = rs_pft is not None and rs_pft < -20.0
|
||||
rs_time_st = rs_hdays >= 60 and rs_excess < 0
|
||||
|
||||
if rs_abs_fl or (rs_excess < rs_thresh) or rs_time_st:
|
||||
trace.append({"gate": "RELATIVE_STOP", "result": "TRIM_50", "reason": "relative_stop_breached"})
|
||||
if final_fa == "HOLD" or "BUY" in final_fa:
|
||||
final_fa = "TRIM_50"
|
||||
else:
|
||||
trace.append({"gate": "RELATIVE_STOP", "result": "PASS", "reason": "relative_stop_passed"})
|
||||
else:
|
||||
trace.append({"gate": "RELATIVE_STOP", "result": "SKIP", "reason": "insufficient_data"})
|
||||
|
||||
# ── Gate 1b: Intraday_Lock
|
||||
if h1.get("intradayLock"):
|
||||
intraday_blocked_keywords = ["BUY", "BUY_LADDER", "EXIT_100"]
|
||||
intraday_allowed_actions = ["WATCH", "TRIM_50", "HOLD", "TRIM_33"]
|
||||
|
||||
if any(keyword in final_fa for keyword in intraday_blocked_keywords):
|
||||
downgraded = "WATCH" if "BUY" in final_fa else "TRIM_50"
|
||||
trace.append({"gate": "INTRADAY_LOCK", "result": "DOWNGRADE", "reason": f"P4: {final_fa}→{downgraded}"})
|
||||
final_fa = downgraded
|
||||
|
||||
if final_fa not in intraday_allowed_actions:
|
||||
trace.append({"gate": "INTRADAY_LOCK", "result": "FORCE_WATCH", "reason": f"P4_ALLOWLIST: {final_fa} not allowed→WATCH"})
|
||||
final_fa = "WATCH"
|
||||
else:
|
||||
trace.append({"gate": "INTRADAY_LOCK", "result": "PASS", "reason": "action_in_allowlist"})
|
||||
else:
|
||||
trace.append({"gate": "INTRADAY_LOCK", "result": "INACTIVE", "reason": "post_market"})
|
||||
|
||||
# ── Gate 1c: Heat Gate
|
||||
if h1.get("heatGate") == "BLOCK_NEW_BUY" and "BUY" in final_fa:
|
||||
trace.append({"gate": "HEAT_GATE", "result": "BLOCK_BUY", "reason": "total_heat>=10%: BUY→WATCH"})
|
||||
final_fa = "WATCH"
|
||||
elif h1.get("heatGate") == "HALVE_NEW_BUY_QUANTITY" and "BUY" in final_fa:
|
||||
trace.append({"gate": "HEAT_GATE", "result": "HALVE_QTY", "reason": "total_heat>=7%: Qty halved"})
|
||||
else:
|
||||
trace.append({"gate": "HEAT_GATE", "result": "PASS", "reason": "heat_gate_pass"})
|
||||
|
||||
# ── Gate 1d: Mean Reversion Gate
|
||||
if "BUY" in final_fa:
|
||||
mrg_close = df.get("close") or 0.0
|
||||
mrg_ma20 = df.get("ma20") or 0.0
|
||||
if mrg_close > 0.0 and mrg_ma20 > 0.0:
|
||||
dev_ratio = mrg_close / mrg_ma20
|
||||
if dev_ratio >= 1.15:
|
||||
trace.append({"gate": "MEAN_REVERSION_GATE", "result": "BUY_HARD_BLOCK", "reason": f"deviation_ratio={dev_ratio:.2f}>=1.15"})
|
||||
final_fa = "WATCH"
|
||||
else:
|
||||
trace.append({"gate": "MEAN_REVERSION_GATE", "result": "PASS", "reason": "mean_reversion_pass"})
|
||||
|
||||
# ── Gate 2: Cash Floor
|
||||
if h1.get("cashFloorStatus") == "HARD_BLOCK" and "BUY" in final_fa:
|
||||
trace.append({"gate": "CASH_FLOOR", "result": "HARD_BLOCK", "reason": "immediate_cash<floor: BUY→WATCH"})
|
||||
final_fa = "WATCH"
|
||||
elif h1.get("cashFloorStatus") == "TRIM_REQUIRED" and "BUY" in final_fa:
|
||||
trace.append({"gate": "CASH_FLOOR", "result": "BUY_BLOCKED", "reason": "TRIM_REQUIRED: BUY→WATCH"})
|
||||
final_fa = "WATCH"
|
||||
elif h1.get("cashFloorStatus") == "TRIM_REQUIRED" and final_fa == "HOLD":
|
||||
trace.append({"gate": "CASH_FLOOR", "result": "NUDGE_TRIM", "reason": "TRIM_REQUIRED: HOLD→TRIM_33"})
|
||||
final_fa = "TRIM_33"
|
||||
else:
|
||||
trace.append({"gate": "CASH_FLOOR", "result": "PASS", "reason": "cash_floor_pass"})
|
||||
|
||||
return final_fa, trace
|
||||
|
||||
|
||||
class TestRoutingDecisionParity(unittest.TestCase):
|
||||
|
||||
def test_stop_breach_routing_gating(self):
|
||||
# Scenario 1: Stop breach during post-market (no intraday lock)
|
||||
# Expected: FORCE_EXIT -> EXIT_100
|
||||
h_1 = {"stopBreach": True, "close": 9000, "stop_price": 10000}
|
||||
df_1 = {"finalAction": "HOLD"}
|
||||
h1_1 = {"intradayLock": False}
|
||||
final_fa, trace = run_route_flow_simulation(h_1, df_1, h1_1)
|
||||
self.assertEqual(final_fa, "EXIT_100")
|
||||
self.assertEqual(trace[0]["result"], "FORCE_EXIT")
|
||||
|
||||
# Scenario 2: Stop breach during intraday market (intraday lock active)
|
||||
# Expected: DOWNGRADE_P4 -> TRIM_50
|
||||
h1_2 = {"intradayLock": True}
|
||||
final_fa_2, trace_2 = run_route_flow_simulation(h_1, df_1, h1_2)
|
||||
self.assertEqual(final_fa_2, "TRIM_50")
|
||||
self.assertEqual(trace_2[0]["result"], "DOWNGRADE_P4")
|
||||
|
||||
def test_heat_gate_and_mr_gating(self):
|
||||
# Scenario 1: Heat gate BLOCK_NEW_BUY overrides BUY_LADDER
|
||||
h_3 = {"stopBreach": False}
|
||||
df_3 = {"finalAction": "BUY_LADDER", "close": 10000, "ma20": 10000}
|
||||
h1_3 = {"intradayLock": False, "heatGate": "BLOCK_NEW_BUY"}
|
||||
final_fa, trace = run_route_flow_simulation(h_3, df_3, h1_3)
|
||||
self.assertEqual(final_fa, "WATCH")
|
||||
|
||||
# Scenario 2: Mean Reversion Gate (MRG001) close/ma20 = 12000/10000 = 1.20 >= 1.15
|
||||
df_4 = {"finalAction": "BUY_LADDER", "close": 12000, "ma20": 10000}
|
||||
h1_4 = {"intradayLock": False, "heatGate": "PASS"}
|
||||
final_fa_4, trace_4 = run_route_flow_simulation(h_3, df_4, h1_4)
|
||||
self.assertEqual(final_fa_4, "WATCH")
|
||||
self.assertTrue(any(t["gate"] == "MEAN_REVERSION_GATE" and t["result"] == "BUY_HARD_BLOCK" for t in trace_4))
|
||||
|
||||
def test_cash_floor_routes_hold_to_trim_and_preserves_exit(self):
|
||||
h_5 = {"stopBreach": False}
|
||||
df_5 = {"finalAction": "HOLD"}
|
||||
h1_5 = {"intradayLock": False, "cashFloorStatus": "TRIM_REQUIRED"}
|
||||
final_fa_5, trace_5 = run_route_flow_simulation(h_5, df_5, h1_5)
|
||||
self.assertEqual(final_fa_5, "TRIM_33")
|
||||
self.assertTrue(any(t["gate"] == "CASH_FLOOR" and t["result"] == "NUDGE_TRIM" for t in trace_5))
|
||||
|
||||
df_6 = {"finalAction": "EXIT_REVIEW"}
|
||||
final_fa_6, trace_6 = run_route_flow_simulation(h_5, df_6, {"intradayLock": False, "cashFloorStatus": "HARD_BLOCK"})
|
||||
self.assertEqual(final_fa_6, "EXIT_REVIEW")
|
||||
self.assertTrue(any(t["gate"] == "CASH_FLOOR" and t["result"] == "PASS" for t in trace_6))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,131 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
def run_route_flow_simulation(h: dict, df: dict, h1: dict) -> tuple[str, list[dict]]:
|
||||
base_fa = str(df.get("finalAction") or "INSUFFICIENT_DATA").upper()
|
||||
final_fa = base_fa
|
||||
trace = []
|
||||
|
||||
if h.get("stopBreach"):
|
||||
if h1.get("intradayLock"):
|
||||
final_fa = "TRIM_50"
|
||||
trace.append({"gate": "STOP_BREACH", "result": "DOWNGRADE_P4"})
|
||||
else:
|
||||
final_fa = "EXIT_100"
|
||||
trace.append({"gate": "STOP_BREACH", "result": "FORCE_EXIT"})
|
||||
else:
|
||||
trace.append({"gate": "STOP_BREACH", "result": "PASS"})
|
||||
|
||||
if final_fa != "EXIT_100":
|
||||
rs_ret20d = df.get("ret20d")
|
||||
rs_atr20 = df.get("atr20")
|
||||
rs_close = h.get("close") or df.get("close") or 0.0
|
||||
rs_pft = h.get("profitPct")
|
||||
rs_hdays = h.get("holdingDays") or 0
|
||||
rs_kospi = h1.get("kospiRet20d") or 0.0
|
||||
if rs_ret20d is not None and rs_atr20 is not None and rs_close > 0:
|
||||
rs_beta = min(3.0, max(0.3, rs_ret20d / rs_kospi)) if abs(rs_kospi) >= 0.5 else 1.0
|
||||
rs_excess = rs_ret20d - rs_beta * rs_kospi
|
||||
rs_sigma = (rs_atr20 / rs_close * 100.0) * math.sqrt(20)
|
||||
rs_thresh = -2.0 * rs_sigma
|
||||
rs_abs_fl = rs_pft is not None and rs_pft < -20.0
|
||||
rs_time_st = rs_hdays >= 60 and rs_excess < 0
|
||||
if rs_abs_fl or (rs_excess < rs_thresh) or rs_time_st:
|
||||
trace.append({"gate": "RELATIVE_STOP", "result": "TRIM_50"})
|
||||
if final_fa == "HOLD" or "BUY" in final_fa:
|
||||
final_fa = "TRIM_50"
|
||||
else:
|
||||
trace.append({"gate": "RELATIVE_STOP", "result": "PASS"})
|
||||
else:
|
||||
trace.append({"gate": "RELATIVE_STOP", "result": "SKIP"})
|
||||
|
||||
if h1.get("intradayLock"):
|
||||
intraday_blocked_keywords = ["BUY", "BUY_LADDER", "EXIT_100"]
|
||||
intraday_allowed_actions = ["WATCH", "TRIM_50", "HOLD", "TRIM_33"]
|
||||
if any(keyword in final_fa for keyword in intraday_blocked_keywords):
|
||||
final_fa = "WATCH" if "BUY" in final_fa else "TRIM_50"
|
||||
if final_fa not in intraday_allowed_actions:
|
||||
final_fa = "WATCH"
|
||||
|
||||
if h1.get("heatGate") == "BLOCK_NEW_BUY" and "BUY" in final_fa:
|
||||
final_fa = "WATCH"
|
||||
elif h1.get("heatGate") == "HALVE_NEW_BUY_QUANTITY" and "BUY" in final_fa:
|
||||
pass
|
||||
|
||||
if "BUY" in final_fa:
|
||||
mrg_close = df.get("close") or 0.0
|
||||
mrg_ma20 = df.get("ma20") or 0.0
|
||||
if mrg_close > 0.0 and mrg_ma20 > 0.0 and (mrg_close / mrg_ma20) >= 1.15:
|
||||
final_fa = "WATCH"
|
||||
|
||||
if h1.get("cashFloorStatus") == "HARD_BLOCK" and "BUY" in final_fa:
|
||||
final_fa = "WATCH"
|
||||
elif h1.get("cashFloorStatus") == "TRIM_REQUIRED" and "BUY" in final_fa:
|
||||
final_fa = "WATCH"
|
||||
elif h1.get("cashFloorStatus") == "TRIM_REQUIRED" and final_fa == "HOLD":
|
||||
final_fa = "TRIM_33"
|
||||
|
||||
return final_fa, trace
|
||||
|
||||
|
||||
class TestRoutingGateParityV1(unittest.TestCase):
|
||||
def test_stop_breach_routes_exit_or_trim(self):
|
||||
final_fa, trace = run_route_flow_simulation({"stopBreach": True}, {"finalAction": "HOLD"}, {"intradayLock": False})
|
||||
self.assertEqual(final_fa, "EXIT_100")
|
||||
self.assertEqual(trace[0]["result"], "FORCE_EXIT")
|
||||
|
||||
final_fa_lock, trace_lock = run_route_flow_simulation({"stopBreach": True}, {"finalAction": "HOLD"}, {"intradayLock": True})
|
||||
self.assertEqual(final_fa_lock, "TRIM_50")
|
||||
self.assertEqual(trace_lock[0]["result"], "DOWNGRADE_P4")
|
||||
|
||||
def test_heat_gate_blocks_buy_and_allows_half_quantity_trace(self):
|
||||
final_fa_buy, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "BUY_LADDER", "close": 12000, "ma20": 10000}, {"heatGate": "PASS"})
|
||||
self.assertEqual(final_fa_buy, "WATCH")
|
||||
|
||||
final_fa_half, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "BUY_LADDER", "close": 11000, "ma20": 10000}, {"heatGate": "HALVE_NEW_BUY_QUANTITY"})
|
||||
self.assertEqual(final_fa_half, "BUY_LADDER")
|
||||
|
||||
def test_cash_floor_routes_hold_and_buy_separately(self):
|
||||
final_fa, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "HOLD"}, {"cashFloorStatus": "TRIM_REQUIRED"})
|
||||
self.assertEqual(final_fa, "TRIM_33")
|
||||
|
||||
final_fa_cash_block, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "BUY_LADDER"}, {"cashFloorStatus": "HARD_BLOCK"})
|
||||
self.assertEqual(final_fa_cash_block, "WATCH")
|
||||
|
||||
final_fa_trim_buy, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "BUY_LADDER"}, {"cashFloorStatus": "TRIM_REQUIRED"})
|
||||
self.assertEqual(final_fa_trim_buy, "WATCH")
|
||||
|
||||
def test_relative_stop_and_mean_reversion(self):
|
||||
final_fa_rw, trace_rw = run_route_flow_simulation(
|
||||
{"stopBreach": False, "profitPct": -25.0},
|
||||
{"finalAction": "HOLD", "ret20d": -5.0, "atr20": 100.0, "close": 10000.0},
|
||||
{"intradayLock": False, "kospiRet20d": 1.0},
|
||||
)
|
||||
self.assertEqual(final_fa_rw, "TRIM_50")
|
||||
self.assertTrue(any(item["gate"] == "RELATIVE_STOP" for item in trace_rw))
|
||||
|
||||
final_fa_mr, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "BUY_LADDER", "close": 12000, "ma20": 10000}, {"heatGate": "PASS", "cashFloorStatus": "PASS"})
|
||||
self.assertEqual(final_fa_mr, "WATCH")
|
||||
|
||||
def test_cash_floor_hard_block_preserves_non_buy_actions(self):
|
||||
final_fa, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "EXIT_REVIEW"}, {"cashFloorStatus": "HARD_BLOCK"})
|
||||
self.assertEqual(final_fa, "EXIT_REVIEW")
|
||||
|
||||
final_fa_hard_hold, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "HOLD"}, {"cashFloorStatus": "HARD_BLOCK"})
|
||||
self.assertEqual(final_fa_hard_hold, "HOLD")
|
||||
|
||||
final_fa_trim_hold, _ = run_route_flow_simulation({"stopBreach": False}, {"finalAction": "HOLD"}, {"cashFloorStatus": "TRIM_REQUIRED", "heatGate": "PASS"})
|
||||
self.assertEqual(final_fa_trim_hold, "TRIM_33")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,129 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from src.quant_engine.exit_decisions import compute_timing_decision
|
||||
|
||||
|
||||
class TestScoreParityV1(unittest.TestCase):
|
||||
def test_pullback_wait_is_selected_for_borderline_entry_scores(self):
|
||||
res = compute_timing_decision(
|
||||
{
|
||||
"priceStatus": "PRICE_OK",
|
||||
"atr20": 100.0,
|
||||
"entryModeGate": "PASS",
|
||||
"entryMode": "PULLBACK",
|
||||
"leaderTotal": 3,
|
||||
"leaderGate": "PASS",
|
||||
"flowCredit": 0.4,
|
||||
"acGate": "CAUTION",
|
||||
"ma20Slope": 1.0,
|
||||
"disparity": 4.5,
|
||||
"rsi14": 68.0,
|
||||
"avgTradeValue5D": 100.0,
|
||||
"spreadPct": 0.5,
|
||||
}
|
||||
)
|
||||
self.assertEqual(res["action"], "BUY_PULLBACK_WAIT")
|
||||
self.assertGreaterEqual(res["entry_score"], 60)
|
||||
self.assertLess(res["entry_score"], 100)
|
||||
|
||||
def test_entry_score_boosts_with_leader_flow_and_clear_gate(self):
|
||||
res = compute_timing_decision(
|
||||
{
|
||||
"priceStatus": "PRICE_OK",
|
||||
"atr20": 100.0,
|
||||
"entryModeGate": "PASS",
|
||||
"entryMode": "BREAKOUT",
|
||||
"leaderTotal": 4,
|
||||
"leaderGate": "PASS",
|
||||
"flowCredit": 0.8,
|
||||
"acGate": "CLEAR",
|
||||
"ma20Slope": 1.0,
|
||||
"disparity": 2.0,
|
||||
"rsi14": 55.0,
|
||||
"avgTradeValue5D": 100.0,
|
||||
"spreadPct": 0.5,
|
||||
}
|
||||
)
|
||||
self.assertEqual(res["action"], "BUY_BREAKOUT_PILOT_ONLY")
|
||||
self.assertGreaterEqual(res["entry_score"], 75)
|
||||
self.assertLessEqual(res["exit_score"], 20)
|
||||
|
||||
def test_exit_review_is_selected_before_forced_exit_threshold(self):
|
||||
res = compute_timing_decision(
|
||||
{
|
||||
"priceStatus": "PRICE_OK",
|
||||
"atr20": 100.0,
|
||||
"entryModeGate": "PASS",
|
||||
"entryMode": "PULLBACK",
|
||||
"leaderTotal": 3,
|
||||
"leaderGate": "PASS",
|
||||
"flowCredit": 0.6,
|
||||
"acGate": "CAUTION",
|
||||
"ma20Slope": -1.0,
|
||||
"disparity": 8.5,
|
||||
"rsi14": 70.0,
|
||||
"avgTradeValue5D": 100.0,
|
||||
"spreadPct": 0.5,
|
||||
"rwPartial": 2,
|
||||
}
|
||||
)
|
||||
self.assertEqual(res["action"], "EXIT_REVIEW")
|
||||
self.assertGreaterEqual(res["exit_score"], 50)
|
||||
self.assertLess(res["exit_score"], 75)
|
||||
|
||||
def test_data_missing_short_circuits_timing_action(self):
|
||||
res = compute_timing_decision(
|
||||
{
|
||||
"priceStatus": "PRICE_OK",
|
||||
"atr20": None,
|
||||
"entryModeGate": "PASS",
|
||||
"entryMode": "BREAKOUT",
|
||||
"leaderTotal": 4,
|
||||
"leaderGate": "PASS",
|
||||
"flowCredit": 0.8,
|
||||
"acGate": "CLEAR",
|
||||
"ma20Slope": 1.0,
|
||||
"disparity": 2.0,
|
||||
"rsi14": 55.0,
|
||||
"avgTradeValue5D": 100.0,
|
||||
"spreadPct": 0.5,
|
||||
}
|
||||
)
|
||||
self.assertEqual(res["action"], "OBSERVE_DATA_MISSING")
|
||||
|
||||
def test_exit_score_dominates_with_risk_off_and_time_stop(self):
|
||||
res = compute_timing_decision(
|
||||
{
|
||||
"priceStatus": "PRICE_OK",
|
||||
"atr20": 100.0,
|
||||
"entryModeGate": "PASS",
|
||||
"entryMode": "PULLBACK",
|
||||
"leaderTotal": 3,
|
||||
"leaderGate": "PASS",
|
||||
"flowCredit": 0.6,
|
||||
"acGate": "BLOCK",
|
||||
"ma20Slope": -1.0,
|
||||
"disparity": 13.0,
|
||||
"rsi14": 80.0,
|
||||
"avgTradeValue5D": 100.0,
|
||||
"spreadPct": 0.5,
|
||||
"rwPartial": 4,
|
||||
"daysToTimeStop": 3,
|
||||
"profitPct": 12.0,
|
||||
}
|
||||
)
|
||||
self.assertEqual(res["action"], "STOP_OR_TIME_EXIT_READY")
|
||||
self.assertGreaterEqual(res["exit_score"], 75)
|
||||
self.assertGreaterEqual(res["entry_score"], 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,83 +1,87 @@
|
||||
"""data_feed 원자료 컬럼(MA/Ret/ATR/수급 5D·20D) 파생 함수 단위 테스트.
|
||||
|
||||
사용자 요청(2026-06-22): "json 로딩되는 게 원래는 sqlite에 파이선 코드로 수집돼야
|
||||
하는거 아니야" — GAS가 계산하던 data_feed 원자료 일부를 Python(kis_data_collection_v1)
|
||||
으로 옮기는 1단계 작업. 네트워크를 사용하지 않고 순수 계산 로직만 검증한다.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
from src.quant_engine.kis_data_collection_v1 import (
|
||||
_aggregate_flow,
|
||||
_compute_atr20,
|
||||
_compute_ma,
|
||||
_compute_ret_pct,
|
||||
)
|
||||
from src.quant_engine import kis_data_collection_v1 as kdc
|
||||
|
||||
|
||||
def _price_rows(closes: list[float], highs: list[float] | None = None, lows: list[float] | None = None) -> list[dict]:
|
||||
"""closes[0]이 최신 거래일. high/low를 안 주면 close와 동일하게 채운다(ATR=0 케이스 테스트용)."""
|
||||
highs = highs or closes
|
||||
lows = lows or closes
|
||||
return [{"close": c, "high": h, "low": l, "volume": 1000} for c, h, l in zip(closes, highs, lows)]
|
||||
class TestKisDataCollectionV1(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self._tmp_root = Path(ROOT / "Temp" / "unit_test_kis_data_collection_v1")
|
||||
self._tmp_root.mkdir(parents=True, exist_ok=True)
|
||||
self.seed_json = self._tmp_root / "seed.json"
|
||||
self.seed_json.write_text(
|
||||
json.dumps(
|
||||
{"data": {"data_feed": [{"Ticker": "005930", "Name": "삼성전자", "Sector": "반도체"}]}},
|
||||
ensure_ascii=False,
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
def tearDown(self) -> None:
|
||||
for path in self._tmp_root.glob("*"):
|
||||
try:
|
||||
path.unlink()
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
self._tmp_root.rmdir()
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def test_build_seed_rows(self):
|
||||
rows = kdc._build_seed_rows(self.seed_json)
|
||||
self.assertEqual(len(rows), 1)
|
||||
self.assertEqual(rows[0]["Ticker"], "005930")
|
||||
|
||||
def test_resolve_price_source_prefers_kis_then_naver(self):
|
||||
original_kis = kdc._normalize_kis_fields
|
||||
original_naver = kdc._normalize_naver_price_history
|
||||
kdc._normalize_kis_fields = lambda ticker, account: {"status": "OK", "current_price": 70000}
|
||||
kdc._normalize_naver_price_history = lambda ticker: {"status": "OK", "close": 65000}
|
||||
try:
|
||||
kis, naver, source_priority = kdc._resolve_price_source(
|
||||
"005930",
|
||||
kis_account="mock",
|
||||
include_naver=True,
|
||||
include_live_kis=True,
|
||||
)
|
||||
self.assertEqual(kis["status"], "OK")
|
||||
self.assertEqual(naver["status"], "OK")
|
||||
self.assertEqual(source_priority[0], "kis_open_api")
|
||||
self.assertIn("naver_finance", source_priority)
|
||||
finally:
|
||||
kdc._normalize_kis_fields = original_kis
|
||||
kdc._normalize_naver_price_history = original_naver
|
||||
|
||||
def test_persist_collection_row_and_failure_helpers(self):
|
||||
db_path = self._tmp_root / "collector.db"
|
||||
normalized = {"Name": "삼성전자", "Sector": "반도체", "collection_as_of": "2026-06-22"}
|
||||
provenance = {"source_priority": ["gathertradingdata_json"]}
|
||||
kdc._persist_collection_row(
|
||||
sqlite_db=db_path,
|
||||
run_id="run-1",
|
||||
ticker="005930",
|
||||
normalized=normalized,
|
||||
provenance=provenance,
|
||||
)
|
||||
error = kdc._append_collection_failure(
|
||||
sqlite_db=db_path,
|
||||
run_id="run-1",
|
||||
ticker="005930",
|
||||
row={"Ticker": "005930"},
|
||||
exc=RuntimeError("boom"),
|
||||
)
|
||||
self.assertEqual(error["ticker"], "005930")
|
||||
self.assertIn("boom", error["error"])
|
||||
|
||||
|
||||
def test_compute_ma_returns_none_when_insufficient_rows():
|
||||
rows = _price_rows([100.0, 101.0, 102.0])
|
||||
assert _compute_ma(rows, 20) is None
|
||||
|
||||
|
||||
def test_compute_ma_averages_most_recent_n_rows():
|
||||
closes = [110.0] * 5 + [100.0] * 15
|
||||
rows = _price_rows(closes)
|
||||
# 최근 5거래일 평균 = 110, 20거래일 평균 = (5*110 + 15*100)/20 = 102.5
|
||||
assert _compute_ma(rows, 5) == 110.0
|
||||
assert _compute_ma(rows, 20) == 102.5
|
||||
|
||||
|
||||
def test_compute_ret_pct_against_n_days_ago_close():
|
||||
closes = [110.0, 109, 108, 107, 106, 100.0]
|
||||
rows = _price_rows(closes)
|
||||
# 최신(110) vs 5거래일전(100) → (110/100 - 1) * 100 = 10%
|
||||
assert _compute_ret_pct(rows, 5) == 10.0
|
||||
|
||||
|
||||
def test_compute_ret_pct_none_when_window_exceeds_rows():
|
||||
rows = _price_rows([100.0, 99.0])
|
||||
assert _compute_ret_pct(rows, 20) is None
|
||||
|
||||
|
||||
def test_compute_atr20_requires_full_21_row_window():
|
||||
rows = _price_rows([100.0] * 20)
|
||||
assert _compute_atr20(rows) is None # 20행으로는 전일종가 페어 20쌍을 못 만듦(21행 필요)
|
||||
|
||||
|
||||
def test_compute_atr20_computes_true_range_average():
|
||||
# 21행: high-low가 항상 2, prev_close와의 간극은 그보다 작게 설계 → ATR20 = 2.0
|
||||
closes = [100.0 + i * 0.1 for i in range(21)]
|
||||
highs = [c + 1.0 for c in closes]
|
||||
lows = [c - 1.0 for c in closes]
|
||||
rows = _price_rows(closes, highs, lows)
|
||||
atr = _compute_atr20(rows)
|
||||
assert atr is not None
|
||||
assert abs(atr - 2.0) < 0.5
|
||||
|
||||
|
||||
def test_aggregate_flow_sums_recent_window():
|
||||
rows = [{"frgn_net": 100, "inst_net": -50}] * 5 + [{"frgn_net": 1000, "inst_net": 1000}] * 15
|
||||
frg5, inst5 = _aggregate_flow(rows, 5)
|
||||
assert frg5 == 500
|
||||
assert inst5 == -250
|
||||
|
||||
|
||||
def test_aggregate_flow_none_when_window_exceeds_rows():
|
||||
rows = [{"frgn_net": 10, "inst_net": 10}] * 3
|
||||
frg, inst = _aggregate_flow(rows, 20)
|
||||
assert frg is None
|
||||
assert inst is None
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -2,14 +2,8 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import base64
|
||||
import subprocess
|
||||
import time
|
||||
import socket
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from urllib import error, request
|
||||
|
||||
import pytest
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
@@ -18,271 +12,174 @@ if str(ROOT) not in sys.path:
|
||||
import tools.validate_snapshot_admin_web_v1 as validator
|
||||
from src.quant_engine.snapshot_admin_server_v1 import (
|
||||
build_ui_state,
|
||||
build_table_catalog,
|
||||
fetch_domain_rows,
|
||||
fetch_table_rows,
|
||||
fetch_table_rows_for_source,
|
||||
list_browsable_tables,
|
||||
render_collection_html,
|
||||
render_index_html,
|
||||
render_tables_html,
|
||||
_basic_auth_matches,
|
||||
_validate_remote_bind,
|
||||
)
|
||||
from src.quant_engine.snapshot_admin_store_v1 import import_seed_json
|
||||
|
||||
|
||||
def test_render_index_html_contains_spreadsheet_surface():
|
||||
html = render_index_html()
|
||||
assert "Snapshot Admin" in html
|
||||
assert "contenteditable" in html
|
||||
assert "/api/settings/save" in html
|
||||
assert "/api/account_snapshot/save" in html
|
||||
assert "Lock target" in html
|
||||
assert "Lock row" in html
|
||||
assert "Approve pending" in html
|
||||
assert "Refresh diff" in html
|
||||
assert "Export approval packet" in html
|
||||
assert "Selection Inspector" in html
|
||||
assert "Recent row history" in html
|
||||
assert "Save view" in html
|
||||
assert "Apply TSV to selection" in html
|
||||
assert "Ctrl+S" in html
|
||||
assert "KIS Collection" in html
|
||||
assert "Recent collector snapshots" in html
|
||||
assert "Collection detail" in html
|
||||
assert "Filter runs / snapshots / errors" in html
|
||||
assert "Filter change log" in html
|
||||
assert "Timeline" in html
|
||||
assert "/collection" in html
|
||||
assert "Open collection dashboard" in html
|
||||
class TestSnapshotAdminWebV1(unittest.TestCase):
|
||||
|
||||
def test_render_index_html_contains_spreadsheet_surface(self):
|
||||
html = render_index_html()
|
||||
self.assertIn("Snapshot Admin", html)
|
||||
self.assertIn("contenteditable", html)
|
||||
self.assertIn("/api/settings/save", html)
|
||||
self.assertIn("/api/account_snapshot/save", html)
|
||||
self.assertIn("Lock target", html)
|
||||
self.assertIn("Lock row", html)
|
||||
self.assertIn("Approve pending", html)
|
||||
self.assertIn("Refresh diff", html)
|
||||
self.assertIn("Export approval packet", html)
|
||||
self.assertIn("Selection Inspector", html)
|
||||
self.assertIn("Recent row history", html)
|
||||
self.assertIn("Save view", html)
|
||||
self.assertIn("Apply TSV to selection", html)
|
||||
self.assertIn("Ctrl+S", html)
|
||||
self.assertIn("KIS Collection", html)
|
||||
self.assertIn("Recent collector snapshots", html)
|
||||
self.assertIn("Collection detail", html)
|
||||
self.assertIn("Filter runs / snapshots / errors", html)
|
||||
self.assertIn("Filter change log", html)
|
||||
self.assertIn("Timeline", html)
|
||||
self.assertIn("/collection", html)
|
||||
self.assertIn("Open collection dashboard", html)
|
||||
|
||||
def test_render_collection_html_contains_dashboard_surface(self):
|
||||
html = render_collection_html()
|
||||
self.assertIn("KIS Collection Dashboard", html)
|
||||
self.assertIn("/api/state", html)
|
||||
self.assertIn("Download raw JSON", html)
|
||||
self.assertIn("Download CSV", html)
|
||||
self.assertIn("Filter runs / snapshots / errors", html)
|
||||
self.assertIn("Ticker quick search", html)
|
||||
self.assertIn("Date quick search", html)
|
||||
|
||||
def test_build_ui_state_exposes_expected_columns(self):
|
||||
import tempfile
|
||||
import shutil
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
db_path = Path(tmp_dir) / "snapshot_admin.db"
|
||||
seed_path = ROOT / "GatherTradingData.json"
|
||||
import_seed_json(db_path, seed_path)
|
||||
|
||||
state = build_ui_state(db_path)
|
||||
self.assertTrue(state["summary"]["settings_rows"] > 0)
|
||||
self.assertTrue(state["summary"]["account_snapshot_rows"] > 0)
|
||||
self.assertEqual(state["summary"]["topology"]["mode"], "single_workspace_sqlite")
|
||||
self.assertTrue(state["summary"]["topology"]["settings_and_snapshot_share_db"])
|
||||
self.assertTrue(state["summary"]["topology"]["collector_separate_db"])
|
||||
self.assertEqual(state["account_snapshot_columns"][0], "captured_at")
|
||||
self.assertIn("settings", state["validation"])
|
||||
self.assertTrue(state["version"]["app"])
|
||||
self.assertIn("fingerprint", state["version"]["source"])
|
||||
self.assertIn("collection", state)
|
||||
self.assertIn("counts", state["collection"])
|
||||
self.assertIn("latest_report", state["collection"])
|
||||
self.assertEqual(state["summary"]["topology"]["mode"], "single_workspace_sqlite")
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
def test_snapshot_admin_workflow_and_script_exist(self):
|
||||
workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml"
|
||||
package = json.loads((ROOT / "package.json").read_text(encoding="utf-8"))
|
||||
self.assertTrue(workflow.exists())
|
||||
self.assertIn("--reload", package["scripts"]["ops:snapshot-web"])
|
||||
self.assertIn("ops:snapshot-validate", package["scripts"])
|
||||
self.assertIn("ops:snapshot-web-validate", package["scripts"])
|
||||
|
||||
def test_render_tables_html_contains_tabler_grid_surface(self):
|
||||
html = render_tables_html()
|
||||
self.assertIn("tabler", html.lower())
|
||||
self.assertIn("tableSelect", html)
|
||||
self.assertIn("/api/tables", html)
|
||||
self.assertIn("/api/table_rows", html)
|
||||
self.assertIn("/api/domain_rows", html)
|
||||
self.assertIn("saveCurrentTable", html)
|
||||
self.assertIn("gridTable", html)
|
||||
|
||||
def test_list_browsable_tables_covers_all_three_databases(self):
|
||||
import tempfile
|
||||
import shutil
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
db_path = Path(tmp_dir) / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
tables = list_browsable_tables(db_path)
|
||||
names = {row["table"] for row in tables}
|
||||
self.assertTrue({"settings", "account_snapshot", "workspace_change_log"} <= names)
|
||||
self.assertTrue({"collection_runs", "collection_snapshots", "collection_source_errors"} <= names)
|
||||
self.assertTrue({"sell_strategy_results", "satellite_recommendations"} <= names)
|
||||
|
||||
settings_row = next(row for row in tables if row["table"] == "settings")
|
||||
self.assertTrue(settings_row["exists"])
|
||||
self.assertTrue(settings_row["row_count"] > 0)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
def test_fetch_table_rows_paginates_and_rejects_unknown_table(self):
|
||||
import tempfile
|
||||
import shutil
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
db_path = Path(tmp_dir) / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
page1 = fetch_table_rows("settings", db_path, limit=2, offset=0)
|
||||
self.assertTrue(page1["columns"])
|
||||
self.assertEqual(len(page1["rows"]), 2)
|
||||
self.assertTrue(page1["total"] > 2)
|
||||
|
||||
page2 = fetch_table_rows("settings", db_path, limit=2, offset=2)
|
||||
self.assertNotEqual(page1["rows"], page2["rows"])
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
fetch_table_rows("settings; DROP TABLE settings;--", db_path)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
def test_fetch_domain_rows_exposes_editable_tables(self):
|
||||
import tempfile
|
||||
import shutil
|
||||
tmp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
db_path = Path(tmp_dir) / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
settings = fetch_domain_rows("settings", db_path)
|
||||
snapshot = fetch_domain_rows("account_snapshot", db_path)
|
||||
self.assertEqual(settings["domain"], "settings")
|
||||
self.assertTrue(settings["rows"])
|
||||
self.assertEqual(snapshot["domain"], "account_snapshot")
|
||||
self.assertTrue(snapshot["rows"])
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
fetch_domain_rows("workspace_change_log", db_path)
|
||||
finally:
|
||||
shutil.rmtree(tmp_dir, ignore_errors=True)
|
||||
|
||||
|
||||
def test_render_collection_html_contains_dashboard_surface():
|
||||
html = render_collection_html()
|
||||
assert "KIS Collection Dashboard" in html
|
||||
assert "/api/state" in html
|
||||
assert "Download raw JSON" in html
|
||||
assert "Download CSV" in html
|
||||
assert "Filter runs / snapshots / errors" in html
|
||||
assert "Ticker quick search" in html
|
||||
assert "Date quick search" in html
|
||||
def test_snapshot_admin_web_validation_script_passes(self):
|
||||
out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
|
||||
if out.exists():
|
||||
out.unlink()
|
||||
|
||||
rc = validator.main()
|
||||
payload = json.loads(out.read_text(encoding="utf-8"))
|
||||
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(payload["gate"], "PASS")
|
||||
self.assertEqual(payload["formula_id"], "SNAPSHOT_ADMIN_WEB_VALIDATION_V1")
|
||||
self.assertTrue(payload["settings_rows"] > 0)
|
||||
self.assertTrue(payload["account_snapshot_rows"] > 0)
|
||||
|
||||
|
||||
def test_build_ui_state_exposes_expected_columns(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
seed_path = ROOT / "GatherTradingData.json"
|
||||
import_seed_json(db_path, seed_path)
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
state = build_ui_state(db_path)
|
||||
assert state["summary"]["settings_rows"] > 0
|
||||
assert state["summary"]["account_snapshot_rows"] > 0
|
||||
assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite"
|
||||
assert state["summary"]["topology"]["settings_and_snapshot_share_db"] is True
|
||||
assert state["summary"]["topology"]["collector_separate_db"] is True
|
||||
assert state["account_snapshot_columns"][0] == "captured_at"
|
||||
assert "settings" in state["validation"]
|
||||
assert state["version"]["app"]
|
||||
assert "fingerprint" in state["version"]["source"]
|
||||
assert "collection" in state
|
||||
assert "counts" in state["collection"]
|
||||
assert "latest_report" in state["collection"]
|
||||
assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite"
|
||||
|
||||
|
||||
def test_snapshot_admin_workflow_and_script_exist():
|
||||
workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml"
|
||||
package = json.loads((ROOT / "package.json").read_text(encoding="utf-8"))
|
||||
assert workflow.exists()
|
||||
assert "--reload" in package["scripts"]["ops:snapshot-web"]
|
||||
assert "ops:snapshot-validate" in package["scripts"]
|
||||
assert "ops:snapshot-web-validate" in package["scripts"]
|
||||
|
||||
|
||||
def test_render_tables_html_contains_tabler_grid_surface():
|
||||
html = render_tables_html()
|
||||
assert "tabler" in html.lower()
|
||||
assert "Workbook migration inventory" in html
|
||||
assert "sqliteTableSelect" in html
|
||||
assert "jsonTableSelect" in html
|
||||
assert "/api/tables" in html
|
||||
assert "/api/table_rows" in html
|
||||
assert "sqliteGridTable" in html
|
||||
assert "jsonGridTable" in html
|
||||
|
||||
|
||||
def test_list_browsable_tables_covers_all_three_databases(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
tables = list_browsable_tables(db_path)
|
||||
names = {row["table"] for row in tables}
|
||||
assert {"settings", "account_snapshot", "workspace_change_log"} <= names
|
||||
assert {"collection_runs", "collection_snapshots", "collection_source_errors"} <= names
|
||||
assert {"sell_strategy_results", "satellite_recommendations"} <= names
|
||||
|
||||
settings_row = next(row for row in tables if row["table"] == "settings")
|
||||
assert settings_row["exists"] is True
|
||||
assert settings_row["row_count"] > 0
|
||||
|
||||
|
||||
def test_build_table_catalog_uses_workbook_inventory(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
catalog = build_table_catalog(db_path)
|
||||
assert {"sqlite", "json", "workbook"} <= set(catalog)
|
||||
assert len(catalog["workbook"]) == 20
|
||||
|
||||
workbook = {row["sheet"]: row for row in catalog["workbook"]}
|
||||
assert workbook["settings"]["current_sources"] == ["sqlite"]
|
||||
assert workbook["account_snapshot"]["current_sources"] == ["sqlite", "json"]
|
||||
assert workbook["harness_context"]["current_sources"] == ["xlsx"]
|
||||
assert workbook["harness_context"]["migration_candidate"] == "yes"
|
||||
|
||||
|
||||
def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
page1 = fetch_table_rows("settings", db_path, limit=2, offset=0)
|
||||
assert page1["columns"]
|
||||
assert len(page1["rows"]) == 2
|
||||
assert page1["total"] > 2
|
||||
|
||||
page2 = fetch_table_rows("settings", db_path, limit=2, offset=2)
|
||||
assert page1["rows"] != page2["rows"]
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
fetch_table_rows("settings; DROP TABLE settings;--", db_path)
|
||||
|
||||
|
||||
def test_list_browsable_tables_includes_json_factor_sheets(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
tables = list_browsable_tables(db_path)
|
||||
json_rows = {row["table"]: row for row in tables if row["source"] == "json"}
|
||||
assert "data_feed" in json_rows
|
||||
assert "sector_flow" in json_rows
|
||||
assert json_rows["data_feed"]["row_count"] > 0
|
||||
|
||||
sqlite_rows = [row for row in tables if row["source"] == "sqlite"]
|
||||
assert sqlite_rows, "sqlite tables must still be listed alongside json sheets"
|
||||
|
||||
|
||||
def test_fetch_table_rows_reads_json_factor_sheet(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
result = fetch_table_rows_for_source("json", "data_feed", db_path, limit=5, offset=0)
|
||||
assert result["source"] == "json"
|
||||
assert "Ticker" in result["columns"]
|
||||
assert len(result["rows"]) <= 5
|
||||
assert result["total"] > 0
|
||||
|
||||
|
||||
def test_fetch_table_rows_can_still_read_sqlite_tables(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
result = fetch_table_rows_for_source("sqlite", "settings", db_path, limit=5, offset=0)
|
||||
assert result["source"] == "sqlite"
|
||||
assert "key" in result["columns"]
|
||||
assert len(result["rows"]) <= 5
|
||||
|
||||
|
||||
def test_auth_helpers_reject_remote_bind_without_credentials():
|
||||
assert _basic_auth_matches("Basic dXNlcjpwYXNz", "user", "pass") is True
|
||||
assert _basic_auth_matches("Basic dXNlcjp3cm9uZw==", "user", "pass") is False
|
||||
assert _basic_auth_matches("Bearer token", "user", "pass") is False
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
_validate_remote_bind("0.0.0.0", False, "", "")
|
||||
with pytest.raises(ValueError):
|
||||
_validate_remote_bind("0.0.0.0", True, "", "")
|
||||
_validate_remote_bind("0.0.0.0", True, "admin", "secret")
|
||||
_validate_remote_bind("127.0.0.1", False, "", "")
|
||||
|
||||
|
||||
def test_snapshot_admin_requires_basic_auth_when_configured(tmp_path):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
port = int(sock.getsockname()[1])
|
||||
db_path = tmp_path / "snapshot_admin_auth.db"
|
||||
seed_path = ROOT / "GatherTradingData.json"
|
||||
server_cmd = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"),
|
||||
"--host",
|
||||
"127.0.0.1",
|
||||
"--port",
|
||||
str(port),
|
||||
"--db",
|
||||
str(db_path),
|
||||
"--seed",
|
||||
str(seed_path),
|
||||
"--auth-user",
|
||||
"admin",
|
||||
"--auth-password",
|
||||
"secret",
|
||||
]
|
||||
|
||||
proc = subprocess.Popen(
|
||||
server_cmd,
|
||||
cwd=ROOT,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
)
|
||||
try:
|
||||
deadline = time.time() + 15
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
probe = request.urlopen(request.Request(f"http://127.0.0.1:{port}/api/state"), timeout=1)
|
||||
except error.HTTPError as exc:
|
||||
if exc.code == 401:
|
||||
break
|
||||
except Exception:
|
||||
time.sleep(0.25)
|
||||
else:
|
||||
probe.close()
|
||||
break
|
||||
url = f"http://127.0.0.1:{port}/api/state"
|
||||
|
||||
req = request.Request(url)
|
||||
with pytest.raises(error.HTTPError) as unauthorized:
|
||||
request.urlopen(req, timeout=5)
|
||||
assert unauthorized.value.code == 401
|
||||
|
||||
token = base64.b64encode(b"admin:secret").decode("ascii")
|
||||
req_auth = request.Request(url, headers={"Authorization": f"Basic {token}"})
|
||||
with request.urlopen(req_auth, timeout=5) as resp:
|
||||
payload = json.loads(resp.read().decode("utf-8"))
|
||||
assert payload["version"]["app"]
|
||||
finally:
|
||||
if proc.poll() is None:
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
if proc.stdout is not None:
|
||||
proc.stdout.close()
|
||||
|
||||
|
||||
def test_snapshot_admin_web_validation_script_passes():
|
||||
out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
|
||||
if out.exists():
|
||||
out.unlink()
|
||||
|
||||
rc = validator.main()
|
||||
payload = json.loads(out.read_text(encoding="utf-8"))
|
||||
|
||||
assert rc == 0
|
||||
assert payload["gate"] == "PASS"
|
||||
assert payload["formula_id"] == "SNAPSHOT_ADMIN_WEB_VALIDATION_V1"
|
||||
assert payload["settings_rows"] > 0
|
||||
assert payload["account_snapshot_rows"] > 0
|
||||
|
||||
Reference in New Issue
Block a user