#!/usr/bin/env python3 """ WBS-9.2: snapshot_admin 성능 벤치마크 도구 목표: 테이블 로드 시간 측정 및 최적화 기준 제시 - P99 < 2초 달성 - 동시 10개 테이블 PASS """ import time import json import requests import statistics from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from typing import Dict, List, Tuple import sys # Config ADMIN_URL = "http://localhost:5000/api" TABLES = [ "positions", "data_feed", "macro", "performance", "orders", "cash_positions", "portfolio_summary", "risk_metrics", "sector_allocation", "sector_flows" ] NUM_RUNS = 10 CONCURRENT_LIMIT = 10 P99_TARGET_MS = 2000 # 2초 class PerformanceBenchmark: def __init__(self, admin_url: str, tables: List[str]): self.admin_url = admin_url self.tables = tables self.results = { "timestamp": datetime.now().isoformat(), "tables": {}, "concurrent": {}, "summary": {} } def _call_table(self, table_name: str) -> Tuple[str, float, int]: """Call a single table API endpoint and return timing.""" url = f"{self.admin_url}/{table_name}" try: start = time.time() response = requests.get(url, timeout=5) elapsed_ms = (time.time() - start) * 1000 status = response.status_code return table_name, elapsed_ms, status except Exception as e: return table_name, None, 0 def benchmark_single_table(self, table_name: str, num_runs: int = NUM_RUNS): """Benchmark a single table with multiple runs.""" times = [] errors = 0 for i in range(num_runs): table, elapsed_ms, status = self._call_table(table_name) if status == 200 and elapsed_ms is not None: times.append(elapsed_ms) else: errors += 1 if not times: self.results["tables"][table_name] = { "status": "FAILED", "errors": errors, "runs": num_runs } return sorted_times = sorted(times) p99_idx = max(0, int(len(sorted_times) * 0.99) - 1) self.results["tables"][table_name] = { "status": "PASS" if sorted_times[-1] <= P99_TARGET_MS else "SLOW", "runs": num_runs, "min_ms": round(min(times), 2), "max_ms": round(max(times), 2), "mean_ms": round(statistics.mean(times), 2), "median_ms": round(statistics.median(times), 2), "stdev_ms": round(statistics.stdev(times) if len(times) > 1 else 0, 2), "p99_ms": round(sorted_times[p99_idx], 2), "errors": errors } def benchmark_concurrent(self, num_concurrent: int = CONCURRENT_LIMIT): """Benchmark concurrent table loads.""" concurrent_times = [] with ThreadPoolExecutor(max_workers=num_concurrent) as executor: futures = { executor.submit(self._call_table, table): table for table in self.tables } start = time.time() results_map = {} for future in as_completed(futures): table_name, elapsed_ms, status = future.result() if status == 200 and elapsed_ms is not None: results_map[table_name] = elapsed_ms concurrent_times.append(elapsed_ms) total_elapsed = (time.time() - start) * 1000 if concurrent_times: sorted_concurrent = sorted(concurrent_times) p99_idx = max(0, int(len(sorted_concurrent) * 0.99) - 1) self.results["concurrent"]["parallel_load"] = { "num_concurrent": num_concurrent, "num_tables": len(results_map), "total_wall_time_ms": round(total_elapsed, 2), "min_table_ms": round(min(concurrent_times), 2), "max_table_ms": round(max(concurrent_times), 2), "p99_table_ms": round(sorted_concurrent[p99_idx], 2), "per_table_times": {k: round(v, 2) for k, v in results_map.items()}, "status": "PASS" if sorted_concurrent[p99_idx] <= P99_TARGET_MS else "SLOW" } def generate_summary(self): """Generate summary statistics.""" table_results = self.results["tables"] passed = sum(1 for r in table_results.values() if r.get("status") == "PASS") failed = sum(1 for r in table_results.values() if r.get("status") in ["SLOW", "FAILED"]) all_p99_times = [ r["p99_ms"] for r in table_results.values() if "p99_ms" in r ] self.results["summary"] = { "total_tables": len(table_results), "passed": passed, "failed": failed, "overall_status": "PASS" if failed == 0 else "NEEDS_OPTIMIZATION", "max_p99_ms": max(all_p99_times) if all_p99_times else None, "p99_target_ms": P99_TARGET_MS, "target_met": max(all_p99_times or [0]) <= P99_TARGET_MS } def print_report(self): """Print formatted report.""" print("\n" + "=" * 70) print("SNAPSHOT_ADMIN PERFORMANCE BENCHMARK REPORT") print("=" * 70) print(f"Timestamp: {self.results['timestamp']}") print(f"Target P99: {P99_TARGET_MS}ms (< 2 seconds)\n") # Individual table results print("TABLE PERFORMANCE:") print("-" * 70) for table_name in sorted(self.results["tables"].keys()): r = self.results["tables"][table_name] if r["status"] in ["PASS", "SLOW"]: status_marker = "✓" if r["status"] == "PASS" else "⚠" print(f"{status_marker} {table_name:25} P99: {r['p99_ms']:7.2f}ms " f"(mean: {r['mean_ms']:7.2f}ms, max: {r['max_ms']:7.2f}ms)") else: print(f"✗ {table_name:25} FAILED ({r['errors']} errors)") # Concurrent performance if "parallel_load" in self.results["concurrent"]: c = self.results["concurrent"]["parallel_load"] print("\nCONCURRENT LOAD ({} tables):".format(c["num_concurrent"])) print("-" * 70) print(f"Wall time: {c['total_wall_time_ms']:.2f}ms") print(f"Table P99: {c['p99_table_ms']:.2f}ms (max single: {c['max_table_ms']:.2f}ms)") print(f"Status: {'PASS' if c['status'] == 'PASS' else 'NEEDS_OPTIMIZATION'}") # Summary s = self.results["summary"] print("\nSUMMARY:") print("-" * 70) print(f"Status: {s['overall_status']}") print(f"Passed: {s['passed']}/{s['total_tables']} tables") print(f"Max P99: {s['max_p99_ms']:.2f}ms (target: {s['p99_target_ms']}ms)") print(f"Target Met: {'YES ✓' if s['target_met'] else 'NO ✗ (optimization needed)'}") print("=" * 70 + "\n") def save_report(self, output_file: str = None): """Save report to JSON file.""" if not output_file: output_file = f"Temp/benchmark_snapshot_admin_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" Path(output_file).parent.mkdir(parents=True, exist_ok=True) with open(output_file, 'w') as f: json.dump(self.results, f, indent=2) print(f"Report saved: {output_file}") def run_full_benchmark(self): """Run complete benchmark suite.""" print("Starting snapshot_admin performance benchmark...") print(f"Target: P99 < {P99_TARGET_MS}ms") print(f"Tables: {', '.join(self.tables)}") print(f"Runs per table: {NUM_RUNS}\n") # Single table benchmark print("Phase 1: Single table performance...") for table in self.tables: self.benchmark_single_table(table, NUM_RUNS) print(f" ✓ {table}") # Concurrent benchmark print(f"\nPhase 2: Concurrent load ({CONCURRENT_LIMIT} tables)...") self.benchmark_concurrent(CONCURRENT_LIMIT) # Generate summary self.generate_summary() # Output self.print_report() self.save_report() return self.results def optimize_recommendations(results: Dict) -> List[str]: """Generate optimization recommendations.""" recommendations = [] summary = results.get("summary", {}) if not summary.get("target_met"): max_p99 = summary.get("max_p99_ms") if max_p99 and max_p99 > P99_TARGET_MS: ratio = max_p99 / P99_TARGET_MS if ratio > 2: recommendations.append( f"Critical: P99 is {ratio:.1f}x target. " "Consider table indexing, materialized views, or caching." ) elif ratio > 1.2: recommendations.append( f"P99 exceeds target by {(ratio-1)*100:.0f}%. " "Optimize database queries or add caching layer." ) # Check specific slow tables for table, result in results.get("tables", {}).items(): if result.get("status") == "SLOW" and "p99_ms" in result: p99 = result["p99_ms"] if p99 > 3000: recommendations.append( f"Table '{table}': {p99:.0f}ms. Consider reducing data volume or adding indexes." ) if not recommendations: recommendations.append("✓ Performance meets targets. Continue monitoring.") return recommendations if __name__ == "__main__": try: # Check server availability response = requests.head(f"{ADMIN_URL}/health", timeout=2) if response.status_code not in [200, 404]: print(f"Error: snapshot_admin not available at {ADMIN_URL}") print("Start server: python tools/run_snapshot_admin_server_v1.py") sys.exit(1) except Exception as e: print(f"Error connecting to {ADMIN_URL}: {e}") print("Start server: python tools/run_snapshot_admin_server_v1.py") sys.exit(1) # Run benchmark benchmark = PerformanceBenchmark(ADMIN_URL, TABLES) results = benchmark.run_full_benchmark() # Print recommendations print("OPTIMIZATION RECOMMENDATIONS:") print("-" * 70) for rec in optimize_recommendations(results): print(f"• {rec}") print() # Exit code based on target met sys.exit(0 if results["summary"]["target_met"] else 1)