데이터 수집 기능 검증 도구 추가
도구: tools/validate_data_collection_v1.py 기능: - 데이터베이스 존재 및 크기 확인 - 모든 테이블 스키마 검증 - 데이터 무결성 검증 (PRAGMA integrity_check) - 컬럼별 데이터 타입 샘플 확인 - 핵심 테이블 (data_feed, performance, positions) 검증 - NULL 값 비율 계산 - 필수 컬럼 누락 검사 실행: python tools/validate_data_collection_v1.py 출력: - 스키마 검증: SUCCESS - 무결성 검증: SUCCESS - 타입 검증: SUCCESS - 핵심 테이블 검증: SUCCESS - JSON 리포트 저장 이 도구로 모든 테이블과 컬럼의 데이터 정확성을 검증할 수 있습니다. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,375 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
데이터 수집 기능 & 테이블/컬럼 검증 도구
|
||||
|
||||
목표: 모든 테이블과 컬럼의 데이터 정확성 검증
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import json
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
|
||||
|
||||
class DataCollectionValidator:
|
||||
"""데이터 수집 기능 및 테이블 검증"""
|
||||
|
||||
def __init__(self, db_path: str = None):
|
||||
self.db_path = db_path or "src/quant_engine/data_feed.db"
|
||||
self.results = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"database_path": self.db_path,
|
||||
"tables": {},
|
||||
"data_quality": {},
|
||||
"summary": {}
|
||||
}
|
||||
|
||||
def check_database_exists(self) -> bool:
|
||||
"""데이터베이스 파일 존재 확인"""
|
||||
exists = Path(self.db_path).exists()
|
||||
self.results["database_exists"] = exists
|
||||
self.results["database_file_size_mb"] = (
|
||||
Path(self.db_path).stat().st_size / (1024 * 1024) if exists else 0
|
||||
)
|
||||
return exists
|
||||
|
||||
def validate_table_schema(self) -> Dict:
|
||||
"""모든 테이블의 스키마 검증"""
|
||||
try:
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 모든 테이블 조회
|
||||
cursor.execute("""
|
||||
SELECT name FROM sqlite_master WHERE type='table'
|
||||
ORDER BY name
|
||||
""")
|
||||
|
||||
tables = [row[0] for row in cursor.fetchall()]
|
||||
table_schemas = {}
|
||||
|
||||
for table in tables:
|
||||
cursor.execute(f"PRAGMA table_info({table})")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
schema = {
|
||||
"table_name": table,
|
||||
"column_count": len(columns),
|
||||
"columns": []
|
||||
}
|
||||
|
||||
for col in columns:
|
||||
col_info = {
|
||||
"cid": col[0],
|
||||
"name": col[1],
|
||||
"type": col[2],
|
||||
"notnull": col[3],
|
||||
"default": col[4],
|
||||
"primary_key": col[5]
|
||||
}
|
||||
schema["columns"].append(col_info)
|
||||
|
||||
table_schemas[table] = schema
|
||||
|
||||
conn.close()
|
||||
self.results["tables"] = table_schemas
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"table_count": len(tables),
|
||||
"tables": list(tables)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "ERROR",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def validate_data_integrity(self) -> Dict:
|
||||
"""데이터 무결성 검증"""
|
||||
try:
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 데이터베이스 무결성 확인
|
||||
cursor.execute("PRAGMA integrity_check")
|
||||
integrity_result = cursor.fetchone()[0]
|
||||
|
||||
# 각 테이블의 행 개수 및 타입 확인
|
||||
cursor.execute("""
|
||||
SELECT name FROM sqlite_master WHERE type='table'
|
||||
""")
|
||||
|
||||
integrity_checks = {
|
||||
"pragma_integrity_check": integrity_result,
|
||||
"tables": {}
|
||||
}
|
||||
|
||||
for table in cursor.fetchall():
|
||||
table_name = table[0]
|
||||
|
||||
# 행 개수
|
||||
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
||||
row_count = cursor.fetchone()[0]
|
||||
|
||||
# NULL 값 확인
|
||||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
null_check = {}
|
||||
for col in columns:
|
||||
col_name = col[1]
|
||||
cursor.execute(
|
||||
f"SELECT COUNT(*) FROM {table_name} WHERE {col_name} IS NULL"
|
||||
)
|
||||
null_count = cursor.fetchone()[0]
|
||||
null_check[col_name] = {
|
||||
"total": row_count,
|
||||
"null_count": null_count,
|
||||
"null_pct": round((null_count / row_count * 100) if row_count > 0 else 0, 2)
|
||||
}
|
||||
|
||||
integrity_checks["tables"][table_name] = {
|
||||
"row_count": row_count,
|
||||
"null_values": null_check
|
||||
}
|
||||
|
||||
conn.close()
|
||||
|
||||
self.results["data_quality"]["integrity"] = integrity_checks
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"integrity_check": integrity_result,
|
||||
"tables_checked": len(integrity_checks["tables"])
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "ERROR",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def validate_column_data_types(self) -> Dict:
|
||||
"""컬럼별 데이터 타입 검증"""
|
||||
try:
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 각 테이블의 컬럼 데이터 샘플 확인
|
||||
cursor.execute("""
|
||||
SELECT name FROM sqlite_master WHERE type='table'
|
||||
""")
|
||||
|
||||
type_validations = {}
|
||||
|
||||
for table in cursor.fetchall():
|
||||
table_name = table[0]
|
||||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
table_validation = {
|
||||
"table": table_name,
|
||||
"columns": {}
|
||||
}
|
||||
|
||||
for col in columns:
|
||||
col_name = col[1]
|
||||
col_type = col[2]
|
||||
|
||||
# 샘플 데이터 확인
|
||||
try:
|
||||
cursor.execute(f"SELECT {col_name} FROM {table_name} LIMIT 5")
|
||||
samples = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
table_validation["columns"][col_name] = {
|
||||
"defined_type": col_type,
|
||||
"sample_count": len(samples),
|
||||
"sample_values": samples[:2],
|
||||
"data_types_in_samples": list(set(type(s).__name__ for s in samples if s is not None))
|
||||
}
|
||||
except Exception as e:
|
||||
table_validation["columns"][col_name] = {
|
||||
"defined_type": col_type,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
type_validations[table_name] = table_validation
|
||||
|
||||
conn.close()
|
||||
|
||||
self.results["data_quality"]["column_types"] = type_validations
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"tables_validated": len(type_validations)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "ERROR",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def validate_critical_tables(self) -> Dict:
|
||||
"""핵심 테이블 검증 (data_feed, performance, positions)"""
|
||||
critical_tables = ["data_feed", "performance", "positions"]
|
||||
validations = {}
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(self.db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
for table_name in critical_tables:
|
||||
try:
|
||||
# 테이블 존재 여부
|
||||
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
# 필수 컬럼 확인
|
||||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = {col[1]: col[2] for col in cursor.fetchall()}
|
||||
|
||||
# 테이블별 필수 컬럼
|
||||
required_columns = {
|
||||
"data_feed": ["ticker", "close_price", "entry_price"],
|
||||
"performance": ["ticker", "entry_date", "entry_price", "exit_date"],
|
||||
"positions": ["ticker", "quantity", "entry_price"]
|
||||
}
|
||||
|
||||
missing_cols = []
|
||||
for col in required_columns.get(table_name, []):
|
||||
if col not in columns:
|
||||
missing_cols.append(col)
|
||||
|
||||
validations[table_name] = {
|
||||
"exists": True,
|
||||
"row_count": count,
|
||||
"column_count": len(columns),
|
||||
"required_columns_present": len(missing_cols) == 0,
|
||||
"missing_columns": missing_cols,
|
||||
"columns": columns
|
||||
}
|
||||
|
||||
except sqlite3.OperationalError:
|
||||
validations[table_name] = {
|
||||
"exists": False,
|
||||
"error": f"Table {table_name} not found"
|
||||
}
|
||||
|
||||
conn.close()
|
||||
|
||||
self.results["data_quality"]["critical_tables"] = validations
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"tables_validated": len(validations),
|
||||
"all_valid": all(v.get("exists", False) for v in validations.values())
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"status": "ERROR",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def generate_validation_report(self) -> Dict:
|
||||
"""전체 검증 리포트 생성"""
|
||||
print("데이터 수집 기능 검증 시작...\n")
|
||||
|
||||
# 데이터베이스 존재 확인
|
||||
if not self.check_database_exists():
|
||||
self.results["summary"]["status"] = "DATABASE_NOT_FOUND"
|
||||
return self.results
|
||||
|
||||
# 스키마 검증
|
||||
print("[1/4] 테이블 스키마 검증...")
|
||||
schema_result = self.validate_table_schema()
|
||||
self.results["schema_validation"] = schema_result
|
||||
|
||||
# 데이터 무결성 검증
|
||||
print("[2/4] 데이터 무결성 검증...")
|
||||
integrity_result = self.validate_data_integrity()
|
||||
self.results["integrity_validation"] = integrity_result
|
||||
|
||||
# 컬럼 데이터 타입 검증
|
||||
print("[3/4] 컬럼 데이터 타입 검증...")
|
||||
type_result = self.validate_column_data_types()
|
||||
self.results["type_validation"] = type_result
|
||||
|
||||
# 핵심 테이블 검증
|
||||
print("[4/4] 핵심 테이블 검증...")
|
||||
critical_result = self.validate_critical_tables()
|
||||
self.results["critical_validation"] = critical_result
|
||||
|
||||
# 요약
|
||||
self.results["summary"] = {
|
||||
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"database_status": "OK" if self.check_database_exists() else "NOT_FOUND",
|
||||
"schema_validation": schema_result.get("status"),
|
||||
"integrity_check": integrity_result.get("status"),
|
||||
"type_validation": type_result.get("status"),
|
||||
"critical_tables": critical_result.get("status"),
|
||||
"overall_status": "PASS" if all([
|
||||
schema_result.get("status") == "SUCCESS",
|
||||
integrity_result.get("status") == "SUCCESS",
|
||||
type_result.get("status") == "SUCCESS",
|
||||
critical_result.get("status") == "SUCCESS"
|
||||
]) else "FAIL"
|
||||
}
|
||||
|
||||
return self.results
|
||||
|
||||
def print_validation_report(self):
|
||||
"""검증 리포트 출력"""
|
||||
print("\n" + "=" * 80)
|
||||
print("데이터 수집 기능 & 테이블 검증 리포트")
|
||||
print("=" * 80)
|
||||
print(f"시간: {self.results['summary']['timestamp']}\n")
|
||||
|
||||
# 데이터베이스 상태
|
||||
print("[데이터베이스]")
|
||||
print(f" 경로: {self.results['database_path']}")
|
||||
print(f" 상태: {self.results['summary']['database_status']}")
|
||||
if self.results.get('database_file_size_mb'):
|
||||
print(f" 크기: {self.results['database_file_size_mb']:.2f}MB\n")
|
||||
|
||||
# 검증 결과
|
||||
print("[검증 결과]")
|
||||
print(f" 스키마 검증: {self.results['summary']['schema_validation']}")
|
||||
print(f" 무결성 검증: {self.results['summary']['integrity_check']}")
|
||||
print(f" 타입 검증: {self.results['summary']['type_validation']}")
|
||||
print(f" 핵심 테이블: {self.results['summary']['critical_tables']}\n")
|
||||
|
||||
# 핵심 테이블 상세
|
||||
critical = self.results.get('data_quality', {}).get('critical_tables', {})
|
||||
print("[핵심 테이블 상세]")
|
||||
for table, info in critical.items():
|
||||
if info.get('exists'):
|
||||
print(f" {table}: {info['row_count']}개 행, {info['column_count']}개 컬럼")
|
||||
if info.get('missing_columns'):
|
||||
print(f" 경고: 누락된 컬럼 {info['missing_columns']}")
|
||||
else:
|
||||
print(f" {table}: NOT FOUND")
|
||||
|
||||
# 전체 상태
|
||||
print(f"\n[전체 상태]")
|
||||
print(f" {self.results['summary']['overall_status']}")
|
||||
|
||||
print("\n" + "=" * 80 + "\n")
|
||||
|
||||
def save_report(self, output_file: str = None):
|
||||
"""리포트 저장"""
|
||||
if not output_file:
|
||||
output_file = f"Temp/data_validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||||
|
||||
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(output_file, 'w', encoding='utf-8') as f:
|
||||
json.dump(self.results, f, indent=2, ensure_ascii=False)
|
||||
|
||||
print(f"리포트 저장: {output_file}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
validator = DataCollectionValidator()
|
||||
validator.generate_validation_report()
|
||||
validator.print_validation_report()
|
||||
validator.save_report()
|
||||
Reference in New Issue
Block a user