#!/usr/bin/env python3 """ Database archive helper (migration/archive only). This tool exists to copy legacy or transient DB files into archive_db/ and generate a manifest. It is not an operational source-of-truth manager. """ import shutil import json from pathlib import Path from datetime import datetime from typing import Dict, List class DatabaseArchiver: """Legacy DB archive helper.""" def __init__(self): self.root = Path(".") self.archive_root = self.root / "archive_db" self.timestamp = datetime.now().strftime("%Y-%m-%d") self.results = { "timestamp": datetime.now().isoformat(), "archived": [], "skipped": [], "errors": [] } def create_archive_structure(self) -> None: """아카이브 디렉토리 구조 생성""" dirs = [ self.archive_root / f"{self.timestamp}_outputs_kis_data_collection", self.archive_root / f"{self.timestamp}_outputs_snapshot_admin", self.archive_root / f"{self.timestamp}_temp_test_files", ] for d in dirs: d.mkdir(parents=True, exist_ok=True) print(f"[OK] Created: {d.relative_to(self.root)}") def archive_outputs_kis_data_collection(self) -> None: """Archive legacy outputs/kis_data_collection/ contents.""" src = self.root / "outputs" / "kis_data_collection" if not src.exists(): print(f"[SKIP] {src.relative_to(self.root)} not found") self.results["skipped"].append(str(src.relative_to(self.root))) return dest = self.archive_root / f"{self.timestamp}_outputs_kis_data_collection" / "kis_data_collection" try: shutil.copytree(src, dest, dirs_exist_ok=True) print(f"[OK] Archived: {src.relative_to(self.root)}") self.results["archived"].append({ "source": str(src.relative_to(self.root)), "destination": str(dest.relative_to(self.root)), "type": "directory", "timestamp": self.timestamp }) except Exception as e: print(f"[ERROR] Failed to archive {src}: {e}") self.results["errors"].append(str(e)) def archive_outputs_snapshot_admin(self) -> None: """Archive legacy outputs/snapshot_admin/ smoke*.db files.""" src_dir = self.root / "outputs" / "snapshot_admin" if not src_dir.exists(): print(f"[SKIP] {src_dir.relative_to(self.root)} not found") self.results["skipped"].append(str(src_dir.relative_to(self.root))) return dest_dir = self.archive_root / f"{self.timestamp}_outputs_snapshot_admin" # smoke*.db 파일들 찾기 smoke_files = list(src_dir.glob("smoke*.db")) if not smoke_files: print(f"[SKIP] No smoke*.db files in {src_dir.relative_to(self.root)}") return for src_file in smoke_files: try: dest_file = dest_dir / src_file.name shutil.copy2(src_file, dest_file) print(f"[OK] Archived: {src_file.relative_to(self.root)}") self.results["archived"].append({ "source": str(src_file.relative_to(self.root)), "destination": str(dest_file.relative_to(self.root)), "type": "file", "size_kb": src_file.stat().st_size / 1024, "timestamp": self.timestamp }) except Exception as e: print(f"[ERROR] Failed to archive {src_file}: {e}") self.results["errors"].append(str(e)) def archive_temp_files(self) -> None: """Archive transient Temp/ test DB files.""" temp_dir = self.root / "Temp" if not temp_dir.exists(): print(f"[SKIP] {temp_dir.relative_to(self.root)} not found") self.results["skipped"].append(str(temp_dir.relative_to(self.root))) return dest_dir = self.archive_root / f"{self.timestamp}_temp_test_files" patterns = [ "*_collection.db", "*_admin*.db", ] files_archived = 0 for pattern in patterns: for src_file in temp_dir.glob(pattern): # snapshot_admin.db와 kis_data_collection.db 제외 (canonical 파일들) if src_file.name in ["kis_data_collection.db", "snapshot_admin.db"]: continue try: dest_file = dest_dir / src_file.name shutil.copy2(src_file, dest_file) print(f"[OK] Archived: {src_file.relative_to(self.root)}") self.results["archived"].append({ "source": str(src_file.relative_to(self.root)), "destination": str(dest_file.relative_to(self.root)), "type": "file", "size_kb": src_file.stat().st_size / 1024, "timestamp": self.timestamp }) files_archived += 1 except Exception as e: print(f"[ERROR] Failed to archive {src_file}: {e}") self.results["errors"].append(str(e)) if files_archived == 0: print(f"[SKIP] No test DB files found in {temp_dir.relative_to(self.root)}") def create_manifest(self) -> None: """Create archive manifest.json.""" manifest = { "archive_date": self.timestamp, "created_at": datetime.now().isoformat(), "archived_count": len(self.results["archived"]), "skipped_count": len(self.results["skipped"]), "error_count": len(self.results["errors"]), "files": self.results["archived"], "notes": [ "These files were archived due to database consolidation.", "Single source of truth is now: src/quant_engine/", "To restore: use archive_db/{date}_*/ directories", "Canonical files: kis_data_collection.db, snapshot_admin.db" ] } manifest_file = self.archive_root / "manifest.json" with open(manifest_file, 'w', encoding='utf-8') as f: json.dump(manifest, f, indent=2, ensure_ascii=False) print(f"\n[OK] Manifest created: {manifest_file.relative_to(self.root)}") def run(self) -> Dict: """전체 실행""" print("="*80) print("Database Archiving Process") print("="*80) print(f"Archive date: {self.timestamp}\n") # 아카이브 디렉토리 구조 생성 self.create_archive_structure() print("\n[Archiving files...]") # 각 레거시 파일 아카이빙 self.archive_outputs_kis_data_collection() self.archive_outputs_snapshot_admin() self.archive_temp_files() # manifest 생성 self.create_manifest() # 요약 print("\n" + "="*80) print("Archive Summary") print("="*80) print(f"Archived: {len(self.results['archived'])} items") print(f"Skipped: {len(self.results['skipped'])} items") print(f"Errors: {len(self.results['errors'])} items") print(f"\nArchive location: {self.archive_root.relative_to(self.root)}") if self.results['errors']: print("\n[Errors encountered]") for error in self.results['errors']: print(f" - {error}") return self.results if __name__ == "__main__": archiver = DatabaseArchiver() results = archiver.run() print("\n" + "="*80) print("[Next Steps]") print("="*80) print("1. Verify archive contents: git status") print("2. Add archive to git: git add archive_db/") print("3. Commit: git commit -m 'Archive legacy database files'") print("4. Delete legacy files (after verification)") print("5. Update code references to use src/quant_engine/")