316 lines
11 KiB
Python
316 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
SPEC_PATH = ROOT / "spec" / "16_data_gaps_roadmap.yaml"
|
|
ROADMAP_DOC_PATH = ROOT / "docs" / "ROADMAP_WBS.md"
|
|
|
|
|
|
def _read_json(path: Path) -> dict[str, Any]:
|
|
if not path.exists():
|
|
return {}
|
|
return json.loads(path.read_text(encoding="utf-8"))
|
|
|
|
|
|
def _read_text(path: Path) -> str:
|
|
if not path.exists():
|
|
return ""
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
|
|
|
|
def _sqlite_counts(db_path: Path) -> dict[str, int]:
|
|
if not db_path.exists():
|
|
return {}
|
|
conn = sqlite3.connect(db_path)
|
|
try:
|
|
return {
|
|
"collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0],
|
|
"collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0],
|
|
"collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0],
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _load_spec() -> dict[str, Any]:
|
|
return yaml.safe_load(SPEC_PATH.read_text(encoding="utf-8"))
|
|
|
|
|
|
def _check_p1() -> dict[str, Any]:
|
|
summary_path = ROOT / "Temp" / "test_kis_data_collection.json"
|
|
db_path = ROOT / "Temp" / "test_kis_data_collection.db"
|
|
summary = _read_json(summary_path)
|
|
counts = _sqlite_counts(db_path)
|
|
errors: list[str] = []
|
|
|
|
if summary.get("status") != "PASS":
|
|
errors.append(f"summary_status={summary.get('status')!r}")
|
|
if int(summary.get("row_count") or 0) <= 0:
|
|
errors.append("summary_row_count<=0")
|
|
if int(counts.get("collection_runs") or 0) <= 0:
|
|
errors.append("collection_runs<=0")
|
|
if int(counts.get("collection_snapshots") or 0) <= 0:
|
|
errors.append("collection_snapshots<=0")
|
|
|
|
source_counts = summary.get("source_counts") if isinstance(summary.get("source_counts"), dict) else {}
|
|
source_count = len([k for k, v in source_counts.items() if int(v or 0) > 0])
|
|
if source_count < 1:
|
|
errors.append(f"provenance_source_count={source_count}")
|
|
|
|
return {
|
|
"gate": "PASS" if not errors else "FAIL",
|
|
"expected_success_value": {
|
|
"collector_gate": "PASS",
|
|
"output_json_gate": "PASS",
|
|
"collection_runs_min": 1,
|
|
"collection_snapshots_min": 1,
|
|
"provenance_source_count_min": 1,
|
|
},
|
|
"evidence": {
|
|
"summary_path": str(summary_path),
|
|
"db_path": str(db_path),
|
|
"sqlite_counts": counts,
|
|
},
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def _check_p2() -> dict[str, Any]:
|
|
from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec
|
|
|
|
db_path = ROOT / "Temp" / "test_kis_data_collection.db"
|
|
counts = _sqlite_counts(db_path)
|
|
sqlite_backend, sqlite_location = normalize_store_spec(CollectionStoreSpec(location=db_path), ROOT)
|
|
pg_backend, pg_location = normalize_store_spec(
|
|
CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"),
|
|
ROOT,
|
|
)
|
|
errors: list[str] = []
|
|
|
|
if sqlite_backend != "sqlite":
|
|
errors.append(f"sqlite_backend={sqlite_backend!r}")
|
|
if pg_backend != "postgresql":
|
|
errors.append(f"postgres_backend={pg_backend!r}")
|
|
if not isinstance(pg_location, str) or "postgresql://" not in pg_location:
|
|
errors.append("postgres_location_invalid")
|
|
if int(counts.get("collection_runs") or 0) <= 0 or int(counts.get("collection_snapshots") or 0) <= 0:
|
|
errors.append("sqlite_round_trip_missing")
|
|
|
|
return {
|
|
"gate": "PASS" if not errors else "FAIL",
|
|
"expected_success_value": {
|
|
"sqlite_schema_tables_min": 3,
|
|
"round_trip_snapshot_lookup": "PASS",
|
|
"backend_contract_sqlite": "PASS",
|
|
"backend_contract_postgresql": "READY",
|
|
},
|
|
"evidence": {
|
|
"db_path": str(db_path),
|
|
"sqlite_location": str(sqlite_location),
|
|
"postgres_location": pg_location,
|
|
"sqlite_counts": counts,
|
|
},
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def _check_p3() -> dict[str, Any]:
|
|
workflow = ROOT / ".gitea" / "workflows" / "kis_data_collection.yml"
|
|
text = _read_text(workflow)
|
|
errors: list[str] = []
|
|
|
|
if not text:
|
|
errors.append("workflow_missing")
|
|
if "tools/run_kis_data_collection_v1.py" not in text:
|
|
errors.append("collector_step_missing")
|
|
if "tools/validate_kis_api_credentials_v1.py" not in text:
|
|
errors.append("mock_validation_step_missing")
|
|
if "GatherTradingData.json" not in text:
|
|
errors.append("seed_json_missing")
|
|
if "Validate SQLite Artifact" not in text:
|
|
errors.append("sqlite_validation_step_missing")
|
|
if ("GatherTradingData.xlsx" in text or ".xlsx" in text) and "Prepare Raw Seed Snapshot" not in text:
|
|
errors.append("xlsx_dependency_present")
|
|
if "validate_no_direct_api_trading_v1.py" not in text:
|
|
errors.append("no_direct_trading_gate_missing")
|
|
mock_key_lines = re.findall(
|
|
r"^\s*KIS_APP_Key_TEST:\s*\$\{\{\s*vars\.KIS_APP_KEY_TEST\s*\}\}",
|
|
text,
|
|
flags=re.M,
|
|
)
|
|
mock_secret_lines = re.findall(
|
|
r"^\s*KIS_APP_Secret_TEST:\s*\$\{\{\s*vars\.KIS_APP_SECRET_TEST\s*\}\}",
|
|
text,
|
|
flags=re.M,
|
|
)
|
|
if len(mock_key_lines) != 1 or len(mock_secret_lines) != 1:
|
|
errors.append("mock_env_vars_not_isolated")
|
|
real_key_lines = re.findall(
|
|
r"^\s*KIS_APP_Key:\s*\$\{\{\s*vars\.KIS_APP_KEY\s*\}\}",
|
|
text,
|
|
flags=re.M,
|
|
)
|
|
real_secret_lines = re.findall(
|
|
r"^\s*KIS_APP_Secret:\s*\$\{\{\s*vars\.KIS_APP_SECRET\s*\}\}",
|
|
text,
|
|
flags=re.M,
|
|
)
|
|
if len(real_key_lines) != 1 or len(real_secret_lines) != 1:
|
|
errors.append("real_env_vars_not_isolated")
|
|
|
|
return {
|
|
"gate": "PASS" if not errors else "FAIL",
|
|
"expected_success_value": {
|
|
"xlsx_dependency_removed": True,
|
|
"json_seed_input": True,
|
|
"sqlite_output": True,
|
|
"mock_api_validation": "PASS",
|
|
"no_direct_trading_gate": "PASS",
|
|
},
|
|
"evidence": {
|
|
"workflow_path": str(workflow),
|
|
},
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def _check_p4() -> dict[str, Any]:
|
|
validation_path = ROOT / "Temp" / "gas_thin_adapter_validation_v1.json"
|
|
payload = _read_json(validation_path)
|
|
errors: list[str] = []
|
|
|
|
if payload.get("gate") != "PASS":
|
|
errors.append(f"gate={payload.get('gate')!r}")
|
|
if float(payload.get("function_inventory_coverage_pct") or 0.0) < 100.0:
|
|
errors.append("function_inventory_coverage_pct<100")
|
|
if not (ROOT / "src" / "gas" / "core" / "gas_lib.gs").exists():
|
|
errors.append("gas_lib_missing")
|
|
|
|
return {
|
|
"gate": "PASS" if not errors else "FAIL",
|
|
"expected_success_value": {
|
|
"allowed_responsibilities_only": True,
|
|
"forbidden_responsibilities_present": False,
|
|
"thin_adapter_gate": "PASS",
|
|
},
|
|
"evidence": {
|
|
"validation_path": str(validation_path),
|
|
"payload": payload,
|
|
},
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def _check_p5() -> dict[str, Any]:
|
|
from src.quant_engine.data_collection_backend_v1 import CollectionStoreSpec, normalize_store_spec
|
|
|
|
backend_path = ROOT / "src" / "quant_engine" / "data_collection_backend_v1.py"
|
|
collector_path = ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py"
|
|
test_path = ROOT / "tests" / "unit" / "test_data_collection_store_v1.py"
|
|
wrapper_path = ROOT / "tools" / "run_kis_data_collection_v1.py"
|
|
migration_stub_path = ROOT / "tools" / "generate_postgresql_upgrade_stub_v1.py"
|
|
errors: list[str] = []
|
|
|
|
try:
|
|
backend, location = normalize_store_spec(
|
|
CollectionStoreSpec(backend="postgresql", location="postgresql://user:pass@localhost/db"),
|
|
ROOT,
|
|
)
|
|
if backend != "postgresql":
|
|
errors.append(f"backend={backend!r}")
|
|
if not isinstance(location, str) or "postgresql://" not in location:
|
|
errors.append("postgres_location_invalid")
|
|
except Exception as exc: # noqa: BLE001
|
|
errors.append(f"normalize_failed={exc}")
|
|
|
|
for path in (backend_path, collector_path, test_path, wrapper_path):
|
|
if not path.exists():
|
|
errors.append(f"missing={path.relative_to(ROOT)}")
|
|
if not migration_stub_path.exists():
|
|
errors.append(f"missing={migration_stub_path.relative_to(ROOT)}")
|
|
|
|
return {
|
|
"gate": "PASS" if not errors else "FAIL",
|
|
"expected_success_value": {
|
|
"sqlite_schema_parity": "PASS",
|
|
"backend_contract_present": True,
|
|
"postgres_execution": "DATA_GATED",
|
|
"caller_compatibility_preserved": True,
|
|
},
|
|
"evidence": {
|
|
"backend_path": str(backend_path),
|
|
"collector_path": str(collector_path),
|
|
"test_path": str(test_path),
|
|
"wrapper_path": str(wrapper_path),
|
|
"migration_stub_path": str(migration_stub_path),
|
|
},
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
spec = _load_spec()
|
|
phase = spec.get("phase_5_platform_transition") or {}
|
|
roadmap_text = _read_text(ROADMAP_DOC_PATH)
|
|
checks = {
|
|
"P1_kis_core_api_collector": _check_p1(),
|
|
"P2_sqlite_canonical_store": _check_p2(),
|
|
"P3_ci_scheduler_cutover": _check_p3(),
|
|
"P4_gas_thin_adapter_minimize": _check_p4(),
|
|
"P5_postgresql_upgrade_path": _check_p5(),
|
|
}
|
|
|
|
missing_criteria: list[str] = []
|
|
for key, result in checks.items():
|
|
spec_row = phase.get(key) or {}
|
|
criteria = spec_row.get("success_criteria") or {}
|
|
if not criteria:
|
|
missing_criteria.append(key)
|
|
if "expected_success_value" not in criteria:
|
|
missing_criteria.append(f"{key}.expected_success_value")
|
|
if "evidence_artifacts" not in criteria:
|
|
missing_criteria.append(f"{key}.evidence_artifacts")
|
|
if "verification_commands" not in criteria:
|
|
missing_criteria.append(f"{key}.verification_commands")
|
|
if result["gate"] != "PASS":
|
|
missing_criteria.append(f"{key}.evidence_gate")
|
|
|
|
roadmap_mentions = [
|
|
"Phase 5 데이터 플랫폼 전환 WBS 성공값",
|
|
"P1 KIS core collector",
|
|
"P2 SQLite canonical store",
|
|
"P3 CI scheduler cutover",
|
|
"P4 GAS thin adapter minimize",
|
|
"P5 PostgreSQL upgrade path",
|
|
]
|
|
roadmap_missing = [item for item in roadmap_mentions if item.lower() not in roadmap_text.lower()]
|
|
|
|
payload = {
|
|
"formula_id": "PLATFORM_TRANSITION_WBS_V1",
|
|
"gate": "PASS" if not missing_criteria and not roadmap_missing else "FAIL",
|
|
"spec_path": str(SPEC_PATH),
|
|
"roadmap_doc_path": str(ROADMAP_DOC_PATH),
|
|
"missing_criteria": missing_criteria,
|
|
"roadmap_missing": roadmap_missing,
|
|
"checks": checks,
|
|
}
|
|
out = ROOT / "Temp" / "platform_transition_wbs_v1.json"
|
|
out.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
|
return 0 if payload["gate"] == "PASS" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|