Merge pull request 'WBS-9: Phase 9 모든 항목 준비 완료 — 7개 도구 & 문서 완성' (#77) from feature/wbs-9-tools-complete into main

Reviewed-on: http://kjh2064.synology.me:8418/kjh2064/myfinance/pulls/77
This commit is contained in:
2026-06-22 23:56:51 +09:00
7 changed files with 1276 additions and 0 deletions
+280
View File
@@ -0,0 +1,280 @@
# WBS-9: Phase 9 성능 & 엔터프라이즈 안정성 — 최종 준비 완료
**상태**: 2026-06-22 완료
**시작 예정**: 2026-08-01
**목표**: GAS 마이그레이션 완결, 성능 최적화, 장애 대응 자동화
---
## 📊 WBS-9 7개 항목 상태
| # | 항목 | 상태 | 완료도 | 파일 |
|---|------|------|--------|------|
| 9.1 | F14 마이그레이션 | ✅ COMPLETE | 100% | docs/WBS_9_1_F14_MIGRATION_COMPLETE_2026_06_22.md |
| 9.2 | snapshot_admin 최적화 | ✅ TOOLS READY | 50% | tools/benchmark_snapshot_admin_performance_v1.py |
| 9.3 | 데이터 품질 강화 | ✅ IMPLEMENTATION | 80% | spec/12_field_dictionary.yaml + 4개 auto_fill 모듈 |
| 9.4 | 장애 대응 플레이북 | ✅ COMPLETE | 100% | docs/WBS_9_4_INCIDENT_RESPONSE_PLAYBOOK_2026_06_22.md |
| 9.5 | 섹터 플로우 신뢰도 | ✅ TOOLS READY | 30% | tools/measure_sector_flow_reliability_v1.py |
| 9.6 | LLM 레이더 최적화 | ✅ STRATEGY | 40% | docs/WBS_9_6_LLM_RADAR_OPTIMIZATION_STRATEGY_2026_06_22.md |
| 9.7 | 자동 백업 & 복구 | ✅ TOOLS READY | 50% | tools/backup_recovery_manager_v1.py |
---
## 🔍 각 항목 상세
### WBS-9.1: GAS 마이그레이션 완결 ✅
**완료**: F14 (late_chase_risk_score) 및 F15 (late_chase_gate)
**파일**:
- formulas/late_chase_risk_v1.py (포트 완료)
- formulas/late_chase_gate_v1.py (포트 완료)
- tests/parity/test_late_chase_risk_parity.py (17개 테스트, PASS)
- tests/parity/test_late_chase_gate_parity_v1.py (19개 테스트, PASS)
**검증**: Parity 테스트 100% PASS
**다음**: GAS 코드 정리 (WBS-9.6 완료 후)
---
### WBS-9.2: snapshot_admin 성능 최적화
**도구**: tools/benchmark_snapshot_admin_performance_v1.py
**기능**:
- 단일 테이블 성능 측정 (10회 반복)
- 동시 10개 테이블 로드 성능 테스트
- P99 < 2초 검증
- 성능 리포트 자동 생성
- 최적화 권장사항 제시
**사용법**:
```bash
# 서버 시작
python tools/run_snapshot_admin_server_v1.py &
# 벤치마크 실행
python tools/benchmark_snapshot_admin_performance_v1.py
```
**예상 소요**: 3~4분 (10회 × 10개 테이블)
**목표**: P99 < 2초 달성
---
### WBS-9.3: 데이터 품질 강화
**정책 파일**: spec/12_field_dictionary.yaml (NULL 정책 섹션 추가)
**자동 충전 모듈** (4개):
1. `auto_fill_atr20_v1.py`: ATR20 자동 계산
2. `auto_fill_rsi14_v1.py`: RSI14 자동 계산
3. `auto_fill_velocity_v1.py`: velocity_1d/5d 자동 계산
4. `auto_fill_stop_price_v1.py`: 손절가 자동 계산 (ATR 기반)
**CI 게이트** (3개):
- DATA_QUALITY_NULL_CHECK: 필수 필드 검증
- DATA_QUALITY_FILLABLE_CHECK: 자동 충전 실행
- DATA_QUALITY_ESTIMATION_BLOCK: 추정 금지 필드 검증
**통합**: GAS runDataFeed() 또는 snapshot_admin API 호출 시 자동 실행
**목표**: 100% 필드 충전율, 오류율 0%
---
### WBS-9.4: 장애 대응 플레이북
**파일**: docs/WBS_9_4_INCIDENT_RESPONSE_PLAYBOOK_2026_06_22.md
**5가지 시나리오**:
1. **KIS API 단절** (RTO: 5분)
- FALLBACK_MODE: CACHED_ONLY 전환
- 로컬 SQLite 미러 사용
2. **Cloudflare 403** (RTO: 2분)
- User-Agent 검증
- Graceful degradation (캐시 사용)
3. **GAS 배포 실패** (RTO: 3분)
- clasp 재배포
- OAuth 토큰 재인증
4. **snapshot_admin 다운** (RTO: 1분)
- systemd 재시작
- 메모리 프로파일링
5. **데이터 수집 중단** (RTO: 2분)
- 스냅샷 롤백
- 강제 재계산
**모의 훈련**: 2026-07-01 ~ 07-29 (5회)
**RTO/RPO 목표**: 달성 가능 (모두 < 5분)
---
### WBS-9.5: 섹터 플로우 신호 신뢰도
**도구**: tools/measure_sector_flow_reliability_v1.py
**측정 지표**:
- Hit Rate: flow_credit 신호 정확도 (%)
- Correlation: flow_credit vs 실제 PnL 상관도 (-1~1)
- Reliability Score: 0-100 (Hit Rate 70% + Correlation 기반)
**상태 판정**:
- HIGH: Score ≥ 70
- MEDIUM: Score 50-69
- LOW: Score < 50
- INSUFFICIENT: 표본 < 5
**실행 시점**: WBS-8.5 완료 후 (섹터 플로우 30일 축적)
**사용법**:
```bash
python tools/measure_sector_flow_reliability_v1.py
```
**기대 결과**: 10개 섹터 중 6개 이상 HIGH/MEDIUM (≥60% hit rate)
---
### WBS-9.6: LLM 레이더 문서 최적화
**전략 파일**: docs/WBS_9_6_LLM_RADAR_OPTIMIZATION_STRATEGY_2026_06_22.md
**5가지 Phase**:
1. **신뢰도 분류** (1일)
- Canonical (100%): 현재 유효한 규격
- Adapter (80%): 인터페이스 정의
- Reference (60%): 배경/의사결정
- Deprecated (0%): 폐기된 개념
2. **읽음 순서 정의** (1.5일)
- Tier 1: 기초 개념 (field, mapping, flow)
- Tier 2: 비즈니스 규칙 (strategy, scoring)
- Tier 3: 실행 계약 (contracts)
- Tier 4: 기술 세부사항
- Tier 5: 운영/플레이북
3. **의존성 그래프** (1.5일)
- 자동 추출 (파일 참조 스캔)
- 순환 의존성 검사
- 고아 파일 식별
4. **용어 표준화** (1.5일)
- Terminology Glossary 생성
- 동일 개념 다중 이름 제거
- 약자 정의 자동화
5. **오류 검증** (2일)
- 30개 질문 테스트 세트
- LLM 독해 정확도 측정
- 오류율 리포트
**목표**: 독해 오류율 30% → 15% (-50%)
---
### WBS-9.7: 자동 백업 & 복구
**도구**: tools/backup_recovery_manager_v1.py
**백업 정책**:
- **일일**: 증분 백업 (data_feed.db, specs, formulas)
- **주간**: 전체 백업 (전체 프로젝트)
- **보관**: 30일 자동 정리
**복구 기능**:
- 백업에서 복원 (RTO < 1시간)
- 무결성 검증 (DB PRAGMA check)
- 메타데이터 추적
**사용법**:
```bash
# 일일 백업 실행
python tools/backup_recovery_manager_v1.py
# 특정 백업에서 복원
manager = BackupRecoveryManager()
result = manager.restore_from_backup("daily_20260622_120000")
```
**목표**: 99% 성공률, 복구 < 1시간
---
## 🎯 병렬 실행 계획 (2026-08-01 시작)
### 병렬 가능 (동시 진행)
- 9.1: F14 마이그레이션 검증 (이미 완료)
- 9.2: snapshot_admin 벤치마크
- 9.3: 데이터 품질 강화 (자동 충전 활성화)
- 9.4: 장애 대응 훈련
- 9.6: LLM 레이더 최적화
- 9.7: 백업 정책 실행
### 순차 필수
- 9.5: WBS-8.5 완료 후 (섹터 플로우 30일)
---
## 📈 예상 일정
| Week | Task | Owner | Duration |
|------|------|-------|----------|
| W1 (Aug 1-7) | 9.2 벤치마크 + 9.3 활성화 | Dev | 2-3 days |
| W1 (Aug 1-7) | 9.4 훈련 #1 + 9.7 설정 | DevOps | 2 days |
| W2 (Aug 8-14) | 9.6 Phase 1-2 (신뢰도 + 순서) | ML/Doc | 3-4 days |
| W3 (Aug 15-21) | 9.6 Phase 3-4 (의존성 + 용어) | ML/Doc | 3-4 days |
| W3 (Aug 15-21) | 9.5 신뢰도 측정 (WBS-8.5 완료시) | Analysis | 1 day |
| W4 (Aug 22-28) | 9.6 Phase 5 (오류 검증) + 9.2 최적화 | ML/Dev | 2-3 days |
| W4 (Aug 22-28) | 9.4 훈련 #2-5 | DevOps | 2 days |
**총 예상**: 14-21일 (병렬 진행)
---
## ✅ 완료 체크리스트
### 준비 단계 (2026-06-22)
- ✅ WBS-9.1: F14 마이그레이션 완료
- ✅ WBS-9.2: 벤치마크 도구 작성
- ✅ WBS-9.3: NULL 정책 + auto_fill 모듈 4개
- ✅ WBS-9.4: 장애 대응 플레이북 작성
- ✅ WBS-9.5: 신뢰도 측정 도구 작성
- ✅ WBS-9.6: 최적화 전략 수립
- ✅ WBS-9.7: 백업/복구 도구 작성
### 실행 단계 (2026-08-01부터)
- ⏳ WBS-9.1: GAS 코드 정리
- ⏳ WBS-9.2: 성능 벤치마크 실행 및 최적화
- ⏳ WBS-9.3: auto_fill 자동화 활성화
- ⏳ WBS-9.4: 장애 대응 훈련 5회 실행
- ⏳ WBS-9.5: 신뢰도 측정 (WBS-8.5 완료 후)
- ⏳ WBS-9.6: LLM 레이더 최적화 실행
- ⏳ WBS-9.7: 백업 정책 운영
---
## 📋 결론
**WBS-9 모든 항목이 준비 완료 상태입니다.**
- 도구: 7개 항목 모두 구현 또는 전략 수립 완료
- 문서: 5개 상세 계획 문서 작성
- 테스트: F14 parity 100% PASS
- 일정: 병렬 진행으로 14-21일 내 완료 가능
**2026-08-01부터 공식 시작 예정**
---
**작성**: 2026-06-22
**상태**: 최종 준비 완료
**다음**: WBS-9 공식 시작 (2026-08-01)
+59
View File
@@ -0,0 +1,59 @@
#!/usr/bin/env python3
"""
WBS-9.3: ATR20 자동 충전 절차
조건: atr20 IS NULL AND close_price IS NOT NULL
"""
def calculate_atr20(closes: list[float], period: int = 20) -> float:
"""
Calculate ATR (Average True Range) for 20 days.
입력: close 가격 리스트 (최소 20일)
출력: ATR20 (숫자)
"""
if len(closes) < period:
return None
# True Range 계산 (간소화 버전: 현재 close 기반)
trs = []
for i in range(1, len(closes)):
high = closes[i]
low = closes[i]
prev_close = closes[i - 1]
tr = max(
high - low,
abs(high - prev_close),
abs(low - prev_close)
)
trs.append(tr)
if not trs:
return None
# ATR = SMA of True Range (20일)
atr = sum(trs[-period:]) / min(period, len(trs))
return round(atr, 2)
def auto_fill_atr20(row: dict, historical_closes: list[float] = None) -> dict:
"""
자동 충전: atr20 필드
입력:
row: 현재 행 (atr20 = None)
historical_closes: 과거 close 가격 (최소 20일)
출력:
row (atr20 채워짐) 또는 원본 (실패시)
"""
if not historical_closes or row.get("atr20") is not None:
return row
atr20 = calculate_atr20(historical_closes)
if atr20 is not None:
row["atr20"] = atr20
row["_fill_source"] = "auto_fill_atr20_v1"
return row
+57
View File
@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
WBS-9.3: RSI14 자동 충전 절차
조건: rsi_14 IS NULL AND close_price IS NOT NULL
"""
def calculate_rsi14(closes: list[float], period: int = 14) -> float:
"""
Calculate RSI (Relative Strength Index) for 14 days.
입력: close 가격 리스트 (최소 14일)
출력: RSI14 (0-100)
"""
if len(closes) < period:
return None
# 가격 변화 계산
deltas = [closes[i] - closes[i - 1] for i in range(1, len(closes))]
# Gains/Losses 분리
gains = [d if d > 0 else 0 for d in deltas]
losses = [abs(d) if d < 0 else 0 for d in deltas]
# 평균 계산 (간단한 이동평균)
avg_gain = sum(gains[-period:]) / period if period <= len(gains) else sum(gains) / len(gains)
avg_loss = sum(losses[-period:]) / period if period <= len(losses) else sum(losses) / len(losses)
if avg_loss == 0:
return 100 if avg_gain > 0 else 50
rs = avg_gain / avg_loss
rsi = 100 - (100 / (1 + rs))
return round(rsi, 2)
def auto_fill_rsi14(row: dict, historical_closes: list[float] = None) -> dict:
"""
자동 충전: rsi_14 필드
입력:
row: 현재 행 (rsi_14 = None)
historical_closes: 과거 close 가격 (최소 14일)
출력:
row (rsi_14 채워짐) 또는 원본 (실패시)
"""
if not historical_closes or row.get("rsi_14") is not None:
return row
rsi14 = calculate_rsi14(historical_closes)
if rsi14 is not None:
row["rsi_14"] = rsi14
row["_fill_source"] = "auto_fill_rsi14_v1"
return row
@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
WBS-9.3: Stop Price 자동 충전 절차
조건: stop_price IS NULL AND atr20 IS NOT NULL
"""
def calculate_stop_price(
entry_price: float = None,
close_price: float = None,
atr20: float = None,
multiplier: float = 2.0,
fallback_pct: float = -5.0
) -> float:
"""
Calculate stop loss price.
전략 (우선순위):
1. entry_price 있으면: entry - (atr20 × multiplier)
2. entry_price 없으면: close - (atr20 × multiplier)
3. atr20 없으면: price × (1 + fallback_pct/100)
입력:
entry_price: 진입가 (선택)
close_price: 현재가 (필수 또는 entry_price)
atr20: ATR20 (권장)
multiplier: ATR 배수 (default: 2.0)
fallback_pct: ATR 미사용시 폴백 (default: -5%)
출력:
stop_price (숫자) 또는 None (실패시)
"""
# ATR 기반 계산 (권장)
if atr20 is not None and atr20 > 0:
base_price = entry_price if entry_price is not None else close_price
if base_price is not None and base_price > 0:
stop_price = base_price - (atr20 * multiplier)
return max(round(stop_price, 0), base_price * 0.8) # 최소 20% 하방선
# Fallback: 단순 백분율 기반
if close_price is not None and close_price > 0:
fallback_stop = close_price * (1 + fallback_pct / 100)
return max(round(fallback_stop, 0), close_price * 0.7) # 최소 30% 하방선
return None
def auto_fill_stop_price(
row: dict,
multiplier: float = 2.0,
fallback_pct: float = -5.0
) -> dict:
"""
자동 충전: stop_price 필드
입력:
row: 현재 행 (stop_price = None)
multiplier: ATR 배수 (default: 2.0)
fallback_pct: 폴백 백분율 (default: -5%)
출력:
row (stop_price 채워짐) 또는 원본 (실패시)
"""
if row.get("stop_price") is not None:
return row
stop_price = calculate_stop_price(
entry_price=row.get("entry_price"),
close_price=row.get("close_price"),
atr20=row.get("atr20"),
multiplier=multiplier,
fallback_pct=fallback_pct
)
if stop_price is not None:
row["stop_price"] = stop_price
row["_fill_source"] = "auto_fill_stop_price_v1"
row["_stop_price_basis"] = (
"atr20" if row.get("atr20") is not None else "fallback_pct"
)
return row
+92
View File
@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
WBS-9.3: Velocity 자동 충전 절차
조건: velocity_1d IS NULL AND (close_price AND previous_close_price) IS NOT NULL
"""
def calculate_velocity_1d(close_price: float, previous_close: float) -> float:
"""
Calculate 1-day velocity (percentage change).
입력: close_price, previous_close
출력: velocity_1d (%, e.g., 2.5 = +2.5%)
"""
if previous_close is None or previous_close == 0:
return None
velocity = ((close_price - previous_close) / previous_close) * 100
return round(velocity, 2)
def calculate_velocity_5d(closes: list[float]) -> float:
"""
Calculate 5-day velocity (5-period momentum).
입력: close 가격 리스트 (최소 5개)
출력: velocity_5d (%, e.g., 5.2 = +5.2%)
"""
if not closes or len(closes) < 5:
return None
current = closes[-1]
five_days_ago = closes[-5]
if five_days_ago is None or five_days_ago == 0:
return None
velocity = ((current - five_days_ago) / five_days_ago) * 100
return round(velocity, 2)
def auto_fill_velocity_1d(row: dict) -> dict:
"""
자동 충전: velocity_1d 필드
입력:
row: 현재 행 (velocity_1d = None)
필수: close_price, previous_close_price
출력:
row (velocity_1d 채워짐) 또는 원본 (실패시)
"""
if row.get("velocity_1d") is not None:
return row
close = row.get("close_price")
prev_close = row.get("previous_close_price")
if close is None or prev_close is None:
return row
velocity_1d = calculate_velocity_1d(close, prev_close)
if velocity_1d is not None:
row["velocity_1d"] = velocity_1d
row["_fill_source"] = "auto_fill_velocity_v1"
return row
def auto_fill_velocity_5d(row: dict, historical_closes: list[float] = None) -> dict:
"""
자동 충전: velocity_5d 필드
입력:
row: 현재 행
historical_closes: 과거 close 가격 (최소 5일)
출력:
row (velocity_5d 채워짐) 또는 원본 (실패시)
"""
if row.get("velocity_5d") is not None:
return row
if not historical_closes:
return row
velocity_5d = calculate_velocity_5d(historical_closes)
if velocity_5d is not None:
row["velocity_5d"] = velocity_5d
row["_fill_source"] = "auto_fill_velocity_v1"
return row
+420
View File
@@ -0,0 +1,420 @@
#!/usr/bin/env python3
"""
WBS-9.7: 자동 백업 & 복구 전략
목표: 99% 성공률, 복구 < 1시간
"""
import os
import shutil
import sqlite3
import json
import hashlib
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import subprocess
class BackupRecoveryManager:
"""백업 및 복구 관리자"""
def __init__(
self,
data_dir: str = "src/quant_engine",
backup_dir: str = "backups",
retention_days: int = 30
):
self.data_dir = Path(data_dir)
self.backup_dir = Path(backup_dir)
self.retention_days = retention_days
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.results = {
"timestamp": datetime.now().isoformat(),
"backups": [],
"recovery_tests": [],
"summary": {}
}
def create_daily_backup(self) -> Dict:
"""일일 증분 백업"""
backup_name = f"daily_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / backup_name
try:
# 필요한 파일 목록
files_to_backup = [
self.data_dir / "data_feed.db",
self.data_dir / "calibration_registry.yaml",
Path("spec") / "12_field_dictionary.yaml",
Path("spec") / "13_formula_registry.yaml",
]
backup_path.mkdir(parents=True, exist_ok=True)
# 파일 복사
success_count = 0
error_count = 0
total_size = 0
for src in files_to_backup:
if src.exists():
try:
dst = backup_path / src.name
if src.is_file():
shutil.copy2(src, dst)
total_size += dst.stat().st_size
success_count += 1
elif src.is_dir():
shutil.copytree(src, dst)
total_size += sum(
f.stat().st_size for f in dst.rglob("*") if f.is_file()
)
success_count += 1
except Exception as e:
print(f"Error backing up {src}: {e}")
error_count += 1
# 메타데이터 저장
metadata = {
"backup_name": backup_name,
"timestamp": datetime.now().isoformat(),
"files_backed_up": success_count,
"files_failed": error_count,
"total_size_bytes": total_size,
"type": "daily_incremental"
}
with open(backup_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
result = {
"backup_name": backup_name,
"status": "SUCCESS" if error_count == 0 else "PARTIAL_SUCCESS",
"files_backed_up": success_count,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"path": str(backup_path)
}
self.results["backups"].append(result)
return result
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def create_weekly_full_backup(self) -> Dict:
"""주간 전체 백업"""
backup_name = f"weekly_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
backup_path = self.backup_dir / backup_name
try:
# 전체 프로젝트 백업 (제외: 임시 파일, cache)
backup_path.mkdir(parents=True, exist_ok=True)
exclude_dirs = {".git", "__pycache__", ".pytest_cache", "Temp", "outputs"}
total_size = 0
file_count = 0
for root_dir in [self.data_dir, Path("spec"), Path("formulas")]:
if not root_dir.exists():
continue
for src_file in root_dir.rglob("*"):
# 제외 디렉터리 확인
if any(exc in src_file.parts for exc in exclude_dirs):
continue
if src_file.is_file():
rel_path = src_file.relative_to(src_file.anchor)
dst = backup_path / rel_path
try:
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(src_file, dst)
total_size += dst.stat().st_size
file_count += 1
except Exception as e:
print(f"Error backing up {src_file}: {e}")
metadata = {
"backup_name": backup_name,
"timestamp": datetime.now().isoformat(),
"files_backed_up": file_count,
"total_size_bytes": total_size,
"type": "weekly_full"
}
with open(backup_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
result = {
"backup_name": backup_name,
"status": "SUCCESS",
"files_backed_up": file_count,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"path": str(backup_path)
}
self.results["backups"].append(result)
return result
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def restore_from_backup(self, backup_name: str, restore_to: str = None) -> Dict:
"""백업에서 복원"""
backup_path = self.backup_dir / backup_name
restore_to = Path(restore_to) if restore_to else self.data_dir
if not backup_path.exists():
return {
"backup_name": backup_name,
"status": "FAILED",
"error": f"Backup not found: {backup_path}"
}
try:
start_time = datetime.now()
restore_to.parent.mkdir(parents=True, exist_ok=True)
# 백업 파일 복원
restored_count = 0
for src in backup_path.glob("*"):
if src.name == "metadata.json":
continue
dst = restore_to / src.name
try:
if src.is_file():
shutil.copy2(src, dst)
restored_count += 1
elif src.is_dir():
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
restored_count += 1
except Exception as e:
print(f"Error restoring {src}: {e}")
recovery_time = (datetime.now() - start_time).total_seconds()
result = {
"backup_name": backup_name,
"status": "SUCCESS",
"files_restored": restored_count,
"recovery_time_seconds": round(recovery_time, 2),
"restored_to": str(restore_to)
}
self.results["recovery_tests"].append(result)
return result
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def cleanup_old_backups(self) -> Dict:
"""오래된 백업 정리"""
cutoff_date = datetime.now() - timedelta(days=self.retention_days)
deleted_count = 0
freed_size = 0
try:
for backup_dir in self.backup_dir.iterdir():
if backup_dir.is_dir():
try:
metadata_file = backup_dir / "metadata.json"
if metadata_file.exists():
with open(metadata_file) as f:
metadata = json.load(f)
backup_time = datetime.fromisoformat(metadata["timestamp"])
if backup_time < cutoff_date:
# 크기 계산
for f in backup_dir.rglob("*"):
if f.is_file():
freed_size += f.stat().st_size
# 삭제
shutil.rmtree(backup_dir)
deleted_count += 1
except Exception as e:
print(f"Error processing {backup_dir}: {e}")
return {
"status": "SUCCESS",
"deleted_backups": deleted_count,
"freed_space_mb": round(freed_size / (1024 * 1024), 2)
}
except Exception as e:
return {
"status": "FAILED",
"error": str(e)
}
def test_backup_integrity(self, backup_name: str) -> Dict:
"""백업 무결성 테스트"""
backup_path = self.backup_dir / backup_name
if not backup_path.exists():
return {
"backup_name": backup_name,
"status": "FAILED",
"error": "Backup not found"
}
try:
# 메타데이터 검증
metadata_file = backup_path / "metadata.json"
if not metadata_file.exists():
return {
"backup_name": backup_name,
"status": "FAILED",
"error": "Metadata missing"
}
with open(metadata_file) as f:
metadata = json.load(f)
# 파일 개수 검증
actual_files = len(list(backup_path.glob("*"))) - 1 # metadata 제외
expected_files = metadata.get("files_backed_up", actual_files)
# DB 무결성 검증
db_file = backup_path / "data_feed.db"
db_integrity = "OK"
if db_file.exists():
try:
conn = sqlite3.connect(db_file)
cursor = conn.execute("PRAGMA integrity_check")
result = cursor.fetchone()
db_integrity = result[0] if result else "UNKNOWN"
conn.close()
except Exception:
db_integrity = "FAILED"
return {
"backup_name": backup_name,
"status": "SUCCESS",
"metadata_valid": True,
"file_count": actual_files,
"expected_files": expected_files,
"database_integrity": db_integrity,
"backup_timestamp": metadata.get("timestamp")
}
except Exception as e:
return {
"backup_name": backup_name,
"status": "FAILED",
"error": str(e)
}
def generate_backup_report(self) -> Dict:
"""백업 리포트 생성"""
# 존재하는 백업 목록
existing_backups = [
d.name for d in self.backup_dir.iterdir()
if d.is_dir() and (d / "metadata.json").exists()
]
# 전체 크기 계산
total_backup_size = sum(
sum(f.stat().st_size for f in (self.backup_dir / b).rglob("*") if f.is_file())
for b in existing_backups
)
# Daily/Weekly 분류
daily_backups = [b for b in existing_backups if b.startswith("daily_")]
weekly_backups = [b for b in existing_backups if b.startswith("weekly_")]
self.results["summary"] = {
"total_backups": len(existing_backups),
"daily_backups": len(daily_backups),
"weekly_backups": len(weekly_backups),
"total_size_mb": round(total_backup_size / (1024 * 1024), 2),
"retention_days": self.retention_days,
"success_rate": round(
(len([b for b in self.results["backups"] if b.get("status") == "SUCCESS"]) /
max(len(self.results["backups"]), 1)) * 100,
1
) if self.results["backups"] else 100
}
return self.results
def print_report(self):
"""리포트 출력"""
print("\n" + "=" * 80)
print("BACKUP & RECOVERY MANAGEMENT REPORT")
print("=" * 80)
print(f"Timestamp: {self.results['timestamp']}\n")
print("RECENT BACKUPS:")
print("-" * 80)
for backup in self.results["backups"][-5:]:
status_marker = "" if backup.get("status") == "SUCCESS" else ""
print(
f"{status_marker} {backup.get('backup_name', 'N/A'):30} "
f"| Size: {backup.get('total_size_mb', 0):8.2f}MB | "
f"Files: {backup.get('files_backed_up', 0):3}"
)
if self.results["summary"]:
s = self.results["summary"]
print("\nSUMMARY:")
print("-" * 80)
print(f"Total backups: {s['total_backups']}")
print(f"Daily backups: {s['daily_backups']}")
print(f"Weekly backups: {s['weekly_backups']}")
print(f"Total size: {s['total_size_mb']:.2f}MB")
print(f"Success rate: {s['success_rate']:.1f}%")
print("=" * 80 + "\n")
def save_report(self, output_file: str = None):
"""리포트 저장"""
if not output_file:
output_file = f"Temp/backup_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"Report saved: {output_file}")
if __name__ == "__main__":
manager = BackupRecoveryManager()
# 일일 백업 실행
print("Creating daily backup...")
manager.create_daily_backup()
# 주간 백업 (매주 월요일)
if datetime.now().weekday() == 0:
print("Creating weekly full backup...")
manager.create_weekly_full_backup()
# 오래된 백업 정리
print("Cleaning up old backups...")
manager.cleanup_old_backups()
# 리포트 생성 및 출력
manager.generate_backup_report()
manager.print_report()
manager.save_report()
+286
View File
@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
WBS-9.5: 섹터 플로우 신호 신뢰도 측정
목표: 섹터별 flow_credit vs 실제 수익률 상관도 계산
"""
import json
import sqlite3
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Tuple
import statistics
class SectorFlowReliabilityMeasure:
"""섹터 플로우 신뢰도 측정 도구"""
def __init__(self, db_path: str = None):
self.db_path = db_path or "src/quant_engine/data_feed.db"
self.results = {
"timestamp": datetime.now().isoformat(),
"sectors": {},
"summary": {}
}
def _query_sector_trades(self, sector: str, days: int = 30) -> List[Dict]:
"""특정 섹터의 거래 데이터 조회 (T+20 결과 포함)"""
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = """
SELECT
ticker,
entry_date,
exit_date,
entry_price,
exit_price,
pnl_pct,
flow_credit,
sector
FROM performance
WHERE sector = ?
AND entry_date >= datetime('now', '-' || ? || ' days')
AND exit_date IS NOT NULL
ORDER BY entry_date DESC
"""
cursor.execute(query, (sector, days))
trades = [dict(row) for row in cursor.fetchall()]
conn.close()
return trades
except Exception as e:
print(f"Error querying trades for {sector}: {e}")
return []
def _calculate_hit_rate(self, signal_correct: List[bool]) -> float:
"""신호 정확도 계산 (몇 %가 맞았는가)"""
if not signal_correct:
return 0.0
return (sum(signal_correct) / len(signal_correct)) * 100
def _calculate_correlation(
self,
flow_credits: List[float],
pnl_pcts: List[float]
) -> float:
"""flow_credit vs pnl 상관계수 계산"""
if len(flow_credits) < 2 or len(pnl_pcts) < 2:
return None
if len(flow_credits) != len(pnl_pcts):
return None
mean_flow = statistics.mean(flow_credits)
mean_pnl = statistics.mean(pnl_pcts)
covariance = sum(
(flow_credits[i] - mean_flow) * (pnl_pcts[i] - mean_pnl)
for i in range(len(flow_credits))
) / len(flow_credits)
std_flow = statistics.stdev(flow_credits) if len(flow_credits) > 1 else 0
std_pnl = statistics.stdev(pnl_pcts) if len(pnl_pcts) > 1 else 0
if std_flow == 0 or std_pnl == 0:
return 0.0
correlation = covariance / (std_flow * std_pnl)
return round(min(1.0, max(-1.0, correlation)), 3)
def measure_sector(self, sector: str, days: int = 30) -> Dict:
"""
특정 섹터의 신뢰도 측정
입력:
sector: 섹터명 (e.g., "금융", "IT")
days: 회고 기간 (default: 30일)
출력:
{
"sector": str,
"sample_count": int,
"flow_signal_hit_rate": float (0-100),
"correlation": float (-1~1),
"mean_pnl_correct": float,
"mean_pnl_incorrect": float,
"reliability_score": float (0-100),
"status": "HIGH" | "MEDIUM" | "LOW" | "INSUFFICIENT"
}
"""
trades = self._query_sector_trades(sector, days)
if len(trades) < 5:
return {
"sector": sector,
"sample_count": len(trades),
"status": "INSUFFICIENT",
"note": f"Samples < 5 ({len(trades)} found)"
}
# 신호 정확도 (flow_credit > 0 인 거래가 실제 수익인가?)
flow_credits = []
pnl_pcts = []
signal_correct = []
for trade in trades:
flow = trade.get("flow_credit", 0)
pnl = trade.get("pnl_pct", 0)
flow_credits.append(flow)
pnl_pcts.append(pnl)
# 신호: flow > 0이면 수익일 것으로 예측
is_profitable = pnl > 0
signal_predicts_profit = flow > 0
is_correct = is_profitable == signal_predicts_profit
signal_correct.append(is_correct)
# 상관도 계산
correlation = self._calculate_correlation(flow_credits, pnl_pcts)
# Hit rate (신호 정확도)
hit_rate = self._calculate_hit_rate(signal_correct)
# 평균 수익 (신호 맞음 vs 틀림)
correct_pnls = [pnl_pcts[i] for i in range(len(pnl_pcts)) if signal_correct[i]]
incorrect_pnls = [pnl_pcts[i] for i in range(len(pnl_pcts)) if not signal_correct[i]]
mean_pnl_correct = statistics.mean(correct_pnls) if correct_pnls else 0
mean_pnl_incorrect = statistics.mean(incorrect_pnls) if incorrect_pnls else 0
# 신뢰도 점수 (0-100)
# Hit rate 60% + Correlation이 높을수록 높음
reliability_score = (hit_rate * 0.7) + (
(correlation + 1) * 50 * 0.3 if correlation is not None else 0
)
# 상태 판정
if reliability_score >= 70:
status = "HIGH"
elif reliability_score >= 50:
status = "MEDIUM"
else:
status = "LOW"
return {
"sector": sector,
"sample_count": len(trades),
"flow_signal_hit_rate": round(hit_rate, 1),
"correlation": correlation,
"mean_pnl_correct": round(mean_pnl_correct, 2),
"mean_pnl_incorrect": round(mean_pnl_incorrect, 2),
"reliability_score": round(reliability_score, 1),
"status": status,
"lookback_days": days
}
def measure_all_sectors(self, days: int = 30) -> Dict:
"""모든 섹터에 대해 신뢰도 측정"""
sectors = [
"금융", "IT", "전기전자", "화학", "철강금속",
"기계", "의약품", "반도체", "통신", "에너지"
]
for sector in sectors:
result = self.measure_sector(sector, days)
self.results["sectors"][sector] = result
self._generate_summary()
return self.results
def _generate_summary(self):
"""전체 요약 생성"""
sectors_results = self.results["sectors"]
high_reliability = [
s for s, r in sectors_results.items() if r.get("status") == "HIGH"
]
medium_reliability = [
s for s, r in sectors_results.items() if r.get("status") == "MEDIUM"
]
low_reliability = [
s for s, r in sectors_results.items() if r.get("status") == "LOW"
]
insufficient = [
s for s, r in sectors_results.items() if r.get("status") == "INSUFFICIENT"
]
avg_hit_rate = statistics.mean([
r["flow_signal_hit_rate"]
for r in sectors_results.values()
if "flow_signal_hit_rate" in r
]) if any("flow_signal_hit_rate" in r for r in sectors_results.values()) else 0
self.results["summary"] = {
"total_sectors": len(sectors_results),
"high_reliability": len(high_reliability),
"medium_reliability": len(medium_reliability),
"low_reliability": len(low_reliability),
"insufficient_data": len(insufficient),
"avg_hit_rate": round(avg_hit_rate, 1),
"high_reliability_sectors": high_reliability,
"low_reliability_sectors": low_reliability,
"recommendation": (
"✓ 신호 신뢰도 충분 (≥60% hit rate)"
if avg_hit_rate >= 60 else
"⚠ 신호 신뢰도 미흡 (< 60% hit rate)"
)
}
def print_report(self):
"""리포트 출력"""
print("\n" + "=" * 80)
print("SECTOR FLOW RELIABILITY MEASUREMENT REPORT")
print("=" * 80)
print(f"Timestamp: {self.results['timestamp']}\n")
print("SECTOR-BY-SECTOR RESULTS:")
print("-" * 80)
for sector, result in sorted(self.results["sectors"].items()):
if result.get("status") in ["HIGH", "MEDIUM", "LOW"]:
status_marker = "" if result["status"] == "HIGH" else ""
print(
f"{status_marker} {sector:10} | "
f"Samples: {result['sample_count']:2} | "
f"Hit Rate: {result['flow_signal_hit_rate']:5.1f}% | "
f"Correlation: {result['correlation']:6.3f} | "
f"Score: {result['reliability_score']:5.1f}"
)
else:
print(f"- {sector:10} | {result.get('note', 'INSUFFICIENT DATA')}")
print("\nSUMMARY:")
print("-" * 80)
s = self.results["summary"]
print(f"Total sectors: {s['total_sectors']}")
print(f"High reliability: {s['high_reliability']} {s['high_reliability_sectors']}")
print(f"Medium reliability: {s['medium_reliability']}")
print(f"Low reliability: {s['low_reliability']} {s['low_reliability_sectors']}")
print(f"Insufficient data: {s['insufficient_data']}")
print(f"\nAverage hit rate: {s['avg_hit_rate']:.1f}%")
print(f"Recommendation: {s['recommendation']}")
print("=" * 80 + "\n")
def save_report(self, output_file: str = None):
"""리포트 저장"""
if not output_file:
output_file = f"Temp/sector_flow_reliability_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
Path(output_file).parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, indent=2, ensure_ascii=False)
print(f"Report saved: {output_file}")
if __name__ == "__main__":
# 30일 회고 기반 신뢰도 측정
measurer = SectorFlowReliabilityMeasure()
measurer.measure_all_sectors(days=30)
measurer.print_report()
measurer.save_report()