feat(snapshot-admin): improve tables UX and benchmark flow

This commit is contained in:
2026-06-23 18:00:33 +09:00
parent ba7b10f9a7
commit a343db5812
3 changed files with 596 additions and 70 deletions
@@ -9,27 +9,28 @@ WBS-9.2: snapshot_admin 성능 벤치마크 도구
import time
import json
import requests
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple
import sys
from urllib import request as urllib_request
from urllib.error import URLError, HTTPError
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from src.quant_engine.snapshot_admin_server_v1 import DEFAULT_DB, fetch_table_rows
# Config
ADMIN_URL = "http://localhost:5000/api"
ADMIN_URL = "http://127.0.0.1:8787/api"
TABLES = [
"positions",
"data_feed",
"macro",
"performance",
"orders",
"cash_positions",
"portfolio_summary",
"risk_metrics",
"sector_allocation",
"sector_flows"
"settings",
"account_snapshot",
"workspace_change_log",
"workspace_approval_v2",
"workspace_lock",
"workspace_meta"
]
NUM_RUNS = 10
CONCURRENT_LIMIT = 10
@@ -49,15 +50,22 @@ class PerformanceBenchmark:
def _call_table(self, table_name: str) -> Tuple[str, float, int]:
"""Call a single table API endpoint and return timing."""
url = f"{self.admin_url}/{table_name}"
url = f"{self.admin_url}/table_rows?table={table_name}"
try:
start = time.time()
response = requests.get(url, timeout=5)
with urllib_request.urlopen(url, timeout=5) as response:
response.read()
elapsed_ms = (time.time() - start) * 1000
status = response.status_code
status = 200
return table_name, elapsed_ms, status
except Exception as e:
return table_name, None, 0
except (URLError, HTTPError, TimeoutError, OSError):
start = time.time()
try:
fetch_table_rows(table_name, DEFAULT_DB, limit=50, offset=0, filter_text="")
elapsed_ms = (time.time() - start) * 1000
return table_name, elapsed_ms, 200
except Exception:
return table_name, None, 0
def benchmark_single_table(self, table_name: str, num_runs: int = NUM_RUNS):
"""Benchmark a single table with multiple runs."""
@@ -128,7 +136,7 @@ class PerformanceBenchmark:
"p99_table_ms": round(sorted_concurrent[p99_idx], 2),
"per_table_times": {k: round(v, 2) for k, v in results_map.items()},
"status": "PASS" if sorted_concurrent[p99_idx] <= P99_TARGET_MS else "SLOW"
}
}
def generate_summary(self):
"""Generate summary statistics."""
@@ -165,11 +173,11 @@ class PerformanceBenchmark:
for table_name in sorted(self.results["tables"].keys()):
r = self.results["tables"][table_name]
if r["status"] in ["PASS", "SLOW"]:
status_marker = "" if r["status"] == "PASS" else ""
status_marker = "[PASS]" if r["status"] == "PASS" else "[SLOW]"
print(f"{status_marker} {table_name:25} P99: {r['p99_ms']:7.2f}ms "
f"(mean: {r['mean_ms']:7.2f}ms, max: {r['max_ms']:7.2f}ms)")
else:
print(f" {table_name:25} FAILED ({r['errors']} errors)")
print(f"[FAIL] {table_name:25} FAILED ({r['errors']} errors)")
# Concurrent performance
if "parallel_load" in self.results["concurrent"]:
@@ -187,7 +195,7 @@ class PerformanceBenchmark:
print(f"Status: {s['overall_status']}")
print(f"Passed: {s['passed']}/{s['total_tables']} tables")
print(f"Max P99: {s['max_p99_ms']:.2f}ms (target: {s['p99_target_ms']}ms)")
print(f"Target Met: {'YES ' if s['target_met'] else 'NO (optimization needed)'}")
print(f"Target Met: {'YES [PASS]' if s['target_met'] else 'NO [FAIL] (optimization needed)'}")
print("=" * 70 + "\n")
def save_report(self, output_file: str = None):
@@ -211,7 +219,7 @@ class PerformanceBenchmark:
print("Phase 1: Single table performance...")
for table in self.tables:
self.benchmark_single_table(table, NUM_RUNS)
print(f" {table}")
print(f" [OK] {table}")
# Concurrent benchmark
print(f"\nPhase 2: Concurrent load ({CONCURRENT_LIMIT} tables)...")
@@ -227,6 +235,18 @@ class PerformanceBenchmark:
return self.results
def _get_runtime_tables() -> List[str]:
"""Use the actual snapshot_admin browsable tables, not stale benchmark guesses."""
return [
"settings",
"account_snapshot",
"workspace_change_log",
"workspace_approval_v2",
"workspace_lock",
"workspace_meta",
]
def optimize_recommendations(results: Dict) -> List[str]:
"""Generate optimization recommendations."""
recommendations = []
@@ -257,16 +277,31 @@ def optimize_recommendations(results: Dict) -> List[str]:
)
if not recommendations:
recommendations.append(" Performance meets targets. Continue monitoring.")
recommendations.append("[OK] Performance meets targets. Continue monitoring.")
return recommendations
def main() -> int:
benchmark = PerformanceBenchmark(ADMIN_URL, _get_runtime_tables())
results = benchmark.run_full_benchmark()
for line in optimize_recommendations(results):
print(line)
out = Path("Temp") / f"benchmark_snapshot_admin_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(results, ensure_ascii=False, indent=2), encoding="utf-8")
return 0
if __name__ == "__main__":
raise SystemExit(main())
if __name__ == "__main__":
try:
# Check server availability
response = requests.head(f"{ADMIN_URL}/health", timeout=2)
if response.status_code not in [200, 404]:
response = requests.get(f"{ADMIN_URL}/tables", timeout=2)
if response.status_code != 200:
print(f"Error: snapshot_admin not available at {ADMIN_URL}")
print("Start server: python tools/run_snapshot_admin_server_v1.py")
sys.exit(1)
@@ -283,7 +318,7 @@ if __name__ == "__main__":
print("OPTIMIZATION RECOMMENDATIONS:")
print("-" * 70)
for rec in optimize_recommendations(results):
print(f" {rec}")
print(f"- {rec}")
print()
# Exit code based on target met