diff --git a/.gitea/workflows/snapshot_admin.yml b/.gitea/workflows/snapshot_admin.yml
new file mode 100644
index 0000000..dbbfc6f
--- /dev/null
+++ b/.gitea/workflows/snapshot_admin.yml
@@ -0,0 +1,44 @@
+name: Snapshot Admin Web Validation
+
+on:
+ workflow_dispatch:
+ push:
+ paths:
+ - "src/quant_engine/snapshot_admin_server_v1.py"
+ - "src/quant_engine/snapshot_admin_store_v1.py"
+ - "tools/run_snapshot_admin_server_v1.py"
+ - "tools/validate_snapshot_admin_workflow_v1.py"
+ - "tools/validate_snapshot_admin_web_v1.py"
+ - "spec/15_account_snapshot_contract.yaml"
+ - "spec/18_settings_contract.yaml"
+ - "GatherTradingData.json"
+
+jobs:
+ validate-snapshot-admin:
+ runs-on: self-hosted
+ steps:
+ - name: Checkout Code
+ run: |
+ if [ -d .git ]; then
+ git remote set-url origin http://x-access-token:${{ secrets.GITHUB_TOKEN }}@192.168.123.100:8418/KimJaeHyun/myfinance.git
+ else
+ git init
+ git remote add origin http://x-access-token:${{ secrets.GITHUB_TOKEN }}@192.168.123.100:8418/KimJaeHyun/myfinance.git
+ fi
+ git fetch origin main --depth=1
+ git reset --hard FETCH_HEAD
+
+ - name: Validate Snapshot Admin Workflow
+ run: python3 tools/validate_snapshot_admin_workflow_v1.py
+
+ - name: Validate Snapshot Admin Web UI
+ run: python3 tools/validate_snapshot_admin_web_v1.py
+
+ - name: Notify Run Result
+ if: always()
+ run: |
+ STATUS="${{ job.status }}"
+ echo "=== Snapshot Admin Web Validation ==="
+ echo "status: $STATUS"
+ echo "workflow validation: Temp/snapshot_admin_workflow_v1.json"
+ echo "web validation: Temp/snapshot_admin_web_validation_v1.json"
diff --git a/src/quant_engine/snapshot_admin_server_v1.py b/src/quant_engine/snapshot_admin_server_v1.py
new file mode 100644
index 0000000..a9f2663
--- /dev/null
+++ b/src/quant_engine/snapshot_admin_server_v1.py
@@ -0,0 +1,3007 @@
+from __future__ import annotations
+
+import argparse
+import json
+import sqlite3
+import subprocess
+from http import HTTPStatus
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+from pathlib import Path
+from hashlib import sha256
+from typing import Any
+from urllib.parse import urlparse, parse_qs
+
+ROOT = Path(__file__).resolve().parents[2]
+SNAPSHOT_ADMIN_VERSION = "snapshot-admin-web-v6"
+KIS_COLLECTION_DB = ROOT / "outputs" / "kis_data_collection" / "kis_data_collection.db"
+KIS_COLLECTION_REPORT = ROOT / "Temp" / "kis_data_collection_v1.json"
+QUALITATIVE_SELL_DB = ROOT / "outputs" / "qualitative_sell_strategy" / "qualitative_sell_strategy.db"
+
+# WBS-7.9 부속 — 테이블별 그리드 조회(Tabler). 화이트리스트에 없는 테이블명은
+# SQL에 절대 보간되지 않는다(요청 테이블명을 그대로 SELECT 문에 넣지 않고
+# 아래 레지스트리 키와 정확히 일치할 때만 허용).
+WORKSPACE_BROWSABLE_TABLES = (
+ "settings",
+ "account_snapshot",
+ "workspace_change_log",
+ "workspace_approval_v2",
+ "workspace_lock",
+ "workspace_meta",
+)
+COLLECTION_BROWSABLE_TABLES = (
+ "collection_runs",
+ "collection_snapshots",
+ "collection_source_errors",
+)
+QUALITATIVE_SELL_BROWSABLE_TABLES = (
+ "sell_strategy_results",
+ "satellite_recommendations",
+)
+
+
+def _resolve_table_db(table: str, workspace_db_path: Path) -> Path | None:
+ if table in WORKSPACE_BROWSABLE_TABLES:
+ return Path(workspace_db_path)
+ if table in COLLECTION_BROWSABLE_TABLES:
+ return KIS_COLLECTION_DB
+ if table in QUALITATIVE_SELL_BROWSABLE_TABLES:
+ return QUALITATIVE_SELL_DB
+ return None
+
+
+def list_browsable_tables(workspace_db_path: Path) -> list[dict[str, Any]]:
+ tables: list[dict[str, Any]] = []
+ for table in (
+ *WORKSPACE_BROWSABLE_TABLES,
+ *COLLECTION_BROWSABLE_TABLES,
+ *QUALITATIVE_SELL_BROWSABLE_TABLES,
+ ):
+ db_path = _resolve_table_db(table, workspace_db_path)
+ exists = bool(db_path and db_path.exists())
+ row_count = 0
+ if exists:
+ try:
+ with sqlite3.connect(db_path) as conn:
+ row_count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - table is whitelist-checked above
+ except sqlite3.OperationalError:
+ exists = False
+ tables.append({"table": table, "db": str(db_path) if db_path else "", "exists": exists, "row_count": row_count})
+ return tables
+
+
+def fetch_table_rows(table: str, workspace_db_path: Path, *, limit: int = 50, offset: int = 0) -> dict[str, Any]:
+ db_path = _resolve_table_db(table, workspace_db_path)
+ if db_path is None:
+ raise ValueError(f"unknown or non-browsable table: {table}")
+ if not db_path.exists():
+ return {"table": table, "db": str(db_path), "columns": [], "rows": [], "total": 0, "limit": limit, "offset": offset}
+ with sqlite3.connect(db_path) as conn:
+ conn.row_factory = sqlite3.Row
+ total = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - whitelisted table name
+ cursor = conn.execute(
+ f"SELECT * FROM {table} ORDER BY rowid DESC LIMIT ? OFFSET ?", # noqa: S608 - whitelisted table name
+ (limit, offset),
+ )
+ rows = [dict(row) for row in cursor.fetchall()]
+ columns = [description[0] for description in cursor.description] if cursor.description else []
+ return {"table": table, "db": str(db_path), "columns": columns, "rows": rows, "total": total, "limit": limit, "offset": offset}
+SNAPSHOT_ADMIN_VERSION_FILES = (
+ ROOT / "src" / "quant_engine" / "snapshot_admin_server_v1.py",
+ ROOT / "src" / "quant_engine" / "snapshot_admin_store_v1.py",
+ ROOT / "src" / "quant_engine" / "data_collection_store_v1.py",
+ ROOT / "tools" / "run_snapshot_admin_server_v1.py",
+ ROOT / "tools" / "validate_snapshot_admin_web_v1.py",
+ ROOT / "tests" / "unit" / "test_snapshot_admin_web_v1.py",
+ ROOT / "package.json",
+)
+
+from .snapshot_admin_store_v1 import (
+ ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS,
+ DEFAULT_DB,
+ DEFAULT_SEED_JSON,
+ export_payload,
+ clear_lock,
+ import_seed_json,
+ is_locked,
+ load_account_snapshot_rows,
+ load_approval_for_domain,
+ load_approval_rows,
+ load_change_log_rows,
+ load_locks,
+ load_settings_rows,
+ normalize_db_path,
+ now_kst_iso,
+ open_connection,
+ parse_account_snapshot_tsv,
+ parse_scalar,
+ record_change_log,
+ validate_account_snapshot_rows,
+ validate_settings_rows,
+ build_validation_suggestions,
+ build_safe_autofix_actions,
+ apply_safe_autofix_action,
+ lock_conflicts_for_rows,
+ set_approval,
+ set_lock,
+ replace_account_snapshot,
+ replace_settings,
+ undo_last_change,
+ summarize_workspace,
+)
+from .data_collection_store_v1 import load_collection_dashboard_state
+
+
+def _strip_internal_fields(row: dict[str, Any]) -> dict[str, Any]:
+ return {key: value for key, value in row.items() if not key.startswith("_")}
+
+
+def _snapshot_columns_from_rows(rows: list[dict[str, Any]]) -> list[str]:
+ columns = list(ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS)
+ extras = sorted(
+ {
+ key
+ for row in rows
+ for key in row.keys()
+ if not key.startswith("_") and key not in ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS
+ }
+ )
+ for key in extras:
+ if key not in columns:
+ columns.append(key)
+ return columns
+
+
+def _write_json(path: Path, payload: dict[str, Any]) -> Path:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+ return path
+
+
+def _render_approval_packet_md(packet: dict[str, Any]) -> str:
+ pending = packet.get("pending_targets") if isinstance(packet.get("pending_targets"), list) else []
+ summary = packet.get("summary") if isinstance(packet.get("summary"), dict) else {}
+ lines = [
+ "# Snapshot Admin Approval Packet",
+ "",
+ "## Summary",
+ "",
+ f"- settings_changed: {summary.get('settings_changed', 0)}",
+ f"- account_snapshot_changed: {summary.get('account_snapshot_changed', 0)}",
+ f"- pending_target_count: {summary.get('pending_target_count', 0)}",
+ "",
+ "## Pending Targets",
+ "",
+ ]
+ if pending:
+ for item in pending[:100]:
+ if not isinstance(item, dict):
+ continue
+ lines.append(f"- {item.get('domain', '')}:{item.get('target_ref', '')} ({item.get('change_type', '')})")
+ else:
+ lines.append("_none_")
+ return "\n".join(lines)
+
+
+def write_approval_packet_artifacts(packet: dict[str, Any]) -> dict[str, str]:
+ json_path = ROOT / "Temp" / "snapshot_admin_approval_packet_v1.json"
+ md_path = ROOT / "Temp" / "snapshot_admin_approval_packet_v1.md"
+ _write_json(json_path, packet)
+ md_path.parent.mkdir(parents=True, exist_ok=True)
+ md_path.write_text(_render_approval_packet_md(packet), encoding="utf-8")
+ return {"json_path": str(json_path), "md_path": str(md_path)}
+
+
+def _git_info() -> dict[str, Any]:
+ try:
+ commit = subprocess.check_output(
+ ["git", "rev-parse", "--short", "HEAD"],
+ cwd=str(ROOT),
+ text=True,
+ stderr=subprocess.DEVNULL,
+ ).strip()
+ status = subprocess.check_output(
+ ["git", "status", "--porcelain"],
+ cwd=str(ROOT),
+ text=True,
+ stderr=subprocess.DEVNULL,
+ )
+ return {
+ "commit": commit,
+ "dirty": bool(status.strip()),
+ "tree_state": "DIRTY" if status.strip() else "CLEAN",
+ }
+ except Exception:
+ return {
+ "commit": "",
+ "dirty": False,
+ "tree_state": "UNKNOWN",
+ }
+
+
+def _source_fingerprint() -> dict[str, Any]:
+ digest = sha256()
+ latest_mtime = 0.0
+ for path in SNAPSHOT_ADMIN_VERSION_FILES:
+ if not path.exists():
+ continue
+ try:
+ data = path.read_bytes()
+ digest.update(path.as_posix().encode("utf-8"))
+ digest.update(b"\0")
+ digest.update(data)
+ latest_mtime = max(latest_mtime, path.stat().st_mtime)
+ except OSError:
+ continue
+ return {
+ "fingerprint": digest.hexdigest()[:16],
+ "latest_mtime": latest_mtime,
+ }
+
+
+def _approval_entry_from_conn(conn, domain: str, target_ref: str = "*") -> dict[str, Any] | None:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"""
+ SELECT domain, target_ref, status, approved_by, approved_at, note, updated_at
+ FROM {APPROVAL_TABLE}
+ WHERE domain = ? AND target_ref = ?
+ LIMIT 1
+ """,
+ (domain, target_ref or "*"),
+ ).fetchone()
+ return dict(row) if row is not None else None
+
+
+def _lock_entry_from_conn(conn, domain: str, target_ref: str = "*") -> dict[str, Any] | None:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"""
+ SELECT domain, target_ref, locked_by, reason, locked_at
+ FROM {LOCK_TABLE}
+ WHERE domain = ? AND target_ref = ?
+ LIMIT 1
+ """,
+ (domain, target_ref or "*"),
+ ).fetchone()
+ return dict(row) if row is not None else None
+
+
+def build_ui_state(db_path: Path | str | None = None) -> dict[str, Any]:
+ summary = summarize_workspace(db_path)
+ settings_rows = load_settings_rows(db_path)
+ account_rows = [_strip_internal_fields(row) for row in load_account_snapshot_rows(db_path)]
+ settings_errors = validate_settings_rows(settings_rows)
+ snapshot_errors = validate_account_snapshot_rows(account_rows)
+ suggestions = build_validation_suggestions(settings_rows, account_rows)
+ autofix_actions = build_safe_autofix_actions(settings_rows, account_rows)
+ collection = load_collection_dashboard_state(KIS_COLLECTION_DB, KIS_COLLECTION_REPORT)
+ return {
+ "version": {
+ "app": SNAPSHOT_ADMIN_VERSION,
+ "git": _git_info(),
+ "source": _source_fingerprint(),
+ },
+ "summary": summary,
+ "approval_rows": load_approval_rows(db_path),
+ "approval_settings": load_approval_for_domain(db_path, "settings"),
+ "approval_account_snapshot": load_approval_for_domain(db_path, "account_snapshot"),
+ "locks": load_locks(db_path),
+ "recent_changes": load_change_log_rows(db_path, limit=12),
+ "history_counts": {
+ "changes": len(load_change_log_rows(db_path, limit=200)),
+ "approvals": len(load_approval_rows(db_path)),
+ "locks": len(load_locks(db_path)),
+ },
+ "settings_rows": settings_rows,
+ "account_snapshot_rows": account_rows,
+ "account_snapshot_columns": _snapshot_columns_from_rows(account_rows),
+ "validation": {
+ "settings": settings_errors,
+ "account_snapshot": snapshot_errors,
+ "suggestions": suggestions,
+ },
+ "autofix_actions": autofix_actions,
+ "collection": collection,
+ "generated_at": now_kst_iso(),
+ }
+
+
+def _json_response(handler: BaseHTTPRequestHandler, status: int, payload: Any) -> None:
+ body = json.dumps(payload, ensure_ascii=False, indent=2).encode("utf-8")
+ handler.send_response(status)
+ handler.send_header("Content-Type", "application/json; charset=utf-8")
+ handler.send_header("Content-Length", str(len(body)))
+ handler.end_headers()
+ handler.wfile.write(body)
+
+
+def _text_response(handler: BaseHTTPRequestHandler, status: int, text: str, content_type: str = "text/plain; charset=utf-8") -> None:
+ body = text.encode("utf-8")
+ handler.send_response(status)
+ handler.send_header("Content-Type", content_type)
+ handler.send_header("Content-Length", str(len(body)))
+ handler.end_headers()
+ handler.wfile.write(body)
+
+
+def _read_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]:
+ length = int(handler.headers.get("Content-Length") or "0")
+ raw = handler.rfile.read(length).decode("utf-8") if length else "{}"
+ payload = json.loads(raw or "{}")
+ if not isinstance(payload, dict):
+ raise ValueError("JSON body must be an object")
+ return payload
+
+
+def render_index_html() -> str:
+ return """
+
+
+
+
+ Snapshot Admin
+
+
+
+
+
+
+
+
+
Workspace
+
+
+
+
+
+
+
+
+
+ Loading...
+
+
+
+
Validation
+
+
Suggestions
+
+
+
+
+
+
+
+
+
+
Approval & Locks
+
+
+
+
+
+
+
+
+
+
+
+
+
settings approval
+
snapshot approval
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Recent change log
+
+
+
+
+
+
Timeline
+
+
+
+
+
+
+
+
+
KIS Collection
+
+
+
+
+
+
+
+
+
collection: loading...
+
+
+
+
+
+
+
+
+
Recent collector runs
+
+
Recent collector snapshots
+
+
Recent collector errors
+
+
Collection detail
+
+
+
+
+
+
+
+
+
Selection Inspector
+
+
+
+
+
+
+
+
+
+
+
+
No row selected.
+
+
Recent row history
+
+
+
+
Batch paste
+
+
+
+
+
+
Tip: clipboard paste still works directly in the grid. This panel is for multi-row batch edit against the selected row.
+
Shortcuts: `Ctrl+S` save current domain, `Ctrl+Enter` save current domain, `Delete` remove selected row.
+
+
+
+
+
+
+
+
Settings
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Account Snapshot
+
+
+
+
+
+
+ Paste TSV below and replace all rows
+ Canonical column order follows spec/15_account_snapshot_contract.yaml
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+def render_collection_html() -> str:
+ return """
+
+
+
+
+ KIS Collection Dashboard
+
+
+
+
+ KIS Collection Dashboard
+ Separate read-only view for KIS collection run, snapshots, errors, and raw JSON evidence.
+
+
+
+
+
+
collection: loading...
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Recent collector runs
+
+
Recent collector snapshots
+
+
Recent collector errors
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+def render_tables_html() -> str:
+ return """
+
+
+
+
+ Snapshot Admin — Table Browser
+
+
+
+
+
+
+
Snapshot Admin — Table Browser
+
+
+
+
+
+
+
+
+"""
+
+
+class SnapshotAdminHandler(BaseHTTPRequestHandler):
+ db_path: Path = DEFAULT_DB
+ seed_json_path: Path = DEFAULT_SEED_JSON
+
+ def log_message(self, format: str, *args: Any) -> None: # noqa: A003
+ return
+
+ def _handle_exception(self, exc: Exception) -> None:
+ _json_response(self, HTTPStatus.INTERNAL_SERVER_ERROR, {"detail": str(exc)})
+
+ def do_GET(self) -> None: # noqa: N802
+ parsed = urlparse(self.path)
+ if parsed.path == "/":
+ _text_response(self, HTTPStatus.OK, render_index_html(), "text/html; charset=utf-8")
+ return
+ if parsed.path == "/collection":
+ _text_response(self, HTTPStatus.OK, render_collection_html(), "text/html; charset=utf-8")
+ return
+ if parsed.path == "/tables":
+ _text_response(self, HTTPStatus.OK, render_tables_html(), "text/html; charset=utf-8")
+ return
+ if parsed.path == "/api/tables":
+ _json_response(self, HTTPStatus.OK, {"tables": list_browsable_tables(self.db_path)})
+ return
+ if parsed.path == "/api/table_rows":
+ query = parse_qs(parsed.query)
+ table = (query.get("table") or [""])[0]
+ try:
+ limit = int((query.get("limit") or ["50"])[0])
+ offset = int((query.get("offset") or ["0"])[0])
+ except ValueError:
+ _json_response(self, HTTPStatus.BAD_REQUEST, {"detail": "limit/offset must be integers"})
+ return
+ limit = min(max(limit, 1), 500)
+ offset = max(offset, 0)
+ try:
+ payload = fetch_table_rows(table, self.db_path, limit=limit, offset=offset)
+ except ValueError as exc:
+ _json_response(self, HTTPStatus.BAD_REQUEST, {"detail": str(exc)})
+ return
+ _json_response(self, HTTPStatus.OK, payload)
+ return
+ if parsed.path == "/api/state":
+ _json_response(self, HTTPStatus.OK, build_ui_state(self.db_path))
+ return
+ if parsed.path == "/api/history":
+ _json_response(
+ self,
+ HTTPStatus.OK,
+ {
+ "settings": load_change_log_rows(self.db_path, limit=25),
+ "approvals": load_approval_rows(self.db_path),
+ "locks": load_locks(self.db_path),
+ },
+ )
+ return
+ if parsed.path == "/api/export":
+ _text_response(
+ self,
+ HTTPStatus.OK,
+ json.dumps(export_payload(self.db_path), ensure_ascii=False, indent=2),
+ "application/json; charset=utf-8",
+ )
+ return
+ if parsed.path == "/favicon.ico":
+ _text_response(self, HTTPStatus.NO_CONTENT, "")
+ return
+ _json_response(self, HTTPStatus.NOT_FOUND, {"detail": "not found"})
+
+ def do_POST(self) -> None: # noqa: N802
+ parsed = urlparse(self.path)
+ try:
+ if parsed.path == "/api/bootstrap":
+ summary = import_seed_json(self.db_path, self.seed_json_path)
+ _json_response(self, HTTPStatus.OK, summary)
+ return
+ payload = _read_json_body(self)
+ if parsed.path == "/api/settings/save":
+ if is_locked(self.db_path, "settings"):
+ raise ValueError("settings are locked")
+ rows = payload.get("rows")
+ if not isinstance(rows, list):
+ raise ValueError("rows must be a list")
+ normalized_rows = []
+ for idx, row in enumerate(rows, start=1):
+ if not isinstance(row, dict):
+ continue
+ key = str(row.get("key") or "").strip()
+ if not key:
+ continue
+ normalized_rows.append(
+ {
+ "ordinal": idx,
+ "key": key,
+ "value": row.get("value", ""),
+ "note": str(row.get("note") or ""),
+ }
+ )
+ conflicts = lock_conflicts_for_rows(self.db_path, "settings", normalized_rows)
+ if conflicts:
+ refs = ", ".join(sorted({str(item.get("target_ref") or "") for item in conflicts if item.get("target_ref")}))
+ raise ValueError(f"settings lock conflict: {refs}")
+ with open_connection(self.db_path) as conn:
+ replace_settings(conn, normalized_rows)
+ _json_response(self, HTTPStatus.OK, summarize_workspace(self.db_path))
+ return
+ if parsed.path == "/api/account_snapshot/save":
+ if is_locked(self.db_path, "account_snapshot"):
+ raise ValueError("account_snapshot is locked")
+ rows = payload.get("rows")
+ if not isinstance(rows, list):
+ raise ValueError("rows must be a list")
+ normalized_rows: list[dict[str, Any]] = []
+ for idx, row in enumerate(rows, start=1):
+ if not isinstance(row, dict):
+ continue
+ candidate = {key: value for key, value in row.items() if not key.startswith("_")}
+ candidate["ordinal"] = idx
+ normalized_rows.append(candidate)
+ conflicts = lock_conflicts_for_rows(self.db_path, "account_snapshot", normalized_rows)
+ if conflicts:
+ refs = ", ".join(sorted({str(item.get("target_ref") or "") for item in conflicts if item.get("target_ref")}))
+ raise ValueError(f"account_snapshot lock conflict: {refs}")
+ with open_connection(self.db_path) as conn:
+ replace_account_snapshot(conn, normalized_rows)
+ _json_response(self, HTTPStatus.OK, summarize_workspace(self.db_path))
+ return
+ if parsed.path == "/api/account_snapshot/import_tsv":
+ if is_locked(self.db_path, "account_snapshot"):
+ raise ValueError("account_snapshot is locked")
+ tsv_text = str(payload.get("tsv") or "")
+ rows = parse_account_snapshot_tsv(tsv_text)
+ with open_connection(self.db_path) as conn:
+ replace_account_snapshot(conn, rows)
+ _json_response(self, HTTPStatus.OK, summarize_workspace(self.db_path))
+ return
+ if parsed.path == "/api/approval_packet":
+ packet = payload.get("packet")
+ if not isinstance(packet, dict):
+ raise ValueError("packet must be an object")
+ artifacts = write_approval_packet_artifacts(packet)
+ response = {
+ "gate": "PASS",
+ "packet_path": artifacts["json_path"],
+ "md_path": artifacts["md_path"],
+ "formula_id": packet.get("formula_id", "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1"),
+ }
+ _json_response(self, HTTPStatus.OK, response)
+ return
+ if parsed.path == "/api/approve":
+ domain = str(payload.get("domain") or "")
+ if domain not in {"settings", "account_snapshot"}:
+ raise ValueError("domain must be settings or account_snapshot")
+ target_ref = str(payload.get("target_ref") or "*")
+ with open_connection(self.db_path) as conn:
+ before = _approval_entry_from_conn(conn, domain, target_ref)
+ set_approval(conn, domain, "APPROVED", target_ref=target_ref, approved_by="ui", note="manual approval")
+ after = _approval_entry_from_conn(conn, domain, target_ref)
+ record_change_log(
+ conn,
+ domain=domain,
+ action="approve",
+ target_ref=target_ref,
+ before_json=before,
+ after_json=after,
+ actor="ui",
+ note="manual approval",
+ )
+ conn.commit()
+ _json_response(self, HTTPStatus.OK, {"domain": domain, "target_ref": target_ref, "status": "APPROVED"})
+ return
+ if parsed.path == "/api/lock":
+ domain = str(payload.get("domain") or "")
+ target_ref = str(payload.get("target_ref") or "*")
+ if domain not in {"settings", "account_snapshot"}:
+ raise ValueError("domain must be settings or account_snapshot")
+ with open_connection(self.db_path) as conn:
+ before = _lock_entry_from_conn(conn, domain, target_ref)
+ set_lock(conn, domain, target_ref, locked_by="ui", reason="manual lock")
+ after = _lock_entry_from_conn(conn, domain, target_ref)
+ record_change_log(
+ conn,
+ domain=domain,
+ action="lock",
+ target_ref=target_ref,
+ before_json=before,
+ after_json=after,
+ actor="ui",
+ note="manual lock",
+ )
+ conn.commit()
+ _json_response(self, HTTPStatus.OK, {"domain": domain, "target_ref": target_ref, "status": "LOCKED"})
+ return
+ if parsed.path == "/api/unlock":
+ domain = str(payload.get("domain") or "")
+ target_ref = str(payload.get("target_ref") or "*")
+ if domain not in {"settings", "account_snapshot"}:
+ raise ValueError("domain must be settings or account_snapshot")
+ with open_connection(self.db_path) as conn:
+ before = _lock_entry_from_conn(conn, domain, target_ref)
+ clear_lock(conn, domain, target_ref)
+ after = _lock_entry_from_conn(conn, domain, target_ref)
+ record_change_log(
+ conn,
+ domain=domain,
+ action="unlock",
+ target_ref=target_ref,
+ before_json=before,
+ after_json=after,
+ actor="ui",
+ note="manual unlock",
+ )
+ conn.commit()
+ _json_response(self, HTTPStatus.OK, {"domain": domain, "target_ref": target_ref, "status": "UNLOCKED"})
+ return
+ if parsed.path == "/api/undo":
+ domain = str(payload.get("domain") or "")
+ if domain not in {"settings", "account_snapshot"}:
+ raise ValueError("domain must be settings or account_snapshot")
+ if is_locked(self.db_path, domain):
+ raise ValueError(f"{domain} is locked")
+ with open_connection(self.db_path) as conn:
+ result = undo_last_change(conn, domain, actor="ui")
+ _json_response(self, HTTPStatus.OK, result if result else {"domain": domain, "status": "UNDONE"})
+ return
+ if parsed.path == "/api/autofix":
+ action_id = str(payload.get("action_id") or "")
+ if not action_id:
+ raise ValueError("action_id required")
+ with open_connection(self.db_path) as conn:
+ result = apply_safe_autofix_action(conn, action_id, actor="ui")
+ _json_response(self, HTTPStatus.OK, result)
+ return
+ _json_response(self, HTTPStatus.NOT_FOUND, {"detail": "not found"})
+ except Exception as exc: # noqa: BLE001
+ self._handle_exception(exc)
+
+
+def serve(host: str, port: int, db_path: Path | str | None = None, seed_json_path: Path | str | None = None, bootstrap: bool = True) -> None:
+ db = normalize_db_path(db_path)
+ seed = Path(seed_json_path) if seed_json_path else DEFAULT_SEED_JSON
+ if bootstrap and seed.exists():
+ with open_connection(db) as conn:
+ from .snapshot_admin_store_v1 import ensure_schema
+
+ ensure_schema(conn)
+ if summarize_workspace(db)["settings_rows"] == 0 and summarize_workspace(db)["account_snapshot_rows"] == 0:
+ import_seed_json(db, seed)
+ SnapshotAdminHandler.db_path = db
+ SnapshotAdminHandler.seed_json_path = seed
+ server = ThreadingHTTPServer((host, port), SnapshotAdminHandler)
+ print(f"Snapshot Admin listening on http://{host}:{port}")
+ print(f"SQLite DB: {db}")
+ print(f"Seed JSON: {seed}")
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ pass
+ finally:
+ server.server_close()
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Run the snapshot admin web server.")
+ parser.add_argument("--host", default="127.0.0.1")
+ parser.add_argument("--port", type=int, default=8787)
+ parser.add_argument("--db", type=Path, default=DEFAULT_DB)
+ parser.add_argument("--seed", type=Path, default=DEFAULT_SEED_JSON)
+ parser.add_argument("--no-bootstrap", action="store_true")
+ args = parser.parse_args()
+ serve(args.host, args.port, args.db, args.seed, bootstrap=not args.no_bootstrap)
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/src/quant_engine/snapshot_admin_store_v1.py b/src/quant_engine/snapshot_admin_store_v1.py
new file mode 100644
index 0000000..4b40b0f
--- /dev/null
+++ b/src/quant_engine/snapshot_admin_store_v1.py
@@ -0,0 +1,993 @@
+from __future__ import annotations
+
+import json
+import re
+import sqlite3
+from datetime import datetime
+from functools import lru_cache
+from pathlib import Path
+from typing import Any
+from zoneinfo import ZoneInfo
+
+import yaml
+
+
+ROOT = Path(__file__).resolve().parents[2]
+DEFAULT_DB = ROOT / "outputs" / "snapshot_admin" / "snapshot_admin.db"
+DEFAULT_SEED_JSON = ROOT / "GatherTradingData.json"
+KST = ZoneInfo("Asia/Seoul")
+
+SETTINGS_TABLE = "settings"
+SNAPSHOT_TABLE = "account_snapshot"
+CHANGE_LOG_TABLE = "workspace_change_log"
+APPROVAL_TABLE = "workspace_approval_v2"
+LOCK_TABLE = "workspace_lock"
+
+ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS = [
+ "captured_at",
+ "account",
+ "account_type",
+ "ticker",
+ "name",
+ "holding_quantity",
+ "available_quantity",
+ "average_cost",
+ "total_cost",
+ "current_price",
+ "market_value",
+ "profit_loss",
+ "return_pct",
+ "immediate_cash",
+ "settlement_cash_d2",
+ "available_cash",
+ "open_order_amount",
+ "monthly_contribution_limit",
+ "monthly_contribution_used",
+ "parse_status",
+ "user_confirmed",
+ "stop_price",
+ "highest_price_since_entry",
+ "entry_date",
+ "entry_stage",
+ "position_type",
+ "last_updated",
+]
+
+ALLOWED_PARSE_STATUS = {
+ "CAPTURE_READ_OK",
+ "CAPTURE_READ_FAILED",
+ "CAPTURE_PROVIDED_BUT_NOT_HOLDINGS",
+ "NOT_PROVIDED",
+}
+
+SETTINGS_SPEC_PATH = ROOT / "spec" / "18_settings_contract.yaml"
+ACCOUNT_SNAPSHOT_SPEC_PATH = ROOT / "spec" / "15_account_snapshot_contract.yaml"
+
+
+def now_kst_iso() -> str:
+ return datetime.now(tz=KST).isoformat(timespec="seconds")
+
+
+def parse_scalar(value: str) -> Any:
+ text = value.strip()
+ if text == "":
+ return ""
+ if text.lower() in {"null", "none"}:
+ return None
+ if text.lower() in {"true", "false"}:
+ return text.lower() == "true"
+ try:
+ return json.loads(text)
+ except Exception:
+ return text
+
+
+def _json_dump(value: Any) -> str:
+ return json.dumps(value, ensure_ascii=False)
+
+
+def _json_load(text: str) -> Any:
+ try:
+ return json.loads(text)
+ except Exception:
+ return text
+
+
+def normalize_db_path(db_path: Path | str | None = None) -> Path:
+ path = Path(db_path) if db_path else DEFAULT_DB
+ path.parent.mkdir(parents=True, exist_ok=True)
+ return path
+
+
+def open_connection(db_path: Path | str | None = None) -> sqlite3.Connection:
+ conn = sqlite3.connect(normalize_db_path(db_path))
+ conn.row_factory = sqlite3.Row
+ conn.execute("PRAGMA foreign_keys = ON")
+ conn.execute("PRAGMA journal_mode = WAL")
+ return conn
+
+
+def ensure_schema(conn: sqlite3.Connection) -> None:
+ conn.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {SETTINGS_TABLE} (
+ ordinal INTEGER NOT NULL,
+ key TEXT PRIMARY KEY,
+ value_json TEXT NOT NULL,
+ note TEXT NOT NULL DEFAULT '',
+ updated_at TEXT NOT NULL
+ )
+ """
+ )
+ conn.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {SNAPSHOT_TABLE} (
+ ordinal INTEGER NOT NULL,
+ row_json TEXT NOT NULL,
+ captured_at TEXT NOT NULL DEFAULT '',
+ account TEXT NOT NULL DEFAULT '',
+ account_type TEXT NOT NULL DEFAULT '',
+ ticker TEXT NOT NULL DEFAULT '',
+ name TEXT NOT NULL DEFAULT '',
+ parse_status TEXT NOT NULL DEFAULT '',
+ user_confirmed TEXT NOT NULL DEFAULT '',
+ updated_at TEXT NOT NULL
+ )
+ """
+ )
+ conn.execute(
+ f"CREATE INDEX IF NOT EXISTS idx_{SNAPSHOT_TABLE}_captured_at ON {SNAPSHOT_TABLE}(captured_at)"
+ )
+ conn.execute(
+ f"CREATE INDEX IF NOT EXISTS idx_{SNAPSHOT_TABLE}_ticker ON {SNAPSHOT_TABLE}(ticker)"
+ )
+ conn.execute(
+ "CREATE TABLE IF NOT EXISTS workspace_meta (key TEXT PRIMARY KEY, value_json TEXT NOT NULL)"
+ )
+ conn.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {CHANGE_LOG_TABLE} (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ domain TEXT NOT NULL,
+ action TEXT NOT NULL,
+ target_ref TEXT NOT NULL DEFAULT '',
+ actor TEXT NOT NULL DEFAULT 'system',
+ note TEXT NOT NULL DEFAULT '',
+ before_json TEXT NOT NULL DEFAULT 'null',
+ after_json TEXT NOT NULL DEFAULT 'null',
+ created_at TEXT NOT NULL
+ )
+ """
+ )
+ conn.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {APPROVAL_TABLE} (
+ domain TEXT NOT NULL,
+ target_ref TEXT NOT NULL DEFAULT '*',
+ status TEXT NOT NULL,
+ approved_by TEXT NOT NULL DEFAULT '',
+ approved_at TEXT NOT NULL DEFAULT '',
+ note TEXT NOT NULL DEFAULT '',
+ updated_at TEXT NOT NULL,
+ PRIMARY KEY (domain, target_ref)
+ )
+ """
+ )
+ conn.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {LOCK_TABLE} (
+ domain TEXT NOT NULL,
+ target_ref TEXT NOT NULL DEFAULT '',
+ locked_by TEXT NOT NULL DEFAULT '',
+ reason TEXT NOT NULL DEFAULT '',
+ locked_at TEXT NOT NULL,
+ PRIMARY KEY (domain, target_ref)
+ )
+ """
+ )
+ conn.commit()
+
+
+def _normalize_settings_rows(settings: Any) -> list[dict[str, Any]]:
+ if isinstance(settings, list):
+ rows: list[dict[str, Any]] = []
+ for idx, item in enumerate(settings, start=1):
+ if isinstance(item, dict) and "key" in item:
+ rows.append(
+ {
+ "ordinal": int(item.get("ordinal") or idx),
+ "key": str(item.get("key") or ""),
+ "value": item.get("value", ""),
+ "note": str(item.get("note") or ""),
+ }
+ )
+ return rows
+ if isinstance(settings, dict):
+ rows = []
+ for idx, (key, value) in enumerate(settings.items(), start=1):
+ rows.append({"ordinal": idx, "key": str(key), "value": value, "note": ""})
+ return rows
+ return []
+
+
+def _normalize_snapshot_rows(rows: Any) -> list[dict[str, Any]]:
+ if not isinstance(rows, list):
+ return []
+ normalized: list[dict[str, Any]] = []
+ for idx, item in enumerate(rows, start=1):
+ if isinstance(item, dict):
+ row = dict(item)
+ row.setdefault("ordinal", idx)
+ normalized.append(row)
+ return normalized
+
+
+def seed_payload_from_json(json_path: Path | str) -> dict[str, Any]:
+ payload = json.loads(Path(json_path).read_text(encoding="utf-8"))
+ data = payload.get("data") if isinstance(payload, dict) else None
+ if not isinstance(data, dict):
+ data = payload if isinstance(payload, dict) else {}
+ settings = _normalize_settings_rows(data.get("settings"))
+ account_snapshot = _normalize_snapshot_rows(data.get("account_snapshot"))
+ return {
+ "meta": payload.get("meta") if isinstance(payload, dict) else {},
+ "settings": settings,
+ "account_snapshot": account_snapshot,
+ }
+
+
+def replace_settings(conn: sqlite3.Connection, rows: list[dict[str, Any]]) -> None:
+ ensure_schema(conn)
+ errors = validate_settings_rows(rows)
+ if errors:
+ raise ValueError("; ".join(errors))
+ old_rows = load_settings_rows_from_conn(conn)
+ conn.execute(f"DELETE FROM {SETTINGS_TABLE}")
+ for idx, row in enumerate(rows, start=1):
+ key = str(row.get("key") or "").strip()
+ if not key:
+ continue
+ conn.execute(
+ f"""
+ INSERT INTO {SETTINGS_TABLE} (ordinal, key, value_json, note, updated_at)
+ VALUES (?, ?, ?, ?, ?)
+ """,
+ (
+ int(row.get("ordinal") or idx),
+ key,
+ _json_dump(row.get("value", "")),
+ str(row.get("note") or ""),
+ now_kst_iso(),
+ ),
+ )
+ record_change_log(
+ conn,
+ domain=SETTINGS_TABLE,
+ action="replace",
+ before_json=old_rows,
+ after_json=rows,
+ target_ref="*",
+ note="settings replace",
+ )
+ set_approval(conn, SETTINGS_TABLE, "PENDING", note="settings updated")
+ conn.commit()
+
+
+def replace_account_snapshot(conn: sqlite3.Connection, rows: list[dict[str, Any]]) -> None:
+ ensure_schema(conn)
+ errors = validate_account_snapshot_rows(rows)
+ if errors:
+ raise ValueError("; ".join(errors))
+ old_rows = load_account_snapshot_rows_from_conn(conn)
+ conn.execute(f"DELETE FROM {SNAPSHOT_TABLE}")
+ for idx, row in enumerate(rows, start=1):
+ normalized = dict(row)
+ ordinal = int(normalized.pop("ordinal", idx) or idx)
+ captured_at = str(normalized.get("captured_at") or "")
+ account = str(normalized.get("account") or "")
+ account_type = str(normalized.get("account_type") or "")
+ ticker = str(normalized.get("ticker") or "")
+ name = str(normalized.get("name") or "")
+ parse_status = str(normalized.get("parse_status") or "")
+ user_confirmed = str(normalized.get("user_confirmed") or "")
+ conn.execute(
+ f"""
+ INSERT INTO {SNAPSHOT_TABLE} (
+ ordinal, row_json, captured_at, account, account_type, ticker, name,
+ parse_status, user_confirmed, updated_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ ordinal,
+ _json_dump(normalized),
+ captured_at,
+ account,
+ account_type,
+ ticker,
+ name,
+ parse_status,
+ user_confirmed,
+ now_kst_iso(),
+ ),
+ )
+ record_change_log(
+ conn,
+ domain=SNAPSHOT_TABLE,
+ action="replace",
+ before_json=old_rows,
+ after_json=rows,
+ target_ref="*",
+ note="account_snapshot replace",
+ )
+ set_approval(conn, SNAPSHOT_TABLE, "PENDING", note="account_snapshot updated")
+ conn.commit()
+
+
+def import_seed_json(db_path: Path | str | None, json_path: Path | str) -> dict[str, Any]:
+ payload = seed_payload_from_json(json_path)
+ with open_connection(db_path) as conn:
+ replace_settings(conn, payload["settings"])
+ replace_account_snapshot(conn, payload["account_snapshot"])
+ conn.execute(
+ "INSERT OR REPLACE INTO workspace_meta(key, value_json) VALUES (?, ?)",
+ ("seed_json_path", _json_dump(str(Path(json_path).resolve()))),
+ )
+ conn.execute(
+ "INSERT OR REPLACE INTO workspace_meta(key, value_json) VALUES (?, ?)",
+ ("seeded_at", _json_dump(now_kst_iso())),
+ )
+ conn.commit()
+ return summarize_workspace(db_path)
+
+
+def load_settings_rows(db_path: Path | str | None = None) -> list[dict[str, Any]]:
+ with open_connection(db_path) as conn:
+ return load_settings_rows_from_conn(conn)
+
+
+def load_settings_rows_from_conn(conn: sqlite3.Connection) -> list[dict[str, Any]]:
+ ensure_schema(conn)
+ rows = conn.execute(
+ f"SELECT ordinal, key, value_json, note, updated_at FROM {SETTINGS_TABLE} ORDER BY ordinal ASC, key ASC"
+ ).fetchall()
+ return [
+ {
+ "ordinal": int(row["ordinal"]),
+ "key": row["key"],
+ "value": _json_load(row["value_json"]),
+ "note": row["note"],
+ "updated_at": row["updated_at"],
+ }
+ for row in rows
+ ]
+
+
+def load_account_snapshot_rows(db_path: Path | str | None = None) -> list[dict[str, Any]]:
+ with open_connection(db_path) as conn:
+ return load_account_snapshot_rows_from_conn(conn)
+
+
+def load_account_snapshot_rows_from_conn(conn: sqlite3.Connection) -> list[dict[str, Any]]:
+ ensure_schema(conn)
+ rows = conn.execute(
+ f"""
+ SELECT ordinal, row_json, captured_at, account, account_type, ticker, name,
+ parse_status, user_confirmed, updated_at
+ FROM {SNAPSHOT_TABLE}
+ ORDER BY ordinal ASC
+ """
+ ).fetchall()
+ loaded: list[dict[str, Any]] = []
+ for row in rows:
+ payload = _json_load(row["row_json"])
+ item = payload if isinstance(payload, dict) else {}
+ item.setdefault("captured_at", row["captured_at"])
+ item.setdefault("account", row["account"])
+ item.setdefault("account_type", row["account_type"])
+ item.setdefault("ticker", row["ticker"])
+ item.setdefault("name", row["name"])
+ item.setdefault("parse_status", row["parse_status"])
+ item.setdefault("user_confirmed", row["user_confirmed"])
+ item["_ordinal"] = int(row["ordinal"])
+ item["_updated_at"] = row["updated_at"]
+ loaded.append(item)
+ return loaded
+
+
+def export_payload(db_path: Path | str | None = None) -> dict[str, Any]:
+ settings_rows = load_settings_rows(db_path)
+ settings = {row["key"]: row["value"] for row in settings_rows}
+ account_snapshot = load_account_snapshot_rows(db_path)
+ return {
+ "meta": {
+ "generated_at": now_kst_iso(),
+ "source_db": str(normalize_db_path(db_path)),
+ },
+ "data": {
+ "settings": settings,
+ "account_snapshot": account_snapshot,
+ },
+ }
+
+
+def write_export_json(db_path: Path | str | None, output_path: Path | str) -> Path:
+ payload = export_payload(db_path)
+ output = Path(output_path)
+ output.parent.mkdir(parents=True, exist_ok=True)
+ output.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+ return output
+
+
+def load_meta(db_path: Path | str | None = None) -> dict[str, Any]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ rows = conn.execute("SELECT key, value_json FROM workspace_meta ORDER BY key ASC").fetchall()
+ return {row["key"]: _json_load(row["value_json"]) for row in rows}
+
+
+def record_change_log(
+ conn: sqlite3.Connection,
+ *,
+ domain: str,
+ action: str,
+ before_json: Any,
+ after_json: Any,
+ target_ref: str = "",
+ actor: str = "ui",
+ note: str = "",
+) -> None:
+ ensure_schema(conn)
+ conn.execute(
+ f"""
+ INSERT INTO {CHANGE_LOG_TABLE} (
+ domain, action, target_ref, actor, note, before_json, after_json, created_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ domain,
+ action,
+ target_ref,
+ actor,
+ note,
+ _json_dump(before_json),
+ _json_dump(after_json),
+ now_kst_iso(),
+ ),
+ )
+
+
+def set_approval(
+ conn: sqlite3.Connection,
+ domain: str,
+ status: str,
+ *,
+ target_ref: str = "*",
+ approved_by: str = "",
+ note: str = "",
+) -> None:
+ ensure_schema(conn)
+ conn.execute(
+ f"""
+ INSERT INTO {APPROVAL_TABLE} (domain, target_ref, status, approved_by, approved_at, note, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ ON CONFLICT(domain, target_ref) DO UPDATE SET
+ status=excluded.status,
+ approved_by=excluded.approved_by,
+ approved_at=excluded.approved_at,
+ note=excluded.note,
+ updated_at=excluded.updated_at
+ """,
+ (
+ domain,
+ target_ref or "*",
+ status,
+ approved_by,
+ now_kst_iso() if status == "APPROVED" else "",
+ note,
+ now_kst_iso(),
+ ),
+ )
+
+
+def load_approval_rows(db_path: Path | str | None = None) -> list[dict[str, Any]]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ rows = conn.execute(
+ f"SELECT domain, target_ref, status, approved_by, approved_at, note, updated_at FROM {APPROVAL_TABLE} ORDER BY domain ASC, target_ref ASC"
+ ).fetchall()
+ return [dict(row) for row in rows]
+
+
+def load_approval_entry(db_path: Path | str | None, domain: str, target_ref: str = "*") -> dict[str, Any] | None:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"""
+ SELECT domain, target_ref, status, approved_by, approved_at, note, updated_at
+ FROM {APPROVAL_TABLE}
+ WHERE domain = ? AND target_ref = ?
+ LIMIT 1
+ """,
+ (domain, target_ref or "*"),
+ ).fetchone()
+ return dict(row) if row is not None else None
+
+
+def load_change_log_rows(db_path: Path | str | None = None, limit: int = 20) -> list[dict[str, Any]]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ rows = conn.execute(
+ f"""
+ SELECT id, domain, action, target_ref, actor, note, before_json, after_json, created_at
+ FROM {CHANGE_LOG_TABLE}
+ ORDER BY id DESC
+ LIMIT ?
+ """,
+ (int(limit),),
+ ).fetchall()
+ items = []
+ for row in rows:
+ items.append(
+ {
+ "id": int(row["id"]),
+ "domain": row["domain"],
+ "action": row["action"],
+ "target_ref": row["target_ref"],
+ "actor": row["actor"],
+ "note": row["note"],
+ "before_json": _json_load(row["before_json"]),
+ "after_json": _json_load(row["after_json"]),
+ "created_at": row["created_at"],
+ }
+ )
+ return items
+
+
+def load_last_change_row(conn: sqlite3.Connection, domain: str) -> dict[str, Any] | None:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"""
+ SELECT id, domain, action, target_ref, actor, note, before_json, after_json, created_at
+ FROM {CHANGE_LOG_TABLE}
+ WHERE domain = ?
+ ORDER BY id DESC
+ LIMIT 1
+ """,
+ (domain,),
+ ).fetchone()
+ if row is None:
+ return None
+ return {
+ "id": int(row["id"]),
+ "domain": row["domain"],
+ "action": row["action"],
+ "target_ref": row["target_ref"],
+ "actor": row["actor"],
+ "note": row["note"],
+ "before_json": _json_load(row["before_json"]),
+ "after_json": _json_load(row["after_json"]),
+ "created_at": row["created_at"],
+ }
+
+
+def set_lock(conn: sqlite3.Connection, domain: str, target_ref: str, *, locked_by: str, reason: str) -> None:
+ ensure_schema(conn)
+ conn.execute(
+ f"""
+ INSERT INTO {LOCK_TABLE} (domain, target_ref, locked_by, reason, locked_at)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT(domain, target_ref) DO UPDATE SET
+ locked_by=excluded.locked_by,
+ reason=excluded.reason,
+ locked_at=excluded.locked_at
+ """,
+ (domain, target_ref, locked_by, reason, now_kst_iso()),
+ )
+
+
+def clear_lock(conn: sqlite3.Connection, domain: str, target_ref: str) -> None:
+ ensure_schema(conn)
+ conn.execute(
+ f"DELETE FROM {LOCK_TABLE} WHERE domain = ? AND target_ref = ?",
+ (domain, target_ref),
+ )
+
+
+def load_locks(db_path: Path | str | None = None) -> list[dict[str, Any]]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ rows = conn.execute(
+ f"SELECT domain, target_ref, locked_by, reason, locked_at FROM {LOCK_TABLE} ORDER BY domain ASC, target_ref ASC"
+ ).fetchall()
+ return [dict(row) for row in rows]
+
+
+def load_lock_entry(db_path: Path | str | None, domain: str, target_ref: str = "*") -> dict[str, Any] | None:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"""
+ SELECT domain, target_ref, locked_by, reason, locked_at
+ FROM {LOCK_TABLE}
+ WHERE domain = ? AND target_ref = ?
+ LIMIT 1
+ """,
+ (domain, target_ref or "*"),
+ ).fetchone()
+ return dict(row) if row is not None else None
+
+
+def is_locked(db_path: Path | str | None, domain: str, target_ref: str = "*") -> bool:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"SELECT 1 FROM {LOCK_TABLE} WHERE domain = ? AND target_ref IN (?, '*') LIMIT 1",
+ (domain, target_ref),
+ ).fetchone()
+ return row is not None
+
+
+def lock_conflicts_for_rows(
+ db_path: Path | str | None,
+ domain: str,
+ rows: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ locks = conn.execute(
+ f"SELECT domain, target_ref, locked_by, reason, locked_at FROM {LOCK_TABLE} WHERE domain = ? ORDER BY target_ref ASC",
+ (domain,),
+ ).fetchall()
+ if not locks:
+ return []
+ row_refs: list[str] = []
+ for idx, row in enumerate(rows, start=1):
+ if domain == SETTINGS_TABLE:
+ ref = str(row.get("_row_ref") or "").strip() or str(row.get("key") or "").strip()
+ elif domain == SNAPSHOT_TABLE:
+ ref = str(row.get("_row_ref") or "").strip()
+ if not ref:
+ ordinal = str(row.get("_ordinal") or row.get("ordinal") or idx).strip()
+ ref = f"row:{ordinal}"
+ else:
+ ref = str(row.get("target_ref") or "").strip()
+ if ref:
+ row_refs.append(ref)
+ if domain == SETTINGS_TABLE:
+ key = str(row.get("key") or "").strip()
+ if key:
+ row_refs.append(key)
+ if domain == SNAPSHOT_TABLE:
+ ticker = str(row.get("ticker") or "").strip()
+ if ticker:
+ row_refs.append(ticker)
+ conflicts: list[dict[str, Any]] = []
+ for lock in locks:
+ target_ref = str(lock["target_ref"] or "").strip()
+ if target_ref == "*" or target_ref in row_refs:
+ conflicts.append(dict(lock))
+ return conflicts
+
+
+def undo_last_change(conn: sqlite3.Connection, domain: str, *, actor: str = "ui") -> dict[str, Any]:
+ ensure_schema(conn)
+ last = load_last_change_row(conn, domain)
+ if not last:
+ raise ValueError(f"no change log for domain={domain}")
+ before_json = last.get("before_json")
+ if domain == SETTINGS_TABLE:
+ rows = before_json if isinstance(before_json, list) else []
+ replace_settings(conn, rows)
+ elif domain == SNAPSHOT_TABLE:
+ rows = before_json if isinstance(before_json, list) else []
+ replace_account_snapshot(conn, rows)
+ else:
+ raise ValueError(f"unsupported domain={domain}")
+ record_change_log(
+ conn,
+ domain=domain,
+ action="undo",
+ before_json=last.get("after_json"),
+ after_json=before_json,
+ target_ref=last.get("target_ref", "*"),
+ actor=actor,
+ note=f"undo change #{last['id']}",
+ )
+ conn.commit()
+ return load_last_change_row(conn, domain) or {}
+
+
+def load_approval_for_domain(db_path: Path | str | None, domain: str) -> dict[str, Any]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ row = conn.execute(
+ f"""
+ SELECT domain, target_ref, status, approved_by, approved_at, note, updated_at
+ FROM {APPROVAL_TABLE}
+ WHERE domain = ? AND target_ref = '*'
+ """,
+ (domain,),
+ ).fetchone()
+ return (
+ dict(row)
+ if row
+ else {"domain": domain, "target_ref": "*", "status": "MISSING", "approved_by": "", "approved_at": "", "note": "", "updated_at": ""}
+ )
+
+
+def summarize_workspace(db_path: Path | str | None = None) -> dict[str, Any]:
+ with open_connection(db_path) as conn:
+ ensure_schema(conn)
+ settings_count = conn.execute(f"SELECT COUNT(*) FROM {SETTINGS_TABLE}").fetchone()[0]
+ snapshot_count = conn.execute(f"SELECT COUNT(*) FROM {SNAPSHOT_TABLE}").fetchone()[0]
+ latest_update = conn.execute(
+ f"""
+ SELECT MAX(updated_at)
+ FROM (
+ SELECT updated_at FROM {SETTINGS_TABLE}
+ UNION ALL
+ SELECT updated_at FROM {SNAPSHOT_TABLE}
+ )
+ """
+ ).fetchone()[0]
+ table_rows = conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table' AND name IN (?, ?, ?, ?, ?)",
+ (SETTINGS_TABLE, SNAPSHOT_TABLE, CHANGE_LOG_TABLE, APPROVAL_TABLE, LOCK_TABLE),
+ ).fetchall()
+ tables = sorted(row[0] for row in table_rows)
+ workspace_db = str(normalize_db_path(db_path))
+ return {
+ "db_path": workspace_db,
+ "settings_rows": int(settings_count),
+ "account_snapshot_rows": int(snapshot_count),
+ "latest_update": latest_update or "",
+ "tables": tables,
+ "topology": {
+ "mode": "single_workspace_sqlite",
+ "workspace_db": workspace_db,
+ "collector_db": str(ROOT / "outputs" / "kis_data_collection" / "kis_data_collection.db"),
+ "settings_and_snapshot_share_db": True,
+ "collector_separate_db": True,
+ },
+ "meta": load_meta(db_path),
+ }
+
+
+def parse_account_snapshot_tsv(tsv_text: str) -> list[dict[str, Any]]:
+ lines = [line.rstrip("\r") for line in tsv_text.splitlines() if line.strip() != ""]
+ if not lines:
+ return []
+ rows: list[list[str]] = [line.split("\t") for line in lines]
+ first_row = rows[0]
+ if first_row == ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS:
+ data_rows = rows[1:]
+ elif set(first_row) >= {"captured_at", "account", "ticker"}:
+ header = first_row
+ data_rows = rows[1:]
+ converted: list[dict[str, Any]] = []
+ for idx, row in enumerate(data_rows, start=1):
+ item: dict[str, Any] = {"ordinal": idx}
+ for col_index, column in enumerate(header):
+ value = row[col_index] if col_index < len(row) else ""
+ item[column] = parse_scalar(value)
+ converted.append(item)
+ return converted
+ else:
+ data_rows = rows
+ converted = []
+ for idx, row in enumerate(data_rows, start=1):
+ item: dict[str, Any] = {"ordinal": idx}
+ for col_index, column in enumerate(ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS):
+ value = row[col_index] if col_index < len(row) else ""
+ item[column] = parse_scalar(value)
+ converted.append(item)
+ return converted
+
+
+def settings_rows_to_dict(rows: list[dict[str, Any]]) -> dict[str, Any]:
+ result: dict[str, Any] = {}
+ for row in rows:
+ key = str(row.get("key") or "").strip()
+ if key:
+ result[key] = row.get("value", "")
+ return result
+
+
+def _as_number(value: Any) -> float | None:
+ if value is None:
+ return None
+ if isinstance(value, bool):
+ return None
+ if isinstance(value, (int, float)):
+ return float(value)
+ text = str(value).strip()
+ if not text:
+ return None
+ try:
+ return float(text)
+ except Exception:
+ return None
+
+
+@lru_cache(maxsize=1)
+def _load_settings_spec() -> dict[str, Any]:
+ return yaml.safe_load(SETTINGS_SPEC_PATH.read_text(encoding="utf-8")) or {}
+
+
+@lru_cache(maxsize=1)
+def _load_account_snapshot_spec() -> dict[str, Any]:
+ return yaml.safe_load(ACCOUNT_SNAPSHOT_SPEC_PATH.read_text(encoding="utf-8")) or {}
+
+
+def validate_settings_rows(rows: list[dict[str, Any]]) -> list[str]:
+ errors: list[str] = []
+ spec = _load_settings_spec().get("required_keys") or {}
+ optional_spec = _load_settings_spec().get("optional_keys") or {}
+ seen: set[str] = set()
+ total_asset_found = False
+ for idx, row in enumerate(rows, start=1):
+ key = str(row.get("key") or "").strip()
+ if not key:
+ errors.append(f"settings row {idx}: missing key")
+ continue
+ if key in seen:
+ errors.append(f"settings row {idx}: duplicate key {key}")
+ seen.add(key)
+ value = row.get("value", "")
+ if key == "total_asset_krw":
+ total_asset_found = True
+ amount = _as_number(value)
+ if amount is None or amount <= 0:
+ errors.append("settings.total_asset_krw must be positive number")
+ if key in {"weekly_target_cash_pct", "fc_budget_pct_override"}:
+ pct = _as_number(value)
+ if pct is None or pct < 0:
+ errors.append(f"settings.{key} must be non-negative number")
+ if key in spec and spec[key].get("type") == "string":
+ if value is not None and not isinstance(value, str):
+ errors.append(f"settings.{key} must be string")
+ if key in optional_spec and optional_spec[key].get("format") == "YYYY-MM":
+ text = str(value).strip()
+ if text and not re.fullmatch(r"\d{4}-\d{2}(-.*)?", text):
+ errors.append(f"settings.{key} must use YYYY-MM")
+ if not total_asset_found:
+ errors.append("settings.total_asset_krw is required")
+ return errors
+
+
+def validate_account_snapshot_rows(rows: list[dict[str, Any]]) -> list[str]:
+ errors: list[str] = []
+ spec = _load_account_snapshot_spec().get("account_snapshot_contract") or {}
+ canonical = spec.get("canonical_fields") or {}
+ for idx, row in enumerate(rows, start=1):
+ captured_at = str(row.get("captured_at") or "").strip()
+ account = str(row.get("account") or "").strip()
+ ticker = str(row.get("ticker") or "").strip()
+ name = str(row.get("name") or "").strip()
+ account_type = str(row.get("account_type") or "").strip()
+ parse_status = str(row.get("parse_status") or "").strip()
+ holding_quantity = _as_number(row.get("holding_quantity"))
+ average_cost = _as_number(row.get("average_cost"))
+ stop_price = _as_number(row.get("stop_price"))
+ entry_stage = str(row.get("entry_stage") or "").strip()
+ position_type = str(row.get("position_type") or "").strip()
+ user_confirmed = str(row.get("user_confirmed") or "").strip().upper()
+ if not captured_at:
+ errors.append(f"account_snapshot row {idx}: captured_at required")
+ if not account:
+ errors.append(f"account_snapshot row {idx}: account required")
+ if not account_type:
+ errors.append(f"account_snapshot row {idx}: account_type required")
+ if account_type and canonical.get("account_type", {}).get("allowed") and account_type not in canonical["account_type"]["allowed"]:
+ errors.append(f"account_snapshot row {idx}: invalid account_type {account_type!r}")
+ if not ticker:
+ errors.append(f"account_snapshot row {idx}: ticker required")
+ if not name:
+ errors.append(f"account_snapshot row {idx}: name required")
+ if parse_status not in ALLOWED_PARSE_STATUS:
+ errors.append(f"account_snapshot row {idx}: invalid parse_status {parse_status!r}")
+ if holding_quantity is not None and holding_quantity < 0:
+ errors.append(f"account_snapshot row {idx}: holding_quantity must be >= 0")
+ if average_cost is not None and average_cost < 0:
+ errors.append(f"account_snapshot row {idx}: average_cost must be >= 0")
+ if user_confirmed and user_confirmed not in {"Y", "N"}:
+ errors.append(f"account_snapshot row {idx}: user_confirmed must be Y or N")
+ if parse_status == "CAPTURE_READ_OK" and user_confirmed != "Y":
+ errors.append(f"account_snapshot row {idx}: CAPTURE_READ_OK rows require user_confirmed=Y")
+ if entry_stage and canonical.get("entry_stage", {}).get("allowed") and entry_stage not in canonical["entry_stage"]["allowed"]:
+ errors.append(f"account_snapshot row {idx}: invalid entry_stage {entry_stage!r}")
+ if position_type and canonical.get("position_type", {}).get("allowed") and position_type not in canonical["position_type"]["allowed"]:
+ errors.append(f"account_snapshot row {idx}: invalid position_type {position_type!r}")
+ return errors
+
+
+def build_validation_suggestions(settings_rows: list[dict[str, Any]], snapshot_rows: list[dict[str, Any]]) -> list[str]:
+ suggestions: list[str] = []
+ settings_map = settings_rows_to_dict(settings_rows)
+ snapshot_count = len(snapshot_rows)
+ if "total_asset_krw" not in settings_map:
+ suggestions.append("settings: add total_asset_krw from current investable asset total")
+ if str(settings_map.get("weekly_target_cash_pct", "")).strip() == "":
+ suggestions.append("settings: weekly_target_cash_pct can stay blank unless weekly rebalance is active")
+ for row in snapshot_rows:
+ if str(row.get("parse_status") or "").strip() == "CAPTURE_READ_OK" and str(row.get("user_confirmed") or "").strip().upper() != "Y":
+ suggestions.append(
+ f"account_snapshot {row.get('ticker') or row.get('name') or 'row'}: set user_confirmed=Y for CAPTURE_READ_OK"
+ )
+ account_type = str(row.get("account_type") or "").strip()
+ if account_type and account_type not in {"일반계좌", "ISA", "연금저축"}:
+ suggestions.append(
+ f"account_snapshot {row.get('ticker') or row.get('name') or 'row'}: account_type should be one of 일반계좌/ISA/연금저축"
+ )
+ if str(row.get("entry_stage") or "").strip() and str(row.get("position_type") or "").strip() == "":
+ suggestions.append(
+ f"account_snapshot {row.get('ticker') or row.get('name') or 'row'}: consider setting position_type when entry_stage is present"
+ )
+ if not snapshot_rows:
+ suggestions.append("account_snapshot: import TSV from HTS capture before saving snapshot")
+ return suggestions[:20]
+
+
+def build_safe_autofix_actions(settings_rows: list[dict[str, Any]], snapshot_rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
+ actions: list[dict[str, Any]] = []
+ if any(str(row.get("parse_status") or "").strip() == "CAPTURE_READ_OK" and str(row.get("user_confirmed") or "").strip().upper() != "Y" for row in snapshot_rows):
+ actions.append(
+ {
+ "action_id": "confirm_captured_rows",
+ "domain": "account_snapshot",
+ "label": "Set user_confirmed=Y for CAPTURE_READ_OK rows",
+ "description": "Safe autofix using the contract default confirmation flag.",
+ }
+ )
+ if any(str(row.get("position_type") or "").strip() == "" and str(row.get("entry_stage") or "").strip() for row in snapshot_rows):
+ actions.append(
+ {
+ "action_id": "default_position_type_satellite",
+ "domain": "account_snapshot",
+ "label": "Default blank position_type to satellite",
+ "description": "Uses the contract default when position_type is missing.",
+ }
+ )
+ if not any(str(row.get("key") or "").strip() == "total_asset_krw" for row in settings_rows):
+ actions.append(
+ {
+ "action_id": "required_total_asset_missing",
+ "domain": "settings",
+ "label": "Settings total_asset_krw missing",
+ "description": "Manual input required. No safe autofix.",
+ }
+ )
+ return actions
+
+
+def apply_safe_autofix_action(
+ conn: sqlite3.Connection,
+ action_id: str,
+ *,
+ actor: str = "ui",
+) -> dict[str, Any]:
+ ensure_schema(conn)
+ snapshot_rows = load_account_snapshot_rows_from_conn(conn)
+ if action_id == "confirm_captured_rows":
+ updated = []
+ for row in snapshot_rows:
+ candidate = dict(row)
+ if str(candidate.get("parse_status") or "").strip() == "CAPTURE_READ_OK" and str(candidate.get("user_confirmed") or "").strip().upper() != "Y":
+ candidate["user_confirmed"] = "Y"
+ updated.append(candidate)
+ replace_account_snapshot(conn, updated)
+ return {"domain": SNAPSHOT_TABLE, "status": "AUTOFIXED", "action_id": action_id}
+ if action_id == "default_position_type_satellite":
+ updated = []
+ for row in snapshot_rows:
+ candidate = dict(row)
+ if str(candidate.get("entry_stage") or "").strip() and str(candidate.get("position_type") or "").strip() == "":
+ candidate["position_type"] = "satellite"
+ updated.append(candidate)
+ replace_account_snapshot(conn, updated)
+ return {"domain": SNAPSHOT_TABLE, "status": "AUTOFIXED", "action_id": action_id}
+ if action_id == "required_total_asset_missing":
+ return {"domain": SETTINGS_TABLE, "status": "MANUAL_REQUIRED", "action_id": action_id}
+ raise ValueError(f"unknown action_id={action_id}")
diff --git a/tests/unit/test_snapshot_admin_store_v1.py b/tests/unit/test_snapshot_admin_store_v1.py
new file mode 100644
index 0000000..52bc790
--- /dev/null
+++ b/tests/unit/test_snapshot_admin_store_v1.py
@@ -0,0 +1,249 @@
+from __future__ import annotations
+
+import json
+from pathlib import Path
+
+from src.quant_engine.snapshot_admin_server_v1 import build_ui_state
+from src.quant_engine.snapshot_admin_store_v1 import (
+ ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS,
+ export_payload,
+ import_seed_json,
+ load_approval_for_domain,
+ load_change_log_rows,
+ load_locks,
+ load_account_snapshot_rows,
+ load_settings_rows,
+ parse_account_snapshot_tsv,
+ open_connection,
+ lock_conflicts_for_rows,
+ validate_account_snapshot_rows,
+ validate_settings_rows,
+ build_validation_suggestions,
+ build_safe_autofix_actions,
+ apply_safe_autofix_action,
+ set_lock,
+ undo_last_change,
+ write_export_json,
+)
+
+
+def _seed_json(path: Path) -> None:
+ payload = {
+ "data": {
+ "settings": {
+ "total_asset_krw": 150000000,
+ "weekly_target_cash_pct": 14,
+ "orbit_start_yyyymm": "2026-01",
+ },
+ "account_snapshot": [
+ {
+ "captured_at": "2026-06-21T09:00:00+09:00",
+ "account": "real",
+ "account_type": "일반계좌",
+ "ticker": "005930",
+ "name": "삼성전자",
+ "holding_quantity": 10,
+ "available_quantity": 10,
+ "average_cost": 70000,
+ "total_cost": 700000,
+ "current_price": 71000,
+ "market_value": 710000,
+ "profit_loss": 10000,
+ "return_pct": 1.43,
+ "immediate_cash": 1000000,
+ "settlement_cash_d2": 1000000,
+ "available_cash": 1000000,
+ "open_order_amount": 0,
+ "monthly_contribution_limit": "",
+ "monthly_contribution_used": "",
+ "parse_status": "CAPTURE_READ_OK",
+ "user_confirmed": "Y",
+ "stop_price": 65000,
+ "highest_price_since_entry": 72000,
+ "entry_date": "2026-06-01",
+ "entry_stage": "stage_1",
+ "position_type": "core",
+ "last_updated": "2026-06-21T09:05:00+09:00",
+ }
+ ],
+ }
+ }
+ path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+
+
+def test_seed_import_and_export_round_trip(tmp_path):
+ db_path = tmp_path / "snapshot.db"
+ seed_path = tmp_path / "seed.json"
+ _seed_json(seed_path)
+
+ summary = import_seed_json(db_path, seed_path)
+ assert summary["settings_rows"] == 3
+ assert summary["account_snapshot_rows"] == 1
+
+ settings_rows = load_settings_rows(db_path)
+ assert settings_rows[0]["key"] == "total_asset_krw"
+ assert settings_rows[0]["value"] == 150000000
+
+ snapshot_rows = load_account_snapshot_rows(db_path)
+ assert snapshot_rows[0]["ticker"] == "005930"
+ assert snapshot_rows[0]["parse_status"] == "CAPTURE_READ_OK"
+
+ exported = export_payload(db_path)
+ assert exported["data"]["settings"]["weekly_target_cash_pct"] == 14
+ assert exported["data"]["account_snapshot"][0]["name"] == "삼성전자"
+
+ out = write_export_json(db_path, tmp_path / "export.json")
+ assert out.exists()
+
+
+def test_parse_account_snapshot_tsv_supports_headerless_and_header_rows():
+ headerless = "\n".join(
+ [
+ "\t".join(ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS),
+ "\t".join(
+ [
+ "2026-06-21T09:00:00+09:00",
+ "real",
+ "일반계좌",
+ "005930",
+ "삼성전자",
+ "10",
+ "10",
+ "70000",
+ "700000",
+ "71000",
+ "710000",
+ "10000",
+ "1.43",
+ "1000000",
+ "1000000",
+ "1000000",
+ "0",
+ "",
+ "",
+ "CAPTURE_READ_OK",
+ "Y",
+ "65000",
+ "72000",
+ "2026-06-01",
+ "stage_1",
+ "core",
+ "2026-06-21T09:05:00+09:00",
+ ]
+ ),
+ ]
+ )
+ rows = parse_account_snapshot_tsv(headerless)
+ assert rows[0]["ticker"] == "005930"
+ assert rows[0]["holding_quantity"] == 10
+
+ with_header = "captured_at\taccount\tticker\n2026-06-21T09:00:00+09:00\treal\t005930"
+ rows2 = parse_account_snapshot_tsv(with_header)
+ assert rows2[0]["account"] == "real"
+ assert rows2[0]["ticker"] == "005930"
+
+
+def test_build_ui_state_reports_schema(tmp_path):
+ db_path = tmp_path / "snapshot.db"
+ seed_path = tmp_path / "seed.json"
+ _seed_json(seed_path)
+ import_seed_json(db_path, seed_path)
+
+ state = build_ui_state(db_path)
+ assert state["summary"]["settings_rows"] == 3
+ assert state["account_snapshot_columns"][: len(ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS)] == ACCOUNT_SNAPSHOT_CANONICAL_COLUMNS
+
+
+def test_change_log_approval_and_lock_workflow(tmp_path):
+ db_path = tmp_path / "snapshot.db"
+ seed_path = tmp_path / "seed.json"
+ _seed_json(seed_path)
+ import_seed_json(db_path, seed_path)
+
+ with open_connection(db_path) as conn:
+ set_lock(conn, "settings", "*", locked_by="tester", reason="review")
+ conn.commit()
+
+ locks = load_locks(db_path)
+ assert locks and locks[0]["domain"] == "settings"
+
+ approval = load_approval_for_domain(db_path, "settings")
+ assert approval["status"] == "PENDING"
+
+ changes = load_change_log_rows(db_path, limit=10)
+ assert changes
+
+
+def test_lock_conflicts_detect_row_targets(tmp_path):
+ db_path = tmp_path / "snapshot.db"
+ seed_path = tmp_path / "seed.json"
+ _seed_json(seed_path)
+ import_seed_json(db_path, seed_path)
+
+ with open_connection(db_path) as conn:
+ set_lock(conn, "settings", "total_asset_krw", locked_by="tester", reason="review")
+ set_lock(conn, "account_snapshot", "005930", locked_by="tester", reason="review")
+ conn.commit()
+
+ settings_conflicts = lock_conflicts_for_rows(
+ db_path,
+ "settings",
+ [{"key": "total_asset_krw", "value": 123, "note": ""}],
+ )
+ snapshot_conflicts = lock_conflicts_for_rows(
+ db_path,
+ "account_snapshot",
+ [{"ticker": "005930", "name": "삼성전자", "ordinal": 1}],
+ )
+
+ assert settings_conflicts and settings_conflicts[0]["target_ref"] == "total_asset_krw"
+ assert snapshot_conflicts and snapshot_conflicts[0]["target_ref"] == "005930"
+
+
+def test_undo_last_change_restores_previous_snapshot(tmp_path):
+ db_path = tmp_path / "snapshot.db"
+ seed_path = tmp_path / "seed.json"
+ _seed_json(seed_path)
+ import_seed_json(db_path, seed_path)
+
+ with open_connection(db_path) as conn:
+ from src.quant_engine.snapshot_admin_store_v1 import replace_settings
+
+ replace_settings(conn, [{"ordinal": 1, "key": "total_asset_krw", "value": 123, "note": "edited"}])
+
+ with open_connection(db_path) as conn:
+ undo_last_change(conn, "settings")
+
+ settings_rows = load_settings_rows(db_path)
+ assert settings_rows[0]["value"] == 150000000
+
+
+def test_validation_helpers_detect_invalid_rows():
+ assert "settings.total_asset_krw is required" in validate_settings_rows([{"key": "weekly_target_cash_pct", "value": 10}])
+ assert "account_snapshot row 1: ticker required" in validate_account_snapshot_rows(
+ [{"captured_at": "2026-06-21", "account": "real", "name": "삼성전자", "parse_status": "BAD"}]
+ )
+ suggestions = build_validation_suggestions(
+ [{"key": "weekly_target_cash_pct", "value": 10}],
+ [{"captured_at": "2026-06-21", "account": "real", "account_type": "일반계좌", "ticker": "005930", "name": "삼성전자", "parse_status": "CAPTURE_READ_OK", "user_confirmed": "N"}],
+ )
+ assert any("user_confirmed=Y" in item for item in suggestions)
+ actions = build_safe_autofix_actions(
+ [{"key": "total_asset_krw", "value": 150000000}],
+ [{"captured_at": "2026-06-21", "account": "real", "account_type": "일반계좌", "ticker": "005930", "name": "삼성전자", "parse_status": "CAPTURE_READ_OK", "user_confirmed": "N", "entry_stage": "stage_1", "position_type": ""}],
+ )
+ assert any(item["action_id"] == "confirm_captured_rows" for item in actions)
+
+
+def test_safe_autofix_updates_snapshot_defaults(tmp_path):
+ db_path = tmp_path / "snapshot.db"
+ seed_path = tmp_path / "seed.json"
+ _seed_json(seed_path)
+ import_seed_json(db_path, seed_path)
+
+ with open_connection(db_path) as conn:
+ result = apply_safe_autofix_action(conn, "confirm_captured_rows")
+ assert result["status"] == "AUTOFIXED"
+
+ snapshot_rows = load_account_snapshot_rows(db_path)
+ assert all(row.get("user_confirmed") == "Y" or str(row.get("parse_status")) != "CAPTURE_READ_OK" for row in snapshot_rows)
diff --git a/tests/unit/test_snapshot_admin_web_v1.py b/tests/unit/test_snapshot_admin_web_v1.py
new file mode 100644
index 0000000..8f7b034
--- /dev/null
+++ b/tests/unit/test_snapshot_admin_web_v1.py
@@ -0,0 +1,144 @@
+from __future__ import annotations
+
+import json
+import sys
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parents[2]
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+import tools.validate_snapshot_admin_web_v1 as validator
+from src.quant_engine.snapshot_admin_server_v1 import (
+ build_ui_state,
+ fetch_table_rows,
+ list_browsable_tables,
+ render_collection_html,
+ render_index_html,
+ render_tables_html,
+)
+from src.quant_engine.snapshot_admin_store_v1 import import_seed_json
+
+
+def test_render_index_html_contains_spreadsheet_surface():
+ html = render_index_html()
+ assert "Snapshot Admin" in html
+ assert "contenteditable" in html
+ assert "/api/settings/save" in html
+ assert "/api/account_snapshot/save" in html
+ assert "Lock target" in html
+ assert "Lock row" in html
+ assert "Approve pending" in html
+ assert "Refresh diff" in html
+ assert "Export approval packet" in html
+ assert "Selection Inspector" in html
+ assert "Recent row history" in html
+ assert "Save view" in html
+ assert "Apply TSV to selection" in html
+ assert "Ctrl+S" in html
+ assert "KIS Collection" in html
+ assert "Recent collector snapshots" in html
+ assert "Collection detail" in html
+ assert "Filter runs / snapshots / errors" in html
+ assert "Filter change log" in html
+ assert "Timeline" in html
+ assert "/collection" in html
+ assert "Open collection dashboard" in html
+
+
+def test_render_collection_html_contains_dashboard_surface():
+ html = render_collection_html()
+ assert "KIS Collection Dashboard" in html
+ assert "/api/state" in html
+ assert "Download raw JSON" in html
+ assert "Download CSV" in html
+ assert "Filter runs / snapshots / errors" in html
+ assert "Ticker quick search" in html
+ assert "Date quick search" in html
+
+
+def test_build_ui_state_exposes_expected_columns(tmp_path):
+ db_path = tmp_path / "snapshot_admin.db"
+ seed_path = ROOT / "GatherTradingData.json"
+ import_seed_json(db_path, seed_path)
+
+ state = build_ui_state(db_path)
+ assert state["summary"]["settings_rows"] > 0
+ assert state["summary"]["account_snapshot_rows"] > 0
+ assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite"
+ assert state["summary"]["topology"]["settings_and_snapshot_share_db"] is True
+ assert state["summary"]["topology"]["collector_separate_db"] is True
+ assert state["account_snapshot_columns"][0] == "captured_at"
+ assert "settings" in state["validation"]
+ assert state["version"]["app"]
+ assert "fingerprint" in state["version"]["source"]
+ assert "collection" in state
+ assert "counts" in state["collection"]
+ assert "latest_report" in state["collection"]
+ assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite"
+
+
+def test_snapshot_admin_workflow_and_script_exist():
+ workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml"
+ package = json.loads((ROOT / "package.json").read_text(encoding="utf-8"))
+ assert workflow.exists()
+ assert "--reload" in package["scripts"]["ops:snapshot-web"]
+ assert "ops:snapshot-validate" in package["scripts"]
+ assert "ops:snapshot-web-validate" in package["scripts"]
+
+
+def test_render_tables_html_contains_tabler_grid_surface():
+ html = render_tables_html()
+ assert "tabler" in html.lower()
+ assert "tableSelect" in html
+ assert "/api/tables" in html
+ assert "/api/table_rows" in html
+ assert "gridTable" in html
+
+
+def test_list_browsable_tables_covers_all_three_databases(tmp_path):
+ db_path = tmp_path / "snapshot_admin.db"
+ import_seed_json(db_path, ROOT / "GatherTradingData.json")
+
+ tables = list_browsable_tables(db_path)
+ names = {row["table"] for row in tables}
+ assert {"settings", "account_snapshot", "workspace_change_log"} <= names
+ assert {"collection_runs", "collection_snapshots", "collection_source_errors"} <= names
+ assert {"sell_strategy_results", "satellite_recommendations"} <= names
+
+ settings_row = next(row for row in tables if row["table"] == "settings")
+ assert settings_row["exists"] is True
+ assert settings_row["row_count"] > 0
+
+
+def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path):
+ db_path = tmp_path / "snapshot_admin.db"
+ import_seed_json(db_path, ROOT / "GatherTradingData.json")
+
+ page1 = fetch_table_rows("settings", db_path, limit=2, offset=0)
+ assert page1["columns"]
+ assert len(page1["rows"]) == 2
+ assert page1["total"] > 2
+
+ page2 = fetch_table_rows("settings", db_path, limit=2, offset=2)
+ assert page1["rows"] != page2["rows"]
+
+ import pytest
+
+ with pytest.raises(ValueError):
+ fetch_table_rows("settings; DROP TABLE settings;--", db_path)
+
+
+def test_snapshot_admin_web_validation_script_passes():
+ out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
+ if out.exists():
+ out.unlink()
+
+ rc = validator.main()
+ payload = json.loads(out.read_text(encoding="utf-8"))
+
+ assert rc == 0
+ assert payload["gate"] == "PASS"
+ assert payload["formula_id"] == "SNAPSHOT_ADMIN_WEB_VALIDATION_V1"
+ assert payload["settings_rows"] > 0
+ assert payload["account_snapshot_rows"] > 0
diff --git a/tools/run_snapshot_admin_server_v1.py b/tools/run_snapshot_admin_server_v1.py
new file mode 100644
index 0000000..7683ea4
--- /dev/null
+++ b/tools/run_snapshot_admin_server_v1.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import os
+import subprocess
+import sys
+import time
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parents[1]
+SERVER_MODULE = "src.quant_engine.snapshot_admin_server_v1"
+WATCH_DIRS = (
+ ROOT / "src",
+ ROOT / "tools",
+ ROOT / "spec",
+ ROOT / "governance",
+ ROOT / "docs",
+ ROOT / ".gitea",
+)
+WATCH_FILES = (
+ ROOT / "package.json",
+ ROOT / "AGENTS.md",
+ ROOT / "GatherTradingData.json",
+)
+WATCH_EXTENSIONS = {".py", ".yaml", ".yml", ".json", ".md", ".gs"}
+IGNORED_DIR_NAMES = {"Temp", "outputs", ".git", "__pycache__", ".pytest_cache"}
+
+
+def _server_cmd(args: argparse.Namespace) -> list[str]:
+ cmd = [
+ sys.executable,
+ "-m",
+ SERVER_MODULE,
+ "--host",
+ args.host,
+ "--port",
+ str(args.port),
+ "--db",
+ args.db,
+ "--seed",
+ args.seed,
+ ]
+ if args.no_bootstrap:
+ cmd.append("--no-bootstrap")
+ return cmd
+
+
+def _iter_watch_files() -> list[Path]:
+ seen: set[Path] = set()
+ files: list[Path] = []
+ for path in WATCH_FILES:
+ if path.exists() and path.is_file():
+ resolved = path.resolve()
+ if resolved not in seen:
+ seen.add(resolved)
+ files.append(resolved)
+ for root in WATCH_DIRS:
+ if not root.exists():
+ continue
+ for path in root.rglob("*"):
+ if not path.is_file():
+ continue
+ if any(part in IGNORED_DIR_NAMES for part in path.parts):
+ continue
+ if path.suffix.lower() not in WATCH_EXTENSIONS:
+ continue
+ resolved = path.resolve()
+ if resolved not in seen:
+ seen.add(resolved)
+ files.append(resolved)
+ return files
+
+
+def _snapshot_mtimes() -> dict[Path, float]:
+ mtimes: dict[Path, float] = {}
+ for path in _iter_watch_files():
+ try:
+ mtimes[path] = path.stat().st_mtime
+ except FileNotFoundError:
+ continue
+ return mtimes
+
+
+def _changed_files(previous: dict[Path, float]) -> list[Path]:
+ current = _snapshot_mtimes()
+ changed: list[Path] = []
+ for path, mtime in current.items():
+ if previous.get(path) != mtime:
+ changed.append(path)
+ for path in previous:
+ if path not in current:
+ changed.append(path)
+ return changed
+
+
+def _run_once(args: argparse.Namespace) -> int:
+ proc = subprocess.Popen(_server_cmd(args), cwd=str(ROOT), env=os.environ.copy())
+ try:
+ return proc.wait()
+ except KeyboardInterrupt:
+ proc.terminate()
+ try:
+ return proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ return proc.wait()
+
+
+def _run_reload(args: argparse.Namespace, interval: float) -> int:
+ last_mtimes = _snapshot_mtimes()
+ child: subprocess.Popen[str] | None = None
+ try:
+ while True:
+ if child is None or child.poll() is not None:
+ if child is not None:
+ code = child.returncode or 0
+ print(f"[snapshot-admin] server exited with code {code}; restarting...")
+ child = subprocess.Popen(_server_cmd(args), cwd=str(ROOT), env=os.environ.copy())
+ print("[snapshot-admin] hot reload watcher active")
+ print("[snapshot-admin] watching:", ", ".join(str(path) for path in WATCH_DIRS))
+ time.sleep(interval)
+ changed = _changed_files(last_mtimes)
+ if changed:
+ print("[snapshot-admin] changes detected:")
+ for path in changed[:20]:
+ print(f" - {path}")
+ last_mtimes = _snapshot_mtimes()
+ if child is not None and child.poll() is None:
+ child.terminate()
+ try:
+ child.wait(timeout=10)
+ except subprocess.TimeoutExpired:
+ child.kill()
+ child.wait()
+ child = None
+ except KeyboardInterrupt:
+ if child is not None and child.poll() is None:
+ child.terminate()
+ try:
+ child.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ child.kill()
+ child.wait()
+ return 0
+
+
+def main() -> int:
+ parser = argparse.ArgumentParser(description="Run the snapshot admin web server.")
+ parser.add_argument("--host", default="127.0.0.1")
+ parser.add_argument("--port", type=int, default=8787)
+ parser.add_argument("--db", default=str(ROOT / "outputs" / "snapshot_admin" / "snapshot_admin.db"))
+ parser.add_argument("--seed", default=str(ROOT / "GatherTradingData.json"))
+ parser.add_argument("--no-bootstrap", action="store_true")
+ parser.add_argument("--reload", action="store_true", help="Restart the server when watched files change.")
+ parser.add_argument("--reload-interval", type=float, default=1.0, help="Seconds between file-system polls.")
+ args = parser.parse_args()
+ if args.reload:
+ return _run_reload(args, max(0.25, args.reload_interval))
+ return _run_once(args)
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/validate_snapshot_admin_web_v1.py b/tools/validate_snapshot_admin_web_v1.py
new file mode 100644
index 0000000..5220f4d
--- /dev/null
+++ b/tools/validate_snapshot_admin_web_v1.py
@@ -0,0 +1,222 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import json
+import socket
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+from pathlib import Path
+from typing import Any
+
+
+ROOT = Path(__file__).resolve().parents[1]
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+OUT = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
+
+
+def _read_json(url: str) -> dict[str, Any]:
+ with urllib.request.urlopen(url, timeout=5) as response:
+ payload = response.read().decode("utf-8")
+ data = json.loads(payload)
+ return data if isinstance(data, dict) else {}
+
+
+def _read_text(url: str) -> str:
+ with urllib.request.urlopen(url, timeout=5) as response:
+ return response.read().decode("utf-8")
+
+
+def _post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
+ data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
+ request = urllib.request.Request(
+ url,
+ data=data,
+ headers={"Content-Type": "application/json"},
+ method="POST",
+ )
+ with urllib.request.urlopen(request, timeout=5) as response:
+ return json.loads(response.read().decode("utf-8"))
+
+
+def _wait_for_server(url: str, timeout_s: float = 15.0) -> None:
+ deadline = time.time() + timeout_s
+ last_error: Exception | None = None
+ while time.time() < deadline:
+ try:
+ _read_text(url)
+ return
+ except Exception as exc: # noqa: BLE001
+ last_error = exc
+ time.sleep(0.25)
+ raise RuntimeError(f"server did not start: {last_error}")
+
+
+def _pick_free_port() -> int:
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
+ sock.bind(("127.0.0.1", 0))
+ return int(sock.getsockname()[1])
+
+
+def main() -> int:
+ port = _pick_free_port()
+ db_path = ROOT / "Temp" / "snapshot_admin_web_validation.db"
+ seed_path = ROOT / "GatherTradingData.json"
+ server_cmd = [
+ sys.executable,
+ str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"),
+ "--host",
+ "127.0.0.1",
+ "--port",
+ str(port),
+ "--db",
+ str(db_path),
+ "--seed",
+ str(seed_path),
+ ]
+
+ proc = subprocess.Popen(
+ server_cmd,
+ cwd=ROOT,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ text=True,
+ encoding="utf-8",
+ )
+ base_url = f"http://127.0.0.1:{port}"
+ errors: list[str] = []
+ html = ""
+ state: dict[str, Any] = {}
+
+ try:
+ _wait_for_server(base_url)
+ html = _read_text(f"{base_url}/")
+ state = _read_json(f"{base_url}/api/state")
+ export_payload = _read_json(f"{base_url}/api/export")
+ approval_packet = {
+ "formula_id": "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1",
+ "generated_at": state.get("generated_at") or "",
+ "summary": {
+ "settings_changed": 0,
+ "account_snapshot_changed": 0,
+ "pending_target_count": 0,
+ },
+ "pending_targets": [],
+ "diff_preview": {"settings": {"added": [], "removed": [], "changed": []}, "account_snapshot": {"added": [], "removed": [], "changed": []}},
+ "approvals": state.get("approval_rows", []),
+ "locks": state.get("locks", []),
+ "workspace": state.get("summary", {}),
+ }
+ packet_response = _post_json(f"{base_url}/api/approval_packet", {"packet": approval_packet})
+ if "Snapshot Admin" not in html:
+ errors.append("html_title_missing")
+ if "contenteditable" not in html:
+ errors.append("sheet_editor_missing")
+ if "settings" not in html or "Account Snapshot" not in html:
+ errors.append("section_missing")
+ if "/api/settings/save" not in html or "/api/account_snapshot/save" not in html:
+ errors.append("api_binding_missing")
+ if "Approve pending" not in html or "Refresh diff" not in html:
+ errors.append("diff_or_approval_ui_missing")
+ if "Export approval packet" not in html:
+ errors.append("approval_packet_ui_missing")
+ if "Selection Inspector" not in html or "Apply TSV to selection" not in html or "Save view" not in html:
+ errors.append("sheet_facade_ui_missing")
+ if "Recent row history" not in html or "Ctrl+S" not in html:
+ errors.append("sheet_shortcuts_ui_missing")
+ if "KIS Collection" not in html or "collector:" not in html:
+ errors.append("collection_dashboard_ui_missing")
+ if "Recent collector snapshots" not in html or "Collection detail" not in html or "Filter runs / snapshots / errors" not in html:
+ errors.append("collection_detail_ui_missing")
+ if "Filter change log" not in html:
+ errors.append("change_log_filter_ui_missing")
+ if "Timeline" not in html or "/collection" not in html or "Open collection dashboard" not in html:
+ errors.append("collection_page_link_missing")
+ if "Open collection dashboard" not in html:
+ errors.append("collection_dashboard_link_missing")
+ collection_html = _read_text(f"{base_url}/collection")
+ if "KIS Collection Dashboard" not in collection_html or "Download CSV" not in collection_html or "Ticker quick search" not in collection_html or "Date quick search" not in collection_html:
+ errors.append("collection_dashboard_page_missing")
+ if int(state.get("summary", {}).get("settings_rows") or 0) <= 0:
+ errors.append("settings_rows_missing")
+ if int(state.get("summary", {}).get("account_snapshot_rows") or 0) <= 0:
+ errors.append("account_snapshot_rows_missing")
+ topology = state.get("summary", {}).get("topology", {})
+ if not isinstance(topology, dict):
+ errors.append("topology_missing")
+ else:
+ if topology.get("mode") != "single_workspace_sqlite":
+ errors.append("topology_mode_invalid")
+ if not topology.get("settings_and_snapshot_share_db"):
+ errors.append("topology_workspace_split_invalid")
+ if not topology.get("collector_separate_db"):
+ errors.append("topology_collector_split_invalid")
+ if not isinstance(state.get("version"), dict) or not state.get("version", {}).get("app"):
+ errors.append("version_metadata_missing")
+ if not isinstance(state.get("collection"), dict):
+ errors.append("collection_state_missing")
+ collection = state.get("collection", {})
+ if not isinstance(collection.get("counts"), dict):
+ errors.append("collection_counts_missing")
+ if "latest_report" not in collection:
+ errors.append("collection_latest_report_missing")
+ if "data" not in export_payload:
+ errors.append("export_missing_data")
+ if packet_response.get("gate") != "PASS":
+ errors.append("approval_packet_export_failed")
+ packet_path = Path(packet_response.get("packet_path") or "")
+ md_path = Path(packet_response.get("md_path") or "")
+ if not packet_path.exists():
+ errors.append("approval_packet_json_missing")
+ if not md_path.exists():
+ errors.append("approval_packet_md_missing")
+
+ payload = {
+ "formula_id": "SNAPSHOT_ADMIN_WEB_VALIDATION_V1",
+ "gate": "PASS" if not errors else "FAIL",
+ "port": port,
+ "db_path": str(db_path),
+ "base_url": base_url,
+ "errors": errors,
+ "summary": state.get("summary", {}),
+ "version": state.get("version", {}),
+ "settings_rows": int(state.get("summary", {}).get("settings_rows") or 0),
+ "account_snapshot_rows": int(state.get("summary", {}).get("account_snapshot_rows") or 0),
+ "approval_packet_path": str(packet_path),
+ }
+ OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+ print(json.dumps(payload, ensure_ascii=False, indent=2))
+ return 0 if payload["gate"] == "PASS" else 1
+ except urllib.error.URLError as exc:
+ errors.append(str(exc))
+ payload = {
+ "formula_id": "SNAPSHOT_ADMIN_WEB_VALIDATION_V1",
+ "gate": "FAIL",
+ "port": port,
+ "db_path": str(db_path),
+ "base_url": base_url,
+ "errors": errors,
+ "summary": state.get("summary", {}),
+ "version": state.get("version", {}),
+ }
+ OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+ print(json.dumps(payload, ensure_ascii=False, indent=2))
+ return 1
+ finally:
+ if proc.poll() is None:
+ proc.terminate()
+ try:
+ proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.wait(timeout=5)
+ if proc.stdout is not None:
+ proc.stdout.close()
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/tools/validate_snapshot_admin_workflow_v1.py b/tools/validate_snapshot_admin_workflow_v1.py
new file mode 100644
index 0000000..cc7f989
--- /dev/null
+++ b/tools/validate_snapshot_admin_workflow_v1.py
@@ -0,0 +1,66 @@
+from __future__ import annotations
+
+import json
+import sys
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parents[1]
+if str(ROOT) not in sys.path:
+ sys.path.insert(0, str(ROOT))
+
+from src.quant_engine.snapshot_admin_store_v1 import (
+ DEFAULT_DB,
+ DEFAULT_SEED_JSON,
+ import_seed_json,
+ load_account_snapshot_rows,
+ load_settings_rows,
+ parse_account_snapshot_tsv,
+ validate_account_snapshot_rows,
+ validate_settings_rows,
+ write_export_json,
+)
+
+OUT = ROOT / "Temp" / "snapshot_admin_workflow_v1.json"
+
+
+def main() -> int:
+ db_path = DEFAULT_DB
+ seed_path = DEFAULT_SEED_JSON
+ summary = import_seed_json(db_path, seed_path)
+ settings_rows = load_settings_rows(db_path)
+ snapshot_rows = load_account_snapshot_rows(db_path)
+ settings_errors = validate_settings_rows(settings_rows)
+ snapshot_errors = validate_account_snapshot_rows(snapshot_rows)
+ exported = write_export_json(db_path, ROOT / "Temp" / "snapshot_admin_export_v1.json")
+ tsv_rows = parse_account_snapshot_tsv(
+ "\n".join(
+ [
+ "captured_at\taccount\taccount_type\tticker\tname\tholding_quantity\tavailable_quantity\taverage_cost\ttotal_cost\tcurrent_price\tmarket_value\tprofit_loss\treturn_pct\timmediate_cash\tsettlement_cash_d2\tavailable_cash\topen_order_amount\tmonthly_contribution_limit\tmonthly_contribution_used\tparse_status\tuser_confirmed\tstop_price\thighest_price_since_entry\tentry_date\tentry_stage\tposition_type\tlast_updated",
+ "2026-06-21T09:00:00+09:00\treal\t일반계좌\t005930\t삼성전자\t10\t10\t70000\t700000\t71000\t710000\t10000\t1.43\t1000000\t1000000\t1000000\t0\t\t\tCAPTURE_READ_OK\tY\t65000\t72000\t2026-06-01\tstage_1\tcore\t2026-06-21T09:05:00+09:00",
+ ]
+ )
+ )
+ payload = {
+ "status": "PASS",
+ "db_path": str(db_path),
+ "seed_path": str(seed_path),
+ "summary": summary,
+ "settings_rows": len(settings_rows),
+ "account_snapshot_rows": len(snapshot_rows),
+ "settings_errors": settings_errors,
+ "snapshot_errors": snapshot_errors,
+ "export_path": str(exported),
+ "tsv_parse_rows": len(tsv_rows),
+ }
+ OUT.parent.mkdir(parents=True, exist_ok=True)
+ OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+ print(json.dumps(payload, ensure_ascii=False, indent=2))
+ if settings_errors or snapshot_errors:
+ print("FAIL")
+ return 1
+ print("PASS")
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())