#!/usr/bin/env python3 from __future__ import annotations import argparse import os import subprocess import sys import time from pathlib import Path ROOT = Path(__file__).resolve().parents[1] SERVER_MODULE = "src.quant_engine.snapshot_admin_server_v1" WATCH_DIRS = ( ROOT / "src", ROOT / "tools", ROOT / "spec", ROOT / "governance", ROOT / "docs", ROOT / ".gitea", ) WATCH_FILES = ( ROOT / "package.json", ROOT / "AGENTS.md", ROOT / "GatherTradingData.json", ) WATCH_EXTENSIONS = {".py", ".yaml", ".yml", ".json", ".md", ".gs"} IGNORED_DIR_NAMES = {"Temp", "outputs", ".git", "__pycache__", ".pytest_cache"} def _server_cmd(args: argparse.Namespace) -> list[str]: cmd = [ sys.executable, "-m", SERVER_MODULE, "--host", args.host, "--port", str(args.port), "--db", args.db, "--seed", args.seed, ] if args.no_bootstrap: cmd.append("--no-bootstrap") return cmd def _iter_watch_files() -> list[Path]: seen: set[Path] = set() files: list[Path] = [] for path in WATCH_FILES: if path.exists() and path.is_file(): resolved = path.resolve() if resolved not in seen: seen.add(resolved) files.append(resolved) for root in WATCH_DIRS: if not root.exists(): continue for path in root.rglob("*"): if not path.is_file(): continue if any(part in IGNORED_DIR_NAMES for part in path.parts): continue if path.suffix.lower() not in WATCH_EXTENSIONS: continue resolved = path.resolve() if resolved not in seen: seen.add(resolved) files.append(resolved) return files def _snapshot_mtimes() -> dict[Path, float]: mtimes: dict[Path, float] = {} for path in _iter_watch_files(): try: mtimes[path] = path.stat().st_mtime except FileNotFoundError: continue return mtimes def _changed_files(previous: dict[Path, float]) -> list[Path]: current = _snapshot_mtimes() changed: list[Path] = [] for path, mtime in current.items(): if previous.get(path) != mtime: changed.append(path) for path in previous: if path not in current: changed.append(path) return changed def _run_once(args: argparse.Namespace) -> int: proc = subprocess.Popen(_server_cmd(args), cwd=str(ROOT), env=os.environ.copy()) try: return proc.wait() except KeyboardInterrupt: proc.terminate() try: return proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() return proc.wait() def _run_reload(args: argparse.Namespace, interval: float) -> int: last_mtimes = _snapshot_mtimes() child: subprocess.Popen[str] | None = None try: while True: if child is None or child.poll() is not None: if child is not None: code = child.returncode or 0 print(f"[snapshot-admin] server exited with code {code}; restarting...") child = subprocess.Popen(_server_cmd(args), cwd=str(ROOT), env=os.environ.copy()) print("[snapshot-admin] hot reload watcher active") print("[snapshot-admin] watching:", ", ".join(str(path) for path in WATCH_DIRS)) time.sleep(interval) changed = _changed_files(last_mtimes) if changed: print("[snapshot-admin] changes detected:") for path in changed[:20]: print(f" - {path}") last_mtimes = _snapshot_mtimes() if child is not None and child.poll() is None: child.terminate() try: child.wait(timeout=10) except subprocess.TimeoutExpired: child.kill() child.wait() child = None except KeyboardInterrupt: if child is not None and child.poll() is None: child.terminate() try: child.wait(timeout=5) except subprocess.TimeoutExpired: child.kill() child.wait() return 0 def main() -> int: parser = argparse.ArgumentParser(description="Run the snapshot admin web server.") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=8787) parser.add_argument("--db", default=str(ROOT / "outputs" / "snapshot_admin" / "snapshot_admin.db")) parser.add_argument("--seed", default=str(ROOT / "GatherTradingData.json")) parser.add_argument("--no-bootstrap", action="store_true") parser.add_argument("--reload", action="store_true", help="Restart the server when watched files change.") parser.add_argument("--reload-interval", type=float, default=1.0, help="Seconds between file-system polls.") args = parser.parse_args() if args.reload: return _run_reload(args, max(0.25, args.reload_interval)) return _run_once(args) if __name__ == "__main__": raise SystemExit(main())