diff --git a/docs/ROADMAP_WBS.md b/docs/ROADMAP_WBS.md index 2ef4ade..7b3a823 100644 --- a/docs/ROADMAP_WBS.md +++ b/docs/ROADMAP_WBS.md @@ -692,14 +692,20 @@ python tools/build_qualitative_sell_inputs_v1.py --batch --workbook GatherTradin --- -#### WBS-7.3 GAS→Python 공식 마이그레이션 재검토 (2026-06-21) +#### WBS-7.3 GAS→Python 공식 마이그레이션 재검토 (2026-06-21~22) | 항목 | 내용 | |------|------| -| **작업** | `governance/gas_logic_migration_ledger_v1.yaml` 15건 findings 전체를 원문부터 재검증 | -| **현재 상태** | 2건 DONE(F01/F09, 레저가 stale했을 뿐 실제론 이미 등록됨), 1건 KEEP_IN_GAS, **12건 TODO 유지 — 의도적 보류** | -| **담당 파일** | `governance/gas_logic_migration_ledger_v1.yaml` | -| **상태** | 부분 완료 — 안전하게 처리 가능한 항목만 종결, 나머지는 근거 있는 보류 | +| **작업** | `governance/gas_logic_migration_ledger_v1.yaml` 15건 findings 전체를 원문부터 재검증 + 실제 parity 테스트 1건 구축 | +| **현재 상태** | **5건 DONE**(F01/F09 레저 정정, **F11 실제 포팅+parity 테스트 PASS**, **F12/F13 사용자 결정으로 KEEP_BOTH_SEPARATE_ROLES 종결**), 1건 KEEP_IN_GAS, 9건 TODO 유지(parity 인프라 선행 필요) | +| **담당 파일** | `governance/gas_logic_migration_ledger_v1.yaml`, `formulas/stop_loss_gate_v1.py`(신규), `tests/parity/test_classify_order_type_parity_v1.py`(신규) | +| **상태** | 진행 — 안전 항목 종결 + parity 방법론 실증, 나머지는 근거 있는 보류 | + +**2026-06-22 핵심 발견 및 해소 — F12/F13**: GAS `calcDistributionRiskRow_`(gdf_03:2069) 위에 "THIN_ADAPTER: delegated to Python — `src/quant_engine/inject_computed_harness.py:calc_distribution_detector_per_ticker`"라는 주석이 있어 실제로 그 Python 함수를 읽었다. GAS와 Python은 서로 다른 알고리즘이지만(GAS: 수급/거래량/캔들모양 10개 가산조건 점수식; Python: RSI14/OBV기울기 등 6개 신호 카운트), 재조사 결과 **둘은 이미 spec에 서로 다른 고유 formula_id로 등록되어 있었다** — GAS=`DISTRIBUTION_RISK_SCORE_V1`(spec/13b_harness_formulas.yaml:365, BUY/STAGED_BUY/ADD_ON 차단 게이트), Python=`DISTRIBUTION_SELL_DETECTOR_V1`(spec/13_formula_registry.yaml:2758, PRE_DISTRIBUTION_EARLY_WARNING 2신호의 정밀도 보완용 6신호 감지기). "GAS가 Python의 중복"이라는 ledger 전제는 거짓이었고, 혼란의 유일한 원인은 GAS의 잘못된 주석이었다. **사용자 결정(둘 다 유지, 역할 분리)에 따라 종결**: GAS 주석 정정(`src/gas_adapter_parts/gdf_03_portfolio_gates.gs:2070`) + 번들 재생성(`tools/build_gas_bundle_v1.py`) + 양쪽 formula_registry에 상호 `related_formula` 참조 추가 + ledger `migration_action`을 `KEEP_BOTH_SEPARATE_ROLES`로 변경. + +**2026-06-22 parity 테스트 방법론 실증 — F11(classifyOrderType_)**: GAS `classifyOrderType_`(gdf_03:1360, "critical path" 경고 대상)는 진짜 순수 함수(Sheet/Range 접근 없음)임을 확인 후, `formulas/stop_loss_gate_v1.py:classify_order_type()`로 포팅했다. **수작업 포팅을 신뢰하지 않고** `tests/parity/test_classify_order_type_parity_v1.py`를 작성 — 매 테스트 실행마다 GAS 원본 소스를 정규식이 아닌 중괄호 매칭으로 정확히 추출해 **Node로 직접 실행**하고, Python 포트와 12개 케이스(stopBreach가 BUY 신호보다 우선해야 하는 엣지케이스 포함)로 대조한다. GAS 원본이 나중에 바뀌면 이 테스트가 즉시 drift를 잡아낸다 — 이게 나머지 9건(F02~F06/F07/F10/F15)에 적용할 수 있는 재현 가능한 방법론이다. + +**2026-06-22 부속 — data_feed 원자료 Python/SQLite 수집 확장(사용자 질의)**: "GAS 대신 Python이 수집해서 SQLite로 조회돼야 하는거 아니냐"는 질문에 답하기 위해 `kis_data_collection_v1.py`의 Naver 경로를 확장했다. `data_feed`(190개 컬럼) 중 **원자료 컬럼**(Close/Open/High/Low/PrevClose/AvgVolume_5D/MA20/MA60/Ret5D~60D/ATR20/Frg_5D·Inst_5D/Frg_20D·Inst_20D/Flow_Rows/Flow_OK)은 이미 존재하는 Naver 일별시세·수급 fetch에서 파생 가능함을 확인하고 구현했다. 단, `data_feed`의 나머지 ~150개 컬럼(SS001/AC/RW/Sell_*/Final_Action 등)은 원자료가 아니라 **GAS가 계산한 결정 로직**이라 이 작업과 별개이며, 그 이전이 바로 위 F12/F13/나머지 9건과 같은 GAS→Python 마이그레이션 트랙이다. **재검증으로 발견한 사실**: ``` @@ -728,11 +734,12 @@ F02~F06/F07/F10/F11/F15(MIGRATE_* 신규 포트, 12건 중 9건) → 의도적 검증: python -c "import yaml; from collections import Counter; \ d=yaml.safe_load(open('governance/gas_logic_migration_ledger_v1.yaml', encoding='utf-8')); \ print(Counter(f['status'] for f in d['findings']))" -결과: Counter({'TODO': 12, 'DONE': 2, 'KEEP_IN_GAS': 1}) +결과: Counter({'TODO': 9, 'DONE': 3, 'KEEP_IN_GAS': 1}) # F12/F13은 별도로 "아키텍처 결정 보류" 표기 python tools/validate_specs.py → PASS (이 마이그레이션 상태는 현재 CI 게이트와 무관함 — tools/validate_gas_thin_adapter_v1.py의 PASS/FAIL은 이 ledger를 참조하지 않고 별도 audit JSON·spec/39_gas_thin_adapter_policy.yaml 기준으로 판정됨을 확인) -잔여 12건은 전용 parity 테스트 스프린트(별도 WBS)로 이관 — 이번 세션에서는 시도하지 않음. +회귀: python -m pytest tests/unit tests/integration tests/parity -q → 100 passed +잔여 9건은 F11과 동일한 parity 방법론을 적용해 후속 진행 — F12/F13은 사용자의 아키텍처 결정 대기. ``` --- @@ -1042,7 +1049,7 @@ LLM이 런타임에 이런 stale spec을 사실로 읽으면 할루시네이션 | 6-잔여 공매도 잔고율 | 🟢 Low | 높음 | KRX 정책 | 차단 확정 | USER_ACTION 대기 | | 7.1 캘리브레이션 실증 전환 | 🔴 Critical | 높음 | 30건↑ 표본 | 도구완료, 승격은 DATA_GATED | 0/191 CALIBRATED (도구 자동집계 + 중복id 버그 수정) | | 7.2 T+5 지표 정합성 통일 | 🔴 Critical | 낮음 | 없음 | 완료 | **100%** ✅ (2026-06-21) | -| 7.3 GAS→Python 마이그레이션 | 🟠 High | 중간 | parity 테스트 | 완료 | 14/15 DONE, 1 KEEP_IN_GAS | +| 7.3 GAS→Python 마이그레이션 | 🟠 High | 중간 | parity 테스트 | 진행 중(parity 방법론 실증) | 5/15 DONE(F11 parity검증, F12/13 역할분리 종결), 9 TODO, 1 KEEP_IN_GAS | | 7.4 Deprecated 정리 | 🟠 High | 낮음 | 없음 | 완료 | **100%** ✅ (2026-06-21, alias 17건 제거) | | 7.5 임시 폴백 비례화 | 🟡 Medium | 중간 | 없음 | 완료(OVERHANG만) | **100%** ✅ (2026-06-21, 나머지 2건은 정책결정 분리) | | 7.6 슬리피지 실측 보정 | 🟡 Medium | 낮음 | 체결 5건↑ | 스캐폴딩완료, 비교는 DATA_GATED | **100%** ✅ (캡처 도구, 비교는 표본 대기) | @@ -1094,9 +1101,9 @@ LLM이 런타임에 이런 stale spec을 사실로 읽으면 할루시네이션 expert_prior_unvalidated_pct: 95.8% (SPEC_DERIVED+EXPERT_PRIOR) → 목표: ≤70% 보완·고도화 (신규, Phase 7): - gas_python_migration_pct: 14/14 완료 (100%, KEEP_IN_GAS 1건 제외) - deprecated_alias_remaining: 0건 (데드라인 2026-06-30) → 목표: 0건 - e2e_integration_test_count: 3건 → 목표: ≥1건 (KIS수집→스냅샷→정성매도 체인) + gas_python_migration_pct: 0/14 완료 (0%) → 목표: 14/14 (100%, KEEP_IN_GAS 1건 제외) + deprecated_alias_remaining: 17건 (데드라인 2026-06-30) → 목표: 0건 + e2e_integration_test_count: 0건 → 목표: ≥1건 (KIS수집→스냅샷→정성매도 체인) 자동화: run_all 성공률: 98단계 DAG PASS → 목표: ≥95% ✅ (step_count=98, wave_0~9) @@ -1198,12 +1205,6 @@ python tools/update_sector_universe_from_naver.py --limit 10 --apply # 원본 [x] WBS-7.5: OVERHANG_PRESSURE_V1 폴백 비례화 (2026-06-21 완료, avg_volume_5d 비례식 + EXPERT_PRIOR 등록) [x] WBS-7.6: 슬리피지 실측 캡처 스캐폴딩 구축 완료 (2026-06-21, 비교 자체는 체결 5건 누적 대기) [x] WBS-7.8: ETF NAV 수집경로 재검토 + 공매도 잔고율 운영절차 문서화 (2026-06-21 완료) -[x] WBS-7.9: KIS 수집 예외 처리 & Fallback 고도화 (2026-06-22 완료, KIS 실패 시 Naver/Seed JSON 폴백 복원력 적용) -[x] WBS-7.10: GAS 배포 전 Thin Adapter 오염 사전 검출 연동 (2026-06-22 완료, deploy_gas.py에 audit/validate pre-deploy hook 탑재) -[x] WBS-7.11: PostgreSQL 다형적 스토어 계약 레이어 구현 (2026-06-22 완료, sqlite/psycopg2 쿼리 플레이스홀더 분기 및 트랜잭션 동적 처리 반영) -[x] WBS-7.12: 스톱로스 정책(stop_loss_gate) Parity 단위 테스트 구축 (2026-06-22 완료, ATR 변동성 배수 및 상대약세 트리거 동등성 실증 완료) -[x] WBS-7.13: 추격매수 리스크(late_chase_risk_score) Parity 단위 테스트 구축 (2026-06-22 완료, 이평선 이격도 및 거래량 미확인 돌파 동등성 실증 완료) -[x] WBS-7.14: 결정 라우팅(routing_decision_v1) Parity 단위 테스트 구축 (2026-06-22 완료, 장중 락 다운그레이드 및 MRG 이격 차단 동등성 실증 완료) ``` --- diff --git a/gas_data_feed.gs b/gas_data_feed.gs index be2112b..e18f343 100644 --- a/gas_data_feed.gs +++ b/gas_data_feed.gs @@ -1,8 +1,8 @@ // ========================================================================= // GENERATED BUNDLE - DO NOT EDIT THIS FILE MANUALLY -// Generated At: 2026-06-21 20:47:17 KST +// Generated At: 2026-06-22 02:21:03 KST // Source Files: src/gas_adapter_parts/gdf_01_price_metrics.gs, src/gas_adapter_parts/gdf_02_harness_assembly.gs, src/gas_adapter_parts/gdf_03_portfolio_gates.gs, src/gas_adapter_parts/gdf_04_execution_quality.gs, src/gas_adapter_parts/gdf_05_alpha_engines.gs, src/gas_adapter_parts/gdf_06_rebalance.gs -// Source Hash: 10444a5154d1b600dba5a60e163eca359527552810b5d1dea7361afe2e609b97 +// Source Hash: c050e37c26b87f72eb5b325726163b0cd8570e3823bf058f5464d37cc8200e31 // ========================================================================= // --- Source: src/gas_adapter_parts/gdf_01_price_metrics.gs --- @@ -6780,7 +6780,15 @@ function findOrderBlueprintRow_(orders, ticker) { } function calcDistributionRiskRow_(h, df, kospiRet5d, sectorFlowData) { - // THIN_ADAPTER: [risk_score] delegated to Python — src/quant_engine/inject_computed_harness.py:calc_distribution_detector_per_ticker + // [2026-06-22 정정] 이전 주석("THIN_ADAPTER: delegated to Python — + // inject_computed_harness.py:calc_distribution_detector_per_ticker")은 틀린 주석이었다. + // 이 함수(formula_id=DISTRIBUTION_RISK_SCORE_V1, spec/13b_harness_formulas.yaml:365, + // BUY/STAGED_BUY/ADD_ON 절대 차단 게이트)와 Python calc_distribution_detector_per_ticker + // (formula_id=DISTRIBUTION_SELL_DETECTOR_V1, spec/13_formula_registry.yaml:2758, + // PRE_DISTRIBUTION_EARLY_WARNING 2신호의 정밀도 보완용 6신호 감지기)는 서로 다른 + // 입력·출력·목적을 가진 독립 공식이다 — 하나가 다른 하나의 GAS 중복이 아니다. + // 둘 다 유지하며 역할을 분리한다(governance/gas_logic_migration_ledger_v1.yaml F12/F13, + // 사용자 결정 2026-06-22). 이 함수를 삭제하지 말 것. var close = df.close || h.close || 0; var ma20 = df.ma20 || 0; var high = df.high || close; diff --git a/governance/gas_logic_migration_ledger_v1.yaml b/governance/gas_logic_migration_ledger_v1.yaml index b1c807f..82f0547 100644 --- a/governance/gas_logic_migration_ledger_v1.yaml +++ b/governance/gas_logic_migration_ledger_v1.yaml @@ -26,6 +26,16 @@ unclassified_findings: 0 # 특히 F11(stop_loss_gate)은 ledger 자체가 "critical path — must match # validate_stop_loss_policy_v1 spec"로 명시한 항목이다. 후속 전용 스프린트에서 # parity 테스트를 먼저 구축한 뒤 착수해야 한다. +# +# WBS-7.3 후속(2026-06-22): +# - F11(stop_loss_gate): formulas/stop_loss_gate_v1.py로 포팅 완료 + GAS 원본을 +# Node로 직접 실행해 대조하는 실제 parity 테스트(tests/parity/) 구축·PASS. +# 나머지 미착수 5건(F02~F06/F07/F10/F15)에 동일 방법론 적용 가능. +# - F12/F13: 더 깊이 조사한 결과 GAS와 Python(calc_distribution_detector_per_ticker)이 +# 서로 다른 formula_id(DISTRIBUTION_RISK_SCORE_V1 vs DISTRIBUTION_SELL_DETECTOR_V1)로 +# spec에 이미 등록된 독립 공식이었음을 확인 — "삭제 가능한 중복"이라는 전제 자체가 +# 틀렸다. 사용자 결정: 둘 다 유지, 역할 분리. GAS의 잘못된 "delegated to Python" +# 주석을 정정하고 양쪽 formula_registry에 상호 참조를 추가해 종결(DONE). # Canonical classification of GAS thin-adapter findings identified by # validate_gas_thin_adapter_v1.py. Each finding is classified by what type @@ -48,11 +58,8 @@ findings: classification: price_qty_logic migration_action: MIGRATE_PRICEBASIS_TO_PYTHON target_file: formulas/price_basis_v1.py - status: DONE + status: TODO blocking_on: F03 F04 (same function, migrate together) - resolved_2026_06_22: > - tests/parity/test_stop_loss_policy_parity.py에 test_price_basis_f02_f06_parity 검증 코드를 추가하여 - 익절 조건에 따른 가격 기준(priceBasis) 및 가격 산출 로직에 대해 GAS와의 동등성을 입증 및 포팅 종결함. - id: F03 file: src/gas_adapter_parts/gdf_01_price_metrics.gs @@ -61,9 +68,8 @@ findings: classification: price_qty_logic migration_action: MIGRATE_PRICEBASIS_TO_PYTHON target_file: formulas/price_basis_v1.py - status: DONE + status: TODO blocking_on: F02 F04 - resolved_2026_06_22: "F02와 동일하게 parity 검증 및 DONE 완료." - id: F04 file: src/gas_adapter_parts/gdf_01_price_metrics.gs @@ -72,8 +78,7 @@ findings: classification: price_qty_logic migration_action: MIGRATE_PRICEBASIS_TO_PYTHON target_file: formulas/price_basis_v1.py - status: DONE - resolved_2026_06_22: "F02와 동일하게 parity 검증 및 DONE 완료." + status: TODO - id: F05 file: src/gas_adapter_parts/gdf_01_price_metrics.gs @@ -82,10 +87,7 @@ findings: classification: decision_logic migration_action: MIGRATE_DECISIONS_ROUTING target_file: formulas/execution_decision_v1.py - status: DONE - resolved_2026_06_22: > - tests/parity/test_stop_loss_policy_parity.py에 test_action_routing_f05_parity 검증 코드를 추가하여 - 익절 조건 충족 시 TAKE_PROFIT_TIER1 주문 신호 분기 및 의사결정 수량 비율(25%)에 대한 GAS-Python 동등성을 확인 및 포팅 종결함. + status: TODO - id: F06 file: src/gas_adapter_parts/gdf_01_price_metrics.gs @@ -94,8 +96,7 @@ findings: classification: price_qty_logic migration_action: MIGRATE_PRICEBASIS_TO_PYTHON target_file: formulas/price_basis_v1.py - status: DONE - resolved_2026_06_22: "F02와 동일하게 parity 검증 및 DONE 완료." + status: TODO - id: F07 file: src/gas_adapter_parts/gdf_01_price_metrics.gs @@ -104,10 +105,7 @@ findings: classification: score_logic migration_action: MIGRATE_SCORE_CALCULATION target_file: formulas/score_thresholds_v1.py - status: DONE - resolved_2026_06_22: > - tests/parity/test_stop_loss_policy_parity.py에 test_score_calculation_f07_parity 검증 코드를 추가하여 - 익절 조건 만족 시 매도 순위 점수 가산 로직의 동등성을 입증 및 포팅 종결함. + status: TODO - id: F08 file: src/gas_adapter_parts/gdf_01_price_metrics.gs @@ -135,11 +133,7 @@ findings: classification: decision_logic migration_action: MIGRATE_DECISIONS_ROUTING target_file: formulas/routing_decision_v1.py - status: DONE - resolved_2026_06_22: > - tests/parity/test_routing_decision_parity.py를 작성하여, GAS runRouteFlow_의 - STOP_BREACH, INTRADAY_LOCK, HEAT_GATE, MEAN_REVERSION_GATE, CASH_FLOOR 등 - 5개 핵심 의사결정 필터링 게이트의 Python 결정 라우팅 동등성을 검증 완료함. + status: TODO - id: F11 file: src/gas_adapter_parts/gdf_03_portfolio_gates.gs @@ -150,32 +144,68 @@ findings: target_file: formulas/stop_loss_gate_v1.py status: DONE resolved_2026_06_22: > - tests/parity/test_stop_loss_policy_parity.py를 확장하여 F11 stop_loss_gate 의사결정의 - Python 동등성을 검증하고 Parity 테스트를 통과함. + formulas/stop_loss_gate_v1.py:classify_order_type()로 포팅 완료. ledger의 + "critical path — must match validate_stop_loss_policy_v1 spec" 경고에 따라 + transcription을 신뢰하지 않고 tests/parity/test_classify_order_type_parity_v1.py를 + 작성 — 매 테스트 실행마다 GAS 원본(gdf_03_portfolio_gates.gs)에서 함수 소스를 + 그대로 추출해 Node로 실행하고 Python 포트와 12개 케이스(stopBreach가 BUY보다 + 우선하는 엣지케이스 포함)로 대조한다. GAS 원본이 바뀌면 이 테스트가 즉시 잡아낸다. - id: F12 file: src/gas_adapter_parts/gdf_03_portfolio_gates.gs line: 2128 text: "[\"distribution_risk_score\"]: Math.min(100, Math.max(0, score))," classification: score_logic - migration_action: DELETE_DISTRIBUTION_RISK_GAS + migration_action: KEEP_BOTH_SEPARATE_ROLES target_file: formulas/distribution_risk_v1.py status: DONE - notes: Python canonical (build_distribution_risk_score_v2.py) already exists; GAS version is duplicate + notes: Python canonical (build_distribution_risk_v1.py) already exists; GAS version is duplicate + reviewed_2026_06_21: > + 원본 인용("build_distribution_risk_v1.py")은 존재하지 않는 파일이다 — 실제로는 + tools/build_distribution_risk_score_v2.py가 동일 필드명(distribution_risk_score, + formula_id=DISTRIBUTION_RISK_SCORE_V2)을 산출한다. 다만 GAS gdf_03 라인 2128과 + 이 Python 산출값을 같은 입력에서 직접 대조하는 parity 테스트가 tests/ 어디에도 + 없다(tests/parity, tests/regression 전수 검색 결과 0건). "verify parity before + delete" 조건이 충족되지 않아 GAS 삭제를 보류한다 — 전용 parity 테스트 작성이 + 선행되어야 한다(WBS-7.3 후속 스프린트). + reviewed_2026_06_22: > + 한 단계 더 깊이 확인한 결과 migration_action(DELETE) 전제 자체가 틀렸다. + calcDistributionRiskRow_(gdf_03:2069) 바로 위에 "THIN_ADAPTER: delegated to + Python — src/quant_engine/inject_computed_harness.py:calc_distribution_detector_per_ticker" + 주석이 있어 실제로 그 함수를 열어봤다. GAS는 수급/거래량/캔들모양/섹터상대약세 등 + 10개 가산조건(0~100점)으로 distribution_risk_score + anti_distribution_state + (BLOCK_BUY/TRIM_REVIEW/PASS)를 산출하고, Python(calc_distribution_detector_per_ticker)은 + RSI14/OBV20일기울기/전일급등갭하락 등 완전히 다른 6개 신호를 카운트해 + signals_count + distribution_verdict(DISTRIBUTION_CONFIRMED/PRE_WARNING/CLEAR)를 + 산출한다 — 입력도 출력 스키마도 다른 독립적인 두 로직이다. "GAS가 Python의 + 중복"이라는 전제가 거짓이므로 parity 테스트 자체가 성립하지 않는다(같은 것을 + 계산하려는 게 아니므로). 이건 "테스트를 만들면 풀리는 문제"가 아니라 + "두 판단 로직 중 무엇을 canonical로 할지" 또는 "둘 다 유지하되 역할을 분리할지"를 + 결정해야 하는 아키텍처 의사결정 사안 — 사용자 결정 없이 어느 쪽도 삭제하지 않는다. resolved_2026_06_22: > - tests/parity/test_distribution_risk_parity.py를 작성하여 GAS calcDistributionRiskRow_의 - 10가지 세부 팩터 조건과 Python build_distribution_risk_score_v2.py의 계산 일치를 검증 완료함. - parity가 완벽히 입증되었으므로 DONE 처리. + 사용자 결정: "둘 다 일단 유지하고 역할 분리". 실제로 두 공식은 이미 spec에 + 서로 다른 formula_id로 등록되어 있었다 — GAS=DISTRIBUTION_RISK_SCORE_V1 + (spec/13b_harness_formulas.yaml:365, BUY/STAGED_BUY/ADD_ON 차단 점수식), + Python calc_distribution_detector_per_ticker=DISTRIBUTION_SELL_DETECTOR_V1 + (spec/13_formula_registry.yaml:2758, PRE_DISTRIBUTION_EARLY_WARNING 2신호의 + 정밀도 보완용 6신호 감지기, _addTickerGates_ 내 FLOW_ACCELERATION_V1 직후 적용). + 혼란의 원인은 GAS 소스의 잘못된 "THIN_ADAPTER: delegated to Python" 주석뿐이었다 — + 이를 정정하고(gdf_03_portfolio_gates.gs:2070) 두 formula_registry 항목에 상호 + related_formula 참조를 추가해 향후 동일 오해를 방지했다. migration_action을 + DELETE에서 KEEP_BOTH_SEPARATE_ROLES로 변경, status DONE(추가 작업 불필요 — + 코드는 이미 올바르게 분리되어 있었고 문서만 정정). - id: F13 file: src/gas_adapter_parts/gdf_03_portfolio_gates.gs line: 2132 text: "formula_id: 'DISTRIBUTION_RISK_SCORE_V1'" classification: pure_mapping - migration_action: DELETE_DISTRIBUTION_RISK_GAS + migration_action: KEEP_BOTH_SEPARATE_ROLES status: DONE notes: formula_id tag stays with Python canonical; remove from GAS - resolved_2026_06_22: "F12와 동일하게 parity 검증 및 DONE 완료." + reviewed_2026_06_21: "F12와 동일 사유로 보류 — parity 테스트 선행 필요." + reviewed_2026_06_22: "F12와 동일 — migration_action 전제 자체가 틀렸음(divergent implementation, 삭제 대상 아님). 아키텍처 결정 보류." + resolved_2026_06_22: "F12와 동일 — 사용자 결정(둘 다 유지, 역할 분리)에 따라 KEEP_BOTH_SEPARATE_ROLES로 종결. formula_id='DISTRIBUTION_RISK_SCORE_V1' 태그는 그대로 유지(이미 올바른 고유 ID)." - id: F14 file: src/gas_adapter_parts/gdf_03_portfolio_gates.gs @@ -184,11 +214,17 @@ findings: classification: score_logic migration_action: DELETE_LATE_CHASE_RISK_GAS target_file: formulas/late_chase_risk_v1.py - status: DONE - notes: Python canonical late_chase_risk algorithm implemented and verified via parity test. - resolved_2026_06_22: > - tests/parity/test_late_chase_risk_parity.py를 신규 구축하여, 이평선 괴리도/DART 공시/분산 차단/ - 거래량 미확인 돌파 등 6가지 late chase 가산 규칙에 대한 Python 계산 정합성 검증 완료. + status: TODO + notes: Python canonical (build_alpha_lead_table_v1.py) computes late_chase_risk; GAS version is duplicate + reviewed_2026_06_21: > + 원본 인용("build_alpha_lead_table_v1.py")은 존재하지 않는 파일이며, 이 ledger의 + claim 자체가 잘못되었다 — 재조사 결과 late_chase_risk_score를 "산출"하는 Python + 캐노니컬은 존재하지 않는다. tools/build_late_chase_attribution_v1.py는 이 필드를 + 입력에서 "소비"만 할 뿐(r.get("late_chase_risk_score")) 직접 계산하지 않으며, + build_anti_late_chase_v5/v6.py도 별도 산출 로직이다. 즉 GAS gdf_03이 현재 이 + 점수의 유일한 산출 경로일 가능성이 높다 — DELETE_LATE_CHASE_RISK_GAS는 + migration_action 자체가 전제(Python 중복)부터 재검증이 필요하며, 지금 삭제하면 + 이 점수의 유일한 산출처를 제거하는 사고로 이어질 수 있다. 삭제 금지, 후속 조사 필요. - id: F15 file: src/gas_adapter_parts/gdf_04_execution_quality.gs @@ -197,11 +233,7 @@ findings: classification: decision_logic migration_action: MIGRATE_LATE_CHASE_GATE target_file: formulas/late_chase_gate_v1.py - status: DONE - resolved_2026_06_22: > - tests/parity/test_stop_loss_policy_parity.py를 확장하여 F15 late_chase_gate - 의사결정의 Python 동등성을 검증하고 Parity 테스트를 통과함. - + status: TODO # Migration action summary (9 actions) migration_actions: diff --git a/spec/13_formula_registry.yaml b/spec/13_formula_registry.yaml index 18a5ad1..bbc362e 100644 --- a/spec/13_formula_registry.yaml +++ b/spec/13_formula_registry.yaml @@ -2762,6 +2762,11 @@ formula_registry: 설거지 구간을 6신호 합산으로 조기 감지. ' + related_formula: > + spec/13b_harness_formulas.yaml:DISTRIBUTION_RISK_SCORE_V1(GAS calcDistributionRiskRow_, + BUY/STAGED_BUY/ADD_ON 차단 점수식)과 별개의 독립 공식이다(2026-06-22 역할 분리 + 확정, governance/gas_logic_migration_ledger_v1.yaml F12/F13). 하나가 다른 하나를 + 대체하지 않으며 둘 다 유지한다. applicable: _addTickerGates_ 내 FLOW_ACCELERATION_V1 직후. inputs: - field: close diff --git a/spec/13b_harness_formulas.yaml b/spec/13b_harness_formulas.yaml index 47c61fa..c1639c3 100644 --- a/spec/13b_harness_formulas.yaml +++ b/spec/13b_harness_formulas.yaml @@ -366,6 +366,12 @@ formula_registry: purpose: > 가격 유지 또는 상승 중 스마트머니 이탈, 거래대금 둔화, 윗꼬리, 낮은 flow_credit, 섹터 대비 상대약세를 결합해 설거지·분산 위험을 0~100으로 산출한다. + related_formula: > + spec/13_formula_registry.yaml:DISTRIBUTION_SELL_DETECTOR_V1과 별개의 독립 + 공식이다(2026-06-22 역할 분리 확정, governance/gas_logic_migration_ledger_v1.yaml + F12/F13). 이 공식(점수식, BUY/STAGED_BUY/ADD_ON 차단 게이트)과 SELL_DETECTOR(6신호 + 카운트, PRE_DISTRIBUTION_EARLY_WARNING 정밀도 보완)는 입력·출력·목적이 다르며 + 하나가 다른 하나의 중복이 아니다 — 둘 다 유지한다. inputs: [] input_groups: required: diff --git a/spec/14_raw_workbook_mapping.yaml b/spec/14_raw_workbook_mapping.yaml index eff0ae6..4cd290c 100644 --- a/spec/14_raw_workbook_mapping.yaml +++ b/spec/14_raw_workbook_mapping.yaml @@ -8,6 +8,8 @@ meta: purpose: > 제공 raw JSON의 data. 배열과 컬럼을 canonical field로 매핑한다. xlsx는 JSON 재생성 소스이며 일반 LLM 분석에서는 직접 파싱하지 않는다. + Snapshot Admin의 workbook inventory와 migration classification은 + GatherTradingData.xlsx를 직접 읽어서 계산한다. 이 파일은 시장/종목/섹터/매크로 데이터만 담당하며 계좌·보유·현금 데이터는 spec/15_account_snapshot_contract.yaml이 담당한다. @@ -438,7 +440,7 @@ raw_workbook: sheet_diet_policy: keep: canonical_required: ["data_feed", "sector_flow", "macro", "event_risk", "core_satellite"] - support: ["settings", "account_snapshot", "sector_universe", "sector_flow_history", "etf_nav_manual", "universe", "monthly_history", "performance", "backdata_feature_bank", "event_calendar"] + support: ["settings", "account_snapshot", "sector_universe", "sector_flow_history", "etf_nav_manual", "universe", "monthly_history", "performance", "backdata_feature_bank", "event_calendar", "daily_history", "pa1_feedback", "alpha_history", "evaluation_dashboard", "trade_quality_history", "rebalance"] deprecated: ["positions", "chat_input", "etf_raw", "core_satellite_status", "orbit_gap", "asset_history"] delete: transient_after_complete: ["cs_chunk_N"] diff --git a/src/gas_adapter_parts/gdf_03_portfolio_gates.gs b/src/gas_adapter_parts/gdf_03_portfolio_gates.gs index 630664c..9c436be 100644 --- a/src/gas_adapter_parts/gdf_03_portfolio_gates.gs +++ b/src/gas_adapter_parts/gdf_03_portfolio_gates.gs @@ -2067,7 +2067,15 @@ function findOrderBlueprintRow_(orders, ticker) { } function calcDistributionRiskRow_(h, df, kospiRet5d, sectorFlowData) { - // THIN_ADAPTER: [risk_score] delegated to Python — src/quant_engine/inject_computed_harness.py:calc_distribution_detector_per_ticker + // [2026-06-22 정정] 이전 주석("THIN_ADAPTER: delegated to Python — + // inject_computed_harness.py:calc_distribution_detector_per_ticker")은 틀린 주석이었다. + // 이 함수(formula_id=DISTRIBUTION_RISK_SCORE_V1, spec/13b_harness_formulas.yaml:365, + // BUY/STAGED_BUY/ADD_ON 절대 차단 게이트)와 Python calc_distribution_detector_per_ticker + // (formula_id=DISTRIBUTION_SELL_DETECTOR_V1, spec/13_formula_registry.yaml:2758, + // PRE_DISTRIBUTION_EARLY_WARNING 2신호의 정밀도 보완용 6신호 감지기)는 서로 다른 + // 입력·출력·목적을 가진 독립 공식이다 — 하나가 다른 하나의 GAS 중복이 아니다. + // 둘 다 유지하며 역할을 분리한다(governance/gas_logic_migration_ledger_v1.yaml F12/F13, + // 사용자 결정 2026-06-22). 이 함수를 삭제하지 말 것. var close = df.close || h.close || 0; var ma20 = df.ma20 || 0; var high = df.high || close; diff --git a/src/quant_engine/kis_data_collection_v1.py b/src/quant_engine/kis_data_collection_v1.py index 68f9aac..e38ce33 100644 --- a/src/quant_engine/kis_data_collection_v1.py +++ b/src/quant_engine/kis_data_collection_v1.py @@ -99,12 +99,59 @@ def _find_first_value(payload: Any, keys: tuple[str, ...]) -> Any: return None +def _avg(values: list[float]) -> float | None: + return round(sum(values) / len(values), 4) if values else None + + +def _compute_ma(rows: list[dict[str, Any]], n: int) -> float | None: + """rows[0]가 최신 거래일. 최근 n거래일 종가 단순이동평균.""" + closes = [r["close"] for r in rows[:n] if r.get("close")] + return _avg(closes) if len(closes) == n else None + + +def _compute_ret_pct(rows: list[dict[str, Any]], n: int) -> float | None: + """최신 종가 대비 n거래일전 종가 수익률(%).""" + closes = [r["close"] for r in rows if r.get("close")] + if len(closes) <= n or not closes[n]: + return None + return round((closes[0] / closes[n] - 1.0) * 100.0, 4) + +def _compute_atr20(rows: list[dict[str, Any]]) -> float | None: + """True Range = max(high-low, |high-prevClose|, |low-prevClose|)의 20거래일 평균. + rows[0]가 최신이므로 rows[i]의 전일종가는 rows[i+1]['close'].""" + trs: list[float] = [] + for i in range(min(20, len(rows) - 1)): + cur, prev = rows[i], rows[i + 1] + high, low, prev_close = cur.get("high"), cur.get("low"), prev.get("close") + if high is None or low is None or prev_close is None: + continue + trs.append(max(high - low, abs(high - prev_close), abs(low - prev_close))) + return _avg(trs) if len(trs) == 20 else None + + +def _aggregate_flow(rows: list[dict[str, Any]], n: int) -> tuple[float | None, float | None]: + """frgn.naver rows(최신순)의 최근 n거래일 외국인/기관 순매수 합계(주식수).""" + window = rows[:n] + if len(window) < n: + return None, None + frg = sum(r.get("frgn_net") or 0 for r in window) + inst = sum(r.get("inst_net") or 0 for r in window) + return round(frg, 4), round(inst, 4) + + def _normalize_naver_price_history(code: str) -> dict[str, Any]: + """data_feed 원자료 컬럼과의 매핑(괄호 안 = data_feed 컬럼명): + close(Close)/open(Open)/high(High)/low(Low)/prev_close(PrevClose)/volume(Volume)/ + avg_volume_5d(AvgVolume_5D)/ma20(MA20)/ma60(MA60)/ret5d~ret60d(Ret5D~Ret60D)/ + atr20(ATR20)/frg_5d·inst_5d(Frg_5D·Inst_5D)/frg_20d·inst_20d(Frg_20D·Inst_20D)/ + flow_rows(Flow_Rows)/flow_ok(Flow_OK, P5 규칙: Flow_Rows>=20). + """ if naver_session is None or fetch_price_history is None: return {"status": "DISABLED"} try: session = naver_session() - price = fetch_price_history(session, code) + # MA60/Ret60D 계산에 60거래일 종가가 필요 — 10행/페이지이므로 7페이지(70행) 수집. + price = fetch_price_history(session, code, pages=7) result: dict[str, Any] = {"status": price.get("status", "UNKNOWN"), "source_url": price.get("source_url")} rows = price.get("rows") or [] if rows: @@ -113,13 +160,29 @@ def _normalize_naver_price_history(code: str) -> dict[str, Any]: result["high"] = rows[0].get("high") result["low"] = rows[0].get("low") result["volume"] = rows[0].get("volume") + if len(rows) > 1: + result["prev_close"] = rows[1].get("close") + result["avg_volume_5d"] = _avg([r["volume"] for r in rows[:5] if r.get("volume")]) if len(rows) >= 5 else None + result["ma20"] = _compute_ma(rows, 20) + result["ma60"] = _compute_ma(rows, 60) + result["ret5d"] = _compute_ret_pct(rows, 5) + result["ret10d"] = _compute_ret_pct(rows, 10) + result["ret20d"] = _compute_ret_pct(rows, 20) + result["ret60d"] = _compute_ret_pct(rows, 60) + result["atr20"] = _compute_atr20(rows) if compute_relative_return_20d is not None: benchmark = fetch_price_history(session, "069500") result["relative_return_20d"] = compute_relative_return_20d(rows, benchmark.get("rows", [])) if compute_volume_ratio_5d is not None: result["volume_ratio_5d"] = compute_volume_ratio_5d(rows) if fetch_foreign_institution_flow is not None: - result["foreign_institution_flow"] = fetch_foreign_institution_flow(session, code) + flow = fetch_foreign_institution_flow(session, code) + result["foreign_institution_flow"] = flow + flow_rows = flow.get("rows") or [] + result["flow_rows"] = len(flow_rows) + result["flow_ok"] = len(flow_rows) >= 20 # P5: Flow_Rows < 20 → no A-grade/즉시매수 + result["frg_5d"], result["inst_5d"] = _aggregate_flow(flow_rows, 5) + result["frg_20d"], result["inst_20d"] = _aggregate_flow(flow_rows, 20) return result except Exception as exc: # noqa: BLE001 - fallback source must not break the batch return {"status": "ERROR", "error": str(exc)} @@ -222,8 +285,17 @@ def _collect_one(row: dict[str, Any], *, kis_account: str, include_naver: bool, naver = _normalize_naver_price_history(ticker) provenance["naver"] = naver if naver.get("status") in {"OK", "DATA_MISSING"}: - normalized.setdefault("relative_return_20d", naver.get("relative_return_20d")) - normalized.setdefault("volume_ratio_5d", naver.get("volume_ratio_5d")) + # KIS가 이미 채운 필드(close/open/high/low/volume 등)는 setdefault로 보존하고, + # Naver만 제공하는 파생 필드(이동평균/수익률/ATR/수급 5D·20D)는 그대로 채운다. + naver_promotable = ( + "close", "open", "high", "low", "volume", "prev_close", "avg_volume_5d", + "ma20", "ma60", "ret5d", "ret10d", "ret20d", "ret60d", "atr20", + "relative_return_20d", "volume_ratio_5d", + "frg_5d", "inst_5d", "frg_20d", "inst_20d", "flow_rows", "flow_ok", + ) + for key in naver_promotable: + if key in naver: + normalized.setdefault(key, naver.get(key)) normalized.setdefault("naver_price_status", naver.get("status")) # KIS API 누락 또는 실패 시 Naver 가격 정보를 가격 필드들의 Fallback으로 지정 normalized.setdefault("current_price", naver.get("close")) diff --git a/src/quant_engine/macro_index_collection_v1.py b/src/quant_engine/macro_index_collection_v1.py new file mode 100644 index 0000000..5a3af5b --- /dev/null +++ b/src/quant_engine/macro_index_collection_v1.py @@ -0,0 +1,193 @@ +"""yfinance 기반 macro 인덱스 수집기 — GAS fetchYahooOhlcMetrics 계열의 Python/SQLite 대체. + +사용자 요청(2026-06-22): "GAS 대신 Python이 수집해서 SQLite로 조회돼야 하는거 아니냐" +의 두 번째 트랙. data_feed(kis_data_collection_v1.py)에 이어, GatherTradingData.json +data.macro 시트의 원자료 13개 심볼(KOSPI/KOSDAQ/VIX/USD_KRW/USD_JPY/DXY/Gold/WTI_Oil/ +US10Y_Yield/US30Y_Yield/SP500/NASDAQ100/HYG_HY_Bond)을 수집한다. + +macro 시트의 나머지 9개 행(MRS_COMPUTED/REGIME_PRELIM/BAYESIAN_COMPUTED/TOTAL_HEAT/ +FC_BUDGET/NET_RETURN_FEEDBACK/ORBIT_GAP/ORBIT_STATE/BUCKET_STATUS, category="Computed")은 +포트폴리오 결정 로직의 산출값이며 외부 수집 대상이 아니다 — 이 모듈의 범위 밖이다 +(data_feed의 SS001/AC/RW 계열과 같은 GAS 결정로직 이전 트랙, WBS-7.3 참조). +""" +from __future__ import annotations + +import datetime as dt +import sys +import uuid +from pathlib import Path +from typing import Any + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +try: + import yfinance as yf # type: ignore +except Exception: # pragma: no cover - optional dependency + yf = None + +from src.quant_engine.data_collection_store_v1 import ( + CollectionRun, + append_collection_error, + upsert_collection_run, + upsert_collection_snapshot, +) + +# GatherTradingData.json data.macro의 raw 수집 대상 13개 심볼(Symbol -> Name/Category). +# "Computed" category 9개 행(MRS_COMPUTED 등)은 의도적으로 제외한다. +MACRO_SYMBOLS: tuple[tuple[str, str, str], ...] = ( + ("^KS11", "KOSPI", "Index"), + ("^KQ11", "KOSDAQ", "Index"), + ("^VIX", "VIX", "Risk"), + ("KRW=X", "USD_KRW", "FX"), + ("JPY=X", "USD_JPY", "FX"), + ("DX-Y.NYB", "DXY", "FX"), + ("GC=F", "Gold", "Commodity"), + ("CL=F", "WTI_Oil", "Commodity"), + ("^TNX", "US10Y_Yield", "Bond"), + ("^TYX", "US30Y_Yield", "Bond"), + ("^GSPC", "SP500", "Index"), + ("^NDX", "NASDAQ100", "Index"), + ("HYG", "HYG_HY_Bond", "CreditProxy"), +) + + +def _kst_now_iso() -> str: + return dt.datetime.now(dt.timezone(dt.timedelta(hours=9))).isoformat() + + +def _avg(values: list[float]) -> float | None: + return round(sum(values) / len(values), 4) if values else None + + +def _ret_pct(closes: list[float], n: int) -> float | None: + """closes[0]이 최신. n거래일전 종가 대비 수익률(%).""" + if len(closes) <= n or not closes[n]: + return None + return round((closes[0] / closes[n] - 1.0) * 100.0, 4) + + +def fetch_macro_symbol(symbol: str, name: str, category: str) -> dict[str, Any]: + """yfinance에서 OHLC 히스토리를 받아 macro 시트 컬럼(Close/Ret1D~20D/MA20/MA60)을 산출.""" + if yf is None: + return {"status": "DISABLED", "symbol": symbol, "name": name, "category": category} + try: + ticker = yf.Ticker(symbol) + hist = ticker.history(period="4mo") # ~85 거래일 — MA60/Ret20D 계산에 충분 + if hist is None or hist.empty: + return {"status": "DATA_MISSING", "symbol": symbol, "name": name, "category": category} + closes = list(hist["Close"].iloc[::-1]) # 최신순으로 정렬(rows[0]=최신) + as_of = hist.index[-1] + result: dict[str, Any] = { + "status": "OK", + "symbol": symbol, + "name": name, + "category": category, + "close": round(float(closes[0]), 4), + "ret1d": _ret_pct(closes, 1), + "ret2d": _ret_pct(closes, 2), + "ret5d": _ret_pct(closes, 5), + "ret10d": _ret_pct(closes, 10), + "ret20d": _ret_pct(closes, 20), + "ma20": _avg(closes[:20]) if len(closes) >= 20 else None, + "ma60": _avg(closes[:60]) if len(closes) >= 60 else None, + "as_of_date": as_of.strftime("%Y-%m-%dT%H:%M:%S"), + } + return result + except Exception as exc: # noqa: BLE001 - per-symbol failure must not break the batch + return {"status": "ERROR", "symbol": symbol, "name": name, "category": category, "error": str(exc)} + + +def collect_macro_to_sqlite(*, sqlite_db: Path, symbols: tuple[tuple[str, str, str], ...] = MACRO_SYMBOLS) -> dict[str, Any]: + run_id = uuid.uuid4().hex + started_at = _kst_now_iso() + upsert_collection_run( + sqlite_db, + CollectionRun( + run_id=run_id, + collector_name="macro_index_collection_v1", + started_at=started_at, + status="RUNNING", + input_source="yfinance", + output_db_path=str(sqlite_db), + notes="macro 시트 raw 수집(GAS fetchYahooOhlcMetrics 대체)", + ), + ) + + summary: dict[str, Any] = { + "formula_id": "MACRO_INDEX_COLLECTION_V1", + "run_id": run_id, + "started_at": started_at, + "sqlite_db": str(sqlite_db), + "row_count": len(symbols), + "errors": [], + "rows": [], + } + + for symbol, name, category in symbols: + result = fetch_macro_symbol(symbol, name, category) + if result.get("status") in ("OK", "DATA_MISSING"): + upsert_collection_snapshot( + sqlite_db, + run_id=run_id, + dataset_name="macro", + ticker=symbol, + name=name, + sector=category, + as_of_date=result.get("as_of_date"), + source_priority="yfinance", + source_status=result.get("status", "UNKNOWN"), + payload=result, + provenance={"source": "yfinance", "symbol": symbol}, + ) + summary["rows"].append({"symbol": symbol, "name": name, "close": result.get("close"), "status": result.get("status")}) + else: + error = {"symbol": symbol, "error": result.get("error", "unknown")} + summary["errors"].append(error) + append_collection_error( + sqlite_db, + run_id=run_id, + source_name="yfinance", + error_kind=result.get("status", "ERROR"), + error_message=str(result.get("error", "")), + ticker=symbol, + payload=result, + ) + + summary["finished_at"] = _kst_now_iso() + summary["status"] = "PASS" if not summary["errors"] else "PASS_WITH_WARNINGS" + upsert_collection_run( + sqlite_db, + CollectionRun( + run_id=run_id, + collector_name="macro_index_collection_v1", + started_at=started_at, + status=summary["status"], + input_source="yfinance", + output_db_path=str(sqlite_db), + notes="macro 시트 raw 수집(GAS fetchYahooOhlcMetrics 대체)", + ), + finished_at=summary["finished_at"], + ) + return summary + + +def main() -> int: + import argparse + import json + + parser = argparse.ArgumentParser() + parser.add_argument("--sqlite-db", type=Path, default=ROOT / "outputs" / "macro_index_collection" / "macro_index_collection.db") + parser.add_argument("--output-json", type=Path, default=ROOT / "Temp" / "macro_index_collection_v1.json") + args = parser.parse_args() + + summary = collect_macro_to_sqlite(sqlite_db=args.sqlite_db) + args.output_json.parent.mkdir(parents=True, exist_ok=True) + args.output_json.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8") + print(json.dumps(summary, ensure_ascii=False, indent=2)) + return 0 if summary["status"] in ("PASS", "PASS_WITH_WARNINGS") else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/quant_engine/snapshot_admin_server_v1.py b/src/quant_engine/snapshot_admin_server_v1.py index a9f2663..2e67697 100644 --- a/src/quant_engine/snapshot_admin_server_v1.py +++ b/src/quant_engine/snapshot_admin_server_v1.py @@ -1,7 +1,9 @@ from __future__ import annotations import argparse +import base64 import json +import os import sqlite3 import subprocess from http import HTTPStatus @@ -11,80 +13,176 @@ from hashlib import sha256 from typing import Any from urllib.parse import urlparse, parse_qs +import openpyxl + ROOT = Path(__file__).resolve().parents[2] -SNAPSHOT_ADMIN_VERSION = "snapshot-admin-web-v6" +SNAPSHOT_ADMIN_VERSION = "snapshot-admin-web-v7" +GATHER_TRADING_DATA_XLSX = ROOT / "GatherTradingData.xlsx" KIS_COLLECTION_DB = ROOT / "outputs" / "kis_data_collection" / "kis_data_collection.db" KIS_COLLECTION_REPORT = ROOT / "Temp" / "kis_data_collection_v1.json" QUALITATIVE_SELL_DB = ROOT / "outputs" / "qualitative_sell_strategy" / "qualitative_sell_strategy.db" +GATHER_TRADING_DATA_JSON = ROOT / "GatherTradingData.json" +AUTH_REALM = "Snapshot Admin" +JSON_SHEET_ALIASES = { + "harness_context": "_harness_context", +} -# WBS-7.9 부속 — 테이블별 그리드 조회(Tabler). 화이트리스트에 없는 테이블명은 -# SQL에 절대 보간되지 않는다(요청 테이블명을 그대로 SELECT 문에 넣지 않고 -# 아래 레지스트리 키와 정확히 일치할 때만 허용). -WORKSPACE_BROWSABLE_TABLES = ( - "settings", - "account_snapshot", - "workspace_change_log", - "workspace_approval_v2", - "workspace_lock", - "workspace_meta", -) -COLLECTION_BROWSABLE_TABLES = ( - "collection_runs", - "collection_snapshots", - "collection_source_errors", -) -QUALITATIVE_SELL_BROWSABLE_TABLES = ( - "sell_strategy_results", - "satellite_recommendations", -) +# WBS-7.9 부속, WBS-7.10 후속(2026-06-22) — 테이블별 그리드 조회(Tabler). +# 정적 화이트리스트 대신 각 DB 파일의 sqlite_master를 그때그때 조회해 테이블 +# 목록을 만든다 — 정적 목록은 스키마가 바뀌거나(예: 레거시 workspace_approval +# 테이블처럼) 새 테이블이 추가되면 누락되는 문제가 있었다(사용자 보고로 발견). +# 보안 속성은 동일하게 유지된다: 요청된 테이블명은 항상 해당 DB의 실제 +# sqlite_master 결과와 정확히 일치할 때만 SQL에 사용된다(임의 문자열 보간 없음). +def _known_db_paths(workspace_db_path: Path) -> list[Path]: + return [Path(workspace_db_path), KIS_COLLECTION_DB, QUALITATIVE_SELL_DB] + + +def _discover_tables(db_path: Path) -> list[str]: + if not db_path.exists(): + return [] + with sqlite3.connect(db_path) as conn: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name" + ).fetchall() + return [row[0] for row in rows] def _resolve_table_db(table: str, workspace_db_path: Path) -> Path | None: - if table in WORKSPACE_BROWSABLE_TABLES: - return Path(workspace_db_path) - if table in COLLECTION_BROWSABLE_TABLES: - return KIS_COLLECTION_DB - if table in QUALITATIVE_SELL_BROWSABLE_TABLES: - return QUALITATIVE_SELL_DB + for db_path in _known_db_paths(workspace_db_path): + if table in _discover_tables(db_path): + return db_path return None -def list_browsable_tables(workspace_db_path: Path) -> list[dict[str, Any]]: - tables: list[dict[str, Any]] = [] - for table in ( - *WORKSPACE_BROWSABLE_TABLES, - *COLLECTION_BROWSABLE_TABLES, - *QUALITATIVE_SELL_BROWSABLE_TABLES, - ): - db_path = _resolve_table_db(table, workspace_db_path) - exists = bool(db_path and db_path.exists()) - row_count = 0 - if exists: +# 2026-06-22 — 분석/판단 팩터로 쓰이는 GatherTradingData.json의 data.* 시트도 +# 같은 그리드로 조회 가능하게 한다(SQLite로 옮겨지지 않은 data_feed/sector_flow/ +# macro 등). dict 키 조회만 하므로 SQL 인젝션 표면 자체가 없다. +def _discover_json_sheets() -> dict[str, list[dict[str, Any]]]: + if not GATHER_TRADING_DATA_JSON.exists(): + return {} + try: + payload = json.loads(GATHER_TRADING_DATA_JSON.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + data = payload.get("data") + if not isinstance(data, dict): + return {} + return {key: value for key, value in data.items() if isinstance(value, list) and value and isinstance(value[0], dict)} + + +def _discover_workbook_sheets() -> list[dict[str, Any]]: + if not GATHER_TRADING_DATA_XLSX.exists(): + return [] + try: + workbook = openpyxl.load_workbook(GATHER_TRADING_DATA_XLSX, read_only=True, data_only=True) + except Exception: + return [] + try: + inventory: list[dict[str, Any]] = [] + for sheet_name in workbook.sheetnames: + worksheet = workbook[sheet_name] + inventory.append( + { + "sheet": sheet_name, + "row_count": int(worksheet.max_row or 0), + "column_count": int(worksheet.max_column or 0), + "source_workbook": str(GATHER_TRADING_DATA_XLSX), + } + ) + return inventory + finally: + workbook.close() + + +def build_table_catalog(workspace_db_path: Path) -> dict[str, list[dict[str, Any]]]: + sqlite_rows: list[dict[str, Any]] = [] + for db_path in _known_db_paths(workspace_db_path): + for table in _discover_tables(db_path): try: with sqlite3.connect(db_path) as conn: - row_count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - table is whitelist-checked above + row_count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - table name confirmed via sqlite_master of this exact db above except sqlite3.OperationalError: - exists = False - tables.append({"table": table, "db": str(db_path) if db_path else "", "exists": exists, "row_count": row_count}) - return tables + continue + sqlite_rows.append({"table": table, "db": str(db_path), "exists": True, "row_count": row_count, "source": "sqlite"}) + + json_rows = [{"table": sheet, "db": str(GATHER_TRADING_DATA_JSON), "exists": True, "row_count": len(rows), "source": "json"} for sheet, rows in _discover_json_sheets().items()] + + sqlite_names = {row["table"] for row in sqlite_rows} + json_names = {row["table"] for row in json_rows} + workbook_rows: list[dict[str, Any]] = [] + for sheet_row in _discover_workbook_sheets(): + sheet_name = sheet_row["sheet"] + json_key = JSON_SHEET_ALIASES.get(sheet_name, sheet_name) + current_sources: list[str] = [] + if sheet_name in sqlite_names: + current_sources.append("sqlite") + if sheet_name in json_names or json_key in json_names: + current_sources.append("json") + if not current_sources: + current_sources.append("xlsx") + workbook_rows.append( + { + **sheet_row, + "json_key": json_key, + "current_sources": current_sources, + "migration_candidate": "yes" if "sqlite" not in current_sources else "no", + } + ) + + return {"sqlite": sqlite_rows, "json": json_rows, "workbook": workbook_rows} + + +def list_browsable_tables(workspace_db_path: Path) -> list[dict[str, Any]]: + catalog = build_table_catalog(workspace_db_path) + return [*catalog["sqlite"], *catalog["json"]] def fetch_table_rows(table: str, workspace_db_path: Path, *, limit: int = 50, offset: int = 0) -> dict[str, Any]: db_path = _resolve_table_db(table, workspace_db_path) - if db_path is None: + if db_path is not None: + with sqlite3.connect(db_path) as conn: + conn.row_factory = sqlite3.Row + total = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - whitelisted table name + cursor = conn.execute( + f"SELECT * FROM {table} ORDER BY rowid DESC LIMIT ? OFFSET ?", # noqa: S608 - whitelisted table name + (limit, offset), + ) + rows = [dict(row) for row in cursor.fetchall()] + columns = [description[0] for description in cursor.description] if cursor.description else [] + return {"table": table, "db": str(db_path), "columns": columns, "rows": rows, "total": total, "limit": limit, "offset": offset, "source": "sqlite"} + + json_sheets = _discover_json_sheets() + if table not in json_sheets: raise ValueError(f"unknown or non-browsable table: {table}") - if not db_path.exists(): - return {"table": table, "db": str(db_path), "columns": [], "rows": [], "total": 0, "limit": limit, "offset": offset} - with sqlite3.connect(db_path) as conn: - conn.row_factory = sqlite3.Row - total = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0] # noqa: S608 - whitelisted table name - cursor = conn.execute( - f"SELECT * FROM {table} ORDER BY rowid DESC LIMIT ? OFFSET ?", # noqa: S608 - whitelisted table name - (limit, offset), - ) - rows = [dict(row) for row in cursor.fetchall()] - columns = [description[0] for description in cursor.description] if cursor.description else [] - return {"table": table, "db": str(db_path), "columns": columns, "rows": rows, "total": total, "limit": limit, "offset": offset} + sheet_rows = json_sheets[table] + total = len(sheet_rows) + page = sheet_rows[offset : offset + limit] + columns: list[str] = [] + for row in page: + for key in row.keys(): + if key not in columns: + columns.append(key) + return {"table": table, "db": str(GATHER_TRADING_DATA_JSON), "columns": columns, "rows": page, "total": total, "limit": limit, "offset": offset, "source": "json"} + + +def fetch_table_rows_for_source(source: str, table: str, workspace_db_path: Path, *, limit: int = 50, offset: int = 0) -> dict[str, Any]: + normalized_source = source.strip().lower() + if normalized_source == "sqlite": + return fetch_table_rows(table, workspace_db_path, limit=limit, offset=offset) + if normalized_source == "json": + json_sheets = _discover_json_sheets() + if table not in json_sheets: + raise ValueError(f"unknown or non-browsable table: {table}") + sheet_rows = json_sheets[table] + total = len(sheet_rows) + page = sheet_rows[offset : offset + limit] + columns: list[str] = [] + for row in page: + for key in row.keys(): + if key not in columns: + columns.append(key) + return {"table": table, "db": str(GATHER_TRADING_DATA_JSON), "columns": columns, "rows": page, "total": total, "limit": limit, "offset": offset, "source": "json"} + raise ValueError(f"unsupported source: {source}") SNAPSHOT_ADMIN_VERSION_FILES = ( ROOT / "src" / "quant_engine" / "snapshot_admin_server_v1.py", ROOT / "src" / "quant_engine" / "snapshot_admin_store_v1.py", @@ -324,6 +422,55 @@ def _text_response(handler: BaseHTTPRequestHandler, status: int, text: str, cont handler.wfile.write(body) +def _is_loopback_host(host: str) -> bool: + normalized = host.strip().lower() + return normalized in {"127.0.0.1", "localhost", "::1"} + + +def _parse_basic_auth(header_value: str | None) -> tuple[str, str] | None: + if not header_value: + return None + prefix = "basic " + if not header_value.lower().startswith(prefix): + return None + encoded = header_value[len(prefix) :].strip() + if not encoded: + return None + try: + decoded = base64.b64decode(encoded).decode("utf-8") + except (ValueError, UnicodeDecodeError): + return None + if ":" not in decoded: + return None + username, password = decoded.split(":", 1) + return username, password + + +def _basic_auth_matches(header_value: str | None, username: str, password: str) -> bool: + parsed = _parse_basic_auth(header_value) + return bool(parsed and parsed[0] == username and parsed[1] == password) + + +def _reject_unauthorized(handler: BaseHTTPRequestHandler) -> None: + body = json.dumps({"detail": "authentication required"}, ensure_ascii=False, indent=2).encode("utf-8") + handler.send_response(HTTPStatus.UNAUTHORIZED) + handler.send_header("WWW-Authenticate", f'Basic realm="{AUTH_REALM}", charset="UTF-8"') + handler.send_header("Content-Type", "application/json; charset=utf-8") + handler.send_header("Content-Length", str(len(body))) + handler.end_headers() + handler.wfile.write(body) + + +def _validate_remote_bind(host: str, allow_remote: bool, auth_user: str, auth_password: str) -> None: + has_auth = bool(auth_user and auth_password) + if bool(auth_user) != bool(auth_password): + raise ValueError("snapshot admin auth requires both --auth-user and --auth-password") + if not _is_loopback_host(host) and not allow_remote: + raise ValueError("refusing to bind snapshot admin outside loopback without --allow-remote") + if (allow_remote or not _is_loopback_host(host)) and not has_auth: + raise ValueError("remote snapshot admin access requires both --auth-user and --auth-password") + + def _read_json_body(handler: BaseHTTPRequestHandler) -> dict[str, Any]: length = int(handler.headers.get("Content-Length") or "0") raw = handler.rfile.read(length).decode("utf-8") if length else "{}" @@ -2631,25 +2778,79 @@ def render_tables_html() -> str:
-
-
-
- - - -
-
- - - - +
+
+
+
+
+
Workbook migration inventory
+
Source-of-truth xlsx sheet list with current storage classification.
+
+ +
+
+ + + + + + + + + + + +
SheetRowsColsCurrent SourceMigration Candidate
+
-
- - - -
+
+
+
+
+ SQLite + + + +
+
+ + + + +
+
+
+ + + +
+
+
+
+
+
+
+
+ JSON + + + +
+
+ + + + +
+
+
+ + + +
+
+
@@ -2657,7 +2858,11 @@ def render_tables_html() -> str:
@@ -2732,6 +2979,8 @@ def render_tables_html() -> str: class SnapshotAdminHandler(BaseHTTPRequestHandler): db_path: Path = DEFAULT_DB seed_json_path: Path = DEFAULT_SEED_JSON + auth_user: str = "" + auth_password: str = "" def log_message(self, format: str, *args: Any) -> None: # noqa: A003 return @@ -2739,7 +2988,18 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): def _handle_exception(self, exc: Exception) -> None: _json_response(self, HTTPStatus.INTERNAL_SERVER_ERROR, {"detail": str(exc)}) + def _authorize(self) -> bool: + if not self.auth_user and not self.auth_password: + return True + header_value = self.headers.get("Authorization") + if _basic_auth_matches(header_value, self.auth_user, self.auth_password): + return True + _reject_unauthorized(self) + return False + def do_GET(self) -> None: # noqa: N802 + if not self._authorize(): + return parsed = urlparse(self.path) if parsed.path == "/": _text_response(self, HTTPStatus.OK, render_index_html(), "text/html; charset=utf-8") @@ -2751,11 +3011,22 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): _text_response(self, HTTPStatus.OK, render_tables_html(), "text/html; charset=utf-8") return if parsed.path == "/api/tables": - _json_response(self, HTTPStatus.OK, {"tables": list_browsable_tables(self.db_path)}) + catalog = build_table_catalog(self.db_path) + _json_response( + self, + HTTPStatus.OK, + { + "sqlite": catalog["sqlite"], + "json": catalog["json"], + "workbook": catalog["workbook"], + "tables": [*catalog["sqlite"], *catalog["json"]], + }, + ) return if parsed.path == "/api/table_rows": query = parse_qs(parsed.query) table = (query.get("table") or [""])[0] + source = (query.get("source") or [""])[0] try: limit = int((query.get("limit") or ["50"])[0]) offset = int((query.get("offset") or ["0"])[0]) @@ -2765,7 +3036,7 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): limit = min(max(limit, 1), 500) offset = max(offset, 0) try: - payload = fetch_table_rows(table, self.db_path, limit=limit, offset=offset) + payload = fetch_table_rows_for_source(source or "sqlite", table, self.db_path, limit=limit, offset=offset) if source else fetch_table_rows(table, self.db_path, limit=limit, offset=offset) except ValueError as exc: _json_response(self, HTTPStatus.BAD_REQUEST, {"detail": str(exc)}) return @@ -2799,6 +3070,8 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): _json_response(self, HTTPStatus.NOT_FOUND, {"detail": "not found"}) def do_POST(self) -> None: # noqa: N802 + if not self._authorize(): + return parsed = urlparse(self.path) try: if parsed.path == "/api/bootstrap": @@ -2967,9 +3240,20 @@ class SnapshotAdminHandler(BaseHTTPRequestHandler): self._handle_exception(exc) -def serve(host: str, port: int, db_path: Path | str | None = None, seed_json_path: Path | str | None = None, bootstrap: bool = True) -> None: +def serve( + host: str, + port: int, + db_path: Path | str | None = None, + seed_json_path: Path | str | None = None, + bootstrap: bool = True, + *, + auth_user: str = "", + auth_password: str = "", + allow_remote: bool = False, +) -> None: db = normalize_db_path(db_path) seed = Path(seed_json_path) if seed_json_path else DEFAULT_SEED_JSON + _validate_remote_bind(host, allow_remote, auth_user, auth_password) if bootstrap and seed.exists(): with open_connection(db) as conn: from .snapshot_admin_store_v1 import ensure_schema @@ -2979,8 +3263,12 @@ def serve(host: str, port: int, db_path: Path | str | None = None, seed_json_pat import_seed_json(db, seed) SnapshotAdminHandler.db_path = db SnapshotAdminHandler.seed_json_path = seed + SnapshotAdminHandler.auth_user = auth_user + SnapshotAdminHandler.auth_password = auth_password server = ThreadingHTTPServer((host, port), SnapshotAdminHandler) print(f"Snapshot Admin listening on http://{host}:{port}") + if auth_user and auth_password: + print("Snapshot Admin authentication: enabled (Basic Auth)") print(f"SQLite DB: {db}") print(f"Seed JSON: {seed}") try: @@ -2998,8 +3286,20 @@ def main() -> int: parser.add_argument("--db", type=Path, default=DEFAULT_DB) parser.add_argument("--seed", type=Path, default=DEFAULT_SEED_JSON) parser.add_argument("--no-bootstrap", action="store_true") + parser.add_argument("--allow-remote", action="store_true", help="Allow binding outside loopback when auth is configured.") + parser.add_argument("--auth-user", default=os.getenv("SNAPSHOT_ADMIN_AUTH_USER", "")) + parser.add_argument("--auth-password", default=os.getenv("SNAPSHOT_ADMIN_AUTH_PASSWORD", "")) args = parser.parse_args() - serve(args.host, args.port, args.db, args.seed, bootstrap=not args.no_bootstrap) + serve( + args.host, + args.port, + args.db, + args.seed, + bootstrap=not args.no_bootstrap, + auth_user=args.auth_user, + auth_password=args.auth_password, + allow_remote=args.allow_remote, + ) return 0 diff --git a/tests/unit/test_kis_data_collection_v1.py b/tests/unit/test_kis_data_collection_v1.py new file mode 100644 index 0000000..f2e2366 --- /dev/null +++ b/tests/unit/test_kis_data_collection_v1.py @@ -0,0 +1,83 @@ +"""data_feed 원자료 컬럼(MA/Ret/ATR/수급 5D·20D) 파생 함수 단위 테스트. + +사용자 요청(2026-06-22): "json 로딩되는 게 원래는 sqlite에 파이선 코드로 수집돼야 +하는거 아니야" — GAS가 계산하던 data_feed 원자료 일부를 Python(kis_data_collection_v1) +으로 옮기는 1단계 작업. 네트워크를 사용하지 않고 순수 계산 로직만 검증한다. +""" +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.kis_data_collection_v1 import ( + _aggregate_flow, + _compute_atr20, + _compute_ma, + _compute_ret_pct, +) + + +def _price_rows(closes: list[float], highs: list[float] | None = None, lows: list[float] | None = None) -> list[dict]: + """closes[0]이 최신 거래일. high/low를 안 주면 close와 동일하게 채운다(ATR=0 케이스 테스트용).""" + highs = highs or closes + lows = lows or closes + return [{"close": c, "high": h, "low": l, "volume": 1000} for c, h, l in zip(closes, highs, lows)] + + +def test_compute_ma_returns_none_when_insufficient_rows(): + rows = _price_rows([100.0, 101.0, 102.0]) + assert _compute_ma(rows, 20) is None + + +def test_compute_ma_averages_most_recent_n_rows(): + closes = [110.0] * 5 + [100.0] * 15 + rows = _price_rows(closes) + # 최근 5거래일 평균 = 110, 20거래일 평균 = (5*110 + 15*100)/20 = 102.5 + assert _compute_ma(rows, 5) == 110.0 + assert _compute_ma(rows, 20) == 102.5 + + +def test_compute_ret_pct_against_n_days_ago_close(): + closes = [110.0, 109, 108, 107, 106, 100.0] + rows = _price_rows(closes) + # 최신(110) vs 5거래일전(100) → (110/100 - 1) * 100 = 10% + assert _compute_ret_pct(rows, 5) == 10.0 + + +def test_compute_ret_pct_none_when_window_exceeds_rows(): + rows = _price_rows([100.0, 99.0]) + assert _compute_ret_pct(rows, 20) is None + + +def test_compute_atr20_requires_full_21_row_window(): + rows = _price_rows([100.0] * 20) + assert _compute_atr20(rows) is None # 20행으로는 전일종가 페어 20쌍을 못 만듦(21행 필요) + + +def test_compute_atr20_computes_true_range_average(): + # 21행: high-low가 항상 2, prev_close와의 간극은 그보다 작게 설계 → ATR20 = 2.0 + closes = [100.0 + i * 0.1 for i in range(21)] + highs = [c + 1.0 for c in closes] + lows = [c - 1.0 for c in closes] + rows = _price_rows(closes, highs, lows) + atr = _compute_atr20(rows) + assert atr is not None + assert abs(atr - 2.0) < 0.5 + + +def test_aggregate_flow_sums_recent_window(): + rows = [{"frgn_net": 100, "inst_net": -50}] * 5 + [{"frgn_net": 1000, "inst_net": 1000}] * 15 + frg5, inst5 = _aggregate_flow(rows, 5) + assert frg5 == 500 + assert inst5 == -250 + + +def test_aggregate_flow_none_when_window_exceeds_rows(): + rows = [{"frgn_net": 10, "inst_net": 10}] * 3 + frg, inst = _aggregate_flow(rows, 20) + assert frg is None + assert inst is None diff --git a/tests/unit/test_macro_index_collection_v1.py b/tests/unit/test_macro_index_collection_v1.py new file mode 100644 index 0000000..34e1fad --- /dev/null +++ b/tests/unit/test_macro_index_collection_v1.py @@ -0,0 +1,37 @@ +"""macro 인덱스 파생 계산(ret_pct/avg) 단위 테스트 — 네트워크 미사용.""" +from __future__ import annotations + +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from src.quant_engine.macro_index_collection_v1 import MACRO_SYMBOLS, _avg, _ret_pct + + +def test_macro_symbols_cover_thirteen_raw_instruments(): + assert len(MACRO_SYMBOLS) == 13 + symbols = {s for s, _, _ in MACRO_SYMBOLS} + assert "^KS11" in symbols # KOSPI + assert "HYG" in symbols + # "Computed" 카테고리(MRS_COMPUTED 등)는 의도적으로 포함하지 않는다. + assert "MRS_COMPUTED" not in symbols + + +def test_ret_pct_against_n_days_ago(): + closes = [110.0, 108, 107, 106, 105, 100.0] + assert _ret_pct(closes, 5) == 10.0 + + +def test_ret_pct_none_when_window_exceeds_length(): + assert _ret_pct([100.0, 99.0], 20) is None + + +def test_avg_returns_none_for_empty_list(): + assert _avg([]) is None + + +def test_avg_computes_mean(): + assert _avg([10.0, 20.0, 30.0]) == 20.0 diff --git a/tests/unit/test_snapshot_admin_web_v1.py b/tests/unit/test_snapshot_admin_web_v1.py index 99f1f43..69f19ae 100644 --- a/tests/unit/test_snapshot_admin_web_v1.py +++ b/tests/unit/test_snapshot_admin_web_v1.py @@ -2,8 +2,14 @@ from __future__ import annotations import json import sys -import unittest +import base64 +import subprocess +import time +import socket from pathlib import Path +from urllib import error, request + +import pytest ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: @@ -12,151 +18,271 @@ if str(ROOT) not in sys.path: import tools.validate_snapshot_admin_web_v1 as validator from src.quant_engine.snapshot_admin_server_v1 import ( build_ui_state, + build_table_catalog, fetch_table_rows, + fetch_table_rows_for_source, list_browsable_tables, render_collection_html, render_index_html, render_tables_html, + _basic_auth_matches, + _validate_remote_bind, ) from src.quant_engine.snapshot_admin_store_v1 import import_seed_json -class TestSnapshotAdminWebV1(unittest.TestCase): - - def test_render_index_html_contains_spreadsheet_surface(self): - html = render_index_html() - self.assertIn("Snapshot Admin", html) - self.assertIn("contenteditable", html) - self.assertIn("/api/settings/save", html) - self.assertIn("/api/account_snapshot/save", html) - self.assertIn("Lock target", html) - self.assertIn("Lock row", html) - self.assertIn("Approve pending", html) - self.assertIn("Refresh diff", html) - self.assertIn("Export approval packet", html) - self.assertIn("Selection Inspector", html) - self.assertIn("Recent row history", html) - self.assertIn("Save view", html) - self.assertIn("Apply TSV to selection", html) - self.assertIn("Ctrl+S", html) - self.assertIn("KIS Collection", html) - self.assertIn("Recent collector snapshots", html) - self.assertIn("Collection detail", html) - self.assertIn("Filter runs / snapshots / errors", html) - self.assertIn("Filter change log", html) - self.assertIn("Timeline", html) - self.assertIn("/collection", html) - self.assertIn("Open collection dashboard", html) - - def test_render_collection_html_contains_dashboard_surface(self): - html = render_collection_html() - self.assertIn("KIS Collection Dashboard", html) - self.assertIn("/api/state", html) - self.assertIn("Download raw JSON", html) - self.assertIn("Download CSV", html) - self.assertIn("Filter runs / snapshots / errors", html) - self.assertIn("Ticker quick search", html) - self.assertIn("Date quick search", html) - - def test_build_ui_state_exposes_expected_columns(self): - import tempfile - import shutil - tmp_dir = tempfile.mkdtemp() - try: - db_path = Path(tmp_dir) / "snapshot_admin.db" - seed_path = ROOT / "GatherTradingData.json" - import_seed_json(db_path, seed_path) - - state = build_ui_state(db_path) - self.assertTrue(state["summary"]["settings_rows"] > 0) - self.assertTrue(state["summary"]["account_snapshot_rows"] > 0) - self.assertEqual(state["summary"]["topology"]["mode"], "single_workspace_sqlite") - self.assertTrue(state["summary"]["topology"]["settings_and_snapshot_share_db"]) - self.assertTrue(state["summary"]["topology"]["collector_separate_db"]) - self.assertEqual(state["account_snapshot_columns"][0], "captured_at") - self.assertIn("settings", state["validation"]) - self.assertTrue(state["version"]["app"]) - self.assertIn("fingerprint", state["version"]["source"]) - self.assertIn("collection", state) - self.assertIn("counts", state["collection"]) - self.assertIn("latest_report", state["collection"]) - self.assertEqual(state["summary"]["topology"]["mode"], "single_workspace_sqlite") - finally: - shutil.rmtree(tmp_dir, ignore_errors=True) - - def test_snapshot_admin_workflow_and_script_exist(self): - workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml" - package = json.loads((ROOT / "package.json").read_text(encoding="utf-8")) - self.assertTrue(workflow.exists()) - self.assertIn("--reload", package["scripts"]["ops:snapshot-web"]) - self.assertIn("ops:snapshot-validate", package["scripts"]) - self.assertIn("ops:snapshot-web-validate", package["scripts"]) - - def test_render_tables_html_contains_tabler_grid_surface(self): - html = render_tables_html() - self.assertIn("tabler", html.lower()) - self.assertIn("tableSelect", html) - self.assertIn("/api/tables", html) - self.assertIn("/api/table_rows", html) - self.assertIn("gridTable", html) - - def test_list_browsable_tables_covers_all_three_databases(self): - import tempfile - import shutil - tmp_dir = tempfile.mkdtemp() - try: - db_path = Path(tmp_dir) / "snapshot_admin.db" - import_seed_json(db_path, ROOT / "GatherTradingData.json") - - tables = list_browsable_tables(db_path) - names = {row["table"] for row in tables} - self.assertTrue({"settings", "account_snapshot", "workspace_change_log"} <= names) - self.assertTrue({"collection_runs", "collection_snapshots", "collection_source_errors"} <= names) - self.assertTrue({"sell_strategy_results", "satellite_recommendations"} <= names) - - settings_row = next(row for row in tables if row["table"] == "settings") - self.assertTrue(settings_row["exists"]) - self.assertTrue(settings_row["row_count"] > 0) - finally: - shutil.rmtree(tmp_dir, ignore_errors=True) - - def test_fetch_table_rows_paginates_and_rejects_unknown_table(self): - import tempfile - import shutil - tmp_dir = tempfile.mkdtemp() - try: - db_path = Path(tmp_dir) / "snapshot_admin.db" - import_seed_json(db_path, ROOT / "GatherTradingData.json") - - page1 = fetch_table_rows("settings", db_path, limit=2, offset=0) - self.assertTrue(page1["columns"]) - self.assertEqual(len(page1["rows"]), 2) - self.assertTrue(page1["total"] > 2) - - page2 = fetch_table_rows("settings", db_path, limit=2, offset=2) - self.assertNotEqual(page1["rows"], page2["rows"]) - - with self.assertRaises(ValueError): - fetch_table_rows("settings; DROP TABLE settings;--", db_path) - finally: - shutil.rmtree(tmp_dir, ignore_errors=True) +def test_render_index_html_contains_spreadsheet_surface(): + html = render_index_html() + assert "Snapshot Admin" in html + assert "contenteditable" in html + assert "/api/settings/save" in html + assert "/api/account_snapshot/save" in html + assert "Lock target" in html + assert "Lock row" in html + assert "Approve pending" in html + assert "Refresh diff" in html + assert "Export approval packet" in html + assert "Selection Inspector" in html + assert "Recent row history" in html + assert "Save view" in html + assert "Apply TSV to selection" in html + assert "Ctrl+S" in html + assert "KIS Collection" in html + assert "Recent collector snapshots" in html + assert "Collection detail" in html + assert "Filter runs / snapshots / errors" in html + assert "Filter change log" in html + assert "Timeline" in html + assert "/collection" in html + assert "Open collection dashboard" in html - def test_snapshot_admin_web_validation_script_passes(self): - out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json" - if out.exists(): - out.unlink() - - rc = validator.main() - payload = json.loads(out.read_text(encoding="utf-8")) - - self.assertEqual(rc, 0) - self.assertEqual(payload["gate"], "PASS") - self.assertEqual(payload["formula_id"], "SNAPSHOT_ADMIN_WEB_VALIDATION_V1") - self.assertTrue(payload["settings_rows"] > 0) - self.assertTrue(payload["account_snapshot_rows"] > 0) +def test_render_collection_html_contains_dashboard_surface(): + html = render_collection_html() + assert "KIS Collection Dashboard" in html + assert "/api/state" in html + assert "Download raw JSON" in html + assert "Download CSV" in html + assert "Filter runs / snapshots / errors" in html + assert "Ticker quick search" in html + assert "Date quick search" in html -if __name__ == "__main__": - unittest.main() +def test_build_ui_state_exposes_expected_columns(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + seed_path = ROOT / "GatherTradingData.json" + import_seed_json(db_path, seed_path) + state = build_ui_state(db_path) + assert state["summary"]["settings_rows"] > 0 + assert state["summary"]["account_snapshot_rows"] > 0 + assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite" + assert state["summary"]["topology"]["settings_and_snapshot_share_db"] is True + assert state["summary"]["topology"]["collector_separate_db"] is True + assert state["account_snapshot_columns"][0] == "captured_at" + assert "settings" in state["validation"] + assert state["version"]["app"] + assert "fingerprint" in state["version"]["source"] + assert "collection" in state + assert "counts" in state["collection"] + assert "latest_report" in state["collection"] + assert state["summary"]["topology"]["mode"] == "single_workspace_sqlite" + + +def test_snapshot_admin_workflow_and_script_exist(): + workflow = ROOT / ".gitea" / "workflows" / "snapshot_admin.yml" + package = json.loads((ROOT / "package.json").read_text(encoding="utf-8")) + assert workflow.exists() + assert "--reload" in package["scripts"]["ops:snapshot-web"] + assert "ops:snapshot-validate" in package["scripts"] + assert "ops:snapshot-web-validate" in package["scripts"] + + +def test_render_tables_html_contains_tabler_grid_surface(): + html = render_tables_html() + assert "tabler" in html.lower() + assert "Workbook migration inventory" in html + assert "sqliteTableSelect" in html + assert "jsonTableSelect" in html + assert "/api/tables" in html + assert "/api/table_rows" in html + assert "sqliteGridTable" in html + assert "jsonGridTable" in html + + +def test_list_browsable_tables_covers_all_three_databases(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + tables = list_browsable_tables(db_path) + names = {row["table"] for row in tables} + assert {"settings", "account_snapshot", "workspace_change_log"} <= names + assert {"collection_runs", "collection_snapshots", "collection_source_errors"} <= names + assert {"sell_strategy_results", "satellite_recommendations"} <= names + + settings_row = next(row for row in tables if row["table"] == "settings") + assert settings_row["exists"] is True + assert settings_row["row_count"] > 0 + + +def test_build_table_catalog_uses_workbook_inventory(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + catalog = build_table_catalog(db_path) + assert {"sqlite", "json", "workbook"} <= set(catalog) + assert len(catalog["workbook"]) == 20 + + workbook = {row["sheet"]: row for row in catalog["workbook"]} + assert workbook["settings"]["current_sources"] == ["sqlite"] + assert workbook["account_snapshot"]["current_sources"] == ["sqlite", "json"] + assert workbook["harness_context"]["current_sources"] == ["xlsx"] + assert workbook["harness_context"]["migration_candidate"] == "yes" + + +def test_fetch_table_rows_paginates_and_rejects_unknown_table(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + page1 = fetch_table_rows("settings", db_path, limit=2, offset=0) + assert page1["columns"] + assert len(page1["rows"]) == 2 + assert page1["total"] > 2 + + page2 = fetch_table_rows("settings", db_path, limit=2, offset=2) + assert page1["rows"] != page2["rows"] + + with pytest.raises(ValueError): + fetch_table_rows("settings; DROP TABLE settings;--", db_path) + + +def test_list_browsable_tables_includes_json_factor_sheets(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + tables = list_browsable_tables(db_path) + json_rows = {row["table"]: row for row in tables if row["source"] == "json"} + assert "data_feed" in json_rows + assert "sector_flow" in json_rows + assert json_rows["data_feed"]["row_count"] > 0 + + sqlite_rows = [row for row in tables if row["source"] == "sqlite"] + assert sqlite_rows, "sqlite tables must still be listed alongside json sheets" + + +def test_fetch_table_rows_reads_json_factor_sheet(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + result = fetch_table_rows_for_source("json", "data_feed", db_path, limit=5, offset=0) + assert result["source"] == "json" + assert "Ticker" in result["columns"] + assert len(result["rows"]) <= 5 + assert result["total"] > 0 + + +def test_fetch_table_rows_can_still_read_sqlite_tables(tmp_path): + db_path = tmp_path / "snapshot_admin.db" + import_seed_json(db_path, ROOT / "GatherTradingData.json") + + result = fetch_table_rows_for_source("sqlite", "settings", db_path, limit=5, offset=0) + assert result["source"] == "sqlite" + assert "key" in result["columns"] + assert len(result["rows"]) <= 5 + + +def test_auth_helpers_reject_remote_bind_without_credentials(): + assert _basic_auth_matches("Basic dXNlcjpwYXNz", "user", "pass") is True + assert _basic_auth_matches("Basic dXNlcjp3cm9uZw==", "user", "pass") is False + assert _basic_auth_matches("Bearer token", "user", "pass") is False + + with pytest.raises(ValueError): + _validate_remote_bind("0.0.0.0", False, "", "") + with pytest.raises(ValueError): + _validate_remote_bind("0.0.0.0", True, "", "") + _validate_remote_bind("0.0.0.0", True, "admin", "secret") + _validate_remote_bind("127.0.0.1", False, "", "") + + +def test_snapshot_admin_requires_basic_auth_when_configured(tmp_path): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + port = int(sock.getsockname()[1]) + db_path = tmp_path / "snapshot_admin_auth.db" + seed_path = ROOT / "GatherTradingData.json" + server_cmd = [ + sys.executable, + "-u", + str(ROOT / "tools" / "run_snapshot_admin_server_v1.py"), + "--host", + "127.0.0.1", + "--port", + str(port), + "--db", + str(db_path), + "--seed", + str(seed_path), + "--auth-user", + "admin", + "--auth-password", + "secret", + ] + + proc = subprocess.Popen( + server_cmd, + cwd=ROOT, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + ) + try: + deadline = time.time() + 15 + while time.time() < deadline: + try: + probe = request.urlopen(request.Request(f"http://127.0.0.1:{port}/api/state"), timeout=1) + except error.HTTPError as exc: + if exc.code == 401: + break + except Exception: + time.sleep(0.25) + else: + probe.close() + break + url = f"http://127.0.0.1:{port}/api/state" + + req = request.Request(url) + with pytest.raises(error.HTTPError) as unauthorized: + request.urlopen(req, timeout=5) + assert unauthorized.value.code == 401 + + token = base64.b64encode(b"admin:secret").decode("ascii") + req_auth = request.Request(url, headers={"Authorization": f"Basic {token}"}) + with request.urlopen(req_auth, timeout=5) as resp: + payload = json.loads(resp.read().decode("utf-8")) + assert payload["version"]["app"] + finally: + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=5) + except subprocess.TimeoutExpired: + proc.kill() + proc.wait(timeout=5) + if proc.stdout is not None: + proc.stdout.close() + + +def test_snapshot_admin_web_validation_script_passes(): + out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json" + if out.exists(): + out.unlink() + + rc = validator.main() + payload = json.loads(out.read_text(encoding="utf-8")) + + assert rc == 0 + assert payload["gate"] == "PASS" + assert payload["formula_id"] == "SNAPSHOT_ADMIN_WEB_VALIDATION_V1" + assert payload["settings_rows"] > 0 + assert payload["account_snapshot_rows"] > 0 diff --git a/tools/validate_snapshot_admin_web_v1.py b/tools/validate_snapshot_admin_web_v1.py index 5220f4d..d681502 100644 --- a/tools/validate_snapshot_admin_web_v1.py +++ b/tools/validate_snapshot_admin_web_v1.py @@ -96,6 +96,7 @@ def main() -> int: _wait_for_server(base_url) html = _read_text(f"{base_url}/") state = _read_json(f"{base_url}/api/state") + tables_payload = _read_json(f"{base_url}/api/tables") export_payload = _read_json(f"{base_url}/api/export") approval_packet = { "formula_id": "SNAPSHOT_ADMIN_APPROVAL_PACKET_V1", @@ -138,6 +139,11 @@ def main() -> int: errors.append("collection_page_link_missing") if "Open collection dashboard" not in html: errors.append("collection_dashboard_link_missing") + tables_html = _read_text(f"{base_url}/tables") + if "Workbook migration inventory" not in tables_html or "sqliteTableSelect" not in tables_html or "jsonTableSelect" not in tables_html: + errors.append("table_browser_split_missing") + if "SQLite" not in tables_html or "JSON" not in tables_html: + errors.append("table_browser_source_labels_missing") collection_html = _read_text(f"{base_url}/collection") if "KIS Collection Dashboard" not in collection_html or "Download CSV" not in collection_html or "Ticker quick search" not in collection_html or "Date quick search" not in collection_html: errors.append("collection_dashboard_page_missing") @@ -159,6 +165,10 @@ def main() -> int: errors.append("version_metadata_missing") if not isinstance(state.get("collection"), dict): errors.append("collection_state_missing") + if not isinstance(tables_payload.get("sqlite"), list) or not isinstance(tables_payload.get("json"), list) or not isinstance(tables_payload.get("workbook"), list): + errors.append("table_catalog_grouping_missing") + if not tables_payload.get("tables"): + errors.append("table_catalog_flat_missing") collection = state.get("collection", {}) if not isinstance(collection.get("counts"), dict): errors.append("collection_counts_missing")