ed1fe03663
### 1. data_feed 컬럼 수정 (CRITICAL) - 문제: header row 오류로 인한 컬럼명 손실 - 해결: JSON metadata의 header_row_1based 사용 - load_from_xlsx_correct.py: 각 시트별 정확한 header 파라미터 적용 - 결과: data_feed 194개 컬럼 정상 로드 (Ticker, Name, Price_Date 등) ### 2. WBS-9.7 완료: Gitea CI/CD 백업 (80% → 100%) - "Backup SQLite Database" step 추가 - 기능: + 매 실행 후 DB 백업 (타임스탐프 기반) + manifest.json 생성 (메타데이터) + 7일 이상 된 백업 자동 삭제 + 백업 위치: /volume1/gitea/backups/kis_data_collection/ ### 3. WBS-9.5 완료: Sector Flow Reliability (100%) - 측정 항목 3가지: + 데이터 커버리지: 100/100 (17개 섹터) + 신선도: 80/100 (6일 전) + 일관성: 100/100 (NULL/이상치 없음) - 종합 신뢰도: 92.0/100 (HIGH - 신뢰 가능) - wbs95_sector_flow_reliability.py: 신뢰도 측정 및 상태 보고 ### 최종 데이터 상태 - XLSX: 20개 시트 → DB 27개 테이블 (100% 커버리지) - kis_data_collection.db: 25 rows - snapshot_admin.db: 7,484 rows - 모든 테이블 정상 동기화 ### WBS 완료 현황 ✓ WBS-8.1: T+20 모니터링 (100%) ✓ WBS-9.2: 성능 최적화 (100%) ✓ WBS-9.3: NULL Policy (100%) ✓ WBS-9.5: Sector Flow Reliability (100%) ✓ WBS-9.6: LLM Radar Phase 3-5 (100%) ✓ WBS-9.7: Gitea CI/CD 백업 (100%) ### 웹 UI 상태 - 포트 5000 실행 중 - API 모든 엔드포인트 정상 - settings 32개 항목 (수정 테스트 완료) - account_snapshot 44개 계좌 Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
306 lines
9.8 KiB
Python
306 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
WBS-9.5: Sector Flow Reliability Measurement
|
|
|
|
섹터 흐름 신뢰도 측정
|
|
- 데이터 커버리지: 섹터별 데이터 가용도
|
|
- 신선도: 최신 데이터의 타이밍
|
|
- 일관성: 데이터 품질 및 이상치 감지
|
|
"""
|
|
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
import json
|
|
|
|
class SectorFlowReliability:
|
|
"""섹터 흐름 신뢰도 측정"""
|
|
|
|
def __init__(self):
|
|
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
|
|
self.results = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"measurements": {}
|
|
}
|
|
|
|
def measure_data_coverage(self) -> dict:
|
|
"""데이터 커버리지 측정"""
|
|
print("\n[1. 데이터 커버리지]")
|
|
|
|
conn = sqlite3.connect(self.snapshot_db)
|
|
cursor = conn.cursor()
|
|
|
|
# 테이블 확인
|
|
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sector_flow_history'")
|
|
if not cursor.fetchone():
|
|
print(" [!] sector_flow_history 테이블이 없음")
|
|
return {}
|
|
|
|
# 총 행 수
|
|
cursor.execute("SELECT COUNT(*) FROM sector_flow_history")
|
|
total_rows = cursor.fetchone()[0]
|
|
|
|
# 섹터별 행 수
|
|
cursor.execute("""
|
|
SELECT Sector, COUNT(*) as count
|
|
FROM sector_flow_history
|
|
GROUP BY Sector
|
|
ORDER BY count DESC
|
|
""")
|
|
sector_counts = cursor.fetchall()
|
|
|
|
coverage = {
|
|
"total_rows": total_rows,
|
|
"sectors": len(sector_counts),
|
|
"sector_distribution": {}
|
|
}
|
|
|
|
print(f" 총 행: {total_rows}")
|
|
print(f" 섹터 수: {len(sector_counts)}")
|
|
print(f" 섹터별 분포:")
|
|
|
|
for sector, count in sector_counts[:10]:
|
|
pct = (count / total_rows * 100) if total_rows > 0 else 0
|
|
coverage["sector_distribution"][sector] = {
|
|
"count": count,
|
|
"percentage": round(pct, 1)
|
|
}
|
|
print(f" {sector}: {count} ({pct:.1f}%)")
|
|
|
|
conn.close()
|
|
return coverage
|
|
|
|
def measure_data_freshness(self) -> dict:
|
|
"""데이터 신선도 측정"""
|
|
print("\n[2. 데이터 신선도]")
|
|
|
|
conn = sqlite3.connect(self.snapshot_db)
|
|
cursor = conn.cursor()
|
|
|
|
# 최신 날짜 확인
|
|
cursor.execute("""
|
|
SELECT MIN(Snapshot_Date) as earliest, MAX(Snapshot_Date) as latest
|
|
FROM sector_flow_history
|
|
""")
|
|
earliest, latest = cursor.fetchone()
|
|
|
|
freshness = {
|
|
"earliest_date": earliest,
|
|
"latest_date": latest,
|
|
"age_days": 0
|
|
}
|
|
|
|
if latest:
|
|
try:
|
|
latest_dt = datetime.fromisoformat(latest)
|
|
age = (datetime.now() - latest_dt).days
|
|
freshness["age_days"] = age
|
|
print(f" 최신 데이터: {latest} ({age}일 전)")
|
|
except:
|
|
print(f" 최신 데이터: {latest}")
|
|
|
|
if earliest:
|
|
print(f" 가장 오래된 데이터: {earliest}")
|
|
|
|
# 시간대별 분포
|
|
cursor.execute("""
|
|
SELECT
|
|
DATE(Snapshot_Date) as date,
|
|
COUNT(*) as count
|
|
FROM sector_flow_history
|
|
GROUP BY DATE(Snapshot_Date)
|
|
ORDER BY date DESC
|
|
LIMIT 10
|
|
""")
|
|
|
|
date_dist = cursor.fetchall()
|
|
print(f" 최근 10일 분포:")
|
|
for date, count in date_dist:
|
|
print(f" {date}: {count}")
|
|
|
|
conn.close()
|
|
return freshness
|
|
|
|
def measure_data_consistency(self) -> dict:
|
|
"""데이터 일관성 측정"""
|
|
print("\n[3. 데이터 일관성]")
|
|
|
|
conn = sqlite3.connect(self.snapshot_db)
|
|
cursor = conn.cursor()
|
|
|
|
consistency = {
|
|
"null_values": 0,
|
|
"outliers": 0,
|
|
"warnings": []
|
|
}
|
|
|
|
# NULL 값 확인
|
|
cursor.execute("""
|
|
SELECT
|
|
COUNT(*) as null_count,
|
|
COUNT(DISTINCT Sector) as sectors_with_nulls
|
|
FROM sector_flow_history
|
|
WHERE
|
|
Sector IS NULL
|
|
OR Snapshot_Date IS NULL
|
|
OR Sector_Score IS NULL
|
|
""")
|
|
|
|
null_count, sectors_null = cursor.fetchone()
|
|
consistency["null_values"] = null_count
|
|
if null_count > 0:
|
|
consistency["warnings"].append(f"NULL 값 발견: {null_count}개")
|
|
print(f" [!] NULL 값: {null_count}")
|
|
|
|
# 이상치 감지 (극단값)
|
|
cursor.execute("""
|
|
SELECT
|
|
Sector,
|
|
MIN(Sector_Score) as min_val,
|
|
MAX(Sector_Score) as max_val,
|
|
AVG(Sector_Score) as avg_val
|
|
FROM sector_flow_history
|
|
WHERE Sector_Score IS NOT NULL
|
|
GROUP BY Sector
|
|
""")
|
|
|
|
anomalies = 0
|
|
for sector, min_val, max_val, avg_val in cursor.fetchall():
|
|
if min_val is None or max_val is None:
|
|
continue
|
|
|
|
# 극단값 감지 (평균의 5배 이상)
|
|
if avg_val and max_val > avg_val * 5:
|
|
anomalies += 1
|
|
consistency["warnings"].append(f"{sector}: 극단값 감지 ({max_val})")
|
|
|
|
consistency["outliers"] = anomalies
|
|
if anomalies > 0:
|
|
print(f" [!] 이상치: {anomalies}개 섹터")
|
|
|
|
# 데이터 완정성 (중요 컬럼)
|
|
cursor.execute("""
|
|
SELECT
|
|
(COUNT(*) - COUNT(Sector)) as missing_sectors,
|
|
(COUNT(*) - COUNT(Snapshot_Date)) as missing_dates,
|
|
(COUNT(*) - COUNT(Sector_Score)) as missing_values
|
|
FROM sector_flow_history
|
|
""")
|
|
|
|
missing_sectors, missing_dates, missing_values = cursor.fetchone()
|
|
print(f" 누락 데이터: sector={missing_sectors}, date={missing_dates}, value={missing_values}")
|
|
|
|
conn.close()
|
|
return consistency
|
|
|
|
def calculate_reliability_score(self, coverage: dict, freshness: dict, consistency: dict) -> float:
|
|
"""종합 신뢰도 점수 계산"""
|
|
print("\n[4. 종합 신뢰도]")
|
|
|
|
scores = {
|
|
"coverage_score": 0,
|
|
"freshness_score": 0,
|
|
"consistency_score": 0
|
|
}
|
|
|
|
# 커버리지 점수 (0-100)
|
|
if coverage.get("total_rows", 0) > 0:
|
|
sector_count = coverage.get("sectors", 0)
|
|
# 10개 이상 섹터: 100점
|
|
# 1개 미만: 0점
|
|
scores["coverage_score"] = min(100, sector_count * 10)
|
|
print(f" 커버리지: {scores['coverage_score']:.1f}/100 ({coverage.get('sectors', 0)} 섹터)")
|
|
|
|
# 신선도 점수 (0-100)
|
|
age_days = freshness.get("age_days", 9999)
|
|
if age_days <= 1:
|
|
scores["freshness_score"] = 100 # 1일 이내
|
|
elif age_days <= 7:
|
|
scores["freshness_score"] = 80 # 1주일 이내
|
|
elif age_days <= 30:
|
|
scores["freshness_score"] = 50 # 1개월 이내
|
|
else:
|
|
scores["freshness_score"] = 20 # 오래됨
|
|
print(f" 신선도: {scores['freshness_score']:.1f}/100 ({age_days}일 전)")
|
|
|
|
# 일관성 점수 (0-100)
|
|
null_violations = consistency.get("null_values", 0)
|
|
outlier_count = consistency.get("outliers", 0)
|
|
warnings = len(consistency.get("warnings", []))
|
|
|
|
consistency_score = 100
|
|
if null_violations > 0:
|
|
consistency_score -= min(20, null_violations / 10)
|
|
if outlier_count > 0:
|
|
consistency_score -= min(30, outlier_count * 3)
|
|
consistency_score = max(0, consistency_score)
|
|
scores["consistency_score"] = consistency_score
|
|
print(f" 일관성: {scores['consistency_score']:.1f}/100 ({warnings} 경고)")
|
|
|
|
# 종합 점수 (가중 평균)
|
|
# 커버리지 30%, 신선도 40%, 일관성 30%
|
|
overall = (
|
|
scores["coverage_score"] * 0.3 +
|
|
scores["freshness_score"] * 0.4 +
|
|
scores["consistency_score"] * 0.3
|
|
)
|
|
|
|
print(f"\n 종합 신뢰도: {overall:.1f}/100")
|
|
|
|
return overall
|
|
|
|
def run(self) -> dict:
|
|
"""전체 실행"""
|
|
print("="*80)
|
|
print("WBS-9.5: Sector Flow Reliability Measurement")
|
|
print("="*80)
|
|
|
|
# 측정
|
|
coverage = self.measure_data_coverage()
|
|
freshness = self.measure_data_freshness()
|
|
consistency = self.measure_data_consistency()
|
|
|
|
# 신뢰도 점수
|
|
reliability_score = self.calculate_reliability_score(coverage, freshness, consistency)
|
|
|
|
# 결과 저장
|
|
self.results["measurements"] = {
|
|
"coverage": coverage,
|
|
"freshness": freshness,
|
|
"consistency": consistency,
|
|
"reliability_score": reliability_score
|
|
}
|
|
|
|
# 신뢰도 판정
|
|
if reliability_score >= 80:
|
|
status = "HIGH (신뢰 가능)"
|
|
elif reliability_score >= 60:
|
|
status = "MEDIUM (주의 필요)"
|
|
else:
|
|
status = "LOW (개선 필요)"
|
|
|
|
print(f"\n[판정]")
|
|
print(f" 신뢰도 상태: {status}")
|
|
print(f" 권장사항: {'데이터 사용 가능' if reliability_score >= 60 else '데이터 보완 필요'}")
|
|
|
|
self.results["summary"] = {
|
|
"status": status,
|
|
"reliability_score": reliability_score,
|
|
"measurement_date": datetime.now().isoformat()
|
|
}
|
|
|
|
return self.results
|
|
|
|
if __name__ == "__main__":
|
|
measurer = SectorFlowReliability()
|
|
result = measurer.run()
|
|
|
|
# 결과 저장
|
|
output_file = Path("Temp/wbs95_sector_flow_reliability.json")
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(result, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n[저장] {output_file}")
|
|
print("[완료] WBS-9.5 Sector Flow Reliability Measurement 완료")
|