diff --git a/tools/validate_data_collection_v1.py b/tools/validate_data_collection_v1.py new file mode 100644 index 0000000..0d17d3d --- /dev/null +++ b/tools/validate_data_collection_v1.py @@ -0,0 +1,375 @@ +#!/usr/bin/env python3 +""" +데이터 수집 기능 & 테이블/컬럼 검증 도구 + +목표: 모든 테이블과 컬럼의 데이터 정확성 검증 +""" + +import sqlite3 +import json +from pathlib import Path +from datetime import datetime +from typing import Dict, List, Tuple, Optional + + +class DataCollectionValidator: + """데이터 수집 기능 및 테이블 검증""" + + def __init__(self, db_path: str = None): + self.db_path = db_path or "src/quant_engine/data_feed.db" + self.results = { + "timestamp": datetime.now().isoformat(), + "database_path": self.db_path, + "tables": {}, + "data_quality": {}, + "summary": {} + } + + def check_database_exists(self) -> bool: + """데이터베이스 파일 존재 확인""" + exists = Path(self.db_path).exists() + self.results["database_exists"] = exists + self.results["database_file_size_mb"] = ( + Path(self.db_path).stat().st_size / (1024 * 1024) if exists else 0 + ) + return exists + + def validate_table_schema(self) -> Dict: + """모든 테이블의 스키마 검증""" + try: + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # 모든 테이블 조회 + cursor.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + ORDER BY name + """) + + tables = [row[0] for row in cursor.fetchall()] + table_schemas = {} + + for table in tables: + cursor.execute(f"PRAGMA table_info({table})") + columns = cursor.fetchall() + + schema = { + "table_name": table, + "column_count": len(columns), + "columns": [] + } + + for col in columns: + col_info = { + "cid": col[0], + "name": col[1], + "type": col[2], + "notnull": col[3], + "default": col[4], + "primary_key": col[5] + } + schema["columns"].append(col_info) + + table_schemas[table] = schema + + conn.close() + self.results["tables"] = table_schemas + return { + "status": "SUCCESS", + "table_count": len(tables), + "tables": list(tables) + } + + except Exception as e: + return { + "status": "ERROR", + "error": str(e) + } + + def validate_data_integrity(self) -> Dict: + """데이터 무결성 검증""" + try: + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # 데이터베이스 무결성 확인 + cursor.execute("PRAGMA integrity_check") + integrity_result = cursor.fetchone()[0] + + # 각 테이블의 행 개수 및 타입 확인 + cursor.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + """) + + integrity_checks = { + "pragma_integrity_check": integrity_result, + "tables": {} + } + + for table in cursor.fetchall(): + table_name = table[0] + + # 행 개수 + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + row_count = cursor.fetchone()[0] + + # NULL 값 확인 + cursor.execute(f"PRAGMA table_info({table_name})") + columns = cursor.fetchall() + + null_check = {} + for col in columns: + col_name = col[1] + cursor.execute( + f"SELECT COUNT(*) FROM {table_name} WHERE {col_name} IS NULL" + ) + null_count = cursor.fetchone()[0] + null_check[col_name] = { + "total": row_count, + "null_count": null_count, + "null_pct": round((null_count / row_count * 100) if row_count > 0 else 0, 2) + } + + integrity_checks["tables"][table_name] = { + "row_count": row_count, + "null_values": null_check + } + + conn.close() + + self.results["data_quality"]["integrity"] = integrity_checks + return { + "status": "SUCCESS", + "integrity_check": integrity_result, + "tables_checked": len(integrity_checks["tables"]) + } + + except Exception as e: + return { + "status": "ERROR", + "error": str(e) + } + + def validate_column_data_types(self) -> Dict: + """컬럼별 데이터 타입 검증""" + try: + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + # 각 테이블의 컬럼 데이터 샘플 확인 + cursor.execute(""" + SELECT name FROM sqlite_master WHERE type='table' + """) + + type_validations = {} + + for table in cursor.fetchall(): + table_name = table[0] + cursor.execute(f"PRAGMA table_info({table_name})") + columns = cursor.fetchall() + + table_validation = { + "table": table_name, + "columns": {} + } + + for col in columns: + col_name = col[1] + col_type = col[2] + + # 샘플 데이터 확인 + try: + cursor.execute(f"SELECT {col_name} FROM {table_name} LIMIT 5") + samples = [row[0] for row in cursor.fetchall()] + + table_validation["columns"][col_name] = { + "defined_type": col_type, + "sample_count": len(samples), + "sample_values": samples[:2], + "data_types_in_samples": list(set(type(s).__name__ for s in samples if s is not None)) + } + except Exception as e: + table_validation["columns"][col_name] = { + "defined_type": col_type, + "error": str(e) + } + + type_validations[table_name] = table_validation + + conn.close() + + self.results["data_quality"]["column_types"] = type_validations + return { + "status": "SUCCESS", + "tables_validated": len(type_validations) + } + + except Exception as e: + return { + "status": "ERROR", + "error": str(e) + } + + def validate_critical_tables(self) -> Dict: + """핵심 테이블 검증 (data_feed, performance, positions)""" + critical_tables = ["data_feed", "performance", "positions"] + validations = {} + + try: + conn = sqlite3.connect(self.db_path) + cursor = conn.cursor() + + for table_name in critical_tables: + try: + # 테이블 존재 여부 + cursor.execute(f"SELECT COUNT(*) FROM {table_name}") + count = cursor.fetchone()[0] + + # 필수 컬럼 확인 + cursor.execute(f"PRAGMA table_info({table_name})") + columns = {col[1]: col[2] for col in cursor.fetchall()} + + # 테이블별 필수 컬럼 + required_columns = { + "data_feed": ["ticker", "close_price", "entry_price"], + "performance": ["ticker", "entry_date", "entry_price", "exit_date"], + "positions": ["ticker", "quantity", "entry_price"] + } + + missing_cols = [] + for col in required_columns.get(table_name, []): + if col not in columns: + missing_cols.append(col) + + validations[table_name] = { + "exists": True, + "row_count": count, + "column_count": len(columns), + "required_columns_present": len(missing_cols) == 0, + "missing_columns": missing_cols, + "columns": columns + } + + except sqlite3.OperationalError: + validations[table_name] = { + "exists": False, + "error": f"Table {table_name} not found" + } + + conn.close() + + self.results["data_quality"]["critical_tables"] = validations + return { + "status": "SUCCESS", + "tables_validated": len(validations), + "all_valid": all(v.get("exists", False) for v in validations.values()) + } + + except Exception as e: + return { + "status": "ERROR", + "error": str(e) + } + + def generate_validation_report(self) -> Dict: + """전체 검증 리포트 생성""" + print("데이터 수집 기능 검증 시작...\n") + + # 데이터베이스 존재 확인 + if not self.check_database_exists(): + self.results["summary"]["status"] = "DATABASE_NOT_FOUND" + return self.results + + # 스키마 검증 + print("[1/4] 테이블 스키마 검증...") + schema_result = self.validate_table_schema() + self.results["schema_validation"] = schema_result + + # 데이터 무결성 검증 + print("[2/4] 데이터 무결성 검증...") + integrity_result = self.validate_data_integrity() + self.results["integrity_validation"] = integrity_result + + # 컬럼 데이터 타입 검증 + print("[3/4] 컬럼 데이터 타입 검증...") + type_result = self.validate_column_data_types() + self.results["type_validation"] = type_result + + # 핵심 테이블 검증 + print("[4/4] 핵심 테이블 검증...") + critical_result = self.validate_critical_tables() + self.results["critical_validation"] = critical_result + + # 요약 + self.results["summary"] = { + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "database_status": "OK" if self.check_database_exists() else "NOT_FOUND", + "schema_validation": schema_result.get("status"), + "integrity_check": integrity_result.get("status"), + "type_validation": type_result.get("status"), + "critical_tables": critical_result.get("status"), + "overall_status": "PASS" if all([ + schema_result.get("status") == "SUCCESS", + integrity_result.get("status") == "SUCCESS", + type_result.get("status") == "SUCCESS", + critical_result.get("status") == "SUCCESS" + ]) else "FAIL" + } + + return self.results + + def print_validation_report(self): + """검증 리포트 출력""" + print("\n" + "=" * 80) + print("데이터 수집 기능 & 테이블 검증 리포트") + print("=" * 80) + print(f"시간: {self.results['summary']['timestamp']}\n") + + # 데이터베이스 상태 + print("[데이터베이스]") + print(f" 경로: {self.results['database_path']}") + print(f" 상태: {self.results['summary']['database_status']}") + if self.results.get('database_file_size_mb'): + print(f" 크기: {self.results['database_file_size_mb']:.2f}MB\n") + + # 검증 결과 + print("[검증 결과]") + print(f" 스키마 검증: {self.results['summary']['schema_validation']}") + print(f" 무결성 검증: {self.results['summary']['integrity_check']}") + print(f" 타입 검증: {self.results['summary']['type_validation']}") + print(f" 핵심 테이블: {self.results['summary']['critical_tables']}\n") + + # 핵심 테이블 상세 + critical = self.results.get('data_quality', {}).get('critical_tables', {}) + print("[핵심 테이블 상세]") + for table, info in critical.items(): + if info.get('exists'): + print(f" {table}: {info['row_count']}개 행, {info['column_count']}개 컬럼") + if info.get('missing_columns'): + print(f" 경고: 누락된 컬럼 {info['missing_columns']}") + else: + print(f" {table}: NOT FOUND") + + # 전체 상태 + print(f"\n[전체 상태]") + print(f" {self.results['summary']['overall_status']}") + + print("\n" + "=" * 80 + "\n") + + def save_report(self, output_file: str = None): + """리포트 저장""" + if not output_file: + output_file = f"Temp/data_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + Path(output_file).parent.mkdir(parents=True, exist_ok=True) + with open(output_file, 'w', encoding='utf-8') as f: + json.dump(self.results, f, indent=2, ensure_ascii=False) + + print(f"리포트 저장: {output_file}") + + +if __name__ == "__main__": + validator = DataCollectionValidator() + validator.generate_validation_report() + validator.print_validation_report() + validator.save_report()