#!/usr/bin/env python3 from __future__ import annotations import json import socket import subprocess import sys import time import urllib.error import urllib.request from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) OUT = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json" def _read_json(url: str) -> dict[str, Any]: with urllib.request.urlopen(url, timeout=5) as response: payload = response.read().decode("utf-8") data = json.loads(payload) return data if isinstance(data, dict) else {} def _read_text(url: str) -> str: with urllib.request.urlopen(url, timeout=5) as response: return response.read().decode("utf-8") def _post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") request = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(request, timeout=5) as response: return json.loads(response.read().decode("utf-8")) def _wait_for_server(url: str, timeout_s: float = 15.0) -> None: deadline = time.time() + timeout_s last_error: Exception | None = None while time.time() < deadline: try: _read_text(url) return except Exception as exc: # noqa: BLE001 last_error = exc time.sleep(0.25) raise RuntimeError(f"server did not start: {last_error}") def _pick_free_port() -> int: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.bind(("127.0.0.1", 0)) return int(sock.getsockname()[1]) def main() -> int: port = _pick_free_port() db_path = ROOT / "Temp" / "snapshot_admin_web_validation.db" seed_path = ROOT / "GatherTradingData.json" server_cmd = [ sys.executable, str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"), "--host", "127.0.0.1", "--port", str(port), "--db", str(db_path), "--seed", str(seed_path), ] proc = subprocess.Popen( server_cmd, cwd=ROOT, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", ) base_url = f"http://127.0.0.1:{port}" errors: list[str] = [] html = "" state: dict[str, Any] = {} try: _wait_for_server(base_url) html = _read_text(f"{base_url}/") state = _read_json(f"{base_url}/api/state") tables_payload = _read_json(f"{base_url}/api/tables") export_payload = _read_json(f"{base_url}/api/export") approval_packet = { "formula_id": "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1", "generated_at": state.get("generated_at") or "", "summary": { "settings_changed": 0, "account_snapshot_changed": 0, "pending_target_count": 0, }, "pending_targets": [], "diff_preview": {"settings": {"added": [], "removed": [], "changed": []}, "account_snapshot": {"added": [], "removed": [], "changed": []}}, "approvals": state.get("approval_rows", []), "locks": state.get("locks", []), "workspace": state.get("summary", {}), } packet_response = _post_json(f"{base_url}/api/approval_packet", {"packet": approval_packet}) if "Snapshot Admin" not in html: errors.append("html_title_missing") if "contenteditable" not in html: errors.append("sheet_editor_missing") if "settings" not in html or "Account Snapshot" not in html: errors.append("section_missing") if "/api/settings/save" not in html or "/api/account_snapshot/save" not in html: errors.append("api_binding_missing") if "Approve pending" not in html or "Refresh diff" not in html: errors.append("diff_or_approval_ui_missing") if "Export approval packet" not in html: errors.append("approval_packet_ui_missing") if "Selection Inspector" not in html or "Apply TSV to selection" not in html or "Save view" not in html: errors.append("sheet_facade_ui_missing") if "Recent row history" not in html or "Ctrl+S" not in html: errors.append("sheet_shortcuts_ui_missing") if "KIS Collection" not in html or "collector:" not in html: errors.append("collection_dashboard_ui_missing") if "Recent collector snapshots" not in html or "Collection detail" not in html or "Filter runs / snapshots / errors" not in html: errors.append("collection_detail_ui_missing") if "Filter change log" not in html: errors.append("change_log_filter_ui_missing") if "Timeline" not in html or "/collection" not in html or "Open collection dashboard" not in html: errors.append("collection_page_link_missing") if "Open collection dashboard" not in html: errors.append("collection_dashboard_link_missing") tables_html = _read_text(f"{base_url}/tables") if "Workbook migration inventory" not in tables_html or "sqliteTableSelect" not in tables_html or "jsonTableSelect" not in tables_html: errors.append("table_browser_split_missing") if "SQLite" not in tables_html or "JSON" not in tables_html: errors.append("table_browser_source_labels_missing") collection_html = _read_text(f"{base_url}/collection") if "KIS Collection Dashboard" not in collection_html or "Download CSV" not in collection_html or "Ticker quick search" not in collection_html or "Date quick search" not in collection_html: errors.append("collection_dashboard_page_missing") if int(state.get("summary", {}).get("settings_rows") or 0) <= 0: errors.append("settings_rows_missing") if int(state.get("summary", {}).get("account_snapshot_rows") or 0) <= 0: errors.append("account_snapshot_rows_missing") topology = state.get("summary", {}).get("topology", {}) if not isinstance(topology, dict): errors.append("topology_missing") else: if topology.get("mode") != "single_workspace_sqlite": errors.append("topology_mode_invalid") if not topology.get("settings_and_snapshot_share_db"): errors.append("topology_workspace_split_invalid") if not topology.get("collector_separate_db"): errors.append("topology_collector_split_invalid") if not isinstance(state.get("version"), dict) or not state.get("version", {}).get("app"): errors.append("version_metadata_missing") if not isinstance(state.get("collection"), dict): errors.append("collection_state_missing") if not isinstance(tables_payload.get("sqlite"), list) or not isinstance(tables_payload.get("json"), list) or not isinstance(tables_payload.get("workbook"), list): errors.append("table_catalog_grouping_missing") if not tables_payload.get("tables"): errors.append("table_catalog_flat_missing") collection = state.get("collection", {}) if not isinstance(collection.get("counts"), dict): errors.append("collection_counts_missing") if "latest_report" not in collection: errors.append("collection_latest_report_missing") if "data" not in export_payload: errors.append("export_missing_data") if packet_response.get("gate") != "PASS": errors.append("approval_packet_export_failed") packet_path = Path(packet_response.get("packet_path") or "") md_path = Path(packet_response.get("md_path") or "") if not packet_path.exists(): errors.append("approval_packet_json_missing") if not md_path.exists(): errors.append("approval_packet_md_missing") payload = { "formula_id": "SNAPSHOT_ADMIN_WEB_VALIDATION_V1", "gate": "PASS" if not errors else "FAIL", "port": port, "db_path": str(db_path), "base_url": base_url, "errors": errors, "summary": state.get("summary", {}), "version": state.get("version", {}), "settings_rows": int(state.get("summary", {}).get("settings_rows") or 0), "account_snapshot_rows": int(state.get("summary", {}).get("account_snapshot_rows") or 0), "approval_packet_path": str(packet_path), } OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 if payload["gate"] == "PASS" else 1 except urllib.error.URLError as exc: errors.append(str(exc)) payload = { "formula_id": "SNAPSHOT_ADMIN_WEB_VALIDATION_V1", "gate": "FAIL", "port": port, "db_path": str(db_path), "base_url": base_url, "errors": errors, "summary": state.get("summary", {}), "version": state.get("version", {}), } OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(payload, ensure_ascii=False, indent=2)) return 1 finally: if proc.poll() is None: proc.terminate() try: proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=5) if proc.stdout is not None: proc.stdout.close() if __name__ == "__main__": raise SystemExit(main())