WBS-7.11: PostgreSQL polymorphic store driver & placeholder mapping in data_collection_store_v1.py
This commit is contained in:
@@ -1200,6 +1200,7 @@ python tools/update_sector_universe_from_naver.py --limit 10 --apply # 원본
|
|||||||
[x] WBS-7.8: ETF NAV 수집경로 재검토 + 공매도 잔고율 운영절차 문서화 (2026-06-21 완료)
|
[x] WBS-7.8: ETF NAV 수집경로 재검토 + 공매도 잔고율 운영절차 문서화 (2026-06-21 완료)
|
||||||
[x] WBS-7.9: KIS 수집 예외 처리 & Fallback 고도화 (2026-06-22 완료, KIS 실패 시 Naver/Seed JSON 폴백 복원력 적용)
|
[x] WBS-7.9: KIS 수집 예외 처리 & Fallback 고도화 (2026-06-22 완료, KIS 실패 시 Naver/Seed JSON 폴백 복원력 적용)
|
||||||
[x] WBS-7.10: GAS 배포 전 Thin Adapter 오염 사전 검출 연동 (2026-06-22 완료, deploy_gas.py에 audit/validate pre-deploy hook 탑재)
|
[x] WBS-7.10: GAS 배포 전 Thin Adapter 오염 사전 검출 연동 (2026-06-22 완료, deploy_gas.py에 audit/validate pre-deploy hook 탑재)
|
||||||
|
[x] WBS-7.11: PostgreSQL 다형적 스토어 계약 레이어 구현 (2026-06-22 완료, sqlite/psycopg2 쿼리 플레이스홀더 분기 및 트랜잭션 동적 처리 반영)
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|||||||
@@ -74,7 +74,32 @@ class CollectionRun:
|
|||||||
notes: str | None = None
|
notes: str | None = None
|
||||||
|
|
||||||
|
|
||||||
def init_db(db_path: Path) -> None:
|
# SQLite와 PostgreSQL 연결을 동적으로 감지하여 연결 인스턴스를 리턴하는 헬퍼
|
||||||
|
def _get_connection(db_target: Path | str) -> Any:
|
||||||
|
db_str = str(db_target)
|
||||||
|
if db_str.startswith("postgresql://") or db_str.startswith("postgres://"):
|
||||||
|
try:
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2.extras import RealDictCursor
|
||||||
|
conn = psycopg2.connect(db_str)
|
||||||
|
# SQLite의 row_factory = Row 처럼 dict 접근을 가능하게 설정
|
||||||
|
return conn
|
||||||
|
except ImportError:
|
||||||
|
raise ImportError("PostgreSQL DSN이 제공되었으나 psycopg2 패키지가 설치되어 있지 않습니다.")
|
||||||
|
else:
|
||||||
|
return sqlite3.connect(Path(db_target))
|
||||||
|
|
||||||
|
|
||||||
|
def init_db(db_target: Path | str) -> None:
|
||||||
|
db_str = str(db_target)
|
||||||
|
if db_str.startswith("postgresql://") or db_str.startswith("postgres://"):
|
||||||
|
# PostgreSQL은 DB 서버 측에서 직접 Schema 생성을 관리하므로, CLI 도구가 생성한 DDL 마이그레이션 스텁을 사용합니다.
|
||||||
|
# 런타임 수집 중 자동 DDL 실행은 락 이슈 예방을 위해 스킵하고 트랜잭션 연결만 보장합니다.
|
||||||
|
conn = _get_connection(db_target)
|
||||||
|
conn.close()
|
||||||
|
return
|
||||||
|
|
||||||
|
db_path = Path(db_target)
|
||||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
conn = sqlite3.connect(db_path)
|
conn = sqlite3.connect(db_path)
|
||||||
try:
|
try:
|
||||||
@@ -84,26 +109,33 @@ def init_db(db_path: Path) -> None:
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def upsert_collection_run(db_path: Path, run: CollectionRun, finished_at: str | None = None) -> None:
|
def upsert_collection_run(db_target: Path | str, run: CollectionRun, finished_at: str | None = None) -> None:
|
||||||
init_db(db_path)
|
init_db(db_target)
|
||||||
conn = sqlite3.connect(db_path)
|
conn = _get_connection(db_target)
|
||||||
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
try:
|
try:
|
||||||
conn.execute(
|
# SQLite와 PostgreSQL 쿼리 바인딩 플레이스홀더 분기 (? vs %s)
|
||||||
"""
|
param_char = "%s" if is_pg else "?"
|
||||||
|
query = f"""
|
||||||
INSERT INTO collection_runs (
|
INSERT INTO collection_runs (
|
||||||
run_id, collector_name, started_at, finished_at, status,
|
run_id, collector_name, started_at, finished_at, status,
|
||||||
input_source, output_json_path, output_db_path, notes
|
input_source, output_json_path, output_db_path, notes
|
||||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
) VALUES ({', '.join([param_char]*9)})
|
||||||
ON CONFLICT(run_id) DO UPDATE SET
|
ON CONFLICT(run_id) DO UPDATE SET
|
||||||
collector_name=excluded.collector_name,
|
collector_name=EXCLUDED.collector_name,
|
||||||
started_at=excluded.started_at,
|
started_at=EXCLUDED.started_at,
|
||||||
finished_at=excluded.finished_at,
|
finished_at=EXCLUDED.finished_at,
|
||||||
status=excluded.status,
|
status=EXCLUDED.status,
|
||||||
input_source=excluded.input_source,
|
input_source=EXCLUDED.input_source,
|
||||||
output_json_path=excluded.output_json_path,
|
output_json_path=EXCLUDED.output_json_path,
|
||||||
output_db_path=excluded.output_db_path,
|
output_db_path=EXCLUDED.output_db_path,
|
||||||
notes=excluded.notes
|
notes=EXCLUDED.notes
|
||||||
""",
|
"""
|
||||||
|
# PostgreSQL은 ON CONFLICT 테이블명 제외, EXCLUDED는 대소문자 무관하지만 PostgreSQL의 표준은 대문자 EXCLUDED를 권장
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
query,
|
||||||
(
|
(
|
||||||
run.run_id,
|
run.run_id,
|
||||||
run.collector_name,
|
run.collector_name,
|
||||||
@@ -122,7 +154,7 @@ def upsert_collection_run(db_path: Path, run: CollectionRun, finished_at: str |
|
|||||||
|
|
||||||
|
|
||||||
def upsert_collection_snapshot(
|
def upsert_collection_snapshot(
|
||||||
db_path: Path,
|
db_target: Path | str,
|
||||||
*,
|
*,
|
||||||
run_id: str,
|
run_id: str,
|
||||||
dataset_name: str,
|
dataset_name: str,
|
||||||
@@ -135,24 +167,29 @@ def upsert_collection_snapshot(
|
|||||||
payload: dict[str, Any],
|
payload: dict[str, Any],
|
||||||
provenance: dict[str, Any],
|
provenance: dict[str, Any],
|
||||||
) -> None:
|
) -> None:
|
||||||
init_db(db_path)
|
init_db(db_target)
|
||||||
conn = sqlite3.connect(db_path)
|
conn = _get_connection(db_target)
|
||||||
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
try:
|
try:
|
||||||
conn.execute(
|
param_char = "%s" if is_pg else "?"
|
||||||
"""
|
query = f"""
|
||||||
INSERT INTO collection_snapshots (
|
INSERT INTO collection_snapshots (
|
||||||
run_id, dataset_name, ticker, name, sector, as_of_date,
|
run_id, dataset_name, ticker, name, sector, as_of_date,
|
||||||
source_priority, source_status, payload_json, provenance_json
|
source_priority, source_status, payload_json, provenance_json
|
||||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
) VALUES ({', '.join([param_char]*10)})
|
||||||
ON CONFLICT(run_id, dataset_name, ticker) DO UPDATE SET
|
ON CONFLICT(run_id, dataset_name, ticker) DO UPDATE SET
|
||||||
name=excluded.name,
|
name=EXCLUDED.name,
|
||||||
sector=excluded.sector,
|
sector=EXCLUDED.sector,
|
||||||
as_of_date=excluded.as_of_date,
|
as_of_date=EXCLUDED.as_of_date,
|
||||||
source_priority=excluded.source_priority,
|
source_priority=EXCLUDED.source_priority,
|
||||||
source_status=excluded.source_status,
|
source_status=EXCLUDED.source_status,
|
||||||
payload_json=excluded.payload_json,
|
payload_json=EXCLUDED.payload_json,
|
||||||
provenance_json=excluded.provenance_json
|
provenance_json=EXCLUDED.provenance_json
|
||||||
""",
|
"""
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
query,
|
||||||
(
|
(
|
||||||
run_id,
|
run_id,
|
||||||
dataset_name,
|
dataset_name,
|
||||||
@@ -172,7 +209,7 @@ def upsert_collection_snapshot(
|
|||||||
|
|
||||||
|
|
||||||
def append_collection_error(
|
def append_collection_error(
|
||||||
db_path: Path,
|
db_target: Path | str,
|
||||||
*,
|
*,
|
||||||
run_id: str,
|
run_id: str,
|
||||||
source_name: str,
|
source_name: str,
|
||||||
@@ -181,15 +218,20 @@ def append_collection_error(
|
|||||||
ticker: str | None = None,
|
ticker: str | None = None,
|
||||||
payload: dict[str, Any] | None = None,
|
payload: dict[str, Any] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
init_db(db_path)
|
init_db(db_target)
|
||||||
conn = sqlite3.connect(db_path)
|
conn = _get_connection(db_target)
|
||||||
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
try:
|
try:
|
||||||
conn.execute(
|
param_char = "%s" if is_pg else "?"
|
||||||
"""
|
query = f"""
|
||||||
INSERT INTO collection_source_errors (
|
INSERT INTO collection_source_errors (
|
||||||
run_id, ticker, source_name, error_kind, error_message, payload_json
|
run_id, ticker, source_name, error_kind, error_message, payload_json
|
||||||
) VALUES (?, ?, ?, ?, ?, ?)
|
) VALUES ({', '.join([param_char]*6)})
|
||||||
""",
|
"""
|
||||||
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
query,
|
||||||
(
|
(
|
||||||
run_id,
|
run_id,
|
||||||
ticker,
|
ticker,
|
||||||
@@ -204,101 +246,131 @@ def append_collection_error(
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def fetch_latest_snapshots(db_path: Path, ticker: str, dataset_name: str | None = None) -> list[dict[str, Any]]:
|
def fetch_latest_snapshots(db_target: Path | str, ticker: str, dataset_name: str | None = None) -> list[dict[str, Any]]:
|
||||||
if not db_path.exists():
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
|
if not is_pg and not Path(db_target).exists():
|
||||||
return []
|
return []
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
|
conn = _get_connection(db_target)
|
||||||
|
if not is_pg:
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
try:
|
try:
|
||||||
|
param_char = "%s" if is_pg else "?"
|
||||||
|
cursor = conn.cursor()
|
||||||
if dataset_name:
|
if dataset_name:
|
||||||
rows = conn.execute(
|
cursor.execute(
|
||||||
"""
|
f"""
|
||||||
SELECT * FROM collection_snapshots
|
SELECT * FROM collection_snapshots
|
||||||
WHERE ticker = ? AND dataset_name = ?
|
WHERE ticker = {param_char} AND dataset_name = {param_char}
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
""",
|
""",
|
||||||
(ticker, dataset_name),
|
(ticker, dataset_name),
|
||||||
).fetchall()
|
)
|
||||||
else:
|
else:
|
||||||
rows = conn.execute(
|
cursor.execute(
|
||||||
"""
|
f"""
|
||||||
SELECT * FROM collection_snapshots
|
SELECT * FROM collection_snapshots
|
||||||
WHERE ticker = ?
|
WHERE ticker = {param_char}
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
""",
|
""",
|
||||||
(ticker,),
|
(ticker,),
|
||||||
).fetchall()
|
)
|
||||||
|
rows = cursor.fetchall()
|
||||||
return [dict(row) for row in rows]
|
return [dict(row) for row in rows]
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def iter_recent_snapshots(db_path: Path, limit: int = 50) -> Iterable[dict[str, Any]]:
|
def iter_recent_snapshots(db_target: Path | str, limit: int = 50) -> Iterable[dict[str, Any]]:
|
||||||
if not db_path.exists():
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
|
if not is_pg and not Path(db_target).exists():
|
||||||
return []
|
return []
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
|
conn = _get_connection(db_target)
|
||||||
|
if not is_pg:
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
try:
|
try:
|
||||||
rows = conn.execute(
|
param_char = "%s" if is_pg else "?"
|
||||||
"SELECT * FROM collection_snapshots ORDER BY created_at DESC LIMIT ?",
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
f"SELECT * FROM collection_snapshots ORDER BY created_at DESC LIMIT {param_char}",
|
||||||
(limit,),
|
(limit,),
|
||||||
).fetchall()
|
)
|
||||||
|
rows = cursor.fetchall()
|
||||||
return [dict(row) for row in rows]
|
return [dict(row) for row in rows]
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def load_collection_runs(db_path: Path, limit: int = 20) -> list[dict[str, Any]]:
|
def load_collection_runs(db_target: Path | str, limit: int = 20) -> list[dict[str, Any]]:
|
||||||
if not db_path.exists():
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
|
if not is_pg and not Path(db_target).exists():
|
||||||
return []
|
return []
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
|
conn = _get_connection(db_target)
|
||||||
|
if not is_pg:
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
try:
|
try:
|
||||||
rows = conn.execute(
|
param_char = "%s" if is_pg else "?"
|
||||||
"""
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
f"""
|
||||||
SELECT run_id, collector_name, started_at, finished_at, status,
|
SELECT run_id, collector_name, started_at, finished_at, status,
|
||||||
input_source, output_json_path, output_db_path, notes, created_at
|
input_source, output_json_path, output_db_path, notes, created_at
|
||||||
FROM collection_runs
|
FROM collection_runs
|
||||||
ORDER BY started_at DESC, created_at DESC
|
ORDER BY started_at DESC, created_at DESC
|
||||||
LIMIT ?
|
LIMIT {param_char}
|
||||||
""",
|
""",
|
||||||
(int(limit),),
|
(int(limit),),
|
||||||
).fetchall()
|
)
|
||||||
|
rows = cursor.fetchall()
|
||||||
return [dict(row) for row in rows]
|
return [dict(row) for row in rows]
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def load_collection_errors(db_path: Path, limit: int = 20) -> list[dict[str, Any]]:
|
def load_collection_errors(db_target: Path | str, limit: int = 20) -> list[dict[str, Any]]:
|
||||||
if not db_path.exists():
|
db_str = str(db_target)
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
|
if not is_pg and not Path(db_target).exists():
|
||||||
return []
|
return []
|
||||||
conn = sqlite3.connect(db_path)
|
|
||||||
|
conn = _get_connection(db_target)
|
||||||
|
if not is_pg:
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
try:
|
try:
|
||||||
rows = conn.execute(
|
param_char = "%s" if is_pg else "?"
|
||||||
"""
|
cursor = conn.cursor()
|
||||||
|
cursor.execute(
|
||||||
|
f"""
|
||||||
SELECT run_id, ticker, source_name, error_kind, error_message, payload_json, created_at
|
SELECT run_id, ticker, source_name, error_kind, error_message, payload_json, created_at
|
||||||
FROM collection_source_errors
|
FROM collection_source_errors
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
LIMIT ?
|
LIMIT {param_char}
|
||||||
""",
|
""",
|
||||||
(int(limit),),
|
(int(limit),),
|
||||||
).fetchall()
|
)
|
||||||
|
rows = cursor.fetchall()
|
||||||
return [dict(row) for row in rows]
|
return [dict(row) for row in rows]
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def load_collection_dashboard_state(
|
def load_collection_dashboard_state(
|
||||||
db_path: Path | str | None = None,
|
db_target: Path | str | None = None,
|
||||||
output_json_path: Path | str | None = None,
|
output_json_path: Path | str | None = None,
|
||||||
*,
|
*,
|
||||||
limit: int = 8,
|
limit: int = 8,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
db = Path(db_path) if db_path else Path()
|
db_str = str(db_target or "")
|
||||||
|
is_pg = db_str.startswith("postgresql://") or db_str.startswith("postgres://")
|
||||||
|
db = Path(db_target) if db_target and not is_pg else Path()
|
||||||
report = Path(output_json_path) if output_json_path else Path()
|
report = Path(output_json_path) if output_json_path else Path()
|
||||||
state: dict[str, Any] = {
|
state: dict[str, Any] = {
|
||||||
"db_path": str(db),
|
"db_path": db_str,
|
||||||
"output_json_path": str(report) if output_json_path else "",
|
"output_json_path": str(report) if output_json_path else "",
|
||||||
"runs": [],
|
"runs": [],
|
||||||
"recent_snapshots": [],
|
"recent_snapshots": [],
|
||||||
@@ -316,17 +388,31 @@ def load_collection_dashboard_state(
|
|||||||
state["latest_report"] = json.loads(report.read_text(encoding="utf-8"))
|
state["latest_report"] = json.loads(report.read_text(encoding="utf-8"))
|
||||||
except Exception:
|
except Exception:
|
||||||
state["latest_report"] = {}
|
state["latest_report"] = {}
|
||||||
if not db.exists():
|
|
||||||
|
if not is_pg and (not db_target or not db.exists()):
|
||||||
return state
|
return state
|
||||||
conn = sqlite3.connect(db)
|
|
||||||
|
conn = _get_connection(db_target)
|
||||||
|
if not is_pg:
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
try:
|
try:
|
||||||
|
cursor = conn.cursor()
|
||||||
state["counts"] = {
|
state["counts"] = {
|
||||||
"collection_runs": conn.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0],
|
"collection_runs": cursor.execute("SELECT COUNT(*) FROM collection_runs").fetchone()[0] if not is_pg else cursor.execute("SELECT COUNT(*) FROM collection_runs") or 0,
|
||||||
"collection_snapshots": conn.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0],
|
"collection_snapshots": cursor.execute("SELECT COUNT(*) FROM collection_snapshots").fetchone()[0] if not is_pg else cursor.execute("SELECT COUNT(*) FROM collection_snapshots") or 0,
|
||||||
"collection_source_errors": conn.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0],
|
"collection_source_errors": cursor.execute("SELECT COUNT(*) FROM collection_source_errors").fetchone()[0] if not is_pg else cursor.execute("SELECT COUNT(*) FROM collection_source_errors") or 0,
|
||||||
}
|
}
|
||||||
run_row = conn.execute(
|
# PostgreSQL인 경우 단순 fetchone() 보완
|
||||||
|
if is_pg:
|
||||||
|
# PostgreSQL count 처리
|
||||||
|
cursor.execute("SELECT COUNT(*) FROM collection_runs")
|
||||||
|
state["counts"]["collection_runs"] = cursor.fetchone()[0]
|
||||||
|
cursor.execute("SELECT COUNT(*) FROM collection_snapshots")
|
||||||
|
state["counts"]["collection_snapshots"] = cursor.fetchone()[0]
|
||||||
|
cursor.execute("SELECT COUNT(*) FROM collection_source_errors")
|
||||||
|
state["counts"]["collection_source_errors"] = cursor.fetchone()[0]
|
||||||
|
|
||||||
|
cursor.execute(
|
||||||
"""
|
"""
|
||||||
SELECT run_id, collector_name, started_at, finished_at, status,
|
SELECT run_id, collector_name, started_at, finished_at, status,
|
||||||
input_source, output_json_path, output_db_path, notes, created_at
|
input_source, output_json_path, output_db_path, notes, created_at
|
||||||
@@ -334,37 +420,45 @@ def load_collection_dashboard_state(
|
|||||||
ORDER BY started_at DESC, created_at DESC
|
ORDER BY started_at DESC, created_at DESC
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
"""
|
"""
|
||||||
).fetchone()
|
)
|
||||||
|
run_row = cursor.fetchone()
|
||||||
state["latest_run"] = dict(run_row) if run_row is not None else {}
|
state["latest_run"] = dict(run_row) if run_row is not None else {}
|
||||||
state["runs"] = [dict(row) for row in conn.execute(
|
|
||||||
"""
|
param_char = "%s" if is_pg else "?"
|
||||||
|
cursor.execute(
|
||||||
|
f"""
|
||||||
SELECT run_id, collector_name, started_at, finished_at, status,
|
SELECT run_id, collector_name, started_at, finished_at, status,
|
||||||
input_source, output_json_path, output_db_path, notes, created_at
|
input_source, output_json_path, output_db_path, notes, created_at
|
||||||
FROM collection_runs
|
FROM collection_runs
|
||||||
ORDER BY started_at DESC, created_at DESC
|
ORDER BY started_at DESC, created_at DESC
|
||||||
LIMIT ?
|
LIMIT {param_char}
|
||||||
""",
|
""",
|
||||||
(int(limit),),
|
(int(limit),),
|
||||||
).fetchall()]
|
)
|
||||||
state["recent_snapshots"] = [dict(row) for row in conn.execute(
|
state["runs"] = [dict(row) for row in cursor.fetchall()]
|
||||||
"""
|
|
||||||
|
cursor.execute(
|
||||||
|
f"""
|
||||||
SELECT run_id, dataset_name, ticker, name, sector, as_of_date,
|
SELECT run_id, dataset_name, ticker, name, sector, as_of_date,
|
||||||
source_priority, source_status, created_at
|
source_priority, source_status, created_at
|
||||||
FROM collection_snapshots
|
FROM collection_snapshots
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
LIMIT ?
|
LIMIT {param_char}
|
||||||
""",
|
""",
|
||||||
(int(limit),),
|
(int(limit),),
|
||||||
).fetchall()]
|
)
|
||||||
state["recent_errors"] = [dict(row) for row in conn.execute(
|
state["recent_snapshots"] = [dict(row) for row in cursor.fetchall()]
|
||||||
"""
|
|
||||||
|
cursor.execute(
|
||||||
|
f"""
|
||||||
SELECT run_id, ticker, source_name, error_kind, error_message, created_at
|
SELECT run_id, ticker, source_name, error_kind, error_message, created_at
|
||||||
FROM collection_source_errors
|
FROM collection_source_errors
|
||||||
ORDER BY created_at DESC
|
ORDER BY created_at DESC
|
||||||
LIMIT ?
|
LIMIT {param_char}
|
||||||
""",
|
""",
|
||||||
(int(limit),),
|
(int(limit),),
|
||||||
).fetchall()]
|
)
|
||||||
|
state["recent_errors"] = [dict(row) for row in cursor.fetchall()]
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
return state
|
return state
|
||||||
|
|||||||
Reference in New Issue
Block a user