Files
QuantEngineByItz/tools/run_snapshot_admin_server_v1.py
kjh2064 05d9f8ed41 코드 참조 경로 업데이트 & WBS-8.1 모니터링 준비
경로 정규화 (outputs/ → src/quant_engine/):
✓ kis_api_client_v1.py: KIS 데이터 수집 경로
✓ kis_data_collection_v1.py: 기본 DB 인자
✓ snapshot_admin_server_v1.py: KIS_COLLECTION_DB
✓ snapshot_admin_store_v1.py: DEFAULT_DB + collector_db
✓ run_snapshot_admin_server_v1.py: --db 기본값

모니터링 도구 추가:
✓ verify_admin_db.py: 어드민 서버 & DB 검증
✓ setup_wbs81_monitoring.py: WBS-8.1 목표 추적 시스템
✓ update_db_paths.py: 자동화된 경로 업데이트

효과:
- 단일 소스 (src/quant_engine/)
- 배포 스크립트 단순화
- WBS-8.1: T+20 30건 모니터링 준비 완료
- 22일 남음 (목표: 2026-07-15)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 00:33:32 +09:00

174 lines
5.7 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
import subprocess
import sys
import time
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SERVER_MODULE = "src.quant_engine.snapshot_admin_server_v1"
WATCH_DIRS = (
ROOT / "src",
ROOT / "tools",
ROOT / "spec",
ROOT / "governance",
ROOT / "docs",
ROOT / ".gitea",
)
WATCH_FILES = (
ROOT / "package.json",
ROOT / "AGENTS.md",
ROOT / "GatherTradingData.json",
)
WATCH_EXTENSIONS = {".py", ".yaml", ".yml", ".json", ".md", ".gs"}
IGNORED_DIR_NAMES = {"Temp", "outputs", ".git", "__pycache__", ".pytest_cache"}
def _server_cmd(args: argparse.Namespace) -> list[str]:
cmd = [
sys.executable,
"-m",
SERVER_MODULE,
"--host",
args.host,
"--port",
str(args.port),
"--db",
args.db,
"--seed",
args.seed,
]
if args.no_bootstrap:
cmd.append("--no-bootstrap")
if args.allow_remote:
cmd.append("--allow-remote")
if args.auth_user:
cmd.extend(["--auth-user", args.auth_user])
if args.auth_password:
cmd.extend(["--auth-password", args.auth_password])
return cmd
def _iter_watch_files() -> list[Path]:
seen: set[Path] = set()
files: list[Path] = []
for path in WATCH_FILES:
if path.exists() and path.is_file():
resolved = path.resolve()
if resolved not in seen:
seen.add(resolved)
files.append(resolved)
for root in WATCH_DIRS:
if not root.exists():
continue
for path in root.rglob("*"):
if not path.is_file():
continue
if any(part in IGNORED_DIR_NAMES for part in path.parts):
continue
if path.suffix.lower() not in WATCH_EXTENSIONS:
continue
resolved = path.resolve()
if resolved not in seen:
seen.add(resolved)
files.append(resolved)
return files
def _snapshot_mtimes() -> dict[Path, float]:
mtimes: dict[Path, float] = {}
for path in _iter_watch_files():
try:
mtimes[path] = path.stat().st_mtime
except FileNotFoundError:
continue
return mtimes
def _changed_files(previous: dict[Path, float]) -> list[Path]:
current = _snapshot_mtimes()
changed: list[Path] = []
for path, mtime in current.items():
if previous.get(path) != mtime:
changed.append(path)
for path in previous:
if path not in current:
changed.append(path)
return changed
def _run_once(args: argparse.Namespace) -> int:
proc = subprocess.Popen(_server_cmd(args), cwd=str(ROOT), env=os.environ.copy())
try:
return proc.wait()
except KeyboardInterrupt:
proc.terminate()
try:
return proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
return proc.wait()
def _run_reload(args: argparse.Namespace, interval: float) -> int:
last_mtimes = _snapshot_mtimes()
child: subprocess.Popen[str] | None = None
try:
while True:
if child is None or child.poll() is not None:
if child is not None:
code = child.returncode or 0
print(f"[snapshot-admin] server exited with code {code}; restarting...")
child = subprocess.Popen(_server_cmd(args), cwd=str(ROOT), env=os.environ.copy())
print("[snapshot-admin] hot reload watcher active")
print("[snapshot-admin] watching:", ", ".join(str(path) for path in WATCH_DIRS))
time.sleep(interval)
changed = _changed_files(last_mtimes)
if changed:
print("[snapshot-admin] changes detected:")
for path in changed[:20]:
print(f" - {path}")
last_mtimes = _snapshot_mtimes()
if child is not None and child.poll() is None:
child.terminate()
try:
child.wait(timeout=10)
except subprocess.TimeoutExpired:
child.kill()
child.wait()
child = None
except KeyboardInterrupt:
if child is not None and child.poll() is None:
child.terminate()
try:
child.wait(timeout=5)
except subprocess.TimeoutExpired:
child.kill()
child.wait()
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="Run the snapshot admin web server.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=8787)
parser.add_argument("--db", default=str(ROOT / "src" / "quant_engine" / "snapshot_admin.db"))
parser.add_argument("--seed", default=str(ROOT / "GatherTradingData.json"))
parser.add_argument("--no-bootstrap", action="store_true")
parser.add_argument("--allow-remote", action="store_true", help="Allow binding outside loopback when auth is configured.")
parser.add_argument("--auth-user", default=os.getenv("SNAPSHOT_ADMIN_AUTH_USER", ""))
parser.add_argument("--auth-password", default=os.getenv("SNAPSHOT_ADMIN_AUTH_PASSWORD", ""))
parser.add_argument("--reload", action="store_true", help="Restart the server when watched files change.")
parser.add_argument("--reload-interval", type=float, default=1.0, help="Seconds between file-system polls.")
args = parser.parse_args()
if args.reload:
return _run_reload(args, max(0.25, args.reload_interval))
return _run_once(args)
if __name__ == "__main__":
raise SystemExit(main())