snapshot admin workbook inventory
This commit is contained in:
@@ -2,7 +2,14 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
import base64
|
||||
import subprocess
|
||||
import time
|
||||
import socket
|
||||
from pathlib import Path
|
||||
from urllib import error, request
|
||||
|
||||
import pytest
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
@@ -11,11 +18,15 @@ if str(ROOT) not in sys.path:
|
||||
import tools.validate_snapshot_admin_web_v1 as validator
|
||||
from src.quant_engine.snapshot_admin_server_v1 import (
|
||||
build_ui_state,
|
||||
build_table_catalog,
|
||||
fetch_table_rows,
|
||||
fetch_table_rows_for_source,
|
||||
list_browsable_tables,
|
||||
render_collection_html,
|
||||
render_index_html,
|
||||
render_tables_html,
|
||||
_basic_auth_matches,
|
||||
_validate_remote_bind,
|
||||
)
|
||||
from src.quant_engine.snapshot_admin_store_v1 import import_seed_json
|
||||
|
||||
@@ -90,10 +101,13 @@ def test_snapshot_admin_workflow_and_script_exist():
|
||||
def test_render_tables_html_contains_tabler_grid_surface():
|
||||
html = render_tables_html()
|
||||
assert "tabler" in html.lower()
|
||||
assert "tableSelect" in html
|
||||
assert "Workbook migration inventory" in html
|
||||
assert "sqliteTableSelect" in html
|
||||
assert "jsonTableSelect" in html
|
||||
assert "/api/tables" in html
|
||||
assert "/api/table_rows" in html
|
||||
assert "gridTable" in html
|
||||
assert "sqliteGridTable" in html
|
||||
assert "jsonGridTable" in html
|
||||
|
||||
|
||||
def test_list_browsable_tables_covers_all_three_databases(tmp_path):
|
||||
@@ -111,6 +125,21 @@ def test_list_browsable_tables_covers_all_three_databases(tmp_path):
|
||||
assert settings_row["row_count"] > 0
|
||||
|
||||
|
||||
def test_build_table_catalog_uses_workbook_inventory(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
catalog = build_table_catalog(db_path)
|
||||
assert {"sqlite", "json", "workbook"} <= set(catalog)
|
||||
assert len(catalog["workbook"]) == 20
|
||||
|
||||
workbook = {row["sheet"]: row for row in catalog["workbook"]}
|
||||
assert workbook["settings"]["current_sources"] == ["sqlite"]
|
||||
assert workbook["account_snapshot"]["current_sources"] == ["sqlite", "json"]
|
||||
assert workbook["harness_context"]["current_sources"] == ["xlsx"]
|
||||
assert workbook["harness_context"]["migration_candidate"] == "yes"
|
||||
|
||||
|
||||
def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
@@ -123,12 +152,127 @@ def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path):
|
||||
page2 = fetch_table_rows("settings", db_path, limit=2, offset=2)
|
||||
assert page1["rows"] != page2["rows"]
|
||||
|
||||
import pytest
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
fetch_table_rows("settings; DROP TABLE settings;--", db_path)
|
||||
|
||||
|
||||
def test_list_browsable_tables_includes_json_factor_sheets(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
tables = list_browsable_tables(db_path)
|
||||
json_rows = {row["table"]: row for row in tables if row["source"] == "json"}
|
||||
assert "data_feed" in json_rows
|
||||
assert "sector_flow" in json_rows
|
||||
assert json_rows["data_feed"]["row_count"] > 0
|
||||
|
||||
sqlite_rows = [row for row in tables if row["source"] == "sqlite"]
|
||||
assert sqlite_rows, "sqlite tables must still be listed alongside json sheets"
|
||||
|
||||
|
||||
def test_fetch_table_rows_reads_json_factor_sheet(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
result = fetch_table_rows_for_source("json", "data_feed", db_path, limit=5, offset=0)
|
||||
assert result["source"] == "json"
|
||||
assert "Ticker" in result["columns"]
|
||||
assert len(result["rows"]) <= 5
|
||||
assert result["total"] > 0
|
||||
|
||||
|
||||
def test_fetch_table_rows_can_still_read_sqlite_tables(tmp_path):
|
||||
db_path = tmp_path / "snapshot_admin.db"
|
||||
import_seed_json(db_path, ROOT / "GatherTradingData.json")
|
||||
|
||||
result = fetch_table_rows_for_source("sqlite", "settings", db_path, limit=5, offset=0)
|
||||
assert result["source"] == "sqlite"
|
||||
assert "key" in result["columns"]
|
||||
assert len(result["rows"]) <= 5
|
||||
|
||||
|
||||
def test_auth_helpers_reject_remote_bind_without_credentials():
|
||||
assert _basic_auth_matches("Basic dXNlcjpwYXNz", "user", "pass") is True
|
||||
assert _basic_auth_matches("Basic dXNlcjp3cm9uZw==", "user", "pass") is False
|
||||
assert _basic_auth_matches("Bearer token", "user", "pass") is False
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
_validate_remote_bind("0.0.0.0", False, "", "")
|
||||
with pytest.raises(ValueError):
|
||||
_validate_remote_bind("0.0.0.0", True, "", "")
|
||||
_validate_remote_bind("0.0.0.0", True, "admin", "secret")
|
||||
_validate_remote_bind("127.0.0.1", False, "", "")
|
||||
|
||||
|
||||
def test_snapshot_admin_requires_basic_auth_when_configured(tmp_path):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.bind(("127.0.0.1", 0))
|
||||
port = int(sock.getsockname()[1])
|
||||
db_path = tmp_path / "snapshot_admin_auth.db"
|
||||
seed_path = ROOT / "GatherTradingData.json"
|
||||
server_cmd = [
|
||||
sys.executable,
|
||||
"-u",
|
||||
str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"),
|
||||
"--host",
|
||||
"127.0.0.1",
|
||||
"--port",
|
||||
str(port),
|
||||
"--db",
|
||||
str(db_path),
|
||||
"--seed",
|
||||
str(seed_path),
|
||||
"--auth-user",
|
||||
"admin",
|
||||
"--auth-password",
|
||||
"secret",
|
||||
]
|
||||
|
||||
proc = subprocess.Popen(
|
||||
server_cmd,
|
||||
cwd=ROOT,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
)
|
||||
try:
|
||||
deadline = time.time() + 15
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
probe = request.urlopen(request.Request(f"http://127.0.0.1:{port}/api/state"), timeout=1)
|
||||
except error.HTTPError as exc:
|
||||
if exc.code == 401:
|
||||
break
|
||||
except Exception:
|
||||
time.sleep(0.25)
|
||||
else:
|
||||
probe.close()
|
||||
break
|
||||
url = f"http://127.0.0.1:{port}/api/state"
|
||||
|
||||
req = request.Request(url)
|
||||
with pytest.raises(error.HTTPError) as unauthorized:
|
||||
request.urlopen(req, timeout=5)
|
||||
assert unauthorized.value.code == 401
|
||||
|
||||
token = base64.b64encode(b"admin:secret").decode("ascii")
|
||||
req_auth = request.Request(url, headers={"Authorization": f"Basic {token}"})
|
||||
with request.urlopen(req_auth, timeout=5) as resp:
|
||||
payload = json.loads(resp.read().decode("utf-8"))
|
||||
assert payload["version"]["app"]
|
||||
finally:
|
||||
if proc.poll() is None:
|
||||
proc.terminate()
|
||||
try:
|
||||
proc.wait(timeout=5)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.kill()
|
||||
proc.wait(timeout=5)
|
||||
if proc.stdout is not None:
|
||||
proc.stdout.close()
|
||||
|
||||
|
||||
def test_snapshot_admin_web_validation_script_passes():
|
||||
out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
|
||||
if out.exists():
|
||||
|
||||
Reference in New Issue
Block a user