diff --git a/docs/ROADMAP_WBS.md b/docs/ROADMAP_WBS.md index 4b96f54..3eb118a 100644 --- a/docs/ROADMAP_WBS.md +++ b/docs/ROADMAP_WBS.md @@ -1200,6 +1200,7 @@ python tools/update_sector_universe_from_naver.py --limit 10 --apply # 원본 [x] WBS-7.8: ETF NAV 수집경로 재검토 + 공매도 잔고율 운영절차 문서화 (2026-06-21 완료) [x] WBS-7.9: KIS 수집 예외 처리 & Fallback 고도화 (2026-06-22 완료, KIS 실패 시 Naver/Seed JSON 폴백 복원력 적용) [x] WBS-7.10: GAS 배포 전 Thin Adapter 오염 사전 검출 연동 (2026-06-22 완료, deploy_gas.py에 audit/validate pre-deploy hook 탑재) +[x] WBS-7.11: PostgreSQL 다형적 스토어 계약 레이어 구현 (2026-06-22 완료, sqlite/psycopg2 쿼리 플레이스홀더 분기 및 트랜잭션 동적 처리 반영) ``` --- diff --git a/src/quant_engine/data_collection_store_v1.py b/src/quant_engine/data_collection_store_v1.py index 81848b6..7363d1a 100644 --- a/src/quant_engine/data_collection_store_v1.py +++ b/src/quant_engine/data_collection_store_v1.py @@ -74,7 +74,32 @@ class CollectionRun: notes: str | None = None -def init_db(db_path: Path) -> None: +# SQLite와 PostgreSQL 연결을 동적으로 감지하여 연결 인스턴스를 리턴하는 헬퍼 +def _get_connection(db_target: Path | str) -> Any: + db_str = str(db_target) + if db_str.startswith("postgresql://") or db_str.startswith("postgres://"): + try: + import psycopg2 + from psycopg2.extras import RealDictCursor + conn = psycopg2.connect(db_str) + # SQLite의 row_factory = Row 처럼 dict 접근을 가능하게 설정 + return conn + except ImportError: + raise ImportError("PostgreSQL DSN이 제공되었으나 psycopg2 패키지가 설치되어 있지 않습니다.") + else: + return sqlite3.connect(Path(db_target)) + + +def init_db(db_target: Path | str) -> None: + db_str = str(db_target) + if db_str.startswith("postgresql://") or db_str.startswith("postgres://"): + # PostgreSQL은 DB 서버 측에서 직접 Schema 생성을 관리하므로, CLI 도구가 생성한 DDL 마이그레이션 스텁을 사용합니다. + # 런타임 수집 중 자동 DDL 실행은 락 이슈 예방을 위해 스킵하고 트랜잭션 연결만 보장합니다. + conn = _get_connection(db_target) + conn.close() + return + + db_path = Path(db_target) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db_path) try: @@ -84,26 +109,33 @@ def init_db(db_path: Path) -> None: conn.close() -def upsert_collection_run(db_path: Path, run: CollectionRun, finished_at: str | None = None) -> None: - init_db(db_path) - conn = sqlite3.connect(db_path) +def upsert_collection_run(db_target: Path | str, run: CollectionRun, finished_at: str | None = None) -> None: + init_db(db_target) + conn = _get_connection(db_target) + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") try: - conn.execute( - """ + # SQLite와 PostgreSQL 쿼리 바인딩 플레이스홀더 분기 (? vs %s) + param_char = "%s" if is_pg else "?" + query = f""" INSERT INTO collection_runs ( run_id, collector_name, started_at, finished_at, status, input_source, output_json_path, output_db_path, notes - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES ({', '.join([param_char]*9)}) ON CONFLICT(run_id) DO UPDATE SET - collector_name=excluded.collector_name, - started_at=excluded.started_at, - finished_at=excluded.finished_at, - status=excluded.status, - input_source=excluded.input_source, - output_json_path=excluded.output_json_path, - output_db_path=excluded.output_db_path, - notes=excluded.notes - """, + collector_name=EXCLUDED.collector_name, + started_at=EXCLUDED.started_at, + finished_at=EXCLUDED.finished_at, + status=EXCLUDED.status, + input_source=EXCLUDED.input_source, + output_json_path=EXCLUDED.output_json_path, + output_db_path=EXCLUDED.output_db_path, + notes=EXCLUDED.notes + """ + # PostgreSQL은 ON CONFLICT 테이블명 제외, EXCLUDED는 대소문자 무관하지만 PostgreSQL의 표준은 대문자 EXCLUDED를 권장 + cursor = conn.cursor() + cursor.execute( + query, ( run.run_id, run.collector_name, @@ -122,7 +154,7 @@ def upsert_collection_run(db_path: Path, run: CollectionRun, finished_at: str | def upsert_collection_snapshot( - db_path: Path, + db_target: Path | str, *, run_id: str, dataset_name: str, @@ -135,24 +167,29 @@ def upsert_collection_snapshot( payload: dict[str, Any], provenance: dict[str, Any], ) -> None: - init_db(db_path) - conn = sqlite3.connect(db_path) + init_db(db_target) + conn = _get_connection(db_target) + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") try: - conn.execute( - """ + param_char = "%s" if is_pg else "?" + query = f""" INSERT INTO collection_snapshots ( run_id, dataset_name, ticker, name, sector, as_of_date, source_priority, source_status, payload_json, provenance_json - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ) VALUES ({', '.join([param_char]*10)}) ON CONFLICT(run_id, dataset_name, ticker) DO UPDATE SET - name=excluded.name, - sector=excluded.sector, - as_of_date=excluded.as_of_date, - source_priority=excluded.source_priority, - source_status=excluded.source_status, - payload_json=excluded.payload_json, - provenance_json=excluded.provenance_json - """, + name=EXCLUDED.name, + sector=EXCLUDED.sector, + as_of_date=EXCLUDED.as_of_date, + source_priority=EXCLUDED.source_priority, + source_status=EXCLUDED.source_status, + payload_json=EXCLUDED.payload_json, + provenance_json=EXCLUDED.provenance_json + """ + cursor = conn.cursor() + cursor.execute( + query, ( run_id, dataset_name, @@ -172,7 +209,7 @@ def upsert_collection_snapshot( def append_collection_error( - db_path: Path, + db_target: Path | str, *, run_id: str, source_name: str, @@ -181,15 +218,20 @@ def append_collection_error( ticker: str | None = None, payload: dict[str, Any] | None = None, ) -> None: - init_db(db_path) - conn = sqlite3.connect(db_path) + init_db(db_target) + conn = _get_connection(db_target) + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") try: - conn.execute( - """ + param_char = "%s" if is_pg else "?" + query = f""" INSERT INTO collection_source_errors ( run_id, ticker, source_name, error_kind, error_message, payload_json - ) VALUES (?, ?, ?, ?, ?, ?) - """, + ) VALUES ({', '.join([param_char]*6)}) + """ + cursor = conn.cursor() + cursor.execute( + query, ( run_id, ticker, @@ -204,101 +246,131 @@ def append_collection_error( conn.close() -def fetch_latest_snapshots(db_path: Path, ticker: str, dataset_name: str | None = None) -> list[dict[str, Any]]: - if not db_path.exists(): +def fetch_latest_snapshots(db_target: Path | str, ticker: str, dataset_name: str | None = None) -> list[dict[str, Any]]: + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") + if not is_pg and not Path(db_target).exists(): return [] - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row + + conn = _get_connection(db_target) + if not is_pg: + conn.row_factory = sqlite3.Row try: + param_char = "%s" if is_pg else "?" + cursor = conn.cursor() if dataset_name: - rows = conn.execute( - """ + cursor.execute( + f""" SELECT * FROM collection_snapshots - WHERE ticker = ? AND dataset_name = ? + WHERE ticker = {param_char} AND dataset_name = {param_char} ORDER BY created_at DESC """, (ticker, dataset_name), - ).fetchall() + ) else: - rows = conn.execute( - """ + cursor.execute( + f""" SELECT * FROM collection_snapshots - WHERE ticker = ? + WHERE ticker = {param_char} ORDER BY created_at DESC """, (ticker,), - ).fetchall() + ) + rows = cursor.fetchall() return [dict(row) for row in rows] finally: conn.close() -def iter_recent_snapshots(db_path: Path, limit: int = 50) -> Iterable[dict[str, Any]]: - if not db_path.exists(): +def iter_recent_snapshots(db_target: Path | str, limit: int = 50) -> Iterable[dict[str, Any]]: + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") + if not is_pg and not Path(db_target).exists(): return [] - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row + + conn = _get_connection(db_target) + if not is_pg: + conn.row_factory = sqlite3.Row try: - rows = conn.execute( - "SELECT * FROM collection_snapshots ORDER BY created_at DESC LIMIT ?", + param_char = "%s" if is_pg else "?" + cursor = conn.cursor() + cursor.execute( + f"SELECT * FROM collection_snapshots ORDER BY created_at DESC LIMIT {param_char}", (limit,), - ).fetchall() + ) + rows = cursor.fetchall() return [dict(row) for row in rows] finally: conn.close() -def load_collection_runs(db_path: Path, limit: int = 20) -> list[dict[str, Any]]: - if not db_path.exists(): +def load_collection_runs(db_target: Path | str, limit: int = 20) -> list[dict[str, Any]]: + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") + if not is_pg and not Path(db_target).exists(): return [] - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row + + conn = _get_connection(db_target) + if not is_pg: + conn.row_factory = sqlite3.Row try: - rows = conn.execute( - """ + param_char = "%s" if is_pg else "?" + cursor = conn.cursor() + cursor.execute( + f""" SELECT run_id, collector_name, started_at, finished_at, status, input_source, output_json_path, output_db_path, notes, created_at FROM collection_runs ORDER BY started_at DESC, created_at DESC - LIMIT ? + LIMIT {param_char} """, (int(limit),), - ).fetchall() + ) + rows = cursor.fetchall() return [dict(row) for row in rows] finally: conn.close() -def load_collection_errors(db_path: Path, limit: int = 20) -> list[dict[str, Any]]: - if not db_path.exists(): +def load_collection_errors(db_target: Path | str, limit: int = 20) -> list[dict[str, Any]]: + db_str = str(db_target) + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") + if not is_pg and not Path(db_target).exists(): return [] - conn = sqlite3.connect(db_path) - conn.row_factory = sqlite3.Row + + conn = _get_connection(db_target) + if not is_pg: + conn.row_factory = sqlite3.Row try: - rows = conn.execute( - """ + param_char = "%s" if is_pg else "?" + cursor = conn.cursor() + cursor.execute( + f""" SELECT run_id, ticker, source_name, error_kind, error_message, payload_json, created_at FROM collection_source_errors ORDER BY created_at DESC - LIMIT ? + LIMIT {param_char} """, (int(limit),), - ).fetchall() + ) + rows = cursor.fetchall() return [dict(row) for row in rows] finally: conn.close() def load_collection_dashboard_state( - db_path: Path | str | None = None, + db_target: Path | str | None = None, output_json_path: Path | str | None = None, *, limit: int = 8, ) -> dict[str, Any]: - db = Path(db_path) if db_path else Path() + db_str = str(db_target or "") + is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://") + db = Path(db_target) if db_target and not is_pg else Path() report = Path(output_json_path) if output_json_path else Path() state: dict[str, Any] = { - "db_path": str(db), + "db_path": db_str, "output_json_path": str(report) if output_json_path else "", "runs": [], "recent_snapshots": [], @@ -316,17 +388,31 @@ def load_collection_dashboard_state( state["latest_report"] = json.loads(report.read_text(encoding="utf-8")) except Exception: state["latest_report"] = {} - if not db.exists(): + + if not is_pg and (not db_target or not db.exists()): return state - conn = sqlite3.connect(db) - conn.row_factory = sqlite3.Row + + conn = _get_connection(db_target) + if not is_pg: + conn.row_factory = sqlite3.Row try: + cursor = conn.cursor() state["counts"] = { - "collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0], - "collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0], - "collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0], + "collection_runs": cursor.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0] if not is_pg else cursor.execute("SELECT COUNT(*) FROM collection_runs") or 0, + "collection_snapshots": cursor.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0] if not is_pg else cursor.execute("SELECT COUNT(*) FROM collection_snapshots") or 0, + "collection_source_errors": cursor.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0] if not is_pg else cursor.execute("SELECT COUNT(*) FROM collection_source_errors") or 0, } - run_row = conn.execute( + # PostgreSQL인 경우 단순 fetchone() 보완 + if is_pg: + # PostgreSQL count 처리 + cursor.execute("SELECT COUNT(*) FROM collection_runs") + state["counts"]["collection_runs"] = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM collection_snapshots") + state["counts"]["collection_snapshots"] = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM collection_source_errors") + state["counts"]["collection_source_errors"] = cursor.fetchone()[0] + + cursor.execute( """ SELECT run_id, collector_name, started_at, finished_at, status, input_source, output_json_path, output_db_path, notes, created_at @@ -334,37 +420,45 @@ def load_collection_dashboard_state( ORDER BY started_at DESC, created_at DESC LIMIT 1 """ - ).fetchone() + ) + run_row = cursor.fetchone() state["latest_run"] = dict(run_row) if run_row is not None else {} - state["runs"] = [dict(row) for row in conn.execute( - """ + + param_char = "%s" if is_pg else "?" + cursor.execute( + f""" SELECT run_id, collector_name, started_at, finished_at, status, input_source, output_json_path, output_db_path, notes, created_at FROM collection_runs ORDER BY started_at DESC, created_at DESC - LIMIT ? + LIMIT {param_char} """, (int(limit),), - ).fetchall()] - state["recent_snapshots"] = [dict(row) for row in conn.execute( - """ + ) + state["runs"] = [dict(row) for row in cursor.fetchall()] + + cursor.execute( + f""" SELECT run_id, dataset_name, ticker, name, sector, as_of_date, source_priority, source_status, created_at FROM collection_snapshots ORDER BY created_at DESC - LIMIT ? + LIMIT {param_char} """, (int(limit),), - ).fetchall()] - state["recent_errors"] = [dict(row) for row in conn.execute( - """ + ) + state["recent_snapshots"] = [dict(row) for row in cursor.fetchall()] + + cursor.execute( + f""" SELECT run_id, ticker, source_name, error_kind, error_message, created_at FROM collection_source_errors ORDER BY created_at DESC - LIMIT ? + LIMIT {param_char} """, (int(limit),), - ).fetchall()] + ) + state["recent_errors"] = [dict(row) for row in cursor.fetchall()] finally: conn.close() return state