Files
QuantEngineByItz/tools/backup_recovery_manager_v1.py

425 lines
15 KiB
Python

#!/usr/bin/env python3
"""
WBS-9.7: 자동 백업 & 복구 전략
목표: 99% 성공률, 복구 < 1시간
"""
import os
import shutil
import sqlite3
import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import subprocess
class BackupRecoveryManager:
"""백업 및 복구 관리자"""
def __init__(
self,
data_dir: str = "src/quant_engine",
backup_dir: str = "backups",
retention_days: int = 30
):
self.data_dir = Path(data_dir)
self.backup_dir = Path(backup_dir)
self.retention_days = retention_days
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.results = {
"timestamp": datetime.now().isoformat(),
"backups": [],
"recovery_tests": [],
"summary": {}
}
def create_daily_backup(self) -> Dict:
"""일일 증분 백업"""
backup_name = f"daily_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / backup_name
try:
# 필요한 파일 목록
files_to_backup = [
self.data_dir / "kis_data_collection.db",
self.data_dir / "snapshot_admin.db",
self.data_dir / "calibration_registry.yaml",
Path("spec") / "12_field_dictionary.yaml",
Path("spec") / "13_formula_registry.yaml",
]
backup_path.mkdir(parents=True, exist_ok=True)
# 파일 복사
success_count = 0
error_count = 0
total_size = 0
for src in files_to_backup:
if src.exists():
try:
dst = backup_path / src.name
if src.is_file():
shutil.copy2(src, dst)
total_size += dst.stat().st_size
success_count += 1
elif src.is_dir():
shutil.copytree(src, dst)
total_size += sum(
f.stat().st_size for f in dst.rglob("*") if f.is_file()
)
success_count += 1
except Exception as e:
print(f"Error backing up {src}: {e}")
error_count += 1
# 메타데이터 저장
metadata = {
"backup_name": backup_name,
"timestamp": datetime.now().isoformat(),
"files_backed_up": success_count,
"files_failed": error_count,
"total_size_bytes": total_size,
"type": "daily_incremental"
}
with open(backup_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
result = {
"backup_name": backup_name,
"status": "SUCCESS" if error_count == 0 else "PARTIAL_SUCCESS",
"files_backed_up": success_count,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"path": str(backup_path)
}
self.results["backups"].append(result)
return result
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def create_weekly_full_backup(self) -> Dict:
"""주간 전체 백업"""
backup_name = f"weekly_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / backup_name
try:
# 전체 프로젝트 백업 (제외: 임시 파일, cache)
backup_path.mkdir(parents=True, exist_ok=True)
exclude_dirs = {".git", "__pycache__", ".pytest_cache", "Temp", "outputs"}
total_size = 0
file_count = 0
for root_dir in [self.data_dir, Path("spec"), Path("formulas")]:
if not root_dir.exists():
continue
for src_file in root_dir.rglob("*"):
# 제외 디렉터리 확인
if any(exc in src_file.parts for exc in exclude_dirs):
continue
if src_file.is_file():
rel_path = src_file.relative_to(src_file.anchor)
dst = backup_path / rel_path
try:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dst)
total_size += dst.stat().st_size
file_count += 1
except Exception as e:
print(f"Error backing up {src_file}: {e}")
metadata = {
"backup_name": backup_name,
"timestamp": datetime.now().isoformat(),
"files_backed_up": file_count,
"total_size_bytes": total_size,
"type": "weekly_full"
}
with open(backup_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
result = {
"backup_name": backup_name,
"status": "SUCCESS",
"files_backed_up": file_count,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"path": str(backup_path)
}
self.results["backups"].append(result)
return result
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def restore_from_backup(self, backup_name: str, restore_to: str = None) -> Dict:
"""백업에서 복원"""
backup_path = self.backup_dir / backup_name
restore_to = Path(restore_to) if restore_to else self.data_dir
if not backup_path.exists():
return {
"backup_name": backup_name,
"status": "FAILED",
"error": f"Backup not found: {backup_path}"
}
try:
start_time = datetime.now()
restore_to.parent.mkdir(parents=True, exist_ok=True)
# 백업 파일 복원
restored_count = 0
for src in backup_path.glob("*"):
if src.name == "metadata.json":
continue
dst = restore_to / src.name
try:
if src.is_file():
shutil.copy2(src, dst)
restored_count += 1
elif src.is_dir():
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
restored_count += 1
except Exception as e:
print(f"Error restoring {src}: {e}")
recovery_time = (datetime.now() - start_time).total_seconds()
result = {
"backup_name": backup_name,
"status": "SUCCESS",
"files_restored": restored_count,
"recovery_time_seconds": round(recovery_time, 2),
"restored_to": str(restore_to)
}
self.results["recovery_tests"].append(result)
return result
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def cleanup_old_backups(self) -> Dict:
"""오래된 백업 정리"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
deleted_count = 0
freed_size = 0
try:
for backup_dir in self.backup_dir.iterdir():
if backup_dir.is_dir():
try:
metadata_file = backup_dir / "metadata.json"
if metadata_file.exists():
with open(metadata_file) as f:
metadata = json.load(f)
backup_time = datetime.fromisoformat(metadata["timestamp"])
if backup_time < cutoff_date:
# 크기 계산
for f in backup_dir.rglob("*"):
if f.is_file():
freed_size += f.stat().st_size
# 삭제
shutil.rmtree(backup_dir)
deleted_count += 1
except Exception as e:
print(f"Error processing {backup_dir}: {e}")
return {
"status": "SUCCESS",
"deleted_backups": deleted_count,
"freed_space_mb": round(freed_size / (1024 * 1024), 2)
}
except Exception as e:
return {
"status": "FAILED",
"error": str(e)
}
def test_backup_integrity(self, backup_name: str) -> Dict:
"""백업 무결성 테스트"""
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
return {
"backup_name": backup_name,
"status": "FAILED",
"error": "Backup not found"
}
try:
# 메타데이터 검증
metadata_file = backup_path / "metadata.json"
if not metadata_file.exists():
return {
"backup_name": backup_name,
"status": "FAILED",
"error": "Metadata missing"
}
with open(metadata_file) as f:
metadata = json.load(f)
# 파일 개수 검증
actual_files = len(list(backup_path.glob("*"))) - 1 # metadata 제외
expected_files = metadata.get("files_backed_up", actual_files)
# DB 무결성 검증 (2개 DB)
db_integrity = {}
for db_name in ["kis_data_collection.db", "snapshot_admin.db"]:
db_file = backup_path / db_name
if db_file.exists():
try:
conn = sqlite3.connect(db_file)
cursor = conn.execute("PRAGMA integrity_check")
result = cursor.fetchone()
db_integrity[db_name] = result[0] if result else "UNKNOWN"
conn.close()
except Exception:
db_integrity[db_name] = "FAILED"
else:
db_integrity[db_name] = "NOT_FOUND"
return {
"backup_name": backup_name,
"status": "SUCCESS",
"metadata_valid": True,
"file_count": actual_files,
"expected_files": expected_files,
"database_integrity": db_integrity,
"backup_timestamp": metadata.get("timestamp")
}
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def generate_backup_report(self) -> Dict:
"""백업 리포트 생성"""
# 존재하는 백업 목록
existing_backups = [
d.name for d in self.backup_dir.iterdir()
if d.is_dir() and (d / "metadata.json").exists()
]
# 전체 크기 계산
total_backup_size = sum(
sum(f.stat().st_size for f in (self.backup_dir / b).rglob("*") if f.is_file())
for b in existing_backups
)
# Daily/Weekly 분류
daily_backups = [b for b in existing_backups if b.startswith("daily_")]
weekly_backups = [b for b in existing_backups if b.startswith("weekly_")]
self.results["summary"] = {
"total_backups": len(existing_backups),
"daily_backups": len(daily_backups),
"weekly_backups": len(weekly_backups),
"total_size_mb": round(total_backup_size / (1024 * 1024), 2),
"retention_days": self.retention_days,
"success_rate": round(
(len([b for b in self.results["backups"] if b.get("status") == "SUCCESS"]) /
max(len(self.results["backups"]), 1)) * 100,
1
) if self.results["backups"] else 100
}
return self.results
def print_report(self):
"""리포트 출력"""
print("\n" + "=" * 80)
print("BACKUP & RECOVERY MANAGEMENT REPORT")
print("=" * 80)
print(f"Timestamp: {self.results['timestamp']}\n")
print("RECENT BACKUPS:")
print("-" * 80)
for backup in self.results["backups"][-5:]:
status_marker = "[OK]" if backup.get("status") == "SUCCESS" else "[FAIL]"
print(
f"{status_marker} {backup.get('backup_name', 'N/A'):30} "
f"| Size: {backup.get('total_size_mb', 0):8.2f}MB | "
f"Files: {backup.get('files_backed_up', 0):3}"
)
if self.results["summary"]:
s = self.results["summary"]
print("\nSUMMARY:")
print("-" * 80)
print(f"Total backups: {s['total_backups']}")
print(f"Daily backups: {s['daily_backups']}")
print(f"Weekly backups: {s['weekly_backups']}")
print(f"Total size: {s['total_size_mb']:.2f}MB")
print(f"Success rate: {s['success_rate']:.1f}%")
print("=" * 80 + "\n")
def save_report(self, output_file: str = None):
"""리포트 저장"""
if not output_file:
output_file = f"Temp/backup_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"Report saved: {output_file}")
if __name__ == "__main__":
manager = BackupRecoveryManager()
# 일일 백업 실행
print("Creating daily backup...")
manager.create_daily_backup()
# 주간 백업 (매주 월요일)
if datetime.now().weekday() == 0:
print("Creating weekly full backup...")
manager.create_weekly_full_backup()
# 오래된 백업 정리
print("Cleaning up old backups...")
manager.cleanup_old_backups()
# 리포트 생성 및 출력
manager.generate_backup_report()
manager.print_report()
manager.save_report()