diff --git a/docs/DAILY_SIGNAL_TRACKING.md b/docs/DAILY_SIGNAL_TRACKING.md index 5f6d14f..968199b 100644 --- a/docs/DAILY_SIGNAL_TRACKING.md +++ b/docs/DAILY_SIGNAL_TRACKING.md @@ -11,7 +11,7 @@ ### 1️⃣ 신호 발생 시 (거래 진입 시점) ```python -# Python 또는 GAS 콘솔에서 실행 +# Python 또는 DB 마이그레이션 도구에서 실행 signal = { "date": "2026-06-25", "ticker": "000660", # SK하이닉스 등 @@ -25,14 +25,13 @@ signal = { "notes": "MA20 돌파 + 스마트머니 매수" } -# GAS: addSignal_(signal) -# 또는 스프레드시트에 직접 입력 +# 운영 표준: PostgreSQL의 signal/factor history 테이블에 적재 ``` **✅ 체크리스트:** - [ ] signal_id 자동 생성됨 (YYYYMMDD_HHMM 형식) - [ ] validation_status = "UNVALIDATED" -- [ ] 스프레드시트 행 추가됨 +- [ ] PostgreSQL 이력 행 추가됨 --- @@ -47,7 +46,7 @@ signal = { **해야 할 일:** 1. T+5일의 종가 조회 2. `updatePriceT5_(signalId, priceT5)` 실행 -3. 또는 스프레드시트 "price_t5" 열에 직접 입력 +3. 또는 PostgreSQL `price_t5` 이력 열에 직접 입력 **예시:** ``` @@ -264,8 +263,9 @@ T+20 종가: 51,050원 ## 🔗 관련 문서 -- `spec/realtime/live_outcome_ledger_plan.yaml` — 마스터 계획 -- `src/google_apps_script/live_outcome_ledger.gs` — GAS 코드 +- `spec/realtime/live_outcome_ledger_plan.yaml` — 마스터 계획(역사적) +- `src/google_apps_script/live_outcome_ledger.gs` — 역사적 GAS 원장 어댑터 +- `spec/02_data_contract.yaml` — PostgreSQL history-first 운영 계약 - `V9_HARDENING_IMPLEMENTATION_ROADMAP.md` — 전체 로드맵 --- diff --git a/docs/POSTGRESQL_HISTORY_FIRST_OPERATING_MODEL.md b/docs/POSTGRESQL_HISTORY_FIRST_OPERATING_MODEL.md new file mode 100644 index 0000000..31c06fe --- /dev/null +++ b/docs/POSTGRESQL_HISTORY_FIRST_OPERATING_MODEL.md @@ -0,0 +1,32 @@ +# PostgreSQL History-First Operating Model + +## 목적 + +운영 이력, 원천 팩터, 파생 팩터, 최종 판단, 시장-엔진 괴리를 PostgreSQL에 영구 이력으로 적재한다. + +## 원칙 + +- PostgreSQL이 canonical operating history store다. +- Excel workbook과 Google Apps Script는 운영 소스가 아니다. +- 모든 파생 결과는 versioned snapshot과 provenance를 가져야 한다. +- 시장 raw와 엔진 결과의 괴리는 별도 gap history로 남긴다. + +## 이력 도메인 + +- `market_raw_history` +- `factor_version_history` +- `factor_output_history` +- `decision_result_history` +- `market_vs_engine_gap_history` + +## 운영 규칙 + +- Append-only를 기본으로 하고, 정정은 correction row로만 남긴다. +- 최종 팩터와 최종 판단은 항상 `source_version`을 포함한다. +- DB snapshot이 존재하면 리포트와 생성기는 이를 1차 진실원천으로 사용한다. + +## 폐기 대상 + +- 운영 경로의 Excel 시트 의존 +- 운영 경로의 GAS 의사결정/원장 갱신 + diff --git a/spec/02_data_contract.yaml b/spec/02_data_contract.yaml index d002a68..0b70e68 100644 --- a/spec/02_data_contract.yaml +++ b/spec/02_data_contract.yaml @@ -172,6 +172,26 @@ quant_feed_contract: normalization: "숫자로 읽힌 91160.0, 5930.0 등은 문자열화 후 6자리 zero-pad 적용." validation_commands: ["npm run validate-data-sample", "npm run validate-specs"] xlsx_refresh_rule: "xlsx 원본을 갱신했으면 먼저 DB에 반영한 뒤, 엔진이 DB를 읽어 JSON 파생 보고서를 재생성하고 다시 검증한다." + + database_first_operating_model: + purpose: "운영 이력, 원천 팩터, 파생 최종 팩터, 시장-결과 괴리를 PostgreSQL에 누적해 엔진을 고도화한다." + canonical_store: + primary: "PostgreSQL" + secondary: "SQLite transient cache only" + prohibited_operating_path: + - "Excel workbook as operational source" + - "Google Apps Script as operational source" + history_domains: + - "market_raw_history" + - "factor_version_history" + - "factor_output_history" + - "decision_result_history" + - "market_vs_engine_gap_history" + policy: + - "최종 팩터와 최종 판단은 DB 이력 테이블에 버전과 시각을 함께 남긴다." + - "시장 raw와 엔진 결과의 괴리는 별도 gap history로 적재한다." + - "엑셀/시트/Apps Script는 더 이상 운영 경로가 아니라, 역사적 import/export 또는 폐기 대상만 허용한다." + - "새 분석·리포트는 PostgreSQL snapshot을 1차 진실원천으로 사용한다." xlsx_analysis_protocol: purpose: "xlsx는 HTS 잔고·거래내역 판독 또는 DB 반영 이전의 보조 감사 소스다. 시장 raw 일반 분석과 최종 보고서 생성은 DB 추적 후의 파생 JSON을 우선한다." python_parsing_baseline: diff --git a/spec/postgresql_history_contract.yaml b/spec/postgresql_history_contract.yaml new file mode 100644 index 0000000..a52c5de --- /dev/null +++ b/spec/postgresql_history_contract.yaml @@ -0,0 +1,74 @@ +schema_version: "postgresql_history_contract_v1" +title: "PostgreSQL History-First Operating Contract" +purpose: "시장 원천, 팩터 버전, 최종 팩터 출력, 엔진 의사결정, 시장-엔진 괴리를 PostgreSQL에 누적한다." + +canonical_principles: + - "PostgreSQL is the canonical operating history store." + - "Excel workbooks and Google Apps Script are not operational sources of truth." + - "All derived analysis must be traceable to a versioned DB snapshot." + - "Factor outputs and decision outputs must carry provenance and source_version." + +domains: + market_raw_history: + description: "시장 원천 데이터 이력" + key_fields: + - source_id + - observed_at + - source_name + - instrument_id + - field_name + - field_value + - unit + factor_version_history: + description: "공식/임계값/팩터 버전 이력" + key_fields: + - factor_id + - factor_version + - effective_from + - effective_to + - formula_id + - source_version + factor_output_history: + description: "최종 팩터 산출 이력" + key_fields: + - factor_output_id + - observed_at + - factor_id + - factor_version + - output_value + - output_gate + - source_version + decision_result_history: + description: "엔진 최종 판단/실행 결과 이력" + key_fields: + - decision_id + - decided_at + - instrument_id + - action + - gate + - score + - source_version + market_vs_engine_gap_history: + description: "시장 실측과 엔진 결과 괴리 이력" + key_fields: + - gap_id + - observed_at + - instrument_id + - metric_name + - market_value + - engine_value + - gap_value + - gap_pct + - source_version + +operating_rules: + - "New history rows are append-only except for explicit correction rows." + - "Correction rows must reference corrected_row_id and correction_reason." + - "Factor recomputation must preserve previous outputs in history." + - "No report should read directly from Excel/GAS when PostgreSQL snapshot is available." + +implementation_targets: + - "src/quant_engine/postgresql_history_store_v1.py" + - "tools/build_postgresql_history_snapshot_v1.py" + - "tools/validate_postgresql_history_contract_v1.py" + - "docs/POSTGRESQL_HISTORY_FIRST_OPERATING_MODEL.md" diff --git a/spec/realtime/live_outcome_ledger_plan.yaml b/spec/realtime/live_outcome_ledger_plan.yaml index 4c6772a..97e4474 100644 --- a/spec/realtime/live_outcome_ledger_plan.yaml +++ b/spec/realtime/live_outcome_ledger_plan.yaml @@ -12,6 +12,12 @@ purpose: | UNVALIDATED → PROVISIONAL → CALIBRATED 상태 전환 honest_proof_score: 56.57 → 95.0 달성 +implementation_note: | + live_outcome_ledger.gs는 Google Sheets 원장 적재/갱신용 GAS thin adapter다. + 운영 리포트와 검증용 JSON 산출물은 Python 하네스가 Temp/ 경로에 생성한다. + GAS는 JSON 리포트를 직접 출력하지 않는다. + 이후 운영 표준은 PostgreSQL history store이며, 시트/GAS는 운영 경로에서 제외한다. + current_state: honest_proof_score: 56.57 target_score: 95.0 @@ -132,7 +138,8 @@ honest_proof_improvement_path: # ───────────────────────────────────────────────────────────────────────────── tracking_system: - spreadsheet: "live_outcome_ledger (GAS 연동 스프레드시트)" + datastore: "PostgreSQL history store" + deprecated_surface: "live_outcome_ledger (GAS 연동 스프레드시트)" daily_tasks: - "신규 신호 entry 작성 (시작할 때)" @@ -151,11 +158,12 @@ tracking_system: # ───────────────────────────────────────────────────────────────────────────── checklist: - - [ ] "live_outcome_ledger 스프레드시트 생성 (GAS 연동)" - - [ ] "신호 기록 템플릿 작성" - - [ ] "T+20 가격 수집 자동화 (GAS)" - - [ ] "daily commit: 신호 추가 시마다" - - [ ] "30개 신호 누적 (약 6주)" - - [ ] "win_rate >= 60% 달성" - - [ ] "CALIBRATED 전환" - - [ ] "honest_proof_score 95 달성" + - "[ ] live_outcome_ledger 스프레드시트 생성 (GAS 연동)" + - "[ ] 신호 기록 템플릿 작성" + - "[ ] T+20 가격 수집 자동화 (GAS)" + - "[ ] Temp/operational_t20_outcome_ledger_v1.json 생성 체인 유지 (Python)" + - "[ ] daily commit: 신호 추가 시마다" + - "[ ] 30개 신호 누적 (약 6주)" + - "[ ] win_rate >= 60% 달성" + - "[ ] CALIBRATED 전환" + - "[ ] honest_proof_score 95 달성" diff --git a/src/dotnet/QuantEngine.Application/Services/HistoryIngestionService.cs b/src/dotnet/QuantEngine.Application/Services/HistoryIngestionService.cs new file mode 100644 index 0000000..6a4d927 --- /dev/null +++ b/src/dotnet/QuantEngine.Application/Services/HistoryIngestionService.cs @@ -0,0 +1,92 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using QuantEngine.Core.Domain; +using QuantEngine.Core.Interfaces; + +namespace QuantEngine.Application.Services +{ + public class HistoryIngestionService + { + private readonly IPostgresqlHistoryStore _store; + + public HistoryIngestionService(IPostgresqlHistoryStore store) + { + _store = store; + } + + public Task AppendDecisionAsync(IDictionary payload) + => _store.AppendAsync("decision_result_history", payload); + + public Task AppendFactorOutputAsync(IDictionary payload) + => _store.AppendAsync("factor_output_history", payload); + + public Task AppendMarketRawAsync(IDictionary payload) + => _store.AppendAsync("market_raw_history", payload); + + public Task AppendGapAsync(IDictionary payload) + => _store.AppendAsync("market_vs_engine_gap_history", payload); + + public Task AppendDecisionAsync( + FinalDecisionResult decision, + SellDecisionResult? sellDecision = null, + TimingDecisionResult? timingDecision = null, + string? instrumentId = null, + string? sourceVersion = null, + string? gate = null) + { + var payload = new Dictionary + { + ["decision_id"] = Guid.NewGuid().ToString("N"), + ["decided_at"] = DateTimeOffset.UtcNow, + ["instrument_id"] = instrumentId ?? string.Empty, + ["action"] = decision.FinalAction, + ["gate"] = gate ?? (string.IsNullOrWhiteSpace(sellDecision?.Validation) ? "PASS" : sellDecision.Validation), + ["score"] = decision.PriorityScore, + ["source_version"] = sourceVersion ?? decision.DecisionSource, + ["provenance"] = new Dictionary + { + ["final_action"] = decision.FinalAction, + ["action_priority"] = decision.ActionPriority, + ["priority_score"] = decision.PriorityScore, + ["decision_source"] = decision.DecisionSource, + ["sell_action"] = sellDecision?.Action, + ["sell_validation"] = sellDecision?.Validation, + ["timing_action"] = timingDecision?.Action, + ["timing_reason"] = timingDecision?.Reason + } + }; + + return _store.AppendAsync("decision_result_history", payload); + } + + public Task AppendFactorOutputAsync( + string factorId, + string factorVersion, + double outputValue, + string outputGate, + string? sourceVersion = null, + DateTimeOffset? observedAt = null) + { + var payload = new Dictionary + { + ["factor_output_id"] = Guid.NewGuid().ToString("N"), + ["observed_at"] = observedAt ?? DateTimeOffset.UtcNow, + ["factor_id"] = factorId, + ["factor_version"] = factorVersion, + ["output_value"] = outputValue, + ["output_gate"] = outputGate, + ["source_version"] = sourceVersion ?? factorVersion, + ["provenance"] = new Dictionary + { + ["factor_id"] = factorId, + ["factor_version"] = factorVersion, + ["output_value"] = outputValue, + ["output_gate"] = outputGate, + ["source_version"] = sourceVersion ?? factorVersion + } + }; + + return _store.AppendAsync("factor_output_history", payload); + } + } +} diff --git a/src/dotnet/QuantEngine.Application/Services/PostgresqlHistorySnapshotReader.cs b/src/dotnet/QuantEngine.Application/Services/PostgresqlHistorySnapshotReader.cs new file mode 100644 index 0000000..248f928 --- /dev/null +++ b/src/dotnet/QuantEngine.Application/Services/PostgresqlHistorySnapshotReader.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using QuantEngine.Core.Interfaces; + +namespace QuantEngine.Application.Services +{ + public class PostgresqlHistorySnapshotReader : IPostgresqlHistorySnapshotReader + { + private readonly IPostgresqlHistoryStore _store; + + public PostgresqlHistorySnapshotReader(IPostgresqlHistoryStore store) + { + _store = store; + } + + public Task>> ReadAsync(string domain, int limit = 500) + => _store.SnapshotAsync(domain, limit); + } +} diff --git a/src/dotnet/QuantEngine.Application/Services/WorkspaceService.cs b/src/dotnet/QuantEngine.Application/Services/WorkspaceService.cs index effa7aa..6113c94 100644 --- a/src/dotnet/QuantEngine.Application/Services/WorkspaceService.cs +++ b/src/dotnet/QuantEngine.Application/Services/WorkspaceService.cs @@ -9,10 +9,12 @@ namespace QuantEngine.Application.Services public class WorkspaceService { private readonly IWorkspaceRepository _repository; + private readonly IPostgresqlHistoryStore _historyStore; - public WorkspaceService(IWorkspaceRepository repository) + public WorkspaceService(IWorkspaceRepository repository, IPostgresqlHistoryStore historyStore) { _repository = repository; + _historyStore = historyStore; } public Task> GetSettingsAsync() => _repository.GetSettingsAsync(); @@ -23,5 +25,8 @@ namespace QuantEngine.Application.Services public Task> GetAccountSnapshotsAsync() => _repository.GetAccountSnapshotsAsync(); public Task InsertAccountSnapshotsAsync(IEnumerable snapshots) => _repository.InsertAccountSnapshotsAsync(snapshots); public Task ClearAccountSnapshotsAsync() => _repository.ClearAccountSnapshotsAsync(); + + public Task AppendHistoryAsync(string domain, IDictionary payload) => _historyStore.AppendAsync(domain, payload); + public Task>> ReadHistorySnapshotAsync(string domain, int limit = 500) => _historyStore.SnapshotAsync(domain, limit); } } diff --git a/src/dotnet/QuantEngine.Core/Interfaces/IPostgresqlHistorySnapshotReader.cs b/src/dotnet/QuantEngine.Core/Interfaces/IPostgresqlHistorySnapshotReader.cs new file mode 100644 index 0000000..02f88c3 --- /dev/null +++ b/src/dotnet/QuantEngine.Core/Interfaces/IPostgresqlHistorySnapshotReader.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace QuantEngine.Core.Interfaces +{ + public interface IPostgresqlHistorySnapshotReader + { + Task>> ReadAsync(string domain, int limit = 500); + } +} diff --git a/src/dotnet/QuantEngine.Core/Interfaces/IPostgresqlHistoryStore.cs b/src/dotnet/QuantEngine.Core/Interfaces/IPostgresqlHistoryStore.cs new file mode 100644 index 0000000..997d996 --- /dev/null +++ b/src/dotnet/QuantEngine.Core/Interfaces/IPostgresqlHistoryStore.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace QuantEngine.Core.Interfaces +{ + public interface IPostgresqlHistoryStore + { + Task AppendAsync(string domain, IDictionary payload); + Task>> SnapshotAsync(string domain, int limit = 500); + } +} diff --git a/src/dotnet/QuantEngine.Core/Models/HistoryRow.cs b/src/dotnet/QuantEngine.Core/Models/HistoryRow.cs new file mode 100644 index 0000000..16ad970 --- /dev/null +++ b/src/dotnet/QuantEngine.Core/Models/HistoryRow.cs @@ -0,0 +1,10 @@ +using System.Collections.Generic; + +namespace QuantEngine.Core.Models +{ + public class HistoryRow + { + public string Domain { get; set; } = string.Empty; + public IDictionary Payload { get; set; } = new Dictionary(); + } +} diff --git a/src/dotnet/QuantEngine.Infrastructure/Data/DbMigrator.cs b/src/dotnet/QuantEngine.Infrastructure/Data/DbMigrator.cs index f058459..70c658e 100644 --- a/src/dotnet/QuantEngine.Infrastructure/Data/DbMigrator.cs +++ b/src/dotnet/QuantEngine.Infrastructure/Data/DbMigrator.cs @@ -156,6 +156,87 @@ namespace QuantEngine.Infrastructure.Data PRIMARY KEY (domain, target_ref) ); "); + + // 10. engine_history schema and tables + conn.Execute(@" + CREATE SCHEMA IF NOT EXISTS engine_history; + "); + conn.Execute(@" + CREATE TABLE IF NOT EXISTS engine_history.market_raw_history ( + id BIGSERIAL PRIMARY KEY, + source_id TEXT NOT NULL, + observed_at TEXT NOT NULL, + source_name TEXT NOT NULL, + instrument_id TEXT NOT NULL, + field_name TEXT NOT NULL, + field_value TEXT NOT NULL, + unit TEXT NOT NULL, + provenance JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_market_raw_history_created_at ON engine_history.market_raw_history (created_at DESC); + "); + conn.Execute(@" + CREATE TABLE IF NOT EXISTS engine_history.factor_version_history ( + id BIGSERIAL PRIMARY KEY, + factor_id TEXT NOT NULL, + factor_version TEXT NOT NULL, + effective_from TEXT NOT NULL, + effective_to TEXT NOT NULL, + formula_id TEXT NOT NULL, + source_version TEXT NOT NULL, + provenance JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_factor_version_history_created_at ON engine_history.factor_version_history (created_at DESC); + "); + conn.Execute(@" + CREATE TABLE IF NOT EXISTS engine_history.factor_output_history ( + id BIGSERIAL PRIMARY KEY, + factor_output_id TEXT NOT NULL, + observed_at TEXT NOT NULL, + factor_id TEXT NOT NULL, + factor_version TEXT NOT NULL, + output_value TEXT NOT NULL, + output_gate TEXT NOT NULL, + source_version TEXT NOT NULL, + provenance JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_factor_output_history_created_at ON engine_history.factor_output_history (created_at DESC); + "); + conn.Execute(@" + CREATE TABLE IF NOT EXISTS engine_history.decision_result_history ( + id BIGSERIAL PRIMARY KEY, + decision_id TEXT NOT NULL, + decided_at TEXT NOT NULL, + instrument_id TEXT NOT NULL, + action TEXT NOT NULL, + gate TEXT NOT NULL, + score TEXT NOT NULL, + source_version TEXT NOT NULL, + provenance JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_decision_result_history_created_at ON engine_history.decision_result_history (created_at DESC); + "); + conn.Execute(@" + CREATE TABLE IF NOT EXISTS engine_history.market_vs_engine_gap_history ( + id BIGSERIAL PRIMARY KEY, + gap_id TEXT NOT NULL, + observed_at TEXT NOT NULL, + instrument_id TEXT NOT NULL, + metric_name TEXT NOT NULL, + market_value TEXT NOT NULL, + engine_value TEXT NOT NULL, + gap_value TEXT NOT NULL, + gap_pct TEXT NOT NULL, + source_version TEXT NOT NULL, + provenance JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ); + CREATE INDEX IF NOT EXISTS idx_market_vs_engine_gap_history_created_at ON engine_history.market_vs_engine_gap_history (created_at DESC); + "); } } } diff --git a/src/dotnet/QuantEngine.Infrastructure/Repositories/PostgresqlHistoryStore.cs b/src/dotnet/QuantEngine.Infrastructure/Repositories/PostgresqlHistoryStore.cs new file mode 100644 index 0000000..155f299 --- /dev/null +++ b/src/dotnet/QuantEngine.Infrastructure/Repositories/PostgresqlHistoryStore.cs @@ -0,0 +1,67 @@ +using System.Data; +using System.Text.Json; +using Dapper; +using QuantEngine.Infrastructure.Data; +using QuantEngine.Core.Interfaces; + +namespace QuantEngine.Infrastructure.Repositories +{ + public class PostgresqlHistoryStore : IPostgresqlHistoryStore + { + private readonly IDbConnectionFactory _connectionFactory; + + private static readonly IReadOnlyDictionary DomainColumns = new Dictionary + { + ["market_raw_history"] = new[] { "source_id", "observed_at", "source_name", "instrument_id", "field_name", "field_value", "unit" }, + ["factor_version_history"] = new[] { "factor_id", "factor_version", "effective_from", "effective_to", "formula_id", "source_version" }, + ["factor_output_history"] = new[] { "factor_output_id", "observed_at", "factor_id", "factor_version", "output_value", "output_gate", "source_version" }, + ["decision_result_history"] = new[] { "decision_id", "decided_at", "instrument_id", "action", "gate", "score", "source_version" }, + ["market_vs_engine_gap_history"] = new[] { "gap_id", "observed_at", "instrument_id", "metric_name", "market_value", "engine_value", "gap_value", "gap_pct", "source_version" } + }; + + public PostgresqlHistoryStore(IDbConnectionFactory connectionFactory) + { + _connectionFactory = connectionFactory; + } + + public async Task AppendAsync(string domain, IDictionary payload) + { + if (!DomainColumns.TryGetValue(domain, out var columns)) + throw new ArgumentException($"Unsupported history domain: {domain}", nameof(domain)); + + using var conn = _connectionFactory.CreateConnection(); + conn.Open(); + + var values = new DynamicParameters(); + var insertColumns = new List(columns.Length + 1); + var placeholders = new List(columns.Length + 1); + foreach (var column in columns) + { + insertColumns.Add(column); + placeholders.Add($"@{column}"); + values.Add(column, payload.TryGetValue(column, out var value) ? value : null); + } + + insertColumns.Add("provenance"); + placeholders.Add("@provenance"); + var provenance = payload.TryGetValue("provenance", out var provenanceValue) ? provenanceValue : new Dictionary(); + values.Add("provenance", provenance is string s ? s : JsonSerializer.Serialize(provenance)); + + var sql = $@"INSERT INTO engine_history.{domain} ({string.Join(", ", insertColumns)}) VALUES ({string.Join(", ", placeholders)})"; + return await conn.ExecuteAsync(sql, values); + } + + public async Task>> SnapshotAsync(string domain, int limit = 500) + { + if (!DomainColumns.ContainsKey(domain)) + throw new ArgumentException($"Unsupported history domain: {domain}", nameof(domain)); + + using var conn = _connectionFactory.CreateConnection(); + conn.Open(); + + var sql = $@"SELECT * FROM engine_history.{domain} ORDER BY created_at DESC LIMIT @Limit"; + var rows = await conn.QueryAsync(sql, new { Limit = limit }); + return rows.Select(row => (IDictionary)row).ToList(); + } + } +} diff --git a/src/quant_engine/postgresql_history_store_v1.py b/src/quant_engine/postgresql_history_store_v1.py new file mode 100644 index 0000000..e4163c7 --- /dev/null +++ b/src/quant_engine/postgresql_history_store_v1.py @@ -0,0 +1,82 @@ +"""PostgreSQL history store for engine provenance tracking. + +This module is intentionally thin: it owns connection, table routing, append +operations, and snapshot reads for the history-first operating model. +""" +from __future__ import annotations + +import json +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Iterable + + +DOMAIN_TABLES = { + "market_raw_history": "engine_history.market_raw_history", + "factor_version_history": "engine_history.factor_version_history", + "factor_output_history": "engine_history.factor_output_history", + "decision_result_history": "engine_history.decision_result_history", + "market_vs_engine_gap_history": "engine_history.market_vs_engine_gap_history", +} + + +@dataclass(frozen=True) +class HistoryRow: + domain: str + payload: dict[str, Any] + + +def _is_pg_dsn(value: str) -> bool: + return value.startswith("postgresql://") or value.startswith("postgres://") + + +def connect(dsn_or_path: str | Path) -> Any: + value = str(dsn_or_path) + if _is_pg_dsn(value): + try: + import psycopg2 + except ImportError as exc: + raise ImportError("PostgreSQL DSN requires psycopg2") from exc + return psycopg2.connect(value) + raise ValueError("postgresql_history_store_v1 only accepts a PostgreSQL DSN") + + +def ensure_schema(conn: Any) -> None: + sql = """ + CREATE SCHEMA IF NOT EXISTS engine_history; + """ + cur = conn.cursor() + cur.execute(sql) + conn.commit() + + +def append_row(conn: Any, domain: str, payload: dict[str, Any]) -> None: + table = DOMAIN_TABLES.get(domain) + if not table: + raise KeyError(f"unknown domain: {domain}") + ensure_schema(conn) + keys = [k for k in payload.keys() if k != "id"] + cols = ", ".join(keys + ["provenance"]) + placeholders = ", ".join(["%s"] * (len(keys) + 1)) + values = [json.dumps(payload.get(k), ensure_ascii=False, default=str) if isinstance(payload.get(k), (dict, list)) else payload.get(k) for k in keys] + values.append(json.dumps(payload.get("provenance") or {}, ensure_ascii=False, default=str)) + sql = f"INSERT INTO {table} ({cols}) VALUES ({placeholders})" + cur = conn.cursor() + cur.execute(sql, values) + conn.commit() + + +def append_rows(conn: Any, rows: Iterable[HistoryRow]) -> None: + for row in rows: + append_row(conn, row.domain, row.payload) + + +def snapshot_table(conn: Any, domain: str, limit: int = 1000) -> list[dict[str, Any]]: + table = DOMAIN_TABLES.get(domain) + if not table: + raise KeyError(f"unknown domain: {domain}") + cur = conn.cursor() + cur.execute(f"SELECT * FROM {table} ORDER BY created_at DESC LIMIT %s", (limit,)) + columns = [col[0] for col in cur.description] + return [dict(zip(columns, row)) for row in cur.fetchall()] + diff --git a/tools/build_postgresql_history_snapshot_v1.py b/tools/build_postgresql_history_snapshot_v1.py new file mode 100644 index 0000000..282db5d --- /dev/null +++ b/tools/build_postgresql_history_snapshot_v1.py @@ -0,0 +1,45 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path + +from src.quant_engine.postgresql_history_store_v1 import DOMAIN_TABLES + +ROOT = Path(__file__).resolve().parents[1] + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--dsn", required=True) + ap.add_argument("--out", default=str(ROOT / "Temp" / "postgresql_history_snapshot_v1.json")) + ap.add_argument("--limit", type=int, default=200) + args = ap.parse_args() + + try: + from src.quant_engine.postgresql_history_store_v1 import connect, snapshot_table + except Exception as exc: + raise SystemExit(f"import_failed: {exc}") + + conn = connect(args.dsn) + try: + payload = { + "formula_id": "POSTGRESQL_HISTORY_SNAPSHOT_V1", + "gate": "PASS", + "domains": { + domain: snapshot_table(conn, domain, limit=args.limit) + for domain in DOMAIN_TABLES + }, + } + finally: + conn.close() + + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(payload, ensure_ascii=False, indent=2, default=str), encoding="utf-8") + print(f"POSTGRESQL_HISTORY_SNAPSHOT_V1 gate=PASS out={out}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/generate_postgresql_history_schema_v1.py b/tools/generate_postgresql_history_schema_v1.py new file mode 100644 index 0000000..c72af7e --- /dev/null +++ b/tools/generate_postgresql_history_schema_v1.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +import argparse +from pathlib import Path + +import yaml + +ROOT = Path(__file__).resolve().parents[1] +CONTRACT = ROOT / "spec" / "postgresql_history_contract.yaml" +DEFAULT_SQL = ROOT / "Temp" / "postgresql_history_schema_v1.sql" +DEFAULT_JSON = ROOT / "Temp" / "postgresql_history_schema_v1.json" + + +def _columns(domain: dict) -> list[str]: + cols = domain.get("key_fields") or [] + out: list[str] = [] + for col in cols: + name = str(col) + if name in {"provenance"}: + continue + out.append(name) + return out + + +def _table_name(domain_name: str) -> str: + return domain_name + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--contract", default=str(CONTRACT)) + ap.add_argument("--sql-out", default=str(DEFAULT_SQL)) + ap.add_argument("--json-out", default=str(DEFAULT_JSON)) + args = ap.parse_args() + + contract_path = Path(args.contract) + data = yaml.safe_load(contract_path.read_text(encoding="utf-8")) + domains = data.get("domains") or {} + + sql_lines = [ + "-- PostgreSQL history-first schema", + "-- generated from spec/postgresql_history_contract.yaml", + "", + "CREATE SCHEMA IF NOT EXISTS engine_history;", + "" + ] + table_defs: dict[str, dict[str, object]] = {} + + for domain_name, domain in domains.items(): + if not isinstance(domain, dict): + continue + cols = _columns(domain) + table_name = _table_name(domain_name) + sql_lines.append(f"CREATE TABLE IF NOT EXISTS engine_history.{table_name} (") + sql_lines.append(" id BIGSERIAL PRIMARY KEY,") + for col in cols: + sql_lines.append(f" {col} TEXT NOT NULL,") + sql_lines.append(" provenance JSONB NOT NULL DEFAULT '{}'::jsonb,") + sql_lines.append(" created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()") + sql_lines.append(");") + sql_lines.append("") + sql_lines.append(f"CREATE INDEX IF NOT EXISTS idx_{table_name}_created_at ON engine_history.{table_name} (created_at DESC);") + sql_lines.append("") + table_defs[table_name] = {"columns": cols, "description": domain.get("description", "")} + + sql_text = "\n".join(sql_lines).rstrip() + "\n" + sql_out = Path(args.sql_out) + json_out = Path(args.json_out) + sql_out.parent.mkdir(parents=True, exist_ok=True) + sql_out.write_text(sql_text, encoding="utf-8") + json_out.write_text( + yaml.safe_dump( + { + "formula_id": "POSTGRESQL_HISTORY_SCHEMA_V1", + "gate": "PASS", + "contract_path": str(contract_path.relative_to(ROOT)), + "tables": table_defs, + "sql_out": str(sql_out.relative_to(ROOT)), + }, + allow_unicode=True, + sort_keys=False, + ), + encoding="utf-8", + ) + print(f"POSTGRESQL_HISTORY_SCHEMA_V1 gate=PASS tables={len(table_defs)}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/validate_postgresql_history_contract_v1.py b/tools/validate_postgresql_history_contract_v1.py new file mode 100644 index 0000000..ed6c5c8 --- /dev/null +++ b/tools/validate_postgresql_history_contract_v1.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +import json +from pathlib import Path + +import yaml + +ROOT = Path(__file__).resolve().parents[1] +CONTRACT = ROOT / "spec" / "postgresql_history_contract.yaml" +OUT = ROOT / "Temp" / "postgresql_history_contract_v1.json" + + +def main() -> int: + errors: list[str] = [] + if not CONTRACT.exists(): + errors.append("contract_missing") + else: + try: + data = yaml.safe_load(CONTRACT.read_text(encoding="utf-8")) + except Exception as exc: + errors.append(f"yaml_parse_error:{exc}") + data = {} + if not isinstance(data, dict): + errors.append("contract_not_mapping") + else: + for key in ("market_raw_history", "factor_version_history", "factor_output_history", "decision_result_history", "market_vs_engine_gap_history"): + if key not in (data.get("domains") or {}): + errors.append(f"missing_domain:{key}") + if "PostgreSQL" not in json.dumps(data, ensure_ascii=False): + errors.append("postgresql_not_mentioned") + + result = { + "formula_id": "POSTGRESQL_HISTORY_CONTRACT_V1", + "gate": "PASS" if not errors else "FAIL", + "errors": errors, + "contract_path": str(CONTRACT.relative_to(ROOT)), + } + OUT.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(result, ensure_ascii=False, indent=2)) + return 0 if not errors else 1 + + +if __name__ == "__main__": + raise SystemExit(main())