Files
QuantEngineByItz/tools/validate_platform_transition_wbs_v1.py
T

316 lines
11 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import json
import re
import sqlite3
import sys
from pathlib import Path
from typing import Any
import yaml
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
SPEC_PATH = ROOT / "spec" / "16_data_gaps_roadmap.yaml"
ROADMAP_DOC_PATH = ROOT / "docs" / "ROADMAP_WBS.md"
def _read_json(path: Path) -> dict[str, Any]:
if not path.exists():
return {}
return json.loads(path.read_text(encoding="utf-8"))
def _read_text(path: Path) -> str:
if not path.exists():
return ""
return path.read_text(encoding="utf-8", errors="replace")
def _sqlite_counts(db_path: Path) -> dict[str, int]:
if not db_path.exists():
return {}
conn = sqlite3.connect(db_path)
try:
return {
"collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0],
"collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0],
"collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0],
}
finally:
conn.close()
def _load_spec() -> dict[str, Any]:
return yaml.safe_load(SPEC_PATH.read_text(encoding="utf-8"))
def _check_p1() -> dict[str, Any]:
summary_path = ROOT / "Temp" / "test_kis_data_collection.json"
db_path = ROOT / "Temp" / "test_kis_data_collection.db"
summary = _read_json(summary_path)
counts = _sqlite_counts(db_path)
errors: list[str] = []
if summary.get("status") != "PASS":
errors.append(f"summary_status={summary.get('status')!r}")
if int(summary.get("row_count") or 0) <= 0:
errors.append("summary_row_count<=0")
if int(counts.get("collection_runs") or 0) <= 0:
errors.append("collection_runs<=0")
if int(counts.get("collection_snapshots") or 0) <= 0:
errors.append("collection_snapshots<=0")
source_counts = summary.get("source_counts") if isinstance(summary.get("source_counts"), dict) else {}
source_count = len([k for k, v in source_counts.items() if int(v or 0) > 0])
if source_count < 1:
errors.append(f"provenance_source_count={source_count}")
return {
"gate": "PASS" if not errors else "FAIL",
"expected_success_value": {
"collector_gate": "PASS",
"output_json_gate": "PASS",
"collection_runs_min": 1,
"collection_snapshots_min": 1,
"provenance_source_count_min": 1,
},
"evidence": {
"summary_path": str(summary_path),
"db_path": str(db_path),
"sqlite_counts": counts,
},
"errors": errors,
}
def _check_p2() -> dict[str, Any]:
from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec
db_path = ROOT / "Temp" / "test_kis_data_collection.db"
counts = _sqlite_counts(db_path)
sqlite_backend, sqlite_location = normalize_store_spec(CollectionStoreSpec(location=db_path), ROOT)
pg_backend, pg_location = normalize_store_spec(
CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"),
ROOT,
)
errors: list[str] = []
if sqlite_backend != "sqlite":
errors.append(f"sqlite_backend={sqlite_backend!r}")
if pg_backend != "postgresql":
errors.append(f"postgres_backend={pg_backend!r}")
if not isinstance(pg_location, str) or "postgresql://" not in pg_location:
errors.append("postgres_location_invalid")
if int(counts.get("collection_runs") or 0) <= 0 or int(counts.get("collection_snapshots") or 0) <= 0:
errors.append("sqlite_round_trip_missing")
return {
"gate": "PASS" if not errors else "FAIL",
"expected_success_value": {
"sqlite_schema_tables_min": 3,
"round_trip_snapshot_lookup": "PASS",
"backend_contract_sqlite": "PASS",
"backend_contract_postgresql": "READY",
},
"evidence": {
"db_path": str(db_path),
"sqlite_location": str(sqlite_location),
"postgres_location": pg_location,
"sqlite_counts": counts,
},
"errors": errors,
}
def _check_p3() -> dict[str, Any]:
workflow = ROOT / ".gitea" / "workflows" / "kis_data_collection.yml"
text = _read_text(workflow)
errors: list[str] = []
if not text:
errors.append("workflow_missing")
if "tools/run_kis_data_collection_v1.py" not in text:
errors.append("collector_step_missing")
if "tools/validate_kis_api_credentials_v1.py" not in text:
errors.append("mock_validation_step_missing")
if "GatherTradingData.json" not in text:
errors.append("seed_json_missing")
if "Validate SQLite Artifact" not in text:
errors.append("sqlite_validation_step_missing")
if ("GatherTradingData.xlsx" in text or ".xlsx" in text) and "Prepare Raw Seed Snapshot" not in text:
errors.append("xlsx_dependency_present")
if "validate_no_direct_api_trading_v1.py" not in text:
errors.append("no_direct_trading_gate_missing")
mock_key_lines = re.findall(
r"^\s*KIS_APP_Key_TEST:\s*\$\{\{\s*vars\.KIS_APP_KEY_TEST\s*\}\}",
text,
flags=re.M,
)
mock_secret_lines = re.findall(
r"^\s*KIS_APP_Secret_TEST:\s*\$\{\{\s*vars\.KIS_APP_SECRET_TEST\s*\}\}",
text,
flags=re.M,
)
if len(mock_key_lines) != 1 or len(mock_secret_lines) != 1:
errors.append("mock_env_vars_not_isolated")
real_key_lines = re.findall(
r"^\s*KIS_APP_Key:\s*\$\{\{\s*vars\.KIS_APP_KEY\s*\}\}",
text,
flags=re.M,
)
real_secret_lines = re.findall(
r"^\s*KIS_APP_Secret:\s*\$\{\{\s*vars\.KIS_APP_SECRET\s*\}\}",
text,
flags=re.M,
)
if len(real_key_lines) != 1 or len(real_secret_lines) != 1:
errors.append("real_env_vars_not_isolated")
return {
"gate": "PASS" if not errors else "FAIL",
"expected_success_value": {
"xlsx_dependency_removed": True,
"json_seed_input": True,
"sqlite_output": True,
"mock_api_validation": "PASS",
"no_direct_trading_gate": "PASS",
},
"evidence": {
"workflow_path": str(workflow),
},
"errors": errors,
}
def _check_p4() -> dict[str, Any]:
validation_path = ROOT / "Temp" / "gas_thin_adapter_validation_v1.json"
payload = _read_json(validation_path)
errors: list[str] = []
if payload.get("gate") != "PASS":
errors.append(f"gate={payload.get('gate')!r}")
if float(payload.get("function_inventory_coverage_pct") or 0.0) < 100.0:
errors.append("function_inventory_coverage_pct<100")
if not (ROOT / "src" / "gas" / "core" / "gas_lib.gs").exists():
errors.append("gas_lib_missing")
return {
"gate": "PASS" if not errors else "FAIL",
"expected_success_value": {
"allowed_responsibilities_only": True,
"forbidden_responsibilities_present": False,
"thin_adapter_gate": "PASS",
},
"evidence": {
"validation_path": str(validation_path),
"payload": payload,
},
"errors": errors,
}
def _check_p5() -> dict[str, Any]:
from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec
backend_path = ROOT / "src" / "quant_engine" / "data_collection_backend_v1.py"
collector_path = ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py"
test_path = ROOT / "tests" / "unit" / "test_data_collection_store_v1.py"
wrapper_path = ROOT / "tools" / "run_kis_data_collection_v1.py"
migration_stub_path = ROOT / "tools" / "generate_postgresql_upgrade_stub_v1.py"
errors: list[str] = []
try:
backend, location = normalize_store_spec(
CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"),
ROOT,
)
if backend != "postgresql":
errors.append(f"backend={backend!r}")
if not isinstance(location, str) or "postgresql://" not in location:
errors.append("postgres_location_invalid")
except Exception as exc: # noqa: BLE001
errors.append(f"normalize_failed={exc}")
for path in (backend_path, collector_path, test_path, wrapper_path):
if not path.exists():
errors.append(f"missing={path.relative_to(ROOT)}")
if not migration_stub_path.exists():
errors.append(f"missing={migration_stub_path.relative_to(ROOT)}")
return {
"gate": "PASS" if not errors else "FAIL",
"expected_success_value": {
"sqlite_schema_parity": "PASS",
"backend_contract_present": True,
"postgres_execution": "DATA_GATED",
"caller_compatibility_preserved": True,
},
"evidence": {
"backend_path": str(backend_path),
"collector_path": str(collector_path),
"test_path": str(test_path),
"wrapper_path": str(wrapper_path),
"migration_stub_path": str(migration_stub_path),
},
"errors": errors,
}
def main() -> int:
spec = _load_spec()
phase = spec.get("phase_5_platform_transition") or {}
roadmap_text = _read_text(ROADMAP_DOC_PATH)
checks = {
"P1_kis_core_api_collector": _check_p1(),
"P2_sqlite_canonical_store": _check_p2(),
"P3_ci_scheduler_cutover": _check_p3(),
"P4_gas_thin_adapter_minimize": _check_p4(),
"P5_postgresql_upgrade_path": _check_p5(),
}
missing_criteria: list[str] = []
for key, result in checks.items():
spec_row = phase.get(key) or {}
criteria = spec_row.get("success_criteria") or {}
if not criteria:
missing_criteria.append(key)
if "expected_success_value" not in criteria:
missing_criteria.append(f"{key}.expected_success_value")
if "evidence_artifacts" not in criteria:
missing_criteria.append(f"{key}.evidence_artifacts")
if "verification_commands" not in criteria:
missing_criteria.append(f"{key}.verification_commands")
if result["gate"] != "PASS":
missing_criteria.append(f"{key}.evidence_gate")
roadmap_mentions = [
"Phase 5 데이터 플랫폼 전환 WBS 성공값",
"P1 KIS core collector",
"P2 SQLite canonical store",
"P3 CI scheduler cutover",
"P4 GAS thin adapter minimize",
"P5 PostgreSQL upgrade path",
]
roadmap_missing = [item for item in roadmap_mentions if item.lower() not in roadmap_text.lower()]
payload = {
"formula_id": "PLATFORM_TRANSITION_WBS_V1",
"gate": "PASS" if not missing_criteria and not roadmap_missing else "FAIL",
"spec_path": str(SPEC_PATH),
"roadmap_doc_path": str(ROADMAP_DOC_PATH),
"missing_criteria": missing_criteria,
"roadmap_missing": roadmap_missing,
"checks": checks,
}
out = ROOT / "Temp" / "platform_transition_wbs_v1.json"
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(payload, ensure_ascii=False, indent=2))
return 0 if payload["gate"] == "PASS" else 1
if __name__ == "__main__":
raise SystemExit(main())