#!/usr/bin/env python3 from __future__ import annotations import json import re import sqlite3 import sys from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) SPEC_PATH = ROOT / "spec" / "16_data_gaps_roadmap.yaml" ROADMAP_DOC_PATH = ROOT / "docs" / "ROADMAP_WBS.md" def _read_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} return json.loads(path.read_text(encoding="utf-8")) def _read_text(path: Path) -> str: if not path.exists(): return "" return path.read_text(encoding="utf-8", errors="replace") def _sqlite_counts(db_path: Path) -> dict[str, int]: if not db_path.exists(): return {} conn = sqlite3.connect(db_path) try: return { "collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0], "collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0], "collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0], } finally: conn.close() def _load_spec() -> dict[str, Any]: return yaml.safe_load(SPEC_PATH.read_text(encoding="utf-8")) def _check_p1() -> dict[str, Any]: summary_path = ROOT / "Temp" / "test_kis_data_collection.json" db_path = ROOT / "Temp" / "test_kis_data_collection.db" summary = _read_json(summary_path) counts = _sqlite_counts(db_path) errors: list[str] = [] if summary.get("status") != "PASS": errors.append(f"summary_status={summary.get('status')!r}") if int(summary.get("row_count") or 0) <= 0: errors.append("summary_row_count<=0") if int(counts.get("collection_runs") or 0) <= 0: errors.append("collection_runs<=0") if int(counts.get("collection_snapshots") or 0) <= 0: errors.append("collection_snapshots<=0") source_counts = summary.get("source_counts") if isinstance(summary.get("source_counts"), dict) else {} source_count = len([k for k, v in source_counts.items() if int(v or 0) > 0]) if source_count < 1: errors.append(f"provenance_source_count={source_count}") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "collector_gate": "PASS", "output_json_gate": "PASS", "collection_runs_min": 1, "collection_snapshots_min": 1, "provenance_source_count_min": 1, }, "evidence": { "summary_path": str(summary_path), "db_path": str(db_path), "sqlite_counts": counts, }, "errors": errors, } def _humanize_check_failure(key: str, result: dict[str, Any]) -> str: evidence = result.get("evidence") if isinstance(result.get("evidence"), dict) else {} errors = result.get("errors") if isinstance(result.get("errors"), list) else [] if key == "P1_kis_core_api_collector": summary = evidence.get("summary_path", "") db = evidence.get("db_path", "") return ( "P1 failed: missing or empty KIS collector evidence. " f"Expected {summary} and {db} to exist with collection_runs>=1 and collection_snapshots>=1. " "Fix: run the KIS collection job first, or restore the collector artifacts before this validator." ) if key == "P2_sqlite_canonical_store": db = evidence.get("db_path", "") return ( "P2 failed: SQLite round-trip evidence is missing. " f"Expected {db} to contain collection_runs and collection_snapshots. " "Fix: regenerate the collector DB or restore Temp/test_kis_data_collection.db before this validator." ) if key == "P4_gas_thin_adapter_minimize": validation_path = evidence.get("validation_path", "") return ( "P4 failed: thin-adapter validation output is missing or incomplete. " f"Expected {validation_path} with gate=PASS and function_inventory_coverage_pct=100.0. " "Fix: run tools/validate_gas_thin_adapter_v1.py before the platform-transition gate." ) if errors: return f"{key} failed: " + ", ".join(str(item) for item in errors) return f"{key} failed: evidence gate did not pass." def _check_p2() -> dict[str, Any]: from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec db_path = ROOT / "Temp" / "test_kis_data_collection.db" counts = _sqlite_counts(db_path) sqlite_backend, sqlite_location = normalize_store_spec(CollectionStoreSpec(location=db_path), ROOT) pg_backend, pg_location = normalize_store_spec( CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"), ROOT, ) errors: list[str] = [] if sqlite_backend != "sqlite": errors.append(f"sqlite_backend={sqlite_backend!r}") if pg_backend != "postgresql": errors.append(f"postgres_backend={pg_backend!r}") if not isinstance(pg_location, str) or "postgresql://" not in pg_location: errors.append("postgres_location_invalid") if int(counts.get("collection_runs") or 0) <= 0 or int(counts.get("collection_snapshots") or 0) <= 0: errors.append("sqlite_round_trip_missing") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "sqlite_schema_tables_min": 3, "round_trip_snapshot_lookup": "PASS", "backend_contract_sqlite": "PASS", "backend_contract_postgresql": "READY", }, "evidence": { "db_path": str(db_path), "sqlite_location": str(sqlite_location), "postgres_location": pg_location, "sqlite_counts": counts, }, "errors": errors, } def _check_p3() -> dict[str, Any]: workflow = ROOT / ".gitea" / "workflows" / "kis_data_collection.yml" text = _read_text(workflow) errors: list[str] = [] if not text: errors.append("workflow_missing") if "tools/run_kis_data_collection_v1.py" not in text: errors.append("collector_step_missing") if "tools/validate_kis_api_credentials_v1.py" not in text: errors.append("mock_validation_step_missing") if "GatherTradingData.json" not in text: errors.append("seed_json_missing") if "Validate SQLite Artifact" not in text: errors.append("sqlite_validation_step_missing") if ("GatherTradingData.xlsx" in text or ".xlsx" in text) and "Prepare Raw Seed Snapshot" not in text: errors.append("xlsx_dependency_present") if "validate_no_direct_api_trading_v1.py" not in text: errors.append("no_direct_trading_gate_missing") mock_key_lines = re.findall( r"^\s*KIS_APP_Key_TEST:\s*\$\{\{\s*vars\.KIS_APP_KEY_TEST\s*\}\}", text, flags=re.M, ) mock_secret_lines = re.findall( r"^\s*KIS_APP_Secret_TEST:\s*\$\{\{\s*vars\.KIS_APP_SECRET_TEST\s*\}\}", text, flags=re.M, ) if len(mock_key_lines) != 1 or len(mock_secret_lines) != 1: errors.append("mock_env_vars_not_isolated") real_key_lines = re.findall( r"^\s*KIS_APP_Key:\s*\$\{\{\s*vars\.KIS_APP_KEY\s*\}\}", text, flags=re.M, ) real_secret_lines = re.findall( r"^\s*KIS_APP_Secret:\s*\$\{\{\s*vars\.KIS_APP_SECRET\s*\}\}", text, flags=re.M, ) if len(real_key_lines) != 1 or len(real_secret_lines) != 1: errors.append("real_env_vars_not_isolated") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "xlsx_dependency_removed": True, "json_seed_input": True, "sqlite_output": True, "mock_api_validation": "PASS", "no_direct_trading_gate": "PASS", }, "evidence": { "workflow_path": str(workflow), }, "errors": errors, } def _check_p4() -> dict[str, Any]: validation_path = ROOT / "Temp" / "gas_thin_adapter_validation_v1.json" payload = _read_json(validation_path) errors: list[str] = [] if payload.get("gate") != "PASS": errors.append(f"gate={payload.get('gate')!r}") if float(payload.get("function_inventory_coverage_pct") or 0.0) < 100.0: errors.append("function_inventory_coverage_pct<100") if not (ROOT / "src" / "gas" / "core" / "gas_lib.gs").exists(): errors.append("gas_lib_missing") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "allowed_responsibilities_only": True, "forbidden_responsibilities_present": False, "thin_adapter_gate": "PASS", }, "evidence": { "validation_path": str(validation_path), "payload": payload, }, "errors": errors, } def _check_p5() -> dict[str, Any]: from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec backend_path = ROOT / "src" / "quant_engine" / "data_collection_backend_v1.py" collector_path = ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py" test_path = ROOT / "tests" / "unit" / "test_data_collection_store_v1.py" wrapper_path = ROOT / "tools" / "run_kis_data_collection_v1.py" migration_stub_path = ROOT / "tools" / "generate_postgresql_upgrade_stub_v1.py" errors: list[str] = [] try: backend, location = normalize_store_spec( CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"), ROOT, ) if backend != "postgresql": errors.append(f"backend={backend!r}") if not isinstance(location, str) or "postgresql://" not in location: errors.append("postgres_location_invalid") except Exception as exc: # noqa: BLE001 errors.append(f"normalize_failed={exc}") for path in (backend_path, collector_path, test_path, wrapper_path): if not path.exists(): errors.append(f"missing={path.relative_to(ROOT)}") if not migration_stub_path.exists(): errors.append(f"missing={migration_stub_path.relative_to(ROOT)}") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "sqlite_schema_parity": "PASS", "backend_contract_present": True, "postgres_execution": "DATA_GATED", "caller_compatibility_preserved": True, }, "evidence": { "backend_path": str(backend_path), "collector_path": str(collector_path), "test_path": str(test_path), "wrapper_path": str(wrapper_path), "migration_stub_path": str(migration_stub_path), }, "errors": errors, } def main() -> int: spec = _load_spec() phase = spec.get("phase_5_platform_transition") or {} roadmap_text = _read_text(ROADMAP_DOC_PATH) checks = { "P1_kis_core_api_collector": _check_p1(), "P2_sqlite_canonical_store": _check_p2(), "P3_ci_scheduler_cutover": _check_p3(), "P4_gas_thin_adapter_minimize": _check_p4(), "P5_postgresql_upgrade_path": _check_p5(), } missing_criteria: list[str] = [] failure_notes: list[str] = [] for key, result in checks.items(): spec_row = phase.get(key) or {} criteria = spec_row.get("success_criteria") or {} if not criteria: missing_criteria.append(key) if "expected_success_value" not in criteria: missing_criteria.append(f"{key}.expected_success_value") if "evidence_artifacts" not in criteria: missing_criteria.append(f"{key}.evidence_artifacts") if "verification_commands" not in criteria: missing_criteria.append(f"{key}.verification_commands") if result["gate"] != "PASS": missing_criteria.append(f"{key}.evidence_gate") failure_notes.append(_humanize_check_failure(key, result)) roadmap_mentions = [ "Phase 5 데이터 플랫폼 전환 WBS 성공값", "P1 KIS core collector", "P2 SQLite canonical store", "P3 CI scheduler cutover", "P4 GAS thin adapter minimize", "P5 PostgreSQL upgrade path", ] roadmap_missing = [item for item in roadmap_mentions if item.lower() not in roadmap_text.lower()] payload = { "formula_id": "PLATFORM_TRANSITION_WBS_V1", "gate": "PASS" if not missing_criteria and not roadmap_missing else "FAIL", "message": ( "Platform transition WBS check passed." if not missing_criteria and not roadmap_missing else "Platform transition WBS check failed. See failure_notes for the exact missing evidence and recovery step." ), "spec_path": str(SPEC_PATH), "roadmap_doc_path": str(ROADMAP_DOC_PATH), "missing_criteria": missing_criteria, "failure_notes": failure_notes, "roadmap_missing": roadmap_missing, "checks": checks, } out = ROOT / "Temp" / "platform_transition_wbs_v1.json" out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print(payload["message"]) print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 if payload["gate"] == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())