from __future__ import annotations import argparse import json import sqlite3 import subprocess from http import HTTPStatus from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from hashlib import sha256 from typing import Any from urllib.parse import urlparse, parse_qs ROOT = Path(__file__).resolve().parents[2] SNAPSHOT_ADMIN_VERSION = "snapshot-admin-web-v6" KIS_COLLECTION_DB = ROOT / "src" / "quant_engine" / "kis_data_collection.db" KIS_COLLECTION_REPORT = ROOT / "Temp" / "kis_data_collection_v1.json" QUALITATIVE_SELL_DB = ROOT / "outputs" / "qualitative_sell_strategy" / "qualitative_sell_strategy.db" # WBS-7.9 부속 — 테이블별 그리드 조회(Tabler). 화이트리스트에 없는 테이블명은 # SQL에 절대 보간되지 않는다(요청 테이블명을 그대로 SELECT 문에 넣지 않고 # 아래 레지스트리 키와 정확히 일치할 때만 허용). WORKSPACE_BROWSABLE_TABLES = ( "settings", "account_snapshot", "workspace_change_log", "workspace_approval_v2", "workspace_lock", "workspace_meta", ) COLLECTION_BROWSABLE_TABLES = ( "collection_runs", "collection_snapshots", "collection_source_errors", ) QUALITATIVE_SELL_BROWSABLE_TABLES = ( "sell_strategy_results", "satellite_recommendations", ) # Editable tables configurations (WBS requirement 2) EDITABLE_TABLES = { "settings", "account_snapshot", } def _resolve_table_db(table: str, workspace_db_path: Path) -> Path | None: if table in WORKSPACE_BROWSABLE_TABLES: return Path(workspace_db_path) if table in COLLECTION_BROWSABLE_TABLES: return KIS_COLLECTION_DB if table in QUALITATIVE_SELL_BROWSABLE_TABLES: return QUALITATIVE_SELL_DB return None def list_browsable_tables(workspace_db_path: Path) -> list[dict[str, Any]]: tables: list[dict[str, Any]] = [] for table in ( *WORKSPACE_BROWSABLE_TABLES, *COLLECTION_BROWSABLE_TABLES, *QUALITATIVE_SELL_BROWSABLE_TABLES, ): db_path = _resolve_table_db(table, workspace_db_path) exists = bool(db_path and db_path.exists()) row_count = 0 if exists: try: with sqlite3.connect(db_path) as conn: row_count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - table is whitelist-checked above except sqlite3.OperationalError: exists = False tables.append({ "table": table, "db": str(db_path) if db_path else "", "exists": exists, "row_count": row_count, "editable": table in EDITABLE_TABLES, }) tables.sort(key=lambda item: ( 0 if item["table"] == "account_snapshot" else 1 if item["table"] == "settings" else 2, 0 if item["row_count"] else 1, item["table"], )) return tables def fetch_table_rows( table: str, workspace_db_path: Path, *, limit: int = 50, offset: int = 0, filter_text: str = "", column_filters: dict[str, str] | None = None, ) -> dict[str, Any]: db_path = _resolve_table_db(table, workspace_db_path) if db_path is None: raise ValueError(f"unknown or non-browsable table: {table}") if not db_path.exists(): return {"table": table, "db": str(db_path), "columns": [], "rows": [], "total": 0, "limit": limit, "offset": offset, "editable": table in EDITABLE_TABLES} with sqlite3.connect(db_path) as conn: conn.row_factory = sqlite3.Row cursor = conn.execute(f"SELECT rowid as _rowid, * FROM {table} ORDER BY rowid DESC",) # noqa: S608 - whitelisted table name all_rows = [dict(row) for row in cursor.fetchall()] columns = [description[0] for description in cursor.description] if cursor.description else [] cleaned_filter_text = str(filter_text or "").strip().lower() normalized_column_filters = {str(key): str(value).strip().lower() for key, value in (column_filters or {}).items() if str(value).strip()} def _match_row(row: dict[str, Any]) -> bool: display_row = {k: v for k, v in row.items() if not str(k).startswith("_")} haystack = json.dumps(display_row, ensure_ascii=False, default=str).lower() if cleaned_filter_text and cleaned_filter_text not in haystack: return False for key, needle in normalized_column_filters.items(): cell = str(display_row.get(key, "") or "").lower() if needle not in cell: return False return True filtered_rows = [row for row in all_rows if _match_row(row)] total = len(filtered_rows) rows = filtered_rows[offset: offset + limit] return {"table": table, "db": str(db_path), "columns": columns, "rows": rows, "total": total, "limit": limit, "offset": offset, "editable": table in EDITABLE_TABLES, "filter_text": cleaned_filter_text, "column_filters": normalized_column_filters} def fetch_domain_rows(domain: str, workspace_db_path: Path) -> dict[str, Any]: if domain == "settings": rows = load_settings_rows(workspace_db_path) return {"domain": domain, "db": str(workspace_db_path), "columns": ["ordinal", "key", "value", "note", "updated_at"], "rows": rows} if domain == "account_snapshot": rows = load_account_snapshot_rows(workspace_db_path) return { "domain": domain, "db": str(workspace_db_path), "columns": list(ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS), "rows": rows, } raise ValueError(f"unknown editable domain: {domain}") SNAPSHOT_ADMIN_VERSION_FILES = ( ROOT / "src" / "quant_engine" / "snapshot_admin_server_v1.py", ROOT / "src" / "quant_engine" / "snapshot_admin_store_v1.py", ROOT / "src" / "quant_engine" / "data_collection_store_v1.py", ROOT / "tools" / "run_snapshot_admin_server_v1.py", ROOT / "tools" / "validate_snapshot_admin_web_v1.py", ROOT / "tests" / "unit" / "test_snapshot_admin_web_v1.py", ROOT / "package.json", ) from .snapshot_admin_store_v1 import ( ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS, DEFAULT_DB, DEFAULT_SEED_JSON, export_payload, clear_lock, import_seed_json, is_locked, load_account_snapshot_rows, load_approval_for_domain, load_approval_rows, load_change_log_rows, load_locks, load_settings_rows, normalize_db_path, now_kst_iso, open_connection, parse_account_snapshot_tsv, parse_scalar, record_change_log, validate_account_snapshot_rows, validate_settings_rows, build_validation_suggestions, build_safe_autofix_actions, apply_safe_autofix_action, lock_conflicts_for_rows, set_approval, set_lock, replace_account_snapshot, replace_settings, undo_last_change, summarize_workspace, ) from .data_collection_store_v1 import load_collection_dashboard_state def _strip_internal_fields(row: dict[str, Any]) -> dict[str, Any]: return {key: value for key, value in row.items() if not key.startswith("_")} def _snapshot_columns_from_rows(rows: list[dict[str, Any]]) -> list[str]: columns = list(ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS) extras = sorted( { key for row in rows for key in row.keys() if not key.startswith("_") and key not in ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS } ) for key in extras: if key not in columns: columns.append(key) return columns def _write_json(path: Path, payload: dict[str, Any]) -> Path: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") return path def _render_approval_packet_md(packet: dict[str, Any]) -> str: pending = packet.get("pending_targets") if isinstance(packet.get("pending_targets"), list) else [] summary = packet.get("summary") if isinstance(packet.get("summary"), dict) else {} lines = [ "# Snapshot Admin Approval Packet", "", "## Summary", "", f"- settings_changed: {summary.get('settings_changed', 0)}", f"- account_snapshot_changed: {summary.get('account_snapshot_changed', 0)}", f"- pending_target_count: {summary.get('pending_target_count', 0)}", "", "## Pending Targets", "", ] if pending: for item in pending[:100]: if not isinstance(item, dict): continue lines.append(f"- {item.get('domain', '')}:{item.get('target_ref', '')} ({item.get('change_type', '')})") else: lines.append("_none_") return "\n".join(lines) def write_approval_packet_artifacts(packet: dict[str, Any]) -> dict[str, str]: json_path = ROOT / "Temp" / "snapshot_admin_approval_packet_v1.json" md_path = ROOT / "Temp" / "snapshot_admin_approval_packet_v1.md" _write_json(json_path, packet) md_path.parent.mkdir(parents=True, exist_ok=True) md_path.write_text(_render_approval_packet_md(packet), encoding="utf-8") return {"json_path": str(json_path), "md_path": str(md_path)} def _git_info() -> dict[str, Any]: try: commit = subprocess.check_output( ["git", "rev-parse", "--short", "HEAD"], cwd=str(ROOT), text=True, stderr=subprocess.DEVNULL, ).strip() status = subprocess.check_output( ["git", "status", "--porcelain"], cwd=str(ROOT), text=True, stderr=subprocess.DEVNULL, ) return { "commit": commit, "dirty": bool(status.strip()), "tree_state": "DIRTY" if status.strip() else "CLEAN", } except Exception: return { "commit": "", "dirty": False, "tree_state": "UNKNOWN", } def _source_fingerprint() -> dict[str, Any]: digest = sha256() latest_mtime = 0.0 for path in SNAPSHOT_ADMIN_VERSION_FILES: if not path.exists(): continue try: data = path.read_bytes() digest.update(path.as_posix().encode("utf-8")) digest.update(b"\0") digest.update(data) latest_mtime = max(latest_mtime, path.stat().st_mtime) except OSError: continue return { "fingerprint": digest.hexdigest()[:16], "latest_mtime": latest_mtime, } def _approval_entry_from_conn(conn, domain: str, target_ref: str = "*") -> dict[str, Any] | None: ensure_schema(conn) row = conn.execute( f""" SELECT domain, target_ref, status, approved_by, approved_at, note, updated_at FROM {APPROVAL_TABLE} WHERE domain = ? AND target_ref = ? LIMIT 1 """, (domain, target_ref or "*"), ).fetchone() return dict(row) if row is not None else None def _lock_entry_from_conn(conn, domain: str, target_ref: str = "*") -> dict[str, Any] | None: ensure_schema(conn) row = conn.execute( f""" SELECT domain, target_ref, locked_by, reason, locked_at FROM {LOCK_TABLE} WHERE domain = ? AND target_ref = ? LIMIT 1 """, (domain, target_ref or "*"), ).fetchone() return dict(row) if row is not None else None def build_ui_state(db_path: Path | str | None = None) -> dict[str, Any]: summary = summarize_workspace(db_path) settings_rows = load_settings_rows(db_path) account_rows = [_strip_internal_fields(row) for row in load_account_snapshot_rows(db_path)] settings_errors = validate_settings_rows(settings_rows) snapshot_errors = validate_account_snapshot_rows(account_rows) suggestions = build_validation_suggestions(settings_rows, account_rows) autofix_actions = build_safe_autofix_actions(settings_rows, account_rows) try: collection = load_collection_dashboard_state(KIS_COLLECTION_DB, KIS_COLLECTION_REPORT) except Exception: collection = {} return { "version": { "app": SNAPSHOT_ADMIN_VERSION, "git": _git_info(), "source": _source_fingerprint(), }, "summary": summary, "approval_rows": load_approval_rows(db_path), "approval_settings": load_approval_for_domain(db_path, "settings"), "approval_account_snapshot": load_approval_for_domain(db_path, "account_snapshot"), "locks": load_locks(db_path), "recent_changes": load_change_log_rows(db_path, limit=12), "history_counts": { "changes": len(load_change_log_rows(db_path, limit=200)), "approvals": len(load_approval_rows(db_path)), "locks": len(load_locks(db_path)), }, "settings_rows": settings_rows, "account_snapshot_rows": account_rows, "account_snapshot_columns": _snapshot_columns_from_rows(account_rows), "validation": { "settings": settings_errors, "account_snapshot": snapshot_errors, "suggestions": suggestions, }, "autofix_actions": autofix_actions, "collection": collection, "generated_at": now_kst_iso(), } def _json_response(handler: BaseHTTPRequestHandler, status: int, payload: Any) -> None: body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json; charset=utf-8") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _text_response(handler: BaseHTTPRequestHandler, status: int, text: str, content_type: str = "text/plain; charset=utf-8") -> None: body = text.encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", content_type) handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _read_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]: length = int(handler.headers.get("Content-Length") or "0") raw = handler.rfile.read(length).decode("utf-8") if length else "{}" payload = json.loads(raw or "{}") if not isinstance(payload, dict): raise ValueError("JSON body must be an object") return payload def render_index_html() -> str: return """ Snapshot Admin

Snapshot Admin

SQLite canonical editor for settings and account_snapshot. Save via API only; xlsx stays as export surface.
Open collection dashboard Open table browser
Approval Loading...
Lock Loading...
Selection No row selected.
Diff Pending diff loading...
Snapshot approval state and lock state are pinned here for immediate review.

Workspace

Loading...
Validation

              
Suggestions

            
Diff preview

            

Approval & Locks

settings approval
snapshot approval
Recent change log

              
Timeline

KIS Collection

collection: loading...
Recent collector runs
Recent collector snapshots
Recent collector errors
Collection detail

            

Selection Inspector

No row selected.

              
Recent row history

            
Batch paste
Tip: clipboard paste still works directly in the grid. This panel is for multi-row batch edit against the selected row.
Shortcuts: `Ctrl+S` save current domain, `Ctrl+Enter` save current domain, `Delete` remove selected row.

Settings 0 rows

Account Snapshot 0 rows

Paste TSV below and replace all rows Canonical column order follows spec/15_account_snapshot_contract.yaml
Account snapshot editing surface
This panel is intentionally separated from settings so row selection, field edits, and save approval stay visually dominant.
""" def render_collection_html() -> str: return """ KIS Collection Dashboard

KIS Collection Dashboard

Separate read-only view for KIS collection run, snapshots, errors, and raw JSON evidence.
collection: loading...
Back to workspace Open table browser
Recent collector runs
Recent collector snapshots
Recent collector errors
Collection detail

          
""" def render_tables_html() -> str: return """ Snapshot Admin — Table Browser
Workspace tables are editable only when the table is in the canonical workspace DB. Collection and strategy tables are read-only by design unless the backing store explicitly supports editing.
Table browser ready 0 rows filter=none page=0
If a table shows "no rows", the current filter or selected table has no visible records.
""" class SnapshotAdminHandler(BaseHTTPRequestHandler): db_path: Path = DEFAULT_DB seed_json_path: Path = DEFAULT_SEED_JSON def log_message(self, format: str, *args: Any) -> None: # noqa: A003 return def _handle_exception(self, exc: Exception) -> None: _json_response(self, HTTPStatus.INTERNAL_SERVER_ERROR, {"detail": str(exc)}) def do_GET(self) -> None: # noqa: N802 parsed = urlparse(self.path) if parsed.path == "/": _text_response(self, HTTPStatus.OK, render_index_html(), "text/html; charset=utf-8") return if parsed.path == "/collection": _text_response(self, HTTPStatus.OK, render_collection_html(), "text/html; charset=utf-8") return if parsed.path == "/tables": _text_response(self, HTTPStatus.OK, render_tables_html(), "text/html; charset=utf-8") return if parsed.path == "/api/tables": _json_response(self, HTTPStatus.OK, {"tables": list_browsable_tables(self.db_path)}) return if parsed.path == "/api/table_rows": query = parse_qs(parsed.query) table = (query.get("table") or [""])[0] try: limit = int((query.get("limit") or ["50"])[0]) offset = int((query.get("offset") or ["0"])[0]) except ValueError: _json_response(self, HTTPStatus.BAD_REQUEST, {"detail": "limit/offset must be integers"}) return limit = min(max(limit, 1), 500) offset = max(offset, 0) filter_text = (query.get("filter") or [""])[0] column_filters: dict[str, str] = {} for key, values in query.items(): if key.startswith("filter_") and values: column_filters[key.removeprefix("filter_")] = values[0] try: payload = fetch_table_rows( table, self.db_path, limit=limit, offset=offset, filter_text=filter_text, column_filters=column_filters, ) except ValueError as exc: _json_response(self, HTTPStatus.BAD_REQUEST, {"detail": str(exc)}) return _json_response(self, HTTPStatus.OK, payload) return if parsed.path == "/api/state": _json_response(self, HTTPStatus.OK, build_ui_state(self.db_path)) return if parsed.path == "/api/history": _json_response( self, HTTPStatus.OK, { "settings": load_change_log_rows(self.db_path, limit=25), "approvals": load_approval_rows(self.db_path), "locks": load_locks(self.db_path), }, ) return if parsed.path == "/api/export": _text_response( self, HTTPStatus.OK, json.dumps(export_payload(self.db_path), ensure_ascii=False, indent=2), "application/json; charset=utf-8", ) return if parsed.path == "/favicon.ico": _text_response(self, HTTPStatus.NO_CONTENT, "") return _json_response(self, HTTPStatus.NOT_FOUND, {"detail": "not found"}) def do_POST(self) -> None: # noqa: N802 parsed = urlparse(self.path) try: if parsed.path == "/api/bootstrap": summary = import_seed_json(self.db_path, self.seed_json_path) _json_response(self, HTTPStatus.OK, summary) return payload = _read_json_body(self) if parsed.path == "/api/settings/save": if is_locked(self.db_path, "settings"): raise ValueError("settings are locked") rows = payload.get("rows") if not isinstance(rows, list): raise ValueError("rows must be a list") normalized_rows = [] for idx, row in enumerate(rows, start=1): if not isinstance(row, dict): continue key = str(row.get("key") or "").strip() if not key: continue normalized_rows.append( { "ordinal": idx, "key": key, "value": row.get("value", ""), "note": str(row.get("note") or ""), } ) conflicts = lock_conflicts_for_rows(self.db_path, "settings", normalized_rows) if conflicts: refs = ", ".join(sorted({str(item.get("target_ref") or "") for item in conflicts if item.get("target_ref")})) raise ValueError(f"settings lock conflict: {refs}") with open_connection(self.db_path) as conn: replace_settings(conn, normalized_rows) _json_response(self, HTTPStatus.OK, summarize_workspace(self.db_path)) return if parsed.path == "/api/account_snapshot/save": if is_locked(self.db_path, "account_snapshot"): raise ValueError("account_snapshot is locked") rows = payload.get("rows") if not isinstance(rows, list): raise ValueError("rows must be a list") normalized_rows: list[dict[str, Any]] = [] for idx, row in enumerate(rows, start=1): if not isinstance(row, dict): continue candidate = {key: value for key, value in row.items() if not key.startswith("_")} candidate["ordinal"] = idx normalized_rows.append(candidate) conflicts = lock_conflicts_for_rows(self.db_path, "account_snapshot", normalized_rows) if conflicts: refs = ", ".join(sorted({str(item.get("target_ref") or "") for item in conflicts if item.get("target_ref")})) raise ValueError(f"account_snapshot lock conflict: {refs}") with open_connection(self.db_path) as conn: replace_account_snapshot(conn, normalized_rows) _json_response(self, HTTPStatus.OK, summarize_workspace(self.db_path)) return if parsed.path == "/api/account_snapshot/import_tsv": if is_locked(self.db_path, "account_snapshot"): raise ValueError("account_snapshot is locked") tsv_text = str(payload.get("tsv") or "") rows = parse_account_snapshot_tsv(tsv_text) with open_connection(self.db_path) as conn: replace_account_snapshot(conn, rows) _json_response(self, HTTPStatus.OK, summarize_workspace(self.db_path)) return if parsed.path == "/api/table/save": table = str(payload.get("table") or "").strip() rows = payload.get("rows") if table not in EDITABLE_TABLES: raise ValueError(f"table not editable: {table}") if not isinstance(rows, list): raise ValueError("rows must be a list") db_path = _resolve_table_db(table, self.db_path) if not db_path: raise ValueError(f"database not found for table: {table}") with open_connection(db_path) as conn: conn.execute("BEGIN TRANSACTION") try: conn.execute(f"DELETE FROM {table}") # noqa: S608 - Whitelisted table name if rows: first_row = rows[0] columns = [k for k in first_row.keys() if not k.startswith("_")] if "rowid" in columns: columns.remove("rowid") if "_rowid" in columns: columns.remove("_rowid") placeholders = ", ".join(["?"] * len(columns)) col_list = ", ".join(columns) insert_sql = f"INSERT INTO {table} ({col_list}) VALUES ({placeholders})" # noqa: S608 - Whitelisted table name for row in rows: values = [row.get(col) for col in columns] conn.execute(insert_sql, values) conn.commit() except Exception as e: conn.rollback() raise e _json_response(self, HTTPStatus.OK, {"status": "SUCCESS", "table": table, "row_count": len(rows)}) return if parsed.path == "/api/approval_packet": packet = payload.get("packet") if not isinstance(packet, dict): raise ValueError("packet must be an object") artifacts = write_approval_packet_artifacts(packet) response = { "gate": "PASS", "packet_path": artifacts["json_path"], "md_path": artifacts["md_path"], "formula_id": packet.get("formula_id", "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1"), } _json_response(self, HTTPStatus.OK, response) return if parsed.path == "/api/approve": domain = str(payload.get("domain") or "") if domain not in {"settings", "account_snapshot"}: raise ValueError("domain must be settings or account_snapshot") target_ref = str(payload.get("target_ref") or "*") with open_connection(self.db_path) as conn: before = _approval_entry_from_conn(conn, domain, target_ref) set_approval(conn, domain, "APPROVED", target_ref=target_ref, approved_by="ui", note="manual approval") after = _approval_entry_from_conn(conn, domain, target_ref) record_change_log( conn, domain=domain, action="approve", target_ref=target_ref, before_json=before, after_json=after, actor="ui", note="manual approval", ) conn.commit() _json_response(self, HTTPStatus.OK, {"domain": domain, "target_ref": target_ref, "status": "APPROVED"}) return if parsed.path == "/api/lock": domain = str(payload.get("domain") or "") target_ref = str(payload.get("target_ref") or "*") if domain not in {"settings", "account_snapshot"}: raise ValueError("domain must be settings or account_snapshot") with open_connection(self.db_path) as conn: before = _lock_entry_from_conn(conn, domain, target_ref) set_lock(conn, domain, target_ref, locked_by="ui", reason="manual lock") after = _lock_entry_from_conn(conn, domain, target_ref) record_change_log( conn, domain=domain, action="lock", target_ref=target_ref, before_json=before, after_json=after, actor="ui", note="manual lock", ) conn.commit() _json_response(self, HTTPStatus.OK, {"domain": domain, "target_ref": target_ref, "status": "LOCKED"}) return if parsed.path == "/api/unlock": domain = str(payload.get("domain") or "") target_ref = str(payload.get("target_ref") or "*") if domain not in {"settings", "account_snapshot"}: raise ValueError("domain must be settings or account_snapshot") with open_connection(self.db_path) as conn: before = _lock_entry_from_conn(conn, domain, target_ref) clear_lock(conn, domain, target_ref) after = _lock_entry_from_conn(conn, domain, target_ref) record_change_log( conn, domain=domain, action="unlock", target_ref=target_ref, before_json=before, after_json=after, actor="ui", note="manual unlock", ) conn.commit() _json_response(self, HTTPStatus.OK, {"domain": domain, "target_ref": target_ref, "status": "UNLOCKED"}) return if parsed.path == "/api/undo": domain = str(payload.get("domain") or "") if domain not in {"settings", "account_snapshot"}: raise ValueError("domain must be settings or account_snapshot") if is_locked(self.db_path, domain): raise ValueError(f"{domain} is locked") with open_connection(self.db_path) as conn: result = undo_last_change(conn, domain, actor="ui") _json_response(self, HTTPStatus.OK, result if result else {"domain": domain, "status": "UNDONE"}) return if parsed.path == "/api/autofix": action_id = str(payload.get("action_id") or "") if not action_id: raise ValueError("action_id required") with open_connection(self.db_path) as conn: result = apply_safe_autofix_action(conn, action_id, actor="ui") _json_response(self, HTTPStatus.OK, result) return _json_response(self, HTTPStatus.NOT_FOUND, {"detail": "not found"}) except Exception as exc: # noqa: BLE001 self._handle_exception(exc) def serve(host: str, port: int, db_path: Path | str | None = None, seed_json_path: Path | str | None = None, bootstrap: bool = True) -> None: db = normalize_db_path(db_path) seed = Path(seed_json_path) if seed_json_path else DEFAULT_SEED_JSON if bootstrap and seed.exists(): with open_connection(db) as conn: from .snapshot_admin_store_v1 import ensure_schema ensure_schema(conn) if summarize_workspace(db)["settings_rows"] == 0 and summarize_workspace(db)["account_snapshot_rows"] == 0: import_seed_json(db, seed) SnapshotAdminHandler.db_path = db SnapshotAdminHandler.seed_json_path = seed server = ThreadingHTTPServer((host, port), SnapshotAdminHandler) print(f"Snapshot Admin listening on http://{host}:{port}") print(f"SQLite DB: {db}") print(f"Seed JSON: {seed}") try: server.serve_forever() except KeyboardInterrupt: pass finally: server.server_close() def main() -> int: parser = argparse.ArgumentParser(description="Run the snapshot admin web server.") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8787) parser.add_argument("--db", type=Path, default=DEFAULT_DB) parser.add_argument("--seed", type=Path, default=DEFAULT_SEED_JSON) parser.add_argument("--no-bootstrap", action="store_true") args = parser.parse_args() serve(args.host, args.port, args.db, args.seed, bootstrap=not args.no_bootstrap) return 0 if __name__ == "__main__": raise SystemExit(main())