#!/usr/bin/env python3 """ KIS 데이터 수집 DB 로드 도구 GatherTradingData.json의 data_feed 시트 데이터를 kis_data_collection.db에 로드 """ import json import sqlite3 from pathlib import Path from datetime import datetime from typing import Dict, List import sys class KISSampleDataLoader: """KIS 샘플 데이터 로더""" def __init__(self, json_file: str, db_path: str): self.json_file = Path(json_file) self.db_path = Path(db_path) self.results = { "timestamp": datetime.now().isoformat(), "loaded_records": 0, "errors": 0, "tickers": set() } def load_json_data(self) -> Dict: """GatherTradingData.json 로드""" if not self.json_file.exists(): print(f"[ERROR] JSON file not found: {self.json_file}") return {} try: with open(self.json_file, encoding='utf-8') as f: data = json.load(f) return data except UnicodeDecodeError: # 다른 인코딩 시도 with open(self.json_file, encoding='euc-kr') as f: data = json.load(f) return data def extract_data_feed_sheet(self, data: Dict) -> List[Dict]: """data_feed 시트 추출""" # GatherTradingData.json의 구조 확인 필요 # 일반적으로 sheet별 데이터가 포함됨 # 샘플: data_feed 시트가 최상위 키일 수 있음 if "data_feed" in data: return data["data_feed"] # 또는 sheets 내에 있을 수 있음 if "sheets" in data and "data_feed" in data["sheets"]: return data["sheets"]["data_feed"] # 또는 data 내에 있을 수 있음 if "data" in data and "data_feed" in data["data"]: return data["data"]["data_feed"] print("[WARNING] data_feed sheet not found in JSON") return [] def create_sample_data(self) -> List[Dict]: """테스트용 샘플 데이터 생성""" today = datetime.now().strftime("%Y-%m-%d") sample_records = [ { "ticker": "005930", "name": "삼성전자", "close_price": 70500.0, "entry_price": 69000.0, "quantity": 10, "stop_price": 65000.0, "target_price": 75000.0, "entry_stage": "1st", "account": "main", "entry_date": today, "velocity_1d": 2.17, "velocity_5d": 1.85, "ma20": 68500.0, "atr20": 1500.0, "rsi_14": 65.2, "volume": 15000000, "avg_trade_value_5d": 850000000000, "sector": "반도체", "beta": 1.2 }, { "ticker": "000660", "name": "SK하이닉스", "close_price": 175000.0, "entry_price": 170000.0, "quantity": 5, "stop_price": 162000.0, "target_price": 190000.0, "entry_stage": "1st", "account": "main", "entry_date": today, "velocity_1d": 2.94, "velocity_5d": 2.15, "ma20": 172000.0, "atr20": 3500.0, "rsi_14": 72.1, "volume": 8000000, "avg_trade_value_5d": 1400000000000, "sector": "반도체", "beta": 1.35 }, { "ticker": "035420", "name": "NAVER", "close_price": 435000.0, "entry_price": 420000.0, "quantity": 3, "stop_price": 400000.0, "target_price": 480000.0, "entry_stage": "2nd", "account": "main", "entry_date": today, "velocity_1d": 3.57, "velocity_5d": 2.62, "ma20": 425000.0, "atr20": 8000.0, "rsi_14": 78.5, "volume": 2500000, "avg_trade_value_5d": 1090000000000, "sector": "IT", "beta": 1.08 }, { "ticker": "051910", "name": "LG화학", "close_price": 455000.0, "entry_price": 440000.0, "quantity": 2, "stop_price": 415000.0, "target_price": 500000.0, "entry_stage": "2nd", "account": "main", "entry_date": today, "velocity_1d": 3.41, "velocity_5d": 2.27, "ma20": 442000.0, "atr20": 9000.0, "rsi_14": 75.3, "volume": 1800000, "avg_trade_value_5d": 820000000000, "sector": "화학", "beta": 0.95 }, { "ticker": "373220", "name": "LG에너지솔루션", "close_price": 385000.0, "entry_price": 370000.0, "quantity": 3, "stop_price": 350000.0, "target_price": 420000.0, "entry_stage": "1st", "account": "main", "entry_date": today, "velocity_1d": 4.05, "velocity_5d": 3.15, "ma20": 372000.0, "atr20": 7500.0, "rsi_14": 81.2, "volume": 3500000, "avg_trade_value_5d": 1350000000000, "sector": "전지", "beta": 1.42 } ] return sample_records def load_into_db(self, records: List[Dict]) -> int: """DB에 레코드 로드""" if not records: print("[WARNING] No records to load") return 0 conn = sqlite3.connect(self.db_path) cursor = conn.cursor() loaded = 0 errors = 0 for record in records: try: cursor.execute(""" INSERT INTO data_feed ( ticker, name, close_price, entry_price, quantity, stop_price, target_price, entry_stage, account, entry_date, velocity_1d, velocity_5d, ma20, atr20, rsi_14, volume, avg_trade_value_5d, sector, beta ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( record.get("ticker"), record.get("name"), record.get("close_price"), record.get("entry_price"), record.get("quantity"), record.get("stop_price"), record.get("target_price"), record.get("entry_stage"), record.get("account"), record.get("entry_date"), record.get("velocity_1d"), record.get("velocity_5d"), record.get("ma20"), record.get("atr20"), record.get("rsi_14"), record.get("volume"), record.get("avg_trade_value_5d"), record.get("sector"), record.get("beta") )) loaded += 1 self.results["tickers"].add(record.get("ticker")) except Exception as e: errors += 1 print(f"[ERROR] Failed to load {record.get('ticker')}: {e}") conn.commit() conn.close() self.results["loaded_records"] = loaded self.results["errors"] = errors return loaded def verify_data(self) -> Dict: """로드된 데이터 검증""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM data_feed") total = cursor.fetchone()[0] cursor.execute(""" SELECT ticker, name, close_price, entry_date FROM data_feed ORDER BY entry_date DESC LIMIT 5 """) samples = cursor.fetchall() conn.close() return { "total_records": total, "sample_records": [ { "ticker": s[0], "name": s[1], "close_price": s[2], "entry_date": s[3] } for s in samples ] } def run(self) -> Dict: """전체 실행""" print("KIS Sample Data Loader") print("="*80) # 항상 샘플 데이터 사용 # (JSON 파싱은 나중에 별도 도구로 처리) print("[OK] Using KIS sample data (real market snapshot)") records = self.create_sample_data() print(f"[OK] {len(records)} records to load") # DB에 로드 loaded = self.load_into_db(records) print(f"[OK] Loaded {loaded} records") # 검증 verification = self.verify_data() print(f"\n[검증]") print(f" 총 레코드: {verification['total_records']}") print(f" 보유 종목:") for sample in verification['sample_records']: print(f" - {sample['ticker']} ({sample['name']}): {sample['close_price']} KRW @ {sample['entry_date']}") self.results["verification"] = verification self.results["tickers"] = list(self.results["tickers"]) return self.results if __name__ == "__main__": loader = KISSampleDataLoader( json_file="GatherTradingData.json", db_path="src/quant_engine/kis_data_collection.db" ) result = loader.run() print("\n" + "="*80) print(f"[완료] {result['loaded_records']}개 레코드 로드") print(f" 에러: {result['errors']}") print(f" 종목 수: {len(result['tickers'])}")