From 4266039d1c8ddfd7955230c2a9962b8a34341220 Mon Sep 17 00:00:00 2001 From: kjh2064 Date: Mon, 22 Jun 2026 02:43:58 +0900 Subject: [PATCH] snapshot admin workbook inventory --- spec/14_raw_workbook_mapping.yaml | 4 +- src/quant_engine/snapshot_admin_server_v1.py | 530 +++++++++++++++---- tests/unit/test_snapshot_admin_web_v1.py | 152 +++++- tools/validate_snapshot_admin_web_v1.py | 10 + 4 files changed, 576 insertions(+), 120 deletions(-) diff --git a/spec/14_raw_workbook_mapping.yaml b/spec/14_raw_workbook_mapping.yaml index eff0ae6..4cd290c 100644 --- a/spec/14_raw_workbook_mapping.yaml +++ b/spec/14_raw_workbook_mapping.yaml @@ -8,6 +8,8 @@ meta: purpose: > 제공 raw JSON의 data. 배열과 컬럼을 canonical field로 매핑한다. xlsx는 JSON 재생성 소스이며 일반 LLM 분석에서는 직접 파싱하지 않는다. + Snapshot Admin의 workbook inventory와 migration classification은 + GatherTradingData.xlsx를 직접 읽어서 계산한다. 이 파일은 시장/종목/섹터/매크로 데이터만 담당하며 계좌·보유·현금 데이터는 spec/15_account_snapshot_contract.yaml이 담당한다. @@ -438,7 +440,7 @@ raw_workbook: sheet_diet_policy: keep: canonical_required: ["data_feed", "sector_flow", "macro", "event_risk", "core_satellite"] - support: ["settings", "account_snapshot", "sector_universe", "sector_flow_history", "etf_nav_manual", "universe", "monthly_history", "performance", "backdata_feature_bank", "event_calendar"] + support: ["settings", "account_snapshot", "sector_universe", "sector_flow_history", "etf_nav_manual", "universe", "monthly_history", "performance", "backdata_feature_bank", "event_calendar", "daily_history", "pa1_feedback", "alpha_history", "evaluation_dashboard", "trade_quality_history", "rebalance"] deprecated: ["positions", "chat_input", "etf_raw", "core_satellite_status", "orbit_gap", "asset_history"] delete: transient_after_complete: ["cs_chunk_N"] diff --git a/src/quant_engine/snapshot_admin_server_v1.py b/src/quant_engine/snapshot_admin_server_v1.py index a9f2663..2e67697 100644 --- a/src/quant_engine/snapshot_admin_server_v1.py +++ b/src/quant_engine/snapshot_admin_server_v1.py @@ -1,7 +1,9 @@ from __future__ import annotations import argparse +import base64 import json +import os import sqlite3 import subprocess from http import HTTPStatus @@ -11,80 +13,176 @@ from hashlib import sha256 from typing import Any from urllib.parse import urlparse, parse_qs +import openpyxl + ROOT = Path(__file__).resolve().parents[2] -SNAPSHOT_ADMIN_VERSION = "snapshot-admin-web-v6" +SNAPSHOT_ADMIN_VERSION = "snapshot-admin-web-v7" +GATHER_TRADING_DATA_XLSX = ROOT / "GatherTradingData.xlsx" KIS_COLLECTION_DB = ROOT / "outputs" / "kis_data_collection" / "kis_data_collection.db" KIS_COLLECTION_REPORT = ROOT / "Temp" / "kis_data_collection_v1.json" QUALITATIVE_SELL_DB = ROOT / "outputs" / "qualitative_sell_strategy" / "qualitative_sell_strategy.db" +GATHER_TRADING_DATA_JSON = ROOT / "GatherTradingData.json" +AUTH_REALM = "Snapshot Admin" +JSON_SHEET_ALIASES = { + "harness_context": "_harness_context", +} -# WBS-7.9 부속 — 테이블별 그리드 조회(Tabler). 화이트리스트에 없는 테이블명은 -# SQL에 절대 보간되지 않는다(요청 테이블명을 그대로 SELECT 문에 넣지 않고 -# 아래 레지스트리 키와 정확히 일치할 때만 허용). -WORKSPACE_BROWSABLE_TABLES = ( - "settings", - "account_snapshot", - "workspace_change_log", - "workspace_approval_v2", - "workspace_lock", - "workspace_meta", -) -COLLECTION_BROWSABLE_TABLES = ( - "collection_runs", - "collection_snapshots", - "collection_source_errors", -) -QUALITATIVE_SELL_BROWSABLE_TABLES = ( - "sell_strategy_results", - "satellite_recommendations", -) +# WBS-7.9 부속, WBS-7.10 후속(2026-06-22) — 테이블별 그리드 조회(Tabler). +# 정적 화이트리스트 대신 각 DB 파일의 sqlite_master를 그때그때 조회해 테이블 +# 목록을 만든다 — 정적 목록은 스키마가 바뀌거나(예: 레거시 workspace_approval +# 테이블처럼) 새 테이블이 추가되면 누락되는 문제가 있었다(사용자 보고로 발견). +# 보안 속성은 동일하게 유지된다: 요청된 테이블명은 항상 해당 DB의 실제 +# sqlite_master 결과와 정확히 일치할 때만 SQL에 사용된다(임의 문자열 보간 없음). +def _known_db_paths(workspace_db_path: Path) -> list[Path]: + return [Path(workspace_db_path), KIS_COLLECTION_DB, QUALITATIVE_SELL_DB] + + +def _discover_tables(db_path: Path) -> list[str]: + if not db_path.exists(): + return [] + with sqlite3.connect(db_path) as conn: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name" + ).fetchall() + return [row[0] for row in rows] def _resolve_table_db(table: str, workspace_db_path: Path) -> Path | None: - if table in WORKSPACE_BROWSABLE_TABLES: - return Path(workspace_db_path) - if table in COLLECTION_BROWSABLE_TABLES: - return KIS_COLLECTION_DB - if table in QUALITATIVE_SELL_BROWSABLE_TABLES: - return QUALITATIVE_SELL_DB + for db_path in _known_db_paths(workspace_db_path): + if table in _discover_tables(db_path): + return db_path return None -def list_browsable_tables(workspace_db_path: Path) -> list[dict[str, Any]]: - tables: list[dict[str, Any]] = [] - for table in ( - *WORKSPACE_BROWSABLE_TABLES, - *COLLECTION_BROWSABLE_TABLES, - *QUALITATIVE_SELL_BROWSABLE_TABLES, - ): - db_path = _resolve_table_db(table, workspace_db_path) - exists = bool(db_path and db_path.exists()) - row_count = 0 - if exists: +# 2026-06-22 — 분석/판단 팩터로 쓰이는 GatherTradingData.json의 data.* 시트도 +# 같은 그리드로 조회 가능하게 한다(SQLite로 옮겨지지 않은 data_feed/sector_flow/ +# macro 등). dict 키 조회만 하므로 SQL 인젝션 표면 자체가 없다. +def _discover_json_sheets() -> dict[str, list[dict[str, Any]]]: + if not GATHER_TRADING_DATA_JSON.exists(): + return {} + try: + payload = json.loads(GATHER_TRADING_DATA_JSON.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + data = payload.get("data") + if not isinstance(data, dict): + return {} + return {key: value for key, value in data.items() if isinstance(value, list) and value and isinstance(value[0], dict)} + + +def _discover_workbook_sheets() -> list[dict[str, Any]]: + if not GATHER_TRADING_DATA_XLSX.exists(): + return [] + try: + workbook = openpyxl.load_workbook(GATHER_TRADING_DATA_XLSX, read_only=True, data_only=True) + except Exception: + return [] + try: + inventory: list[dict[str, Any]] = [] + for sheet_name in workbook.sheetnames: + worksheet = workbook[sheet_name] + inventory.append( + { + "sheet": sheet_name, + "row_count": int(worksheet.max_row or 0), + "column_count": int(worksheet.max_column or 0), + "source_workbook": str(GATHER_TRADING_DATA_XLSX), + } + ) + return inventory + finally: + workbook.close() + + +def build_table_catalog(workspace_db_path: Path) -> dict[str, list[dict[str, Any]]]: + sqlite_rows: list[dict[str, Any]] = [] + for db_path in _known_db_paths(workspace_db_path): + for table in _discover_tables(db_path): try: with sqlite3.connect(db_path) as conn: - row_count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - table is whitelist-checked above + row_count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - table name confirmed via sqlite_master of this exact db above except sqlite3.OperationalError: - exists = False - tables.append({"table": table, "db": str(db_path) if db_path else "", "exists": exists, "row_count": row_count}) - return tables + continue + sqlite_rows.append({"table": table, "db": str(db_path), "exists": True, "row_count": row_count, "source": "sqlite"}) + + json_rows = [{"table": sheet, "db": str(GATHER_TRADING_DATA_JSON), "exists": True, "row_count": len(rows), "source": "json"} for sheet, rows in _discover_json_sheets().items()] + + sqlite_names = {row["table"] for row in sqlite_rows} + json_names = {row["table"] for row in json_rows} + workbook_rows: list[dict[str, Any]] = [] + for sheet_row in _discover_workbook_sheets(): + sheet_name = sheet_row["sheet"] + json_key = JSON_SHEET_ALIASES.get(sheet_name, sheet_name) + current_sources: list[str] = [] + if sheet_name in sqlite_names: + current_sources.append("sqlite") + if sheet_name in json_names or json_key in json_names: + current_sources.append("json") + if not current_sources: + current_sources.append("xlsx") + workbook_rows.append( + { + **sheet_row, + "json_key": json_key, + "current_sources": current_sources, + "migration_candidate": "yes" if "sqlite" not in current_sources else "no", + } + ) + + return {"sqlite": sqlite_rows, "json": json_rows, "workbook": workbook_rows} + + +def list_browsable_tables(workspace_db_path: Path) -> list[dict[str, Any]]: + catalog = build_table_catalog(workspace_db_path) + return [*catalog["sqlite"], *catalog["json"]] def fetch_table_rows(table: str, workspace_db_path: Path, *, limit: int = 50, offset: int = 0) -> dict[str, Any]: db_path = _resolve_table_db(table, workspace_db_path) - if db_path is None: + if db_path is not None: + with sqlite3.connect(db_path) as conn: + conn.row_factory = sqlite3.Row + total = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - whitelisted table name + cursor = conn.execute( + f"SELECT * FROM {table} ORDER BY rowid DESC LIMIT ? OFFSET ?", # noqa: S608 - whitelisted table name + (limit, offset), + ) + rows = [dict(row) for row in cursor.fetchall()] + columns = [description[0] for description in cursor.description] if cursor.description else [] + return {"table": table, "db": str(db_path), "columns": columns, "rows": rows, "total": total, "limit": limit, "offset": offset, "source": "sqlite"} + + json_sheets = _discover_json_sheets() + if table not in json_sheets: raise ValueError(f"unknown or non-browsable table: {table}") - if not db_path.exists(): - return {"table": table, "db": str(db_path), "columns": [], "rows": [], "total": 0, "limit": limit, "offset": offset} - with sqlite3.connect(db_path) as conn: - conn.row_factory = sqlite3.Row - total = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - whitelisted table name - cursor = conn.execute( - f"SELECT * FROM {table} ORDER BY rowid DESC LIMIT ? OFFSET ?", # noqa: S608 - whitelisted table name - (limit, offset), - ) - rows = [dict(row) for row in cursor.fetchall()] - columns = [description[0] for description in cursor.description] if cursor.description else [] - return {"table": table, "db": str(db_path), "columns": columns, "rows": rows, "total": total, "limit": limit, "offset": offset} + sheet_rows = json_sheets[table] + total = len(sheet_rows) + page = sheet_rows[offset : offset + limit] + columns: list[str] = [] + for row in page: + for key in row.keys(): + if key not in columns: + columns.append(key) + return {"table": table, "db": str(GATHER_TRADING_DATA_JSON), "columns": columns, "rows": page, "total": total, "limit": limit, "offset": offset, "source": "json"} + + +def fetch_table_rows_for_source(source: str, table: str, workspace_db_path: Path, *, limit: int = 50, offset: int = 0) -> dict[str, Any]: + normalized_source = source.strip().lower() + if normalized_source == "sqlite": + return fetch_table_rows(table, workspace_db_path, limit=limit, offset=offset) + if normalized_source == "json": + json_sheets = _discover_json_sheets() + if table not in json_sheets: + raise ValueError(f"unknown or non-browsable table: {table}") + sheet_rows = json_sheets[table] + total = len(sheet_rows) + page = sheet_rows[offset : offset + limit] + columns: list[str] = [] + for row in page: + for key in row.keys(): + if key not in columns: + columns.append(key) + return {"table": table, "db": str(GATHER_TRADING_DATA_JSON), "columns": columns, "rows": page, "total": total, "limit": limit, "offset": offset, "source": "json"} + raise ValueError(f"unsupported source: {source}") SNAPSHOT_ADMIN_VERSION_FILES = ( ROOT / "src" / "quant_engine" / "snapshot_admin_server_v1.py", ROOT / "src" / "quant_engine" / "snapshot_admin_store_v1.py", @@ -324,6 +422,55 @@ def _text_response(handler: BaseHTTPRequestHandler, status: int, text: str, cont handler.wfile.write(body) +def _is_loopback_host(host: str) -> bool: + normalized = host.strip().lower() + return normalized in {"127.0.0.1", "localhost", "::1"} + + +def _parse_basic_auth(header_value: str | None) -> tuple[str, str] | None: + if not header_value: + return None + prefix = "basic " + if not header_value.lower().startswith(prefix): + return None + encoded = header_value[len(prefix) :].strip() + if not encoded: + return None + try: + decoded = base64.b64decode(encoded).decode("utf-8") + except (ValueError, UnicodeDecodeError): + return None + if ":" not in decoded: + return None + username, password = decoded.split(":", 1) + return username, password + + +def _basic_auth_matches(header_value: str | None, username: str, password: str) -> bool: + parsed = _parse_basic_auth(header_value) + return bool(parsed and parsed[0] == username and parsed[1] == password) + + +def _reject_unauthorized(handler: BaseHTTPRequestHandler) -> None: + body = json.dumps({"detail": "authentication required"}, ensure_ascii=False, indent=2).encode("utf-8") + handler.send_response(HTTPStatus.UNAUTHORIZED) + handler.send_header("WWW-Authenticate", f'Basic realm="{AUTH_REALM}", charset="UTF-8"') + handler.send_header("Content-Type", "application/json; charset=utf-8") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def _validate_remote_bind(host: str, allow_remote: bool, auth_user: str, auth_password: str) -> None: + has_auth = bool(auth_user and auth_password) + if bool(auth_user) != bool(auth_password): + raise ValueError("snapshot admin auth requires both --auth-user and --auth-password") + if not _is_loopback_host(host) and not allow_remote: + raise ValueError("refusing to bind snapshot admin outside loopback without --allow-remote") + if (allow_remote or not _is_loopback_host(host)) and not has_auth: + raise ValueError("remote snapshot admin access requires both --auth-user and --auth-password") + + def _read_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]: length = int(handler.headers.get("Content-Length") or "0") raw = handler.rfile.read(length).decode("utf-8") if length else "{}" @@ -2631,25 +2778,79 @@ def render_tables_html() -> str:
-
-
-
- - - -
-
- - - - +
+
+
+
+
+
Workbook migration inventory
+
Source-of-truth xlsx sheet list with current storage classification.
+
+ +
+
+ + + + + + + + + + + +
SheetRowsColsCurrent SourceMigration Candidate
+
-
- - - -
+
+
+
+
+ SQLite + + + +
+
+ + + + +
+
+
+ + + +
+
+
+
+
+
+
+
+ JSON + + + +
+
+ + + + +
+
+
+ + + +
+
+
@@ -2657,7 +2858,11 @@ def render_tables_html() -> str:
@@ -2732,6 +2979,8 @@ def render_tables_html() -> str: class SnapshotAdminHandler(BaseHTTPRequestHandler): db_path: Path = DEFAULT_DB seed_json_path: Path = DEFAULT_SEED_JSON + auth_user: str = "" + auth_password: str = "" def log_message(self, format: str, *args: Any) -> None: # noqa: A003 return @@ -2739,7 +2988,18 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): def _handle_exception(self, exc: Exception) -> None: _json_response(self, HTTPStatus.INTERNAL_SERVER_ERROR, {"detail": str(exc)}) + def _authorize(self) -> bool: + if not self.auth_user and not self.auth_password: + return True + header_value = self.headers.get("Authorization") + if _basic_auth_matches(header_value, self.auth_user, self.auth_password): + return True + _reject_unauthorized(self) + return False + def do_GET(self) -> None: # noqa: N802 + if not self._authorize(): + return parsed = urlparse(self.path) if parsed.path == "/": _text_response(self, HTTPStatus.OK, render_index_html(), "text/html; charset=utf-8") @@ -2751,11 +3011,22 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): _text_response(self, HTTPStatus.OK, render_tables_html(), "text/html; charset=utf-8") return if parsed.path == "/api/tables": - _json_response(self, HTTPStatus.OK, {"tables": list_browsable_tables(self.db_path)}) + catalog = build_table_catalog(self.db_path) + _json_response( + self, + HTTPStatus.OK, + { + "sqlite": catalog["sqlite"], + "json": catalog["json"], + "workbook": catalog["workbook"], + "tables": [*catalog["sqlite"], *catalog["json"]], + }, + ) return if parsed.path == "/api/table_rows": query = parse_qs(parsed.query) table = (query.get("table") or [""])[0] + source = (query.get("source") or [""])[0] try: limit = int((query.get("limit") or ["50"])[0]) offset = int((query.get("offset") or ["0"])[0]) @@ -2765,7 +3036,7 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): limit = min(max(limit, 1), 500) offset = max(offset, 0) try: - payload = fetch_table_rows(table, self.db_path, limit=limit, offset=offset) + payload = fetch_table_rows_for_source(source or "sqlite", table, self.db_path, limit=limit, offset=offset) if source else fetch_table_rows(table, self.db_path, limit=limit, offset=offset) except ValueError as exc: _json_response(self, HTTPStatus.BAD_REQUEST, {"detail": str(exc)}) return @@ -2799,6 +3070,8 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): _json_response(self, HTTPStatus.NOT_FOUND, {"detail": "not found"}) def do_POST(self) -> None: # noqa: N802 + if not self._authorize(): + return parsed = urlparse(self.path) try: if parsed.path == "/api/bootstrap": @@ -2967,9 +3240,20 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): self._handle_exception(exc) -def serve(host: str, port: int, db_path: Path | str | None = None, seed_json_path: Path | str | None = None, bootstrap: bool = True) -> None: +def serve( + host: str, + port: int, + db_path: Path | str | None = None, + seed_json_path: Path | str | None = None, + bootstrap: bool = True, + *, + auth_user: str = "", + auth_password: str = "", + allow_remote: bool = False, +) -> None: db = normalize_db_path(db_path) seed = Path(seed_json_path) if seed_json_path else DEFAULT_SEED_JSON + _validate_remote_bind(host, allow_remote, auth_user, auth_password) if bootstrap and seed.exists(): with open_connection(db) as conn: from .snapshot_admin_store_v1 import ensure_schema @@ -2979,8 +3263,12 @@ def serve(host: str, port: int, db_path: Path | str | None = None, seed_json_pat import_seed_json(db, seed) SnapshotAdminHandler.db_path = db SnapshotAdminHandler.seed_json_path = seed + SnapshotAdminHandler.auth_user = auth_user + SnapshotAdminHandler.auth_password = auth_password server = ThreadingHTTPServer((host, port), SnapshotAdminHandler) print(f"Snapshot Admin listening on http://{host}:{port}") + if auth_user and auth_password: + print("Snapshot Admin authentication: enabled (Basic Auth)") print(f"SQLite DB: {db}") print(f"Seed JSON: {seed}") try: @@ -2998,8 +3286,20 @@ def main() -> int: parser.add_argument("--db", type=Path, default=DEFAULT_DB) parser.add_argument("--seed", type=Path, default=DEFAULT_SEED_JSON) parser.add_argument("--no-bootstrap", action="store_true") + parser.add_argument("--allow-remote", action="store_true", help="Allow binding outside loopback when auth is configured.") + parser.add_argument("--auth-user", default=os.getenv("SNAPSHOT_ADMIN_AUTH_USER", "")) + parser.add_argument("--auth-password", default=os.getenv("SNAPSHOT_ADMIN_AUTH_PASSWORD", "")) args = parser.parse_args() - serve(args.host, args.port, args.db, args.seed, bootstrap=not args.no_bootstrap) + serve( + args.host, + args.port, + args.db, + args.seed, + bootstrap=not args.no_bootstrap, + auth_user=args.auth_user, + auth_password=args.auth_password, + allow_remote=args.allow_remote, + ) return 0 diff --git a/tests/unit/test_snapshot_admin_web_v1.py b/tests/unit/test_snapshot_admin_web_v1.py index 8f7b034..69f19ae 100644 --- a/tests/unit/test_snapshot_admin_web_v1.py +++ b/tests/unit/test_snapshot_admin_web_v1.py @@ -2,7 +2,14 @@ from __future__ import annotations import json import sys +import base64 +import subprocess +import time +import socket from pathlib import Path +from urllib import error, request + +import pytest ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: @@ -11,11 +18,15 @@ if str(ROOT) not in sys.path: import tools.validate_snapshot_admin_web_v1 as validator from src.quant_engine.snapshot_admin_server_v1 import ( build_ui_state, + build_table_catalog, fetch_table_rows, + fetch_table_rows_for_source, list_browsable_tables, render_collection_html, render_index_html, render_tables_html, + _basic_auth_matches, + _validate_remote_bind, ) from src.quant_engine.snapshot_admin_store_v1 import import_seed_json @@ -90,10 +101,13 @@ def test_snapshot_admin_workflow_and_script_exist(): def test_render_tables_html_contains_tabler_grid_surface(): html = render_tables_html() assert "tabler" in html.lower() - assert "tableSelect" in html + assert "Workbook migration inventory" in html + assert "sqliteTableSelect" in html + assert "jsonTableSelect" in html assert "/api/tables" in html assert "/api/table_rows" in html - assert "gridTable" in html + assert "sqliteGridTable" in html + assert "jsonGridTable" in html def test_list_browsable_tables_covers_all_three_databases(tmp_path): @@ -111,6 +125,21 @@ def test_list_browsable_tables_covers_all_three_databases(tmp_path): assert settings_row["row_count"] > 0 +def test_build_table_catalog_uses_workbook_inventory(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + catalog = build_table_catalog(db_path) + assert {"sqlite", "json", "workbook"} <= set(catalog) + assert len(catalog["workbook"]) == 20 + + workbook = {row["sheet"]: row for row in catalog["workbook"]} + assert workbook["settings"]["current_sources"] == ["sqlite"] + assert workbook["account_snapshot"]["current_sources"] == ["sqlite", "json"] + assert workbook["harness_context"]["current_sources"] == ["xlsx"] + assert workbook["harness_context"]["migration_candidate"] == "yes" + + def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path): db_path = tmp_path / "snapshot_admin.db" import_seed_json(db_path, ROOT / "GatherTradingData.json") @@ -123,12 +152,127 @@ def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path): page2 = fetch_table_rows("settings", db_path, limit=2, offset=2) assert page1["rows"] != page2["rows"] - import pytest - with pytest.raises(ValueError): fetch_table_rows("settings; DROP TABLE settings;--", db_path) +def test_list_browsable_tables_includes_json_factor_sheets(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + tables = list_browsable_tables(db_path) + json_rows = {row["table"]: row for row in tables if row["source"] == "json"} + assert "data_feed" in json_rows + assert "sector_flow" in json_rows + assert json_rows["data_feed"]["row_count"] > 0 + + sqlite_rows = [row for row in tables if row["source"] == "sqlite"] + assert sqlite_rows, "sqlite tables must still be listed alongside json sheets" + + +def test_fetch_table_rows_reads_json_factor_sheet(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + result = fetch_table_rows_for_source("json", "data_feed", db_path, limit=5, offset=0) + assert result["source"] == "json" + assert "Ticker" in result["columns"] + assert len(result["rows"]) <= 5 + assert result["total"] > 0 + + +def test_fetch_table_rows_can_still_read_sqlite_tables(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + result = fetch_table_rows_for_source("sqlite", "settings", db_path, limit=5, offset=0) + assert result["source"] == "sqlite" + assert "key" in result["columns"] + assert len(result["rows"]) <= 5 + + +def test_auth_helpers_reject_remote_bind_without_credentials(): + assert _basic_auth_matches("Basic dXNlcjpwYXNz", "user", "pass") is True + assert _basic_auth_matches("Basic dXNlcjp3cm9uZw==", "user", "pass") is False + assert _basic_auth_matches("Bearer token", "user", "pass") is False + + with pytest.raises(ValueError): + _validate_remote_bind("0.0.0.0", False, "", "") + with pytest.raises(ValueError): + _validate_remote_bind("0.0.0.0", True, "", "") + _validate_remote_bind("0.0.0.0", True, "admin", "secret") + _validate_remote_bind("127.0.0.1", False, "", "") + + +def test_snapshot_admin_requires_basic_auth_when_configured(tmp_path): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + port = int(sock.getsockname()[1]) + db_path = tmp_path / "snapshot_admin_auth.db" + seed_path = ROOT / "GatherTradingData.json" + server_cmd = [ + sys.executable, + "-u", + str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"), + "--host", + "127.0.0.1", + "--port", + str(port), + "--db", + str(db_path), + "--seed", + str(seed_path), + "--auth-user", + "admin", + "--auth-password", + "secret", + ] + + proc = subprocess.Popen( + server_cmd, + cwd=ROOT, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + ) + try: + deadline = time.time() + 15 + while time.time() < deadline: + try: + probe = request.urlopen(request.Request(f"http://127.0.0.1:{port}/api/state"), timeout=1) + except error.HTTPError as exc: + if exc.code == 401: + break + except Exception: + time.sleep(0.25) + else: + probe.close() + break + url = f"http://127.0.0.1:{port}/api/state" + + req = request.Request(url) + with pytest.raises(error.HTTPError) as unauthorized: + request.urlopen(req, timeout=5) + assert unauthorized.value.code == 401 + + token = base64.b64encode(b"admin:secret").decode("ascii") + req_auth = request.Request(url, headers={"Authorization": f"Basic {token}"}) + with request.urlopen(req_auth, timeout=5) as resp: + payload = json.loads(resp.read().decode("utf-8")) + assert payload["version"]["app"] + finally: + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + if proc.stdout is not None: + proc.stdout.close() + + def test_snapshot_admin_web_validation_script_passes(): out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json" if out.exists(): diff --git a/tools/validate_snapshot_admin_web_v1.py b/tools/validate_snapshot_admin_web_v1.py index 5220f4d..d681502 100644 --- a/tools/validate_snapshot_admin_web_v1.py +++ b/tools/validate_snapshot_admin_web_v1.py @@ -96,6 +96,7 @@ def main() -> int: _wait_for_server(base_url) html = _read_text(f"{base_url}/") state = _read_json(f"{base_url}/api/state") + tables_payload = _read_json(f"{base_url}/api/tables") export_payload = _read_json(f"{base_url}/api/export") approval_packet = { "formula_id": "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1", @@ -138,6 +139,11 @@ def main() -> int: errors.append("collection_page_link_missing") if "Open collection dashboard" not in html: errors.append("collection_dashboard_link_missing") + tables_html = _read_text(f"{base_url}/tables") + if "Workbook migration inventory" not in tables_html or "sqliteTableSelect" not in tables_html or "jsonTableSelect" not in tables_html: + errors.append("table_browser_split_missing") + if "SQLite" not in tables_html or "JSON" not in tables_html: + errors.append("table_browser_source_labels_missing") collection_html = _read_text(f"{base_url}/collection") if "KIS Collection Dashboard" not in collection_html or "Download CSV" not in collection_html or "Ticker quick search" not in collection_html or "Date quick search" not in collection_html: errors.append("collection_dashboard_page_missing") @@ -159,6 +165,10 @@ def main() -> int: errors.append("version_metadata_missing") if not isinstance(state.get("collection"), dict): errors.append("collection_state_missing") + if not isinstance(tables_payload.get("sqlite"), list) or not isinstance(tables_payload.get("json"), list) or not isinstance(tables_payload.get("workbook"), list): + errors.append("table_catalog_grouping_missing") + if not tables_payload.get("tables"): + errors.append("table_catalog_flat_missing") collection = state.get("collection", {}) if not isinstance(collection.get("counts"), dict): errors.append("collection_counts_missing")