Files
QuantEngineByItz/tools/validate_snapshot_admin_web_v1.py
kjh2064 27730704ae
Snapshot Admin Web Validation / validate-snapshot-admin-smoke (push) Has been cancelled
Snapshot Admin Web Validation / validate-snapshot-admin-full (push) Has been cancelled
Quant Engine CI/CD Pipeline / validate-core (pull_request) Has been cancelled
Quant Engine CI/CD Pipeline / validate-ui-and-storage (pull_request) Has been cancelled
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (pull_request) Has been cancelled
test(validation): 토큰 위생 및 플랫폼 통합 검증 체계 고도화
2026-06-24 18:06:05 +09:00

279 lines
12 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import json
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
OUT = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
def _write_valid_seed(path: Path) -> None:
payload = {
"data": {
"settings": [
{"ordinal": 1, "key": "total_asset_krw", "value": 500000000, "note": "seed"},
{"ordinal": 2, "key": "settlement_cash_d2_krw", "value": 250000000, "note": "seed"},
],
"account_snapshot": [
{
"captured_at": "2026-06-22T11:15:47+09:00",
"account": "demo",
"account_type": "일반계좌",
"ticker": "005930",
"name": "삼성전자",
"holding_quantity": 10,
"average_cost": 70000,
"parse_status": "NOT_PROVIDED",
"position_type": "core",
}
],
}
}
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def _read_json(url: str) -> dict[str, Any]:
with urllib.request.urlopen(url, timeout=5) as response:
payload = response.read().decode("utf-8")
data = json.loads(payload)
return data if isinstance(data, dict) else {}
def _read_text(url: str) -> str:
with urllib.request.urlopen(url, timeout=5) as response:
return response.read().decode("utf-8")
def _post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=5) as response:
return json.loads(response.read().decode("utf-8"))
def _wait_for_server(url: str, timeout_s: float = 15.0) -> None:
deadline = time.time() + timeout_s
last_error: Exception | None = None
while time.time() < deadline:
try:
_read_text(url)
return
except Exception as exc: # noqa: BLE001
last_error = exc
time.sleep(0.25)
raise RuntimeError(f"server did not start: {last_error}")
def _pick_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def main() -> int:
port = _pick_free_port()
db_path = ROOT / "Temp" / "snapshot_admin_web_validation.db"
seed_path = ROOT / "Temp" / "snapshot_admin_web_validation_seed.json"
_write_valid_seed(seed_path)
server_cmd = [
sys.executable,
str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"),
"--host",
"127.0.0.1",
"--port",
str(port),
"--db",
str(db_path),
"--seed",
str(seed_path),
]
proc = subprocess.Popen(
server_cmd,
cwd=ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
)
base_url = f"http://127.0.0.1:{port}"
errors: list[str] = []
html = ""
state: dict[str, Any] = {}
try:
_wait_for_server(base_url)
home_html = _read_text(f"{base_url}/")
html = _read_text(f"{base_url}/workspace")
state = _read_json(f"{base_url}/api/state")
tables_payload = _read_json(f"{base_url}/api/tables")
export_payload = _read_json(f"{base_url}/api/export")
approval_packet = {
"formula_id": "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1",
"generated_at": state.get("generated_at") or "",
"summary": {
"settings_changed": 0,
"account_snapshot_changed": 0,
"pending_target_count": 0,
},
"pending_targets": [],
"diff_preview": {"settings": {"added": [], "removed": [], "changed": []}, "account_snapshot": {"added": [], "removed": [], "changed": []}},
"approvals": state.get("approval_rows", []),
"locks": state.get("locks", []),
"workspace": state.get("summary", {}),
}
packet_response = _post_json(f"{base_url}/api/approval_packet", {"packet": approval_packet})
if "Snapshot Admin Home" not in home_html:
errors.append("home_title_missing")
if "Open workspace" not in home_html or "Open collection" not in home_html:
errors.append("home_navigation_missing")
if "Snapshot Admin" not in html:
errors.append("html_title_missing")
if "contenteditable" not in html:
errors.append("sheet_editor_missing")
if "settings" not in html or "Account Snapshot" not in html:
errors.append("section_missing")
if "/api/settings/save" not in html or "/api/account_snapshot/save" not in html:
errors.append("api_binding_missing")
if "Approve pending" not in html or "Refresh diff" not in html:
errors.append("diff_or_approval_ui_missing")
if "Export approval packet" not in html:
errors.append("approval_packet_ui_missing")
if "Selection Inspector" not in html or "Apply TSV to selection" not in html or "Save view" not in html:
errors.append("sheet_facade_ui_missing")
if "Recent row history" not in html or "Ctrl+S" not in html:
errors.append("sheet_shortcuts_ui_missing")
if "KIS Collection" not in html or "collector:" not in html:
errors.append("collection_dashboard_ui_missing")
if "Recent collector snapshots" not in html or "Collection detail" not in html or "Filter runs / snapshots / errors" not in html:
errors.append("collection_detail_ui_missing")
if "Filter change log" not in html:
errors.append("change_log_filter_ui_missing")
if "Timeline" not in html or "/collection" not in html or "Open collection dashboard" not in html:
errors.append("collection_page_link_missing")
if "Open collection dashboard" not in html:
errors.append("collection_dashboard_link_missing")
tables_html = _read_text(f"{base_url}/tables")
if "tableSelect" not in tables_html or "saveCurrentTable" not in tables_html or "/api/domain_rows" not in tables_html:
errors.append("table_browser_split_missing")
if "Read only" not in tables_html or "Save current table" not in tables_html:
errors.append("table_browser_source_labels_missing")
collection_html = _read_text(f"{base_url}/collection")
if (
"KIS Collection Dashboard" not in collection_html
or "Download CSV" not in collection_html
or "Ticker quick search" not in collection_html
or "Date quick search" not in collection_html
or "collectionLiveStatus" not in collection_html
or "live source: unknown" not in collection_html
):
errors.append("collection_dashboard_page_missing")
if int(state.get("summary", {}).get("settings_rows") or 0) <= 0:
errors.append("settings_rows_missing")
if int(state.get("summary", {}).get("account_snapshot_rows") or 0) <= 0:
errors.append("account_snapshot_rows_missing")
topology = state.get("summary", {}).get("topology", {})
if not isinstance(topology, dict):
errors.append("topology_missing")
else:
if topology.get("mode") != "single_workspace_sqlite":
errors.append("topology_mode_invalid")
if not topology.get("settings_and_snapshot_share_db"):
errors.append("topology_workspace_split_invalid")
if not topology.get("collector_separate_db"):
errors.append("topology_collector_split_invalid")
if not isinstance(state.get("version"), dict) or not state.get("version", {}).get("app"):
errors.append("version_metadata_missing")
if not isinstance(state.get("collection"), dict):
errors.append("collection_state_missing")
if not isinstance(tables_payload.get("tables"), list):
errors.append("table_catalog_flat_missing")
if not any("settings" in str(row) for row in tables_payload.get("tables", [])):
errors.append("table_catalog_grouping_missing")
collection = state.get("collection", {})
if not isinstance(collection.get("counts"), dict):
errors.append("collection_counts_missing")
if "latest_report" not in collection:
errors.append("collection_latest_report_missing")
latest_report = collection.get("latest_report", {})
if isinstance(latest_report, dict):
if latest_report.get("input_json") and "GatherTradingData.json" not in str(latest_report.get("input_json")):
errors.append("collection_latest_report_input_mismatch")
if not latest_report.get("sqlite_db"):
errors.append("collection_latest_report_sqlite_missing")
if collection.get("output_json_path") and "kis_data_collection_v1.json" not in str(collection.get("output_json_path")):
errors.append("collection_output_json_path_mismatch")
if "data" not in export_payload:
errors.append("export_missing_data")
if packet_response.get("gate") != "PASS":
errors.append("approval_packet_export_failed")
packet_path = Path(packet_response.get("packet_path") or "")
md_path = Path(packet_response.get("md_path") or "")
if not packet_path.exists():
errors.append("approval_packet_json_missing")
if not md_path.exists():
errors.append("approval_packet_md_missing")
payload = {
"formula_id": "SNAPSHOT_ADMIN_WEB_VALIDATION_V1",
"gate": "PASS" if not errors else "FAIL",
"port": port,
"db_path": str(db_path),
"base_url": base_url,
"errors": errors,
"summary": state.get("summary", {}),
"version": state.get("version", {}),
"settings_rows": int(state.get("summary", {}).get("settings_rows") or 0),
"account_snapshot_rows": int(state.get("summary", {}).get("account_snapshot_rows") or 0),
"approval_packet_path": str(packet_path),
}
OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0 if payload["gate"] == "PASS" else 1
except urllib.error.URLError as exc:
errors.append(str(exc))
payload = {
"formula_id": "SNAPSHOT_ADMIN_WEB_VALIDATION_V1",
"gate": "FAIL",
"port": port,
"db_path": str(db_path),
"base_url": base_url,
"errors": errors,
"summary": state.get("summary", {}),
"version": state.get("version", {}),
}
OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 1
finally:
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=5)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
if proc.stdout is not None:
proc.stdout.close()
if __name__ == "__main__":
raise SystemExit(main())