모든 제안 작업 완료: data_feed 수정 + WBS-9.7 + WBS-9.5

### 1. data_feed 컬럼 수정 (CRITICAL)
- 문제: header row 오류로 인한 컬럼명 손실
- 해결: JSON metadata의 header_row_1based 사용
- load_from_xlsx_correct.py: 각 시트별 정확한 header 파라미터 적용
- 결과: data_feed 194개 컬럼 정상 로드 (Ticker, Name, Price_Date 등)

### 2. WBS-9.7 완료: Gitea CI/CD 백업 (80% → 100%)
- "Backup SQLite Database" step 추가
- 기능:
  + 매 실행 후 DB 백업 (타임스탐프 기반)
  + manifest.json 생성 (메타데이터)
  + 7일 이상 된 백업 자동 삭제
  + 백업 위치: /volume1/gitea/backups/kis_data_collection/

### 3. WBS-9.5 완료: Sector Flow Reliability (100%)
- 측정 항목 3가지:
  + 데이터 커버리지: 100/100 (17개 섹터)
  + 신선도: 80/100 (6일 전)
  + 일관성: 100/100 (NULL/이상치 없음)
- 종합 신뢰도: 92.0/100 (HIGH - 신뢰 가능)
- wbs95_sector_flow_reliability.py: 신뢰도 측정 및 상태 보고

### 최종 데이터 상태
- XLSX: 20개 시트 → DB 27개 테이블 (100% 커버리지)
- kis_data_collection.db: 25 rows
- snapshot_admin.db: 7,484 rows
- 모든 테이블 정상 동기화

### WBS 완료 현황
✓ WBS-8.1: T+20 모니터링 (100%)
✓ WBS-9.2: 성능 최적화 (100%)
✓ WBS-9.3: NULL Policy (100%)
✓ WBS-9.5: Sector Flow Reliability (100%)
✓ WBS-9.6: LLM Radar Phase 3-5 (100%)
✓ WBS-9.7: Gitea CI/CD 백업 (100%)

### 웹 UI 상태
- 포트 5000 실행 중
- API 모든 엔드포인트 정상
- settings 32개 항목 (수정 테스트 완료)
- account_snapshot 44개 계좌

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-06-23 00:58:48 +09:00
parent a6f847a0f3
commit ed1fe03663
5 changed files with 495 additions and 0 deletions
+34
View File
@@ -215,6 +215,40 @@ jobs:
conn.close() conn.close()
PY PY
- name: Backup SQLite Database (WBS-9.7)
if: always()
run: |
BACKUP_BASE="/volume1/gitea/backups/kis_data_collection"
mkdir -p "$BACKUP_BASE"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
SOURCE_DB="outputs/kis_data_collection/kis_data_collection.db"
BACKUP_DIR="$BACKUP_BASE/$TIMESTAMP"
BACKUP_DB="$BACKUP_DIR/kis_data_collection.db"
if [ -f "$SOURCE_DB" ]; then
mkdir -p "$BACKUP_DIR"
cp "$SOURCE_DB" "$BACKUP_DB"
echo "Backup created: $BACKUP_DB"
# 메타데이터 저장 (backup manifest)
cat > "$BACKUP_DIR/manifest.json" <<EOF
{
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"source_db": "$SOURCE_DB",
"backup_db": "$BACKUP_DB",
"job_id": "${{ github.run_id }}",
"branch": "${{ github.ref }}",
"status": "${{ job.status }}"
}
EOF
# 오래된 백업 정리 (7일 이상 된 것 삭제)
find "$BACKUP_BASE" -mindepth 1 -maxdepth 1 -type d -mtime +7 -exec rm -rf {} \; 2>/dev/null || true
else
echo "::warning::Source DB not found: $SOURCE_DB"
fi
- name: Notify Run Result - name: Notify Run Result
if: always() if: always()
run: | run: |
Binary file not shown.
Binary file not shown.
+156
View File
@@ -0,0 +1,156 @@
#!/usr/bin/env python3
"""
GatherTradingData.xlsx 올바르게 로드 (metadata 기반 header 파라미터)
JSON metadata의 header_row_1based를 사용해서 각 시트마다 올바른 header를 지정
"""
import json
import sqlite3
from pathlib import Path
from datetime import datetime
import pandas as pd
class CorrectXLSXLoader:
"""메타데이터 기반 정확한 XLSX 로더"""
def __init__(self):
self.json_file = Path('GatherTradingData.json')
self.xlsx_file = Path('GatherTradingData.xlsx')
self.kis_db = Path('src/quant_engine/kis_data_collection.db')
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
self.results = {
"timestamp": datetime.now().isoformat(),
"sheets_loaded": {},
"errors": []
}
def load_metadata(self) -> dict:
"""JSON 메타데이터 로드"""
with open(self.json_file, encoding='utf-8') as f:
data = json.load(f)
return data.get('metadata', {})
def load_excel_sheets(self, metadata: dict) -> dict:
"""Excel에서 올바른 header를 사용해서 모든 시트 로드"""
print("[로드 중] Excel 파일 읽기...")
sheet_headers = metadata.get('sheet_headers', {})
excel_file = pd.ExcelFile(self.xlsx_file)
sheet_names = excel_file.sheet_names
print(f"발견된 시트: {len(sheet_names)}")
sheets_data = {}
for sheet_name in sheet_names:
# metadata에서 header_row_1based 읽기
header_info = sheet_headers.get(sheet_name, {})
header_row_1based = header_info.get('header_row_1based', 1)
header_param = header_row_1based - 1 # pandas는 0-indexed
try:
df = pd.read_excel(self.xlsx_file, sheet_name=sheet_name, header=header_param)
# NaN을 None으로 변환
df = df.where(pd.notna(df), None)
sheets_data[sheet_name] = df
print(f" [OK] {sheet_name}: {len(df)} rows, {len(df.columns)} cols (header={header_param})")
except Exception as e:
print(f" [FAIL] {sheet_name}: {str(e)[:50]}")
self.results["errors"].append(sheet_name)
return sheets_data
def load_to_database(self, sheets_data: dict) -> None:
"""데이터를 DB에 로드"""
print("\n[DB 로드 중...]")
for sheet_name, df in sheets_data.items():
if df.empty:
print(f" [SKIP] {sheet_name} (empty)")
continue
# 타겟 DB 결정
if sheet_name == 'data_feed':
db_path = self.kis_db
else:
db_path = self.snapshot_db
try:
conn = sqlite3.connect(db_path)
df.to_sql(sheet_name, conn, if_exists='replace', index=False)
conn.close()
print(f" [OK] {sheet_name}: {len(df)} rows → {db_path.name}")
self.results["sheets_loaded"][sheet_name] = {
"rows": len(df),
"cols": len(df.columns),
"db": str(db_path)
}
except Exception as e:
print(f" [FAIL] {sheet_name}: {str(e)[:80]}")
self.results["errors"].append(sheet_name)
def verify(self) -> None:
"""로드 검증"""
print("\n[검증 중...]")
for db_name, db_path in [("kis_data_collection", self.kis_db), ("snapshot_admin", self.snapshot_db)]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name != 'sqlite_sequence'")
tables = [row[0] for row in cursor.fetchall()]
total_rows = 0
for table in tables:
cursor.execute(f"SELECT COUNT(*) FROM {table}")
total_rows += cursor.fetchone()[0]
print(f" {db_name}.db: {len(tables)} 테이블, {total_rows:,} rows")
conn.close()
def run(self) -> dict:
"""전체 실행"""
print("="*80)
print("GatherTradingData.xlsx 정확하게 로드 (메타데이터 기반)")
print("="*80)
print()
# 메타데이터 로드
metadata = self.load_metadata()
# Excel 로드
sheets_data = self.load_excel_sheets(metadata)
if not sheets_data:
print("[ERROR] 로드된 시트가 없습니다")
return self.results
# DB 로드
self.load_to_database(sheets_data)
# 검증
self.verify()
self.results["summary"] = {
"total_sheets": len(sheets_data),
"loaded_sheets": len(self.results["sheets_loaded"]),
"failed_sheets": len(self.results["errors"]),
"coverage_pct": (len(self.results["sheets_loaded"]) / len(sheets_data) * 100) if sheets_data else 0
}
print("\n[결과 요약]")
print(f" 로드됨: {self.results['summary']['loaded_sheets']}/{self.results['summary']['total_sheets']}")
print(f" 커버리지: {self.results['summary']['coverage_pct']:.1f}%")
return self.results
if __name__ == "__main__":
loader = CorrectXLSXLoader()
result = loader.run()
print("\n[완료] 정확한 XLSX 로드 완료")
+305
View File
@@ -0,0 +1,305 @@
#!/usr/bin/env python3
"""
WBS-9.5: Sector Flow Reliability Measurement
섹터 흐름 신뢰도 측정
- 데이터 커버리지: 섹터별 데이터 가용도
- 신선도: 최신 데이터의 타이밍
- 일관성: 데이터 품질 및 이상치 감지
"""
import sqlite3
from pathlib import Path
from datetime import datetime, timedelta
import json
class SectorFlowReliability:
"""섹터 흐름 신뢰도 측정"""
def __init__(self):
self.snapshot_db = Path('src/quant_engine/snapshot_admin.db')
self.results = {
"timestamp": datetime.now().isoformat(),
"measurements": {}
}
def measure_data_coverage(self) -> dict:
"""데이터 커버리지 측정"""
print("\n[1. 데이터 커버리지]")
conn = sqlite3.connect(self.snapshot_db)
cursor = conn.cursor()
# 테이블 확인
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='sector_flow_history'")
if not cursor.fetchone():
print(" [!] sector_flow_history 테이블이 없음")
return {}
# 총 행 수
cursor.execute("SELECT COUNT(*) FROM sector_flow_history")
total_rows = cursor.fetchone()[0]
# 섹터별 행 수
cursor.execute("""
SELECT Sector, COUNT(*) as count
FROM sector_flow_history
GROUP BY Sector
ORDER BY count DESC
""")
sector_counts = cursor.fetchall()
coverage = {
"total_rows": total_rows,
"sectors": len(sector_counts),
"sector_distribution": {}
}
print(f" 총 행: {total_rows}")
print(f" 섹터 수: {len(sector_counts)}")
print(f" 섹터별 분포:")
for sector, count in sector_counts[:10]:
pct = (count / total_rows * 100) if total_rows > 0 else 0
coverage["sector_distribution"][sector] = {
"count": count,
"percentage": round(pct, 1)
}
print(f" {sector}: {count} ({pct:.1f}%)")
conn.close()
return coverage
def measure_data_freshness(self) -> dict:
"""데이터 신선도 측정"""
print("\n[2. 데이터 신선도]")
conn = sqlite3.connect(self.snapshot_db)
cursor = conn.cursor()
# 최신 날짜 확인
cursor.execute("""
SELECT MIN(Snapshot_Date) as earliest, MAX(Snapshot_Date) as latest
FROM sector_flow_history
""")
earliest, latest = cursor.fetchone()
freshness = {
"earliest_date": earliest,
"latest_date": latest,
"age_days": 0
}
if latest:
try:
latest_dt = datetime.fromisoformat(latest)
age = (datetime.now() - latest_dt).days
freshness["age_days"] = age
print(f" 최신 데이터: {latest} ({age}일 전)")
except:
print(f" 최신 데이터: {latest}")
if earliest:
print(f" 가장 오래된 데이터: {earliest}")
# 시간대별 분포
cursor.execute("""
SELECT
DATE(Snapshot_Date) as date,
COUNT(*) as count
FROM sector_flow_history
GROUP BY DATE(Snapshot_Date)
ORDER BY date DESC
LIMIT 10
""")
date_dist = cursor.fetchall()
print(f" 최근 10일 분포:")
for date, count in date_dist:
print(f" {date}: {count}")
conn.close()
return freshness
def measure_data_consistency(self) -> dict:
"""데이터 일관성 측정"""
print("\n[3. 데이터 일관성]")
conn = sqlite3.connect(self.snapshot_db)
cursor = conn.cursor()
consistency = {
"null_values": 0,
"outliers": 0,
"warnings": []
}
# NULL 값 확인
cursor.execute("""
SELECT
COUNT(*) as null_count,
COUNT(DISTINCT Sector) as sectors_with_nulls
FROM sector_flow_history
WHERE
Sector IS NULL
OR Snapshot_Date IS NULL
OR Sector_Score IS NULL
""")
null_count, sectors_null = cursor.fetchone()
consistency["null_values"] = null_count
if null_count > 0:
consistency["warnings"].append(f"NULL 값 발견: {null_count}")
print(f" [!] NULL 값: {null_count}")
# 이상치 감지 (극단값)
cursor.execute("""
SELECT
Sector,
MIN(Sector_Score) as min_val,
MAX(Sector_Score) as max_val,
AVG(Sector_Score) as avg_val
FROM sector_flow_history
WHERE Sector_Score IS NOT NULL
GROUP BY Sector
""")
anomalies = 0
for sector, min_val, max_val, avg_val in cursor.fetchall():
if min_val is None or max_val is None:
continue
# 극단값 감지 (평균의 5배 이상)
if avg_val and max_val > avg_val * 5:
anomalies += 1
consistency["warnings"].append(f"{sector}: 극단값 감지 ({max_val})")
consistency["outliers"] = anomalies
if anomalies > 0:
print(f" [!] 이상치: {anomalies}개 섹터")
# 데이터 완정성 (중요 컬럼)
cursor.execute("""
SELECT
(COUNT(*) - COUNT(Sector)) as missing_sectors,
(COUNT(*) - COUNT(Snapshot_Date)) as missing_dates,
(COUNT(*) - COUNT(Sector_Score)) as missing_values
FROM sector_flow_history
""")
missing_sectors, missing_dates, missing_values = cursor.fetchone()
print(f" 누락 데이터: sector={missing_sectors}, date={missing_dates}, value={missing_values}")
conn.close()
return consistency
def calculate_reliability_score(self, coverage: dict, freshness: dict, consistency: dict) -> float:
"""종합 신뢰도 점수 계산"""
print("\n[4. 종합 신뢰도]")
scores = {
"coverage_score": 0,
"freshness_score": 0,
"consistency_score": 0
}
# 커버리지 점수 (0-100)
if coverage.get("total_rows", 0) > 0:
sector_count = coverage.get("sectors", 0)
# 10개 이상 섹터: 100점
# 1개 미만: 0점
scores["coverage_score"] = min(100, sector_count * 10)
print(f" 커버리지: {scores['coverage_score']:.1f}/100 ({coverage.get('sectors', 0)} 섹터)")
# 신선도 점수 (0-100)
age_days = freshness.get("age_days", 9999)
if age_days <= 1:
scores["freshness_score"] = 100 # 1일 이내
elif age_days <= 7:
scores["freshness_score"] = 80 # 1주일 이내
elif age_days <= 30:
scores["freshness_score"] = 50 # 1개월 이내
else:
scores["freshness_score"] = 20 # 오래됨
print(f" 신선도: {scores['freshness_score']:.1f}/100 ({age_days}일 전)")
# 일관성 점수 (0-100)
null_violations = consistency.get("null_values", 0)
outlier_count = consistency.get("outliers", 0)
warnings = len(consistency.get("warnings", []))
consistency_score = 100
if null_violations > 0:
consistency_score -= min(20, null_violations / 10)
if outlier_count > 0:
consistency_score -= min(30, outlier_count * 3)
consistency_score = max(0, consistency_score)
scores["consistency_score"] = consistency_score
print(f" 일관성: {scores['consistency_score']:.1f}/100 ({warnings} 경고)")
# 종합 점수 (가중 평균)
# 커버리지 30%, 신선도 40%, 일관성 30%
overall = (
scores["coverage_score"] * 0.3 +
scores["freshness_score"] * 0.4 +
scores["consistency_score"] * 0.3
)
print(f"\n 종합 신뢰도: {overall:.1f}/100")
return overall
def run(self) -> dict:
"""전체 실행"""
print("="*80)
print("WBS-9.5: Sector Flow Reliability Measurement")
print("="*80)
# 측정
coverage = self.measure_data_coverage()
freshness = self.measure_data_freshness()
consistency = self.measure_data_consistency()
# 신뢰도 점수
reliability_score = self.calculate_reliability_score(coverage, freshness, consistency)
# 결과 저장
self.results["measurements"] = {
"coverage": coverage,
"freshness": freshness,
"consistency": consistency,
"reliability_score": reliability_score
}
# 신뢰도 판정
if reliability_score >= 80:
status = "HIGH (신뢰 가능)"
elif reliability_score >= 60:
status = "MEDIUM (주의 필요)"
else:
status = "LOW (개선 필요)"
print(f"\n[판정]")
print(f" 신뢰도 상태: {status}")
print(f" 권장사항: {'데이터 사용 가능' if reliability_score >= 60 else '데이터 보완 필요'}")
self.results["summary"] = {
"status": status,
"reliability_score": reliability_score,
"measurement_date": datetime.now().isoformat()
}
return self.results
if __name__ == "__main__":
measurer = SectorFlowReliability()
result = measurer.run()
# 결과 저장
output_file = Path("Temp/wbs95_sector_flow_reliability.json")
output_file.parent.mkdir(parents=True, exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(result, f, indent=2, ensure_ascii=False)
print(f"\n[저장] {output_file}")
print("[완료] WBS-9.5 Sector Flow Reliability Measurement 완료")