Files
QuantEngineByItz/tools/wbs95_sector_flow_reliability.py
kjh2064 ed1fe03663 모든 제안 작업 완료: data_feed 수정 + WBS-9.7 + WBS-9.5
### 1. data_feed 컬럼 수정 (CRITICAL)
- 문제: header row 오류로 인한 컬럼명 손실
- 해결: JSON metadata의 header_row_1based 사용
- load_from_xlsx_correct.py: 각 시트별 정확한 header 파라미터 적용
- 결과: data_feed 194개 컬럼 정상 로드 (Ticker, Name, Price_Date 등)

### 2. WBS-9.7 완료: Gitea CI/CD 백업 (80% → 100%)
- "Backup SQLite Database" step 추가
- 기능:
  + 매 실행 후 DB 백업 (타임스탐프 기반)
  + manifest.json 생성 (메타데이터)
  + 7일 이상 된 백업 자동 삭제
  + 백업 위치: /volume1/gitea/backups/kis_data_collection/

### 3. WBS-9.5 완료: Sector Flow Reliability (100%)
- 측정 항목 3가지:
  + 데이터 커버리지: 100/100 (17개 섹터)
  + 신선도: 80/100 (6일 전)
  + 일관성: 100/100 (NULL/이상치 없음)
- 종합 신뢰도: 92.0/100 (HIGH - 신뢰 가능)
- wbs95_sector_flow_reliability.py: 신뢰도 측정 및 상태 보고

### 최종 데이터 상태
- XLSX: 20개 시트 → DB 27개 테이블 (100% 커버리지)
- kis_data_collection.db: 25 rows
- snapshot_admin.db: 7,484 rows
- 모든 테이블 정상 동기화

### WBS 완료 현황
✓ WBS-8.1: T+20 모니터링 (100%)
✓ WBS-9.2: 성능 최적화 (100%)
✓ WBS-9.3: NULL Policy (100%)
✓ WBS-9.5: Sector Flow Reliability (100%)
✓ WBS-9.6: LLM Radar Phase 3-5 (100%)
✓ WBS-9.7: Gitea CI/CD 백업 (100%)

### 웹 UI 상태
- 포트 5000 실행 중
- API 모든 엔드포인트 정상
- settings 32개 항목 (수정 테스트 완료)
- account_snapshot 44개 계좌

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-23 00:58:48 +09:00

306 lines
9.8 KiB
Python

#!/usr/bin/env python3
"""
WBS-9.5: Sector Flow Reliability Measurement
섹터 흐름 신뢰도 측정
- 데이터 커버리지: 섹터별 데이터 가용도
- 신선도: 최신 데이터의 타이밍
- 일관성: 데이터 품질 및 이상치 감지
"""
import sqlite3
from pathlib import Path
from datetime import datetime, timedelta
import json
class SectorFlowReliability:
"""섹터 흐름 신뢰도 측정"""
def __init__(self):
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
self.results = {
"timestamp": datetime.now().isoformat(),
"measurements": {}
}
def measure_data_coverage(self) -> dict:
"""데이터 커버리지 측정"""
print("\n[1. 데이터 커버리지]")
conn = sqlite3.connect(self.snapshot_db)
cursor = conn.cursor()
# 테이블 확인
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sector_flow_history'")
if not cursor.fetchone():
print(" [!] sector_flow_history 테이블이 없음")
return {}
# 총 행 수
cursor.execute("SELECT COUNT(*) FROM sector_flow_history")
total_rows = cursor.fetchone()[0]
# 섹터별 행 수
cursor.execute("""
SELECT Sector, COUNT(*) as count
FROM sector_flow_history
GROUP BY Sector
ORDER BY count DESC
""")
sector_counts = cursor.fetchall()
coverage = {
"total_rows": total_rows,
"sectors": len(sector_counts),
"sector_distribution": {}
}
print(f" 총 행: {total_rows}")
print(f" 섹터 수: {len(sector_counts)}")
print(f" 섹터별 분포:")
for sector, count in sector_counts[:10]:
pct = (count / total_rows * 100) if total_rows > 0 else 0
coverage["sector_distribution"][sector] = {
"count": count,
"percentage": round(pct, 1)
}
print(f" {sector}: {count} ({pct:.1f}%)")
conn.close()
return coverage
def measure_data_freshness(self) -> dict:
"""데이터 신선도 측정"""
print("\n[2. 데이터 신선도]")
conn = sqlite3.connect(self.snapshot_db)
cursor = conn.cursor()
# 최신 날짜 확인
cursor.execute("""
SELECT MIN(Snapshot_Date) as earliest, MAX(Snapshot_Date) as latest
FROM sector_flow_history
""")
earliest, latest = cursor.fetchone()
freshness = {
"earliest_date": earliest,
"latest_date": latest,
"age_days": 0
}
if latest:
try:
latest_dt = datetime.fromisoformat(latest)
age = (datetime.now() - latest_dt).days
freshness["age_days"] = age
print(f" 최신 데이터: {latest} ({age}일 전)")
except:
print(f" 최신 데이터: {latest}")
if earliest:
print(f" 가장 오래된 데이터: {earliest}")
# 시간대별 분포
cursor.execute("""
SELECT
DATE(Snapshot_Date) as date,
COUNT(*) as count
FROM sector_flow_history
GROUP BY DATE(Snapshot_Date)
ORDER BY date DESC
LIMIT 10
""")
date_dist = cursor.fetchall()
print(f" 최근 10일 분포:")
for date, count in date_dist:
print(f" {date}: {count}")
conn.close()
return freshness
def measure_data_consistency(self) -> dict:
"""데이터 일관성 측정"""
print("\n[3. 데이터 일관성]")
conn = sqlite3.connect(self.snapshot_db)
cursor = conn.cursor()
consistency = {
"null_values": 0,
"outliers": 0,
"warnings": []
}
# NULL 값 확인
cursor.execute("""
SELECT
COUNT(*) as null_count,
COUNT(DISTINCT Sector) as sectors_with_nulls
FROM sector_flow_history
WHERE
Sector IS NULL
OR Snapshot_Date IS NULL
OR Sector_Score IS NULL
""")
null_count, sectors_null = cursor.fetchone()
consistency["null_values"] = null_count
if null_count > 0:
consistency["warnings"].append(f"NULL 값 발견: {null_count}개")
print(f" [!] NULL 값: {null_count}")
# 이상치 감지 (극단값)
cursor.execute("""
SELECT
Sector,
MIN(Sector_Score) as min_val,
MAX(Sector_Score) as max_val,
AVG(Sector_Score) as avg_val
FROM sector_flow_history
WHERE Sector_Score IS NOT NULL
GROUP BY Sector
""")
anomalies = 0
for sector, min_val, max_val, avg_val in cursor.fetchall():
if min_val is None or max_val is None:
continue
# 극단값 감지 (평균의 5배 이상)
if avg_val and max_val > avg_val * 5:
anomalies += 1
consistency["warnings"].append(f"{sector}: 극단값 감지 ({max_val})")
consistency["outliers"] = anomalies
if anomalies > 0:
print(f" [!] 이상치: {anomalies}개 섹터")
# 데이터 완정성 (중요 컬럼)
cursor.execute("""
SELECT
(COUNT(*) - COUNT(Sector)) as missing_sectors,
(COUNT(*) - COUNT(Snapshot_Date)) as missing_dates,
(COUNT(*) - COUNT(Sector_Score)) as missing_values
FROM sector_flow_history
""")
missing_sectors, missing_dates, missing_values = cursor.fetchone()
print(f" 누락 데이터: sector={missing_sectors}, date={missing_dates}, value={missing_values}")
conn.close()
return consistency
def calculate_reliability_score(self, coverage: dict, freshness: dict, consistency: dict) -> float:
"""종합 신뢰도 점수 계산"""
print("\n[4. 종합 신뢰도]")
scores = {
"coverage_score": 0,
"freshness_score": 0,
"consistency_score": 0
}
# 커버리지 점수 (0-100)
if coverage.get("total_rows", 0) > 0:
sector_count = coverage.get("sectors", 0)
# 10개 이상 섹터: 100점
# 1개 미만: 0점
scores["coverage_score"] = min(100, sector_count * 10)
print(f" 커버리지: {scores['coverage_score']:.1f}/100 ({coverage.get('sectors', 0)} 섹터)")
# 신선도 점수 (0-100)
age_days = freshness.get("age_days", 9999)
if age_days <= 1:
scores["freshness_score"] = 100 # 1일 이내
elif age_days <= 7:
scores["freshness_score"] = 80 # 1주일 이내
elif age_days <= 30:
scores["freshness_score"] = 50 # 1개월 이내
else:
scores["freshness_score"] = 20 # 오래됨
print(f" 신선도: {scores['freshness_score']:.1f}/100 ({age_days}일 전)")
# 일관성 점수 (0-100)
null_violations = consistency.get("null_values", 0)
outlier_count = consistency.get("outliers", 0)
warnings = len(consistency.get("warnings", []))
consistency_score = 100
if null_violations > 0:
consistency_score -= min(20, null_violations / 10)
if outlier_count > 0:
consistency_score -= min(30, outlier_count * 3)
consistency_score = max(0, consistency_score)
scores["consistency_score"] = consistency_score
print(f" 일관성: {scores['consistency_score']:.1f}/100 ({warnings} 경고)")
# 종합 점수 (가중 평균)
# 커버리지 30%, 신선도 40%, 일관성 30%
overall = (
scores["coverage_score"] * 0.3 +
scores["freshness_score"] * 0.4 +
scores["consistency_score"] * 0.3
)
print(f"\n 종합 신뢰도: {overall:.1f}/100")
return overall
def run(self) -> dict:
"""전체 실행"""
print("="*80)
print("WBS-9.5: Sector Flow Reliability Measurement")
print("="*80)
# 측정
coverage = self.measure_data_coverage()
freshness = self.measure_data_freshness()
consistency = self.measure_data_consistency()
# 신뢰도 점수
reliability_score = self.calculate_reliability_score(coverage, freshness, consistency)
# 결과 저장
self.results["measurements"] = {
"coverage": coverage,
"freshness": freshness,
"consistency": consistency,
"reliability_score": reliability_score
}
# 신뢰도 판정
if reliability_score >= 80:
status = "HIGH (신뢰 가능)"
elif reliability_score >= 60:
status = "MEDIUM (주의 필요)"
else:
status = "LOW (개선 필요)"
print(f"\n[판정]")
print(f" 신뢰도 상태: {status}")
print(f" 권장사항: {'데이터 사용 가능' if reliability_score >= 60 else '데이터 보완 필요'}")
self.results["summary"] = {
"status": status,
"reliability_score": reliability_score,
"measurement_date": datetime.now().isoformat()
}
return self.results
if __name__ == "__main__":
measurer = SectorFlowReliability()
result = measurer.run()
# 결과 저장
output_file = Path("Temp/wbs95_sector_flow_reliability.json")
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"\n[저장] {output_file}")
print("[완료] WBS-9.5 Sector Flow Reliability Measurement 완료")