#!/usr/bin/env python3 from __future__ import annotations import json import sqlite3 import sys from pathlib import Path from typing import Any import yaml ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) SPEC_PATH = ROOT / "spec" / "16_data_gaps_roadmap.yaml" ROADMAP_DOC_PATH = ROOT / "docs" / "ROADMAP_WBS.md" def _read_json(path: Path) -> dict[str, Any]: if not path.exists(): return {} return json.loads(path.read_text(encoding="utf-8")) def _read_text(path: Path) -> str: if not path.exists(): return "" return path.read_text(encoding="utf-8", errors="replace") def _sqlite_counts(db_path: Path) -> dict[str, int]: if not db_path.exists(): return {} conn = sqlite3.connect(db_path) try: return { "collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0], "collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0], "collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0], } finally: conn.close() def _load_spec() -> dict[str, Any]: return yaml.safe_load(SPEC_PATH.read_text(encoding="utf-8")) def _check_p1() -> dict[str, Any]: summary_path = ROOT / "Temp" / "test_kis_data_collection.json" db_path = ROOT / "Temp" / "test_kis_data_collection.db" summary = _read_json(summary_path) counts = _sqlite_counts(db_path) errors: list[str] = [] if summary.get("status") != "PASS": errors.append(f"summary_status={summary.get('status')!r}") if int(summary.get("row_count") or 0) <= 0: errors.append("summary_row_count<=0") if int(counts.get("collection_runs") or 0) <= 0: errors.append("collection_runs<=0") if int(counts.get("collection_snapshots") or 0) <= 0: errors.append("collection_snapshots<=0") source_counts = summary.get("source_counts") if isinstance(summary.get("source_counts"), dict) else {} source_count = len([k for k, v in source_counts.items() if int(v or 0) > 0]) if source_count < 1: errors.append(f"provenance_source_count={source_count}") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "collector_gate": "PASS", "output_json_gate": "PASS", "collection_runs_min": 1, "collection_snapshots_min": 1, "provenance_source_count_min": 1, }, "evidence": { "summary_path": str(summary_path), "db_path": str(db_path), "sqlite_counts": counts, }, "errors": errors, } def _check_p2() -> dict[str, Any]: from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec db_path = ROOT / "Temp" / "test_kis_data_collection.db" counts = _sqlite_counts(db_path) sqlite_backend, sqlite_location = normalize_store_spec(CollectionStoreSpec(location=db_path), ROOT) pg_backend, pg_location = normalize_store_spec( CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"), ROOT, ) errors: list[str] = [] if sqlite_backend != "sqlite": errors.append(f"sqlite_backend={sqlite_backend!r}") if pg_backend != "postgresql": errors.append(f"postgres_backend={pg_backend!r}") if not isinstance(pg_location, str) or "postgresql://" not in pg_location: errors.append("postgres_location_invalid") if int(counts.get("collection_runs") or 0) <= 0 or int(counts.get("collection_snapshots") or 0) <= 0: errors.append("sqlite_round_trip_missing") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "sqlite_schema_tables_min": 3, "round_trip_snapshot_lookup": "PASS", "backend_contract_sqlite": "PASS", "backend_contract_postgresql": "READY", }, "evidence": { "db_path": str(db_path), "sqlite_location": str(sqlite_location), "postgres_location": pg_location, "sqlite_counts": counts, }, "errors": errors, } def _check_p3() -> dict[str, Any]: workflow = ROOT / ".gitea" / "workflows" / "kis_data_collection.yml" text = _read_text(workflow) errors: list[str] = [] if not text: errors.append("workflow_missing") if "tools/run_kis_data_collection_v1.py" not in text: errors.append("collector_step_missing") if "tools/validate_kis_api_credentials_v1.py" not in text: errors.append("mock_validation_step_missing") if "GatherTradingData.json" not in text: errors.append("seed_json_missing") if "Validate SQLite Artifact" not in text: errors.append("sqlite_validation_step_missing") if ".xlsx" in text or "GatherTradingData.xlsx" in text: errors.append("xlsx_dependency_present") if "validate_no_direct_api_trading_v1.py" not in text: errors.append("no_direct_trading_gate_missing") if text.count("KIS_APP_Key_TEST") != 1 or text.count("KIS_APP_Secret_TEST") != 1: errors.append("mock_env_vars_not_isolated") if text.count("KIS_APP_Key:") != 1 or text.count("KIS_APP_Secret:") != 1: errors.append("real_env_vars_not_isolated") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "xlsx_dependency_removed": True, "json_seed_input": True, "sqlite_output": True, "mock_api_validation": "PASS", "no_direct_trading_gate": "PASS", }, "evidence": { "workflow_path": str(workflow), }, "errors": errors, } def _check_p4() -> dict[str, Any]: validation_path = ROOT / "Temp" / "gas_thin_adapter_validation_v1.json" payload = _read_json(validation_path) errors: list[str] = [] if payload.get("gate") != "PASS": errors.append(f"gate={payload.get('gate')!r}") if float(payload.get("function_inventory_coverage_pct") or 0.0) < 100.0: errors.append("function_inventory_coverage_pct<100") if not (ROOT / "src" / "gas" / "core" / "gas_lib.gs").exists(): errors.append("gas_lib_missing") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "allowed_responsibilities_only": True, "forbidden_responsibilities_present": False, "thin_adapter_gate": "PASS", }, "evidence": { "validation_path": str(validation_path), "payload": payload, }, "errors": errors, } def _check_p5() -> dict[str, Any]: from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec backend_path = ROOT / "src" / "quant_engine" / "data_collection_backend_v1.py" collector_path = ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py" test_path = ROOT / "tests" / "unit" / "test_data_collection_store_v1.py" wrapper_path = ROOT / "tools" / "run_kis_data_collection_v1.py" migration_stub_path = ROOT / "tools" / "generate_postgresql_upgrade_stub_v1.py" errors: list[str] = [] try: backend, location = normalize_store_spec( CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"), ROOT, ) if backend != "postgresql": errors.append(f"backend={backend!r}") if not isinstance(location, str) or "postgresql://" not in location: errors.append("postgres_location_invalid") except Exception as exc: # noqa: BLE001 errors.append(f"normalize_failed={exc}") for path in (backend_path, collector_path, test_path, wrapper_path): if not path.exists(): errors.append(f"missing={path.relative_to(ROOT)}") if not migration_stub_path.exists(): errors.append(f"missing={migration_stub_path.relative_to(ROOT)}") return { "gate": "PASS" if not errors else "FAIL", "expected_success_value": { "sqlite_schema_parity": "PASS", "backend_contract_present": True, "postgres_execution": "DATA_GATED", "caller_compatibility_preserved": True, }, "evidence": { "backend_path": str(backend_path), "collector_path": str(collector_path), "test_path": str(test_path), "wrapper_path": str(wrapper_path), "migration_stub_path": str(migration_stub_path), }, "errors": errors, } def main() -> int: spec = _load_spec() phase = spec.get("phase_5_platform_transition") or {} roadmap_text = _read_text(ROADMAP_DOC_PATH) checks = { "P1_kis_core_api_collector": _check_p1(), "P2_sqlite_canonical_store": _check_p2(), "P3_ci_scheduler_cutover": _check_p3(), "P4_gas_thin_adapter_minimize": _check_p4(), "P5_postgresql_upgrade_path": _check_p5(), } missing_criteria: list[str] = [] for key, result in checks.items(): spec_row = phase.get(key) or {} criteria = spec_row.get("success_criteria") or {} if not criteria: missing_criteria.append(key) if "expected_success_value" not in criteria: missing_criteria.append(f"{key}.expected_success_value") if "evidence_artifacts" not in criteria: missing_criteria.append(f"{key}.evidence_artifacts") if "verification_commands" not in criteria: missing_criteria.append(f"{key}.verification_commands") if result["gate"] != "PASS": missing_criteria.append(f"{key}.evidence_gate") roadmap_mentions = [ "Phase 5 데이터 플랫폼 전환 WBS 성공값", "P1 KIS core collector", "P2 SQLite canonical store", "P3 CI scheduler cutover", "P4 GAS thin adapter minimize", "P5 PostgreSQL upgrade path", ] roadmap_missing = [item for item in roadmap_mentions if item.lower() not in roadmap_text.lower()] payload = { "formula_id": "PLATFORM_TRANSITION_WBS_V1", "gate": "PASS" if not missing_criteria and not roadmap_missing else "FAIL", "spec_path": str(SPEC_PATH), "roadmap_doc_path": str(ROADMAP_DOC_PATH), "missing_criteria": missing_criteria, "roadmap_missing": roadmap_missing, "checks": checks, } out = ROOT / "Temp" / "platform_transition_wbs_v1.json" out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps(payload, ensure_ascii=False, indent=2)) return 0 if payload["gate"] == "PASS" else 1 if __name__ == "__main__": raise SystemExit(main())