test(validation): 토큰 위생 및 플랫폼 통합 검증 체계 고도화
Snapshot Admin Web Validation / validate-snapshot-admin-smoke (push) Has been cancelled
Snapshot Admin Web Validation / validate-snapshot-admin-full (push) Has been cancelled
Quant Engine CI/CD Pipeline / validate-core (pull_request) Has been cancelled
Quant Engine CI/CD Pipeline / validate-ui-and-storage (pull_request) Has been cancelled
WBS-9.3 - NULL Policy CI Gate / NULL Policy Validation (pull_request) Has been cancelled

This commit is contained in:
2026-06-24 18:06:05 +09:00
parent 9abb8d3bc3
commit 27730704ae
24 changed files with 850 additions and 54 deletions
+21
View File
@@ -315,6 +315,27 @@ def main() -> int:
"immediate_actions": immediate,
"medium_term_actions": medium_term,
"criteria": criteria,
"workflow_disciplines": {
"required_preimplementation_order": [
"로드맵/현황 확인",
"WBS 작성",
"목표 설정",
"성공판단 데이터 정의",
"구현",
"사후 검증",
"증빙 기록",
],
"completion_gate_rule": (
"작업 시작 전 WBS와 성공판단 데이터가 명시되지 않으면 진행 금지"
),
"small_change_rule": (
"한 줄 추가, 두 줄 추가 같은 소규모 변경도 동일하게 적용"
),
"scope_change_rule": (
"작업 도중 범위가 바뀌면 먼저 WBS를 갱신한 뒤 계속 진행"
),
"evidence_rule": "검증 증빙 없이는 완료로 간주하지 않음",
},
"priority_roadmap": {
"P1_immediately": [
"GAS 새 JSON 내보내기 → schema_presence SLA 해소 + fundamentals 로드",
+20 -1
View File
@@ -40,7 +40,7 @@ def main() -> int:
gap_path = Path(args.gap) if Path(args.gap).is_absolute() else ROOT / args.gap
if not gap_path.exists():
print(f"FAIL: {gap_path} not found run build-completion-gap-v1 first")
print(f"FAIL: {gap_path} not found - run build-completion-gap-v1 first")
return 1
d = _load(gap_path)
@@ -52,6 +52,8 @@ def main() -> int:
for f in required:
if f not in d:
failures.append(f"missing field: {f}")
if "workflow_disciplines" not in d:
failures.append("missing field: workflow_disciplines")
if failures:
for f in failures:
@@ -100,6 +102,23 @@ def main() -> int:
if str(llm_field.get("current")) != "0" and llm_field.get("current") != 0:
failures.append("CRITICAL: llm_generated_decision_field_count != 0 — LLM 판단 개입")
workflow = d.get("workflow_disciplines") if isinstance(d.get("workflow_disciplines"), dict) else {}
required_order = workflow.get("required_preimplementation_order") if isinstance(workflow.get("required_preimplementation_order"), list) else []
expected_order = [
"로드맵/현황 확인",
"WBS 작성",
"목표 설정",
"성공판단 데이터 정의",
"구현",
"사후 검증",
"증빙 기록",
]
if required_order != expected_order:
failures.append(f"workflow_disciplines.required_preimplementation_order mismatch: {required_order}")
for key in ("completion_gate_rule", "small_change_rule", "scope_change_rule", "evidence_rule"):
if not str(workflow.get(key) or "").strip():
failures.append(f"workflow_disciplines missing or empty: {key}")
if failures:
for f in failures:
print("FAIL:", f)
@@ -45,6 +45,8 @@ def main() -> int:
["코드"],
["데이터 실체"],
["검증 증빙"],
["wbs 작성"],
["성공판단 데이터"],
],
"REPORT_GUIDE.md": [
["completion harness"],
@@ -66,6 +68,8 @@ def main() -> int:
["코드"],
["데이터 실체"],
["검증 증빙"],
["wbs"],
["성공판단"],
],
"prompts/review_prompt.md": [
["default completion harness"],
@@ -73,6 +77,8 @@ def main() -> int:
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
["wbs"],
["success criteria", "성공판단"],
],
"prompts/capture_parse_prompt.md": [
["기본 완료 조건"],
@@ -80,46 +86,8 @@ def main() -> int:
["코드"],
["데이터 실체"],
["검증 증빙"],
],
"prompts/engine_audit_master_prompt_v2.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/engine_audit_master_prompt_v3.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/engine_audit_prompt.md": [
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/low_capability_report_renderer.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/report_renderer_prompt.md": [
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
],
"prompts/weekly_operational_report_master_prompt_v1.md": [
["default completion harness"],
["yaml"],
["code"],
["data artifact", "data/artifact"],
["validation evidence", "검증 증빙"],
["wbs"],
["성공판단"],
],
}
+52
View File
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
def main() -> int:
errors: list[str] = []
spec_path = ROOT / "spec" / "02_data_contract.yaml"
server_path = ROOT / "src" / "quant_engine" / "snapshot_admin_server_v1.py"
collector_path = ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py"
spec_text = spec_path.read_text(encoding="utf-8")
server_text = server_path.read_text(encoding="utf-8")
collector_text = collector_path.read_text(encoding="utf-8")
required_markers = [
("spec/db-first", "DB 기반 수집 결과를 바탕으로 생성된 파생 보고서 증빙"),
("spec/db-first-xlsx", "xlsx는 HTS 잔고·거래내역 판독 또는 DB 반영 이전의 보조 감사 소스"),
("server/json-role", "derived_report_evidence"),
("server/json-evidence", "Derived JSON Evidence Preview"),
("server/collection-trend", "collectionTrendChart"),
("collector/db-canonical", "SQLite as the canonical persistence layer"),
]
for name, marker in required_markers:
haystack = {
"spec/db-first": spec_text,
"spec/db-first-xlsx": spec_text,
"server/json-role": server_text,
"server/json-evidence": server_text,
"server/collection-trend": server_text,
"collector/db-canonical": collector_text,
}[name]
if marker not in haystack:
errors.append(f"missing marker: {name}")
if errors:
print(json.dumps({"gate": "FAIL", "errors": errors}, ensure_ascii=False, indent=2))
return 1
print(json.dumps({"gate": "PASS", "errors": []}, ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
@@ -23,6 +23,15 @@ REQUIRED_PATTERNS = {
"vars.KIS_APP_KEY_TEST",
"vars.KIS_APP_SECRET_TEST",
],
"docs/GITEA_SECRETS_SETUP.md": [
"Temp/kis_tokens.db",
"TOKEN_REFRESH_SKEW_MINUTES=10",
"python tools/inspect_kis_token_cache_v1.py --json",
],
"docs/GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md": [
"Temp/kis_tokens.db",
"TOKEN_REFRESH_SKEW_MINUTES",
],
}
+107
View File
@@ -0,0 +1,107 @@
#!/usr/bin/env python3
from __future__ import annotations
import json
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SOURCE_FILE = ROOT / "src" / "quant_engine" / "kis_api_client_v1.py"
TEST_FILE = ROOT / "tests" / "unit" / "test_kis_api_client_v1.py"
RUNBOOK_FILES = [
ROOT / "docs" / "GITEA_SECRETS_SETUP.md",
ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md",
]
SCAN_FILES = [
ROOT / "src" / "quant_engine" / "kis_api_client_v1.py",
ROOT / "src" / "quant_engine" / "kis_data_collection_v1.py",
ROOT / "tools" / "run_kis_data_collection_v1.py",
ROOT / "tools" / "inspect_kis_token_cache_v1.py",
ROOT / "tools" / "validate_gitea_secrets_contract_v1.py",
ROOT / "tests" / "unit" / "test_kis_api_client_v1.py",
ROOT / "tests" / "unit" / "test_validate_kis_api_credentials_v1.py",
ROOT / "docs" / "GITEA_SECRETS_SETUP.md",
ROOT / "docs" / "GATHERTRADINGDATA_XLSX_OPERATING_RUNBOOK.md",
ROOT / ".gitea" / "workflows" / "kis_data_collection.yml",
ROOT / ".gitea" / "workflows" / "qualitative_sell_strategy.yml",
ROOT / ".gitea" / "workflows" / "ci.yml",
]
FORBIDDEN_PATTERNS = [
r"print\(\s*.*appsecret",
r"print\(\s*.*appkey",
r"logger\.",
r"resp\.text",
r"response\.text",
r"json\.dumps\(\s*\{\s*.*appkey",
r"json\.dumps\(\s*\{\s*.*appsecret",
]
def _scan_text(path: Path, text: str) -> list[str]:
errors: list[str] = []
for pattern in FORBIDDEN_PATTERNS:
if re.search(pattern, text, re.IGNORECASE | re.MULTILINE):
errors.append(f"{path}:{pattern}")
return errors
def _scan_repository() -> list[str]:
errors: list[str] = []
for path in SCAN_FILES:
if not path.exists():
continue
text = path.read_text(encoding="utf-8")
errors.extend(_scan_text(path, text))
return errors
def main() -> int:
errors: list[str] = []
evidence: dict[str, dict[str, bool]] = {}
if not SOURCE_FILE.exists():
errors.append(f"missing:{SOURCE_FILE}")
else:
text = SOURCE_FILE.read_text(encoding="utf-8")
errors.extend(_scan_text(SOURCE_FILE, text))
evidence[str(SOURCE_FILE)] = {
"sanitized_token_refresh_error": "KIS token refresh failed; check credentials and API availability." in text,
"sanitized_readonly_error": "KIS read-only request failed for" in text,
"token_cache_db": "kis_tokens.db" in text,
}
if not TEST_FILE.exists():
errors.append(f"missing:{TEST_FILE}")
else:
text = TEST_FILE.read_text(encoding="utf-8")
evidence[str(TEST_FILE)] = {
"token_cache_tests": "test_issue_or_reuse_token_with_sqlite_db_cache" in text,
"token_override_tests": "test_issue_or_reuse_token_honors_token_db_override" in text,
}
for path in RUNBOOK_FILES:
if not path.exists():
errors.append(f"missing:{path}")
continue
text = path.read_text(encoding="utf-8")
evidence[str(path)] = {
"mentions_token_cache": "Temp/kis_tokens.db" in text,
"mentions_refresh_skew": "TOKEN_REFRESH_SKEW_MINUTES" in text,
}
errors.extend(_scan_repository())
result = {
"formula_id": "KIS_TOKEN_HYGIENE_V1",
"gate": "PASS" if not errors else "FAIL",
"errors": errors,
"evidence": evidence,
}
out = ROOT / "Temp" / "kis_token_hygiene_v1.json"
out.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=False, indent=2))
return 0 if not errors else 1
if __name__ == "__main__":
raise SystemExit(main())
+22 -2
View File
@@ -120,7 +120,8 @@ def main() -> int:
try:
_wait_for_server(base_url)
html = _read_text(f"{base_url}/")
home_html = _read_text(f"{base_url}/")
html = _read_text(f"{base_url}/workspace")
state = _read_json(f"{base_url}/api/state")
tables_payload = _read_json(f"{base_url}/api/tables")
export_payload = _read_json(f"{base_url}/api/export")
@@ -139,6 +140,10 @@ def main() -> int:
"workspace": state.get("summary", {}),
}
packet_response = _post_json(f"{base_url}/api/approval_packet", {"packet": approval_packet})
if "Snapshot Admin Home" not in home_html:
errors.append("home_title_missing")
if "Open workspace" not in home_html or "Open collection" not in home_html:
errors.append("home_navigation_missing")
if "Snapshot Admin" not in html:
errors.append("html_title_missing")
if "contenteditable" not in html:
@@ -171,7 +176,14 @@ def main() -> int:
if "Read only" not in tables_html or "Save current table" not in tables_html:
errors.append("table_browser_source_labels_missing")
collection_html = _read_text(f"{base_url}/collection")
if "KIS Collection Dashboard" not in collection_html or "Download CSV" not in collection_html or "Ticker quick search" not in collection_html or "Date quick search" not in collection_html:
if (
"KIS Collection Dashboard" not in collection_html
or "Download CSV" not in collection_html
or "Ticker quick search" not in collection_html
or "Date quick search" not in collection_html
or "collectionLiveStatus" not in collection_html
or "live source: unknown" not in collection_html
):
errors.append("collection_dashboard_page_missing")
if int(state.get("summary", {}).get("settings_rows") or 0) <= 0:
errors.append("settings_rows_missing")
@@ -200,6 +212,14 @@ def main() -> int:
errors.append("collection_counts_missing")
if "latest_report" not in collection:
errors.append("collection_latest_report_missing")
latest_report = collection.get("latest_report", {})
if isinstance(latest_report, dict):
if latest_report.get("input_json") and "GatherTradingData.json" not in str(latest_report.get("input_json")):
errors.append("collection_latest_report_input_mismatch")
if not latest_report.get("sqlite_db"):
errors.append("collection_latest_report_sqlite_missing")
if collection.get("output_json_path") and "kis_data_collection_v1.json" not in str(collection.get("output_json_path")):
errors.append("collection_output_json_path_mismatch")
if "data" not in export_payload:
errors.append("export_missing_data")
if packet_response.get("gate") != "PASS":
+20 -7
View File
@@ -827,6 +827,7 @@ def main() -> int:
validate_json_schema_minimal(schema, sample, errors)
validate_formula_registry(errors)
validate_kis_token_hygiene(errors)
validate_output_rendering_contract(schema, errors)
validate_harness_contract_consistency(errors)
validate_spec_code_sync(errors)
@@ -902,13 +903,25 @@ def main() -> int:
if bundle.exists():
load_yaml(bundle, errors)
if errors:
print("VALIDATION FAIL")
for err in errors:
print(f"- {err}")
return 1
print("VALIDATION OK")
return 0
def validate_kis_token_hygiene(errors: list[str]) -> None:
import importlib.util
module_path = ROOT / "tools" / "validate_kis_token_hygiene_v1.py"
spec = importlib.util.spec_from_file_location("validate_kis_token_hygiene_v1", module_path)
if spec is None or spec.loader is None:
fail(errors, f"unable to load KIS token hygiene validator: {module_path}")
return
kis_hygiene = importlib.util.module_from_spec(spec)
spec.loader.exec_module(kis_hygiene)
try:
rc = kis_hygiene.main()
except Exception as exc: # pragma: no cover - validator integration gate
fail(errors, f"KIS token hygiene validator raised: {type(exc).__name__}: {exc}")
return
if rc != 0:
fail(errors, "KIS token hygiene validator failed")
if __name__ == "__main__":