1 Commits

Author SHA1 Message Date
kjh2064 4bf7e97934 refactor(kis): 한국투자증권(KIS) 데이터 수집 엔진 효율화 및 DB 인프라 최적화
Quant Engine CI/CD Pipeline / validate-core (pull_request) Has been cancelled
Quant Engine CI/CD Pipeline / validate-ui-and-storage (pull_request) Has been cancelled
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (pull_request) Has been cancelled
2026-06-24 18:04:16 +09:00
25 changed files with 74 additions and 864 deletions
-3
View File
@@ -153,9 +153,6 @@ jobs:
- name: Validate Snapshot Admin Workflow
run: python3 tools/validate_snapshot_admin_workflow_v1.py
- name: Validate DB First Pipeline
run: python3 tools/validate_db_first_pipeline_v1.py
validate-ui-and-storage:
runs-on: self-hosted
needs: validate-core
-10
View File
@@ -50,11 +50,6 @@ jobs:
echo "[smoke] validate workflow only (no web UI, no deploy)"
python3 tools/validate_snapshot_admin_workflow_v1.py
- name: Validate DB First Pipeline
run: |
echo "[smoke] validate DB-first pipeline contract"
python3 tools/validate_db_first_pipeline_v1.py
# Manual dispatch gate: full workflow + web UI validation only.
validate-snapshot-admin-full:
if: github.event_name == 'workflow_dispatch'
@@ -91,11 +86,6 @@ jobs:
echo "[full] validate workflow"
python3 tools/validate_snapshot_admin_workflow_v1.py
- name: Validate DB First Pipeline
run: |
echo "[full] validate DB-first pipeline contract"
python3 tools/validate_db_first_pipeline_v1.py
- name: Validate Snapshot Admin Web UI
run: |
echo "[full] validate web ui"
-19
View File
@@ -16,22 +16,6 @@
- 위 4가지 중 하나라도 빠지면 작업은 미완료다. 요약이나 설명만으로 완료 처리하지 않는다.
- 완료 보고에는 반드시 변경된 YAML, 코드, 데이터 파일 경로와 검증 명령을 함께 적는다.
## 0c. 작업 수행 절차 강제
- 모든 작업은 아래 순서를 반드시 따른다.
1. `로드맵/현황 확인`
2. `WBS 작성`
3. `목표 설정`
4. `성공판단 데이터 정의`
5. `구현`
6. `사후 검증`
7. `증빙 기록`
- 작업 시작 전에는 반드시 해당 작업의 WBS 항목과 성공판단 데이터를 문장 또는 표로 먼저 확정한다.
- 성공판단 데이터가 없으면 구현을 시작하지 않는다.
- “한 줄 추가”, “작아 보이는 수정”도 예외가 아니다. 모든 변경은 WBS와 성공판단 데이터에 매핑되어야 한다.
- 작업 도중 범위가 바뀌면 WBS를 먼저 갱신하고 난 뒤에만 구현을 계속한다.
- 작업 완료 판정은 구현 완료가 아니라 검증 통과와 증빙 기록까지 확인된 경우에만 가능하다.
- 사후 검증 없이 “대충 괜찮다” 식의 진행은 금지한다.
## 1. 읽는 순서
1. `runtime/active_artifact_manifest.yaml`
2. `Temp/final_decision_packet_active.json` (manifest alias)
@@ -117,14 +101,11 @@
## 5. 개발 규칙
- 새 기능은 contract, schema, golden case, owner ledger를 먼저 만든다.
- 그 다음에 WBS와 성공판단 데이터(테스트/검증 입력과 기대값)를 먼저 만든다.
- 구현은 Python canonical first, GAS adapter second다.
- `tools/*.py`는 CLI wrapper에 가깝게 유지한다.
- `gas_*.gs`는 thin adapter 방향으로 유지한다.
- `src/quant_engine`는 canonical package로 유지한다.
- `schemas/generated``src/quant_engine/models/generated`는 schema/model parity를 유지한다.
- 코드 변경은 WBS 항목 번호와 성공판단 데이터 파일/명령을 함께 남겨야 한다.
- 검증 결과가 없으면 완료 보고를 하지 않는다.
- 경로가 새로 생기면 `AGENTS.md`의 Directory Routing / Serving 섹션과 zip 화이트리스트를 함께 갱신한다.
- **Python 인터프리터**: Windows 로컬 환경에서는 반드시 `python`을 사용한다 (`python3` 금지).
- `python` → Python 3.13.5 (`Python313/`) — yaml/openpyxl/yfinance 등 프로젝트 패키지 설치됨
-9
View File
@@ -19,14 +19,6 @@
- `KIS_APP_KEY`
- `KIS_APP_SECRET`
## Token Cache Policy
- KIS access token은 `Temp/kis_tokens.db`에 저장한다.
- 토큰은 `TOKEN_REFRESH_SKEW_MINUTES=10` 기준으로만 재사용/갱신한다.
- 토큰 캐시는 수집 DB와 분리한다.
- 토큰 캐시 상태는 `python tools/inspect_kis_token_cache_v1.py --json`로 점검한다.
- 토큰 갱신 실패 시 appkey/appsecret 또는 API 가용성 문제로만 판단하고, 시크릿 값을 로그나 알림에 그대로 노출하지 않는다.
## Workflow Mapping
- `.gitea/workflows/kis_data_collection.yml`
@@ -43,7 +35,6 @@
- mock 계정은 유효성 확인용이다.
- real 계정은 실제 데이터 수집용이다.
- 둘을 같은 단계에서 혼용하지 않는다.
- 토큰 발급은 1일 1회 원칙을 따르며, 만료 전에는 캐시를 재사용한다.
## Verification
-30
View File
@@ -5,20 +5,6 @@
---
## 0a. 현재 실행 우선순위
> 2026-06-24 기준, v8.9 채택안(P0~P3)은 검증 완료 상태이며 새 구현 백로그의 최우선 순위는 아래 순서로 고정한다.
1. `WBS-7.1` 캘리브레이션 임계값 실증 전환
2. `WBS-7.7` 신규 시스템 E2E 통합 테스트 및 snapshot_admin 스모크 테스트
3. `WBS-7.8` ETF NAV/괴리율/추적오차/AUM 수집 경로 확정
4. `WBS-7.5` 임시 하드코딩 폴백 비례화의 실증 보정
5. `WBS-7.6` 슬리피지 실측 보정
`WBS-7.2`, `WBS-7.3`, `WBS-7.4`, `WBS-7.10`~`WBS-7.14`는 현재 문서상 완료 또는 정리 완료로 유지한다.
---
## 0b. 완료 조건
모든 작업은 아래 4가지 증빙이 함께 있을 때만 완료로 본다.
@@ -30,22 +16,6 @@
하나라도 빠지면 완료로 보지 않는다.
## 0c. 작업 절차 강제
모든 변경은 아래 순서를 지켜야 한다.
1. 로드맵/현황 확인
2. WBS 작성
3. 목표 설정
4. 성공판단 데이터 정의
5. 구현
6. 사후 검증
7. 증빙 기록
작업 시작 전에 WBS와 성공판단 데이터를 먼저 확정해야 하며, 작은 수정도 예외가 아니다.
작업 도중 범위가 바뀌면 먼저 WBS를 갱신한 뒤 구현을 계속한다.
검증 증빙이 없으면 완료로 볼 수 없다.
---
## 0c. 비판적 리뷰 (2026-06-21)
-13
View File
@@ -58,19 +58,6 @@ Use this prompt when producing an investment analysis or HTS-ready playbook.
| GOAL_RETIREMENT_V1 | goal_current_asset_krw, goal_achievement_pct | {N}% 달성 / 잔여 {M}만원 / ETA {YYYY-MM} | IN_PROGRESS / ACHIEVED |
**상황별 선택 추가 공식 (해당 시 반드시 포함):**
---
## WORKFLOW DISCIPLINE
작업 또는 수정 제안 전에 반드시 아래 4가지를 먼저 확정한다.
1. WBS 항목
2. 목표
3. 성공판단 데이터
4. 검증 명령
이 4가지가 명시되지 않으면 구현, 수정, 렌더링을 시작하지 않는다.
- 매수 검토 시: `MEAN_REVERSION_GATE_V1` (이격도 체크 선행), `POSITION_SIZE_V1`, `RISK_BUDGET_CASCADE_V1`, `EXPECTED_EDGE_V1`
- 매도 후보 시: `RS_RATIO_V1` (rs_laggard 판정), `SELL_PRIORITY_V1`
- 가격 산출 시: `STOP_PRICE_CORE_V1`, `TAKE_PROFIT_LADDER_V2`, `TICK_NORMALIZER_V1`
-13
View File
@@ -15,19 +15,6 @@ HTS 캡처 이미지가 제공되면 이 프롬프트를 **분석보다 먼저**
---
## WORKFLOW DISCIPLINE
캡처 파싱 전에 반드시 아래 4가지를 먼저 확정한다.
1. WBS 항목
2. 목표
3. 성공판단 데이터
4. 검증 명령
이 4가지가 없으면 파싱을 시작하지 않는다.
---
## STEP 1 — 화면 종류 판별
| 화면 | 판별 기준 | 사용 가능 여부 |
-13
View File
@@ -43,16 +43,3 @@ Do not approve:
- PASS order without `execution_quality_table`
- WATCH ledger using HTS order columns such as `지정가`, `손절가`, `익절가`, `주문수량`, or `주문금액`
- prose headers such as `이번 주 결론`, `현재 포트폴리오 핵심 진단`, `보유 종목별 운용 지침`, `종합 의견` replacing required tables
---
## WORKFLOW DISCIPLINE
리뷰 전에 반드시 아래 4가지를 요구한다.
1. WBS 항목
2. 목표
3. 성공판단 데이터
4. 검증 명령
이 4가지가 없으면 리뷰 대상은 완료가 아니라 미완료로 판단한다.
+4 -4
View File
@@ -160,10 +160,10 @@ quant_feed_contract:
- "data_integrity_score=100이어도 pending_critical_category_count>0이면 PASS_100 문구를 쓰지 않는다."
json_analysis_protocol:
purpose: "GatherTradingData.json은 DB 기반 수집 결과를 바탕으로 생성된 파생 보고서 증빙이다. 최종 보고서 렌더링과 data_completeness_matrix 참고용으로 사용한다."
purpose: "GatherTradingData.json에서 시장 raw 분석 데이터를 빠르게 파싱해 data_completeness_matrix와 판단 입력으로 사용."
python_parsing_baseline:
shell_rule: "PowerShell에서는 Bash heredoc 금지. '@ ... @ | python -' 형식으로 실행."
json_load_rule: "json.loads(Path('GatherTradingData.json').read_text(encoding='utf-8'))를 기본값으로 사용하되, 원천 추적은 SQLite DB의 history와 snapshot tables를 우선 확인한다."
json_load_rule: "json.loads(Path('GatherTradingData.json').read_text(encoding='utf-8'))를 기본값으로 사용."
required_top_level: ["metadata", "data"]
required_schema_version: "2026-05-18-json-raw-data-v1"
required_paths: ["data.data_feed", "data.sector_flow", "data.macro", "data.event_risk", "data.core_satellite"]
@@ -171,9 +171,9 @@ quant_feed_contract:
text_columns: ["Ticker", "ETF_Code", "Proxy_Ticker", "Base_Ticker", "Constituent_Code", "ETF_Ticker", "Symbol", "ticker"]
normalization: "숫자로 읽힌 91160.0, 5930.0 등은 문자열화 후 6자리 zero-pad 적용."
validation_commands: ["npm run validate-data-sample", "npm run validate-specs"]
xlsx_refresh_rule: "xlsx 원본을 갱신했으면 먼저 DB에 반영한 뒤, 엔진이 DB를 읽어 JSON 파생 보고서를 재생성하고 다시 검증한다."
xlsx_refresh_rule: "xlsx 원본을 갱신했으면 npm run convert-data-json 실행 후 JSON을 다시 검증한다."
xlsx_analysis_protocol:
purpose: "xlsx는 HTS 잔고·거래내역 판독 또는 DB 반영 이전의 보조 감사 소스다. 시장 raw 일반 분석과 최종 보고서 생성은 DB 추적 후의 파생 JSON을 우선한다."
purpose: "xlsx는 HTS 잔고·거래내역 판독 또는 raw JSON 재생성 감사를 위한 보조 프로토콜이다. 시장 raw 일반 분석은 json_analysis_protocol을 우선한다."
python_parsing_baseline:
shell_rule: "PowerShell에서는 Bash heredoc 금지. '@ ... @ | python -' 형식으로 실행."
openpyxl_read_rule: "값 점검은 openpyxl.load_workbook(path, data_only=True, read_only=True)를 기본값으로 사용."
-14
View File
@@ -15,20 +15,6 @@ meta:
engine_audit_ref: Temp/engine_audit_v1.json
pass_100_ref: Temp/pass_100_criteria_v1.json
workflow_disciplines:
required_preimplementation_order:
- "로드맵/현황 확인"
- "WBS 작성"
- "목표 설정"
- "성공판단 데이터 정의"
- "구현"
- "사후 검증"
- "증빙 기록"
completion_gate_rule: "작업 시작 전 WBS와 성공판단 데이터가 명시되지 않으면 진행 금지"
small_change_rule: "한 줄 추가, 두 줄 추가 같은 소규모 변경도 동일하게 적용"
scope_change_rule: "작업 도중 범위가 바뀌면 먼저 WBS를 갱신한 뒤 계속 진행"
evidence_rule: "검증 증빙 없이는 완료로 간주하지 않음"
# ── §7 프롬프트 완료 조건 ────────────────────────────────────────────────────
criteria:
-22
View File
@@ -598,28 +598,6 @@ thresholds:
sunset_date: '2026-09-30'
unit: rsi
value: 70.0
- gs_location: gas_data_feed.gs:8780
id: DSD_V1_ANALYST_PEG_BLOCK_PCT
last_calibrated: null
live_sample_requirement: 30
notes: analystScore 보조 임계 — pegScore>=8이면 가점. 실증 전 EXPERT_PRIOR로 유지.
owner_formula: DISTRIBUTION_SELL_DETECTOR_V1
sample_n: 0
source: EXPERT_PRIOR
sunset_date: '2026-09-30'
unit: score_condition
value: 8.0
- gs_location: gas_data_feed.gs:8780
id: DSD_V1_ANALYST_UPSIDE_BLOCK_PCT
last_calibrated: null
live_sample_requirement: 30
notes: analystScore 보조 임계 — upsidePct>15이면 가점. 실증 전 EXPERT_PRIOR로 유지.
owner_formula: DISTRIBUTION_SELL_DETECTOR_V1
sample_n: 0
source: EXPERT_PRIOR
sunset_date: '2026-09-30'
unit: pct
value: 15.0
- gs_location: gas_data_feed.gs:2098
id: HEAT_GATE_EVENT_SHOCK_HARD_BLOCK
last_calibrated: null
+16 -10
View File
@@ -168,13 +168,16 @@ def _issue_or_reuse_token(creds: KisCredentials) -> str:
# 2. 토큰이 만료되었거나 없을 시 KIS API로 새로 발급 요청
requests = _requests()
resp = requests.post(
f"{creds.domain}/oauth2/tokenP",
json={"grant_type": "client_credentials", "appkey": creds.app_key, "appsecret": creds.app_secret},
timeout=15,
)
resp.raise_for_status()
body = resp.json()
try:
resp = requests.post(
f"{creds.domain}/oauth2/tokenP",
json={"grant_type": "client_credentials", "appkey": creds.app_key, "appsecret": creds.app_secret},
timeout=15,
)
resp.raise_for_status()
body = resp.json()
except Exception as exc:
raise RuntimeError("KIS token refresh failed; check credentials and API availability.") from exc
access_token = body["access_token"]
expires_in_sec = int(body.get("expires_in", 86400))
expires_at = dt.datetime.now(dt.timezone.utc) + dt.timedelta(seconds=expires_in_sec)
@@ -209,9 +212,12 @@ def _send_request(creds: KisCredentials, path: str, tr_id: str, params: dict[str
"custtype": "P",
}
requests = _requests()
resp = requests.get(f"{creds.domain}{path}", headers=headers, params=params, timeout=15)
resp.raise_for_status()
return resp.json()
try:
resp = requests.get(f"{creds.domain}{path}", headers=headers, params=params, timeout=15)
resp.raise_for_status()
return resp.json()
except Exception as exc:
raise RuntimeError(f"KIS read-only request failed for {path} / {tr_id}.") from exc
# ── 조회(read-only) 함수 — 전부 GET, 전부 quotations/ranking 카테고리 (실측 확인) ──────────
+4 -4
View File
@@ -1,9 +1,9 @@
"""KIS-first data collector for the CI scheduler.
The collector uses the existing `GatherTradingData.json` snapshot as the seed
universe, then enriches Korean tickers with read-only KIS quotations and
orderbook data, while retaining Naver/Yahoo fallbacks when available.
The canonical persistence target is SQLite.
The collector treats SQLite as the canonical persistence layer.
`GatherTradingData.json` is emitted only after the DB-backed collection run
finishes, so the JSON acts as a derived reporting artifact rather than the
source of truth.
"""
from __future__ import annotations
@@ -12,8 +12,6 @@ from __future__ import annotations
import json
import sys
import urllib.request
import socket
from datetime import date
from pathlib import Path
@@ -23,8 +21,6 @@ if str(ROOT) not in sys.path:
import unittest
import tools.validate_platform_transition_wbs_v1 as platform_transition_validator
import tools.validate_snapshot_admin_web_v1 as snapshot_admin_validator
from src.quant_engine import kis_data_collection_v1 as kdc
from src.quant_engine.data_collection_store_v1 import load_collection_dashboard_state
from src.quant_engine.qualitative_sell_strategy_v1 import compute_qualitative_sell_strategy
@@ -179,166 +175,3 @@ class TestKisCollectionIntegration(unittest.TestCase):
self.assertEqual(fetched[0]["action"], decision["action"])
self.assertEqual(fetched[0]["conviction"], decision["conviction"])
self.assertEqual(fetched[0]["market_regime"], decision["market_regime"])
def test_snapshot_admin_and_platform_transition_validators_remain_passable(self):
"""E2E 체인 결과물이 snapshot_admin 및 platform-transition 검증에 그대로 연결되는지 확인."""
snapshot_out = ROOT / "Temp" / "snapshot_admin_web_validation_v1.json"
transition_out = ROOT / "Temp" / "platform_transition_wbs_v1.json"
if snapshot_out.exists():
snapshot_out.unlink()
if transition_out.exists():
transition_out.unlink()
snapshot_rc = snapshot_admin_validator.main()
transition_rc = platform_transition_validator.main()
snapshot_payload = json.loads(snapshot_out.read_text(encoding="utf-8"))
transition_payload = json.loads(transition_out.read_text(encoding="utf-8"))
self.assertEqual(snapshot_rc, 0)
self.assertEqual(snapshot_payload["gate"], "PASS")
self.assertGreater(snapshot_payload["settings_rows"], 0)
self.assertGreater(snapshot_payload["account_snapshot_rows"], 0)
self.assertEqual(transition_rc, 0)
self.assertEqual(transition_payload["gate"], "PASS")
self.assertFalse(transition_payload["missing_criteria"])
self.assertFalse(transition_payload["roadmap_missing"])
self.assertTrue(transition_payload["checks"]["P1_kis_core_api_collector"]["gate"] == "PASS")
self.assertTrue(transition_payload["checks"]["P2_sqlite_canonical_store"]["gate"] == "PASS")
self.assertTrue(transition_payload["checks"]["P3_ci_scheduler_cutover"]["gate"] == "PASS")
self.assertTrue(transition_payload["checks"]["P4_gas_thin_adapter_minimize"]["gate"] == "PASS")
self.assertTrue(transition_payload["checks"]["P5_postgresql_upgrade_path"]["gate"] == "PASS")
def test_snapshot_admin_collection_run_updates_state_with_live_price(self):
"""실제 수집 후 snapshot_admin state가 최신 run 및 live price를 반영하는지 확인."""
import tempfile
import subprocess
import time
with tempfile.TemporaryDirectory() as tmpdir:
tmp_path = Path(tmpdir)
db_path = tmp_path / "snapshot_admin.db"
seed_path = tmp_path / "seed.json"
seed_payload = {
"data": {
"settings": [
{"ordinal": 1, "key": "total_asset_krw", "value": 500000000, "note": "seed"},
{"ordinal": 2, "key": "settlement_cash_d2_krw", "value": 250000000, "note": "seed"},
],
"data_feed": [
{"Ticker": "005930", "Name": "삼성전자", "Sector": "반도체"},
{"Ticker": "000660", "Name": "SK하이닉스", "Sector": "반도체"},
],
"account_snapshot": [
{
"captured_at": "2026-06-24T11:15:47+09:00",
"account": "real",
"account_type": "일반계좌",
"ticker": "005930",
"name": "삼성전자",
"holding_quantity": 10,
"available_quantity": 10,
"average_cost": 70000,
"total_cost": 700000,
"current_price": 71000,
"market_value": 710000,
"profit_loss": 10000,
"return_pct": 1.43,
"immediate_cash": 0,
"settlement_cash_d2": 0,
"available_cash": 0,
"open_order_amount": 0,
"monthly_contribution_limit": 0,
"monthly_contribution_used": 0,
"parse_status": "CAPTURE_PROVIDED_BUT_NOT_HOLDINGS",
"user_confirmed": "N",
"stop_price": 65000,
"highest_price_since_entry": 72000,
"entry_date": "2026-06-01",
"entry_stage": "stage_1",
"position_type": "core",
"last_updated": "2026-06-24T11:15:47+09:00",
}
],
}
}
seed_path.write_text(json.dumps(seed_payload, ensure_ascii=False, indent=2), encoding="utf-8")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
port = int(sock.getsockname()[1])
proc = subprocess.Popen(
[
sys.executable,
"-m",
"src.quant_engine.snapshot_admin_server_v1",
"--host",
"127.0.0.1",
"--port",
str(port),
"--db",
str(db_path),
"--seed",
str(seed_path),
],
cwd=ROOT,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
)
try:
base_url = f"http://127.0.0.1:{port}"
deadline = time.time() + 20
while time.time() < deadline:
if proc.poll() is not None:
output = proc.stdout.read() if proc.stdout else ""
self.fail(f"snapshot_admin server exited early with code {proc.returncode}:\n{output}")
try:
with urllib.request.urlopen(f"{base_url}/api/state", timeout=2) as response:
json.loads(response.read().decode("utf-8"))
break
except Exception:
time.sleep(0.25)
else:
output = proc.stdout.read() if proc.stdout else ""
self.fail(f"snapshot_admin server did not become ready:\n{output}")
request = urllib.request.Request(
f"{base_url}/api/collection/run",
data=json.dumps(
{
"mode": "real",
"kis_account": "real",
"include_live_kis": True,
"allow_naver_fallback": False,
},
ensure_ascii=False,
).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=120) as response:
run_payload = json.loads(response.read().decode("utf-8"))
with urllib.request.urlopen(f"{base_url}/api/state", timeout=10) as response:
state_payload = json.loads(response.read().decode("utf-8"))
collection = state_payload.get("collection", {})
latest_report = collection.get("latest_report", {})
latest_run = collection.get("latest_run", {})
recent_runs = collection.get("runs", [])
self.assertEqual(run_payload["status"], "PASS")
self.assertTrue(any(run.get("run_id") == run_payload["summary"]["run_id"] for run in recent_runs))
self.assertIn(latest_report.get("run_id"), {run_payload["summary"]["run_id"], latest_run.get("run_id")})
self.assertGreaterEqual(latest_report["source_counts"]["kis_open_api"], 1)
self.assertGreaterEqual(len(latest_report.get("rows", [])), 1)
self.assertIsNotNone(latest_report["rows"][0].get("current_price"))
finally:
proc.terminate()
try:
proc.wait(timeout=10)
except Exception:
proc.kill()
@@ -1,7 +1,6 @@
from __future__ import annotations
import sys
import subprocess
from pathlib import Path
ROOT = Path(__file__).resolve().parents[2]
@@ -89,29 +88,3 @@ def test_intended_price_must_be_positive(tmp_path):
actual_fill_price=100,
recorded_at="2026-06-21",
)
def test_cli_report_is_data_gated_below_minimum_sample(tmp_path):
db_path = tmp_path / "execution_slippage.db"
report_path = ROOT / "Temp" / "execution_slippage_report_v1.json"
if report_path.exists():
report_path.unlink()
result = subprocess.run(
[
sys.executable,
"tools/evaluate_execution_slippage_v1.py",
"--db",
str(db_path),
"report",
],
cwd=ROOT,
check=True,
capture_output=True,
text=True,
encoding="utf-8",
)
assert "DATA_GATED" in result.stdout
payload = report_path.read_text(encoding="utf-8")
assert '"status": "DATA_GATED"' in payload
-25
View File
@@ -1,25 +0,0 @@
from __future__ import annotations
import json
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
import tools.validate_kis_token_hygiene_v1 as validator
class TestKisTokenHygieneV1(unittest.TestCase):
def test_validator_reports_pass(self):
rc = validator.main()
self.assertEqual(rc, 0)
payload = json.loads((ROOT / "Temp" / "kis_token_hygiene_v1.json").read_text(encoding="utf-8"))
self.assertEqual(payload["gate"], "PASS")
self.assertIn("sanitized_token_refresh_error", payload["evidence"][str(ROOT / "src" / "quant_engine" / "kis_api_client_v1.py")])
if __name__ == "__main__":
unittest.main()
-222
View File
@@ -4,7 +4,6 @@ import json
import sys
import unittest
from pathlib import Path
from unittest.mock import Mock, patch
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
@@ -58,14 +57,6 @@ class TestSnapshotAdminWebV1(unittest.TestCase):
self.assertIn("/api/account_snapshot/save", html)
self.assertIn("opsBanner", html)
self.assertIn("bannerApprovalSummary", html)
self.assertIn("heroNextAction", html)
self.assertIn("heroPrimaryAction", html)
self.assertIn("heroCollection", html)
self.assertIn("approvalPanel", html)
self.assertIn("collectionPanel", html)
self.assertIn("selectionPanel", html)
self.assertIn("open when you need row-level operations", html)
self.assertIn("open", html)
self.assertIn("snapshot-panel", html)
self.assertIn("selected-field", html)
self.assertIn("settingsCountChip", html)
@@ -77,7 +68,6 @@ class TestSnapshotAdminWebV1(unittest.TestCase):
self.assertIn("Export approval packet", html)
self.assertIn("Selection Inspector", html)
self.assertIn("Recent row history", html)
self.assertIn("Recent change summary", html)
self.assertIn("Save view", html)
self.assertIn("Apply TSV to selection", html)
self.assertIn("Ctrl+S", html)
@@ -90,238 +80,26 @@ class TestSnapshotAdminWebV1(unittest.TestCase):
self.assertIn("/collection", html)
self.assertIn("Open collection dashboard", html)
def test_render_home_html_contains_role_based_entrances(self):
from src.quant_engine.snapshot_admin_server_v1 import render_home_html
html = render_home_html()
self.assertIn("Snapshot Admin Home", html)
self.assertIn("1. Workspace", html)
self.assertIn("2. Collection", html)
self.assertIn("3. Tables", html)
self.assertIn("Open workspace", html)
self.assertIn("Open collection", html)
self.assertIn("Open tables", html)
self.assertIn("/workspace", html)
self.assertIn("/tables", html)
def test_render_tables_html_contains_table_group_summary(self):
html = render_tables_html()
self.assertIn("Snapshot Admin — Table Browser", html)
self.assertIn("DB별 수정, JSON별 검토, 수집 증빙 확인", html)
self.assertIn("DB 먼저 / JSON은 증빙", html)
self.assertIn("tablePurposeNavigator", html)
self.assertIn("DB 먼저", html)
self.assertIn("JSON은 증빙", html)
self.assertIn("Edit only canonical rows", html)
self.assertIn("DB tables", html)
self.assertIn("Editable source of truth.", html)
self.assertIn("Workbook sheets", html)
self.assertIn("Derived report evidence.", html)
self.assertIn("tablesVersionTop", html)
self.assertIn("tablesVersionBottom", html)
self.assertIn("final_updated_at=", html)
self.assertIn("tablesCoverageWarning", html)
self.assertIn("workbookSheetSelect", html)
self.assertIn("Workbook sheets only", html)
self.assertIn("workbookSheetMeta", html)
self.assertIn("workbookSheetSurfaceMeta", html)
self.assertIn("workbookSheetDetail", html)
self.assertIn("workbookSheetPreview", html)
self.assertIn("tableSelect", html)
self.assertIn("DB Table", html)
self.assertIn("tableGroupSummary", html)
self.assertIn("tableSourceSummary", html)
self.assertIn("Registry details", html)
self.assertIn("History details", html)
self.assertIn("Workspace", html)
self.assertIn("Collection", html)
self.assertIn("Strategy", html)
self.assertIn("JSON", html)
self.assertIn("tableWorkspaceSection", html)
self.assertIn("tableJsonSection", html)
self.assertIn("tableEditState", html)
self.assertIn("bg-danger-lt", html)
self.assertIn("Save applies only to the canonical workspace DB.", html)
self.assertIn("Derived JSON Evidence Preview", html)
self.assertIn("jsonReportStatus", html)
self.assertIn("jsonReportFocus", html)
self.assertIn("jsonReportStats", html)
self.assertIn("jsonReportDetail", html)
self.assertIn("Purpose: edit canonical workspace rows only.", html)
self.assertIn("Collection purpose: monitor collector runs and snapshots.", html)
self.assertIn("Latest run summary loads from `/api/state`.", html)
self.assertIn("Purpose: inspect DB-backed JSON evidence and row payloads.", html)
self.assertIn("Workspace tables are editable only when the table is in the canonical workspace DB.", html)
self.assertIn("DB별 / JSON별 조회 기준", html)
self.assertIn("This page is intentionally a triage surface, not a generic table dump.", html)
self.assertIn("read-only because this table belongs to collector or strategy storage", html)
self.assertIn("Table Browser", html)
self.assertIn("Save changes", html)
self.assertIn("Clear filters", html)
self.assertIn("• current", html)
self.assertIn("workbookRegistrySummary", html)
self.assertIn("dbRegistryBody", html)
self.assertIn("Derived report registry", html)
self.assertIn("DB tables", html)
self.assertIn("workbookPurposeFilters", html)
self.assertIn("Recorded surface", html)
self.assertIn("History details", html)
self.assertIn("historyChangeBody", html)
self.assertIn("historyRunBody", html)
self.assertIn("Derived JSON evidence view", html)
self.assertIn("Derived JSON Evidence Preview", html)
self.assertIn("JSON evidence", html)
def test_render_tables_html_exposes_workbook_registry_surface(self):
html = render_tables_html()
self.assertIn("Workbook sheets", html)
self.assertIn("XLSX:", html)
self.assertIn("JSON evidence:", html)
self.assertIn("JSON role:", html)
self.assertIn("Unmapped:", html)
self.assertIn("Purpose filter:", html)
self.assertIn("destination:", html)
self.assertIn("recorded surface:", html)
self.assertIn("Selected sheet:", html)
self.assertIn("surface:", html)
self.assertIn("version=", html)
self.assertIn("No workbook sheet selected.", html)
self.assertIn("tableSelect", html)
self.assertIn("DB tables only", html)
self.assertIn("Registry details", html)
def test_workbook_registry_maps_xlsx_sheets_to_recording_surfaces(self):
from src.quant_engine.snapshot_admin_server_v1 import load_workbook_sheet_registry
registry = load_workbook_sheet_registry()
self.assertEqual(registry["sheet_count"], 19)
entries = {row["sheet"]: row for row in registry["entries"]}
self.assertEqual(entries["settings"]["kind"], "workspace_db")
self.assertEqual(entries["account_snapshot"]["kind"], "workspace_db")
self.assertEqual(entries["data_feed"]["kind"], "collector_db")
self.assertEqual(entries["settings"]["purpose"], "workspace_edit")
self.assertEqual(entries["data_feed"]["purpose"], "collector_run")
self.assertEqual(registry["json_role"], "derived_report_evidence")
for sheet in [
"sector_universe_refresh_audit",
"daily_history",
"event_calendar",
"pa1_feedback",
"alpha_history",
"backdata_feature_bank",
"sell_priority",
"harness_context",
"monthly_history",
"sector_flow_history",
"sector_universe",
"core_satellite",
"universe",
"event_risk",
"macro",
"sector_flow",
]:
self.assertEqual(entries[sheet]["kind"], "json_payload")
self.assertEqual(entries[sheet]["destination"], "GatherTradingData.json")
self.assertEqual(entries[sheet]["source_role"], "derived_report_evidence")
self.assertEqual(entries["sector_universe_refresh_audit"]["kind"], "json_payload")
self.assertEqual(entries["daily_history"]["kind"], "json_payload")
self.assertEqual(entries["sector_universe_refresh_audit"]["purpose"], "refresh_audit")
self.assertEqual(entries["daily_history"]["purpose"], "history_ledger")
self.assertEqual(entries["event_calendar"]["purpose"], "audit_history")
self.assertEqual(entries["alpha_history"]["purpose"], "analysis_report")
self.assertEqual(entries["harness_context"]["purpose"], "execution_context")
self.assertEqual(entries["monthly_history"]["purpose"], "history_ledger")
self.assertEqual(entries["sector_universe"]["purpose"], "universe_registry")
self.assertEqual(entries["event_risk"]["purpose"], "macro_risk_context")
self.assertEqual(entries["sector_flow"]["purpose"], "flow_leadership")
self.assertNotIn("sector_universe_refresh_audit", registry["unmapped_sheets"])
self.assertNotIn("daily_history", registry["unmapped_sheets"])
def test_render_collection_html_contains_dashboard_surface(self):
html = render_collection_html()
self.assertIn("KIS Collection Dashboard", html)
self.assertIn("/api/state", html)
self.assertIn("/api/collection/run", html)
self.assertIn("Collect now", html)
self.assertIn("collectionModeInput", html)
self.assertIn("collectionAccountInput", html)
self.assertIn("collectionRunLog", html)
self.assertIn("collectionRunBanner", html)
self.assertIn("collectionModeBadge", html)
self.assertIn("collectionLiveStatus", html)
self.assertIn("collectionProgressChip", html)
self.assertIn("collectionStageChip", html)
self.assertIn("collectionResultChip", html)
self.assertIn("collectionTrendSummary", html)
self.assertIn("collectionTrendChart", html)
self.assertIn("Auto refreshes every 15 seconds", html)
self.assertIn("older → newer", html)
self.assertIn("Snapshots / run", html)
self.assertIn("Errors / run", html)
self.assertIn("[RUN]", html)
self.assertIn("[SNAPSHOT]", html)
self.assertIn("[ERROR]", html)
self.assertIn("live source: active | KIS rows=", html)
self.assertIn("collectionDetailAnchor", html)
self.assertIn("collectionRunLogAnchor", html)
self.assertIn("Live KIS on", html)
self.assertIn("live source: unknown", html)
self.assertNotIn('value="mock"', html)
self.assertIn("Download raw JSON", html)
self.assertIn("Download CSV", html)
self.assertIn("Filter runs / snapshots / errors", html)
self.assertIn("Unified activity timeline", html)
self.assertIn("date", html)
self.assertIn("Ticker quick search", html)
self.assertIn("Date quick search", html)
def test_run_collection_job_returns_progress_payload(self):
import tempfile
import shutil
from src.quant_engine.snapshot_admin_server_v1 import KIS_COLLECTION_DB, KIS_COLLECTION_REPORT, run_collection_job
tmp_dir = tempfile.mkdtemp()
try:
sqlite_db = Path(tmp_dir) / "kis_data_collection.db"
output_json = Path(tmp_dir) / "kis_data_collection_v1.json"
output_json.write_text(
json.dumps(
{
"generated_at": "2026-06-24T10:00:00+09:00",
"row_count": 1,
"source_counts": {"kis_open_api": 1},
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
fake_proc = Mock(returncode=0, stdout="ok", stderr="")
with patch("src.quant_engine.snapshot_admin_server_v1.subprocess.run", return_value=fake_proc) as run_mock:
payload = run_collection_job(
sqlite_db=sqlite_db,
input_json=Path(tmp_dir) / "GatherTradingData.json",
output_json=output_json,
kis_account="real",
include_live_kis=True,
allow_naver_fallback=False,
)
self.assertEqual(payload["status"], "PASS")
self.assertEqual(payload["returncode"], 0)
self.assertEqual(payload["stdout"], "ok")
self.assertIn("started_at", payload)
self.assertIn("finished_at", payload)
self.assertIn("elapsed_ms", payload)
self.assertEqual(payload["summary"]["row_count"], 1)
self.assertEqual(payload["summary"]["source_counts"]["kis_open_api"], 1)
self.assertEqual(payload["state"]["db_path"], str(sqlite_db))
run_mock.assert_called_once()
self.assertTrue(str(KIS_COLLECTION_DB).endswith("kis_data_collection.db"))
self.assertTrue(str(KIS_COLLECTION_REPORT).endswith("kis_data_collection_v1.json"))
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
def test_build_ui_state_exposes_expected_columns(self):
import tempfile
import shutil
-21
View File
@@ -315,27 +315,6 @@ def main() -> int:
"immediate_actions": immediate,
"medium_term_actions": medium_term,
"criteria": criteria,
"workflow_disciplines": {
"required_preimplementation_order": [
"로드맵/현황 확인",
"WBS 작성",
"목표 설정",
"성공판단 데이터 정의",
"구현",
"사후 검증",
"증빙 기록",
],
"completion_gate_rule": (
"작업 시작 전 WBS와 성공판단 데이터가 명시되지 않으면 진행 금지"
),
"small_change_rule": (
"한 줄 추가, 두 줄 추가 같은 소규모 변경도 동일하게 적용"
),
"scope_change_rule": (
"작업 도중 범위가 바뀌면 먼저 WBS를 갱신한 뒤 계속 진행"
),
"evidence_rule": "검증 증빙 없이는 완료로 간주하지 않음",
},
"priority_roadmap": {
"P1_immediately": [
"GAS 새 JSON 내보내기 → schema_presence SLA 해소 + fundamentals 로드",
+1 -20
View File
@@ -40,7 +40,7 @@ def main() -> int:
gap_path = Path(args.gap) if Path(args.gap).is_absolute() else ROOT / args.gap
if not gap_path.exists():
print(f"FAIL: {gap_path} not found - run build-completion-gap-v1 first")
print(f"FAIL: {gap_path} not found run build-completion-gap-v1 first")
return 1
d = _load(gap_path)
@@ -52,8 +52,6 @@ def main() -> int:
for f in required:
if f not in d:
failures.append(f"missing field: {f}")
if "workflow_disciplines" not in d:
failures.append("missing field: workflow_disciplines")
if failures:
for f in failures:
@@ -102,23 +100,6 @@ def main() -> int:
if str(llm_field.get("current")) != "0" and llm_field.get("current") != 0:
failures.append("CRITICAL: llm_generated_decision_field_count != 0 — LLM 판단 개입")
workflow = d.get("workflow_disciplines") if isinstance(d.get("workflow_disciplines"), dict) else {}
required_order = workflow.get("required_preimplementation_order") if isinstance(workflow.get("required_preimplementation_order"), list) else []
expected_order = [
"로드맵/현황 확인",
"WBS 작성",
"목표 설정",
"성공판단 데이터 정의",
"구현",
"사후 검증",
"증빙 기록",
]
if required_order != expected_order:
failures.append(f"workflow_disciplines.required_preimplementation_order mismatch: {required_order}")
for key in ("completion_gate_rule", "small_change_rule", "scope_change_rule", "evidence_rule"):
if not str(workflow.get(key) or "").strip():
failures.append(f"workflow_disciplines missing or empty: {key}")
if failures:
for f in failures:
print("FAIL:", f)
@@ -45,8 +45,6 @@ def main() -> int:
["코드"],
["데이터 실체"],
["검증 증빙"],
["wbs 작성"],
["성공판단 데이터"],
],
"REPORT_GUIDE.md": [
["completion harness"],
@@ -68,8 +66,6 @@ def main() -> int:
["코드"],
["데이터 실체"],
["검증 증빙"],
["wbs"],
["성공판단"],
],
"prompts/review_prompt.md": [
["default completion harness"],
@@ -77,8 +73,6 @@ def main() -> int:
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
["wbs"],
["success criteria", "성공판단"],
],
"prompts/capture_parse_prompt.md": [
["기본 완료 조건"],
@@ -86,8 +80,46 @@ def main() -> int:
["코드"],
["데이터 실체"],
["검증 증빙"],
["wbs"],
["성공판단"],
],
"prompts/engine_audit_master_prompt_v2.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/engine_audit_master_prompt_v3.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/engine_audit_prompt.md": [
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/low_capability_report_renderer.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/report_renderer_prompt.md": [
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/weekly_operational_report_master_prompt_v1.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
}
-52
View File
@@ -1,52 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
def main() -> int:
errors: list[str] = []
spec_path = ROOT / "spec" / "02_data_contract.yaml"
server_path = ROOT / "src" / "quant_engine" / "snapshot_admin_server_v1.py"
collector_path = ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py"
spec_text = spec_path.read_text(encoding="utf-8")
server_text = server_path.read_text(encoding="utf-8")
collector_text = collector_path.read_text(encoding="utf-8")
required_markers = [
("spec/db-first", "DB 기반 수집 결과를 바탕으로 생성된 파생 보고서 증빙"),
("spec/db-first-xlsx", "xlsx는 HTS 잔고·거래내역 판독 또는 DB 반영 이전의 보조 감사 소스"),
("server/json-role", "derived_report_evidence"),
("server/json-evidence", "Derived JSON Evidence Preview"),
("server/collection-trend", "collectionTrendChart"),
("collector/db-canonical", "SQLite as the canonical persistence layer"),
]
for name, marker in required_markers:
haystack = {
"spec/db-first": spec_text,
"spec/db-first-xlsx": spec_text,
"server/json-role": server_text,
"server/json-evidence": server_text,
"server/collection-trend": server_text,
"collector/db-canonical": collector_text,
}[name]
if marker not in haystack:
errors.append(f"missing marker: {name}")
if errors:
print(json.dumps({"gate": "FAIL", "errors": errors}, ensure_ascii=False, indent=2))
return 1
print(json.dumps({"gate": "PASS", "errors": []}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -23,15 +23,6 @@ REQUIRED_PATTERNS = {
"vars.KIS_APP_KEY_TEST",
"vars.KIS_APP_SECRET_TEST",
],
"docs/GITEA_SECRETS_SETUP.md": [
"Temp/kis_tokens.db",
"TOKEN_REFRESH_SKEW_MINUTES=10",
"python tools/inspect_kis_token_cache_v1.py --json",
],
"docs/GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md": [
"Temp/kis_tokens.db",
"TOKEN_REFRESH_SKEW_MINUTES",
],
}
-107
View File
@@ -1,107 +0,0 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SOURCE_FILE = ROOT / "src" / "quant_engine" / "kis_api_client_v1.py"
TEST_FILE = ROOT / "tests" / "unit" / "test_kis_api_client_v1.py"
RUNBOOK_FILES = [
ROOT / "docs" / "GITEA_SECRETS_SETUP.md",
ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md",
]
SCAN_FILES = [
ROOT / "src" / "quant_engine" / "kis_api_client_v1.py",
ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py",
ROOT / "tools" / "run_kis_data_collection_v1.py",
ROOT / "tools" / "inspect_kis_token_cache_v1.py",
ROOT / "tools" / "validate_gitea_secrets_contract_v1.py",
ROOT / "tests" / "unit" / "test_kis_api_client_v1.py",
ROOT / "tests" / "unit" / "test_validate_kis_api_credentials_v1.py",
ROOT / "docs" / "GITEA_SECRETS_SETUP.md",
ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md",
ROOT / ".gitea" / "workflows" / "kis_data_collection.yml",
ROOT / ".gitea" / "workflows" / "qualitative_sell_strategy.yml",
ROOT / ".gitea" / "workflows" / "ci.yml",
]
FORBIDDEN_PATTERNS = [
r"print\(\s*.*appsecret",
r"print\(\s*.*appkey",
r"logger\.",
r"resp\.text",
r"response\.text",
r"json\.dumps\(\s*\{\s*.*appkey",
r"json\.dumps\(\s*\{\s*.*appsecret",
]
def _scan_text(path: Path, text: str) -> list[str]:
errors: list[str] = []
for pattern in FORBIDDEN_PATTERNS:
if re.search(pattern, text, re.IGNORECASE | re.MULTILINE):
errors.append(f"{path}:{pattern}")
return errors
def _scan_repository() -> list[str]:
errors: list[str] = []
for path in SCAN_FILES:
if not path.exists():
continue
text = path.read_text(encoding="utf-8")
errors.extend(_scan_text(path, text))
return errors
def main() -> int:
errors: list[str] = []
evidence: dict[str, dict[str, bool]] = {}
if not SOURCE_FILE.exists():
errors.append(f"missing:{SOURCE_FILE}")
else:
text = SOURCE_FILE.read_text(encoding="utf-8")
errors.extend(_scan_text(SOURCE_FILE, text))
evidence[str(SOURCE_FILE)] = {
"sanitized_token_refresh_error": "KIS token refresh failed; check credentials and API availability." in text,
"sanitized_readonly_error": "KIS read-only request failed for" in text,
"token_cache_db": "kis_tokens.db" in text,
}
if not TEST_FILE.exists():
errors.append(f"missing:{TEST_FILE}")
else:
text = TEST_FILE.read_text(encoding="utf-8")
evidence[str(TEST_FILE)] = {
"token_cache_tests": "test_issue_or_reuse_token_with_sqlite_db_cache" in text,
"token_override_tests": "test_issue_or_reuse_token_honors_token_db_override" in text,
}
for path in RUNBOOK_FILES:
if not path.exists():
errors.append(f"missing:{path}")
continue
text = path.read_text(encoding="utf-8")
evidence[str(path)] = {
"mentions_token_cache": "Temp/kis_tokens.db" in text,
"mentions_refresh_skew": "TOKEN_REFRESH_SKEW_MINUTES" in text,
}
errors.extend(_scan_repository())
result = {
"formula_id": "KIS_TOKEN_HYGIENE_V1",
"gate": "PASS" if not errors else "FAIL",
"errors": errors,
"evidence": evidence,
}
out = ROOT / "Temp" / "kis_token_hygiene_v1.json"
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if not errors else 1
if __name__ == "__main__":
raise SystemExit(main())
+2 -22
View File
@@ -120,8 +120,7 @@ def main() -> int:
try:
_wait_for_server(base_url)
home_html = _read_text(f"{base_url}/")
html = _read_text(f"{base_url}/workspace")
html = _read_text(f"{base_url}/")
state = _read_json(f"{base_url}/api/state")
tables_payload = _read_json(f"{base_url}/api/tables")
export_payload = _read_json(f"{base_url}/api/export")
@@ -140,10 +139,6 @@ def main() -> int:
"workspace": state.get("summary", {}),
}
packet_response = _post_json(f"{base_url}/api/approval_packet", {"packet": approval_packet})
if "Snapshot Admin Home" not in home_html:
errors.append("home_title_missing")
if "Open workspace" not in home_html or "Open collection" not in home_html:
errors.append("home_navigation_missing")
if "Snapshot Admin" not in html:
errors.append("html_title_missing")
if "contenteditable" not in html:
@@ -176,14 +171,7 @@ def main() -> int:
if "Read only" not in tables_html or "Save current table" not in tables_html:
errors.append("table_browser_source_labels_missing")
collection_html = _read_text(f"{base_url}/collection")
if (
"KIS Collection Dashboard" not in collection_html
or "Download CSV" not in collection_html
or "Ticker quick search" not in collection_html
or "Date quick search" not in collection_html
or "collectionLiveStatus" not in collection_html
or "live source: unknown" not in collection_html
):
if "KIS Collection Dashboard" not in collection_html or "Download CSV" not in collection_html or "Ticker quick search" not in collection_html or "Date quick search" not in collection_html:
errors.append("collection_dashboard_page_missing")
if int(state.get("summary", {}).get("settings_rows") or 0) <= 0:
errors.append("settings_rows_missing")
@@ -212,14 +200,6 @@ def main() -> int:
errors.append("collection_counts_missing")
if "latest_report" not in collection:
errors.append("collection_latest_report_missing")
latest_report = collection.get("latest_report", {})
if isinstance(latest_report, dict):
if latest_report.get("input_json") and "GatherTradingData.json" not in str(latest_report.get("input_json")):
errors.append("collection_latest_report_input_mismatch")
if not latest_report.get("sqlite_db"):
errors.append("collection_latest_report_sqlite_missing")
if collection.get("output_json_path") and "kis_data_collection_v1.json" not in str(collection.get("output_json_path")):
errors.append("collection_output_json_path_mismatch")
if "data" not in export_payload:
errors.append("export_missing_data")
if packet_response.get("gate") != "PASS":
+7 -20
View File
@@ -827,7 +827,6 @@ def main() -> int:
validate_json_schema_minimal(schema, sample, errors)
validate_formula_registry(errors)
validate_kis_token_hygiene(errors)
validate_output_rendering_contract(schema, errors)
validate_harness_contract_consistency(errors)
validate_spec_code_sync(errors)
@@ -903,25 +902,13 @@ def main() -> int:
if bundle.exists():
load_yaml(bundle, errors)
def validate_kis_token_hygiene(errors: list[str]) -> None:
import importlib.util
module_path = ROOT / "tools" / "validate_kis_token_hygiene_v1.py"
spec = importlib.util.spec_from_file_location("validate_kis_token_hygiene_v1", module_path)
if spec is None or spec.loader is None:
fail(errors, f"unable to load KIS token hygiene validator: {module_path}")
return
kis_hygiene = importlib.util.module_from_spec(spec)
spec.loader.exec_module(kis_hygiene)
try:
rc = kis_hygiene.main()
except Exception as exc: # pragma: no cover - validator integration gate
fail(errors, f"KIS token hygiene validator raised: {type(exc).__name__}: {exc}")
return
if rc != 0:
fail(errors, "KIS token hygiene validator failed")
if errors:
print("VALIDATION FAIL")
for err in errors:
print(f"- {err}")
return 1
print("VALIDATION OK")
return 0
if __name__ == "__main__":