Files
QuantEngineByItz/tools/validate_runtime_source_whitelist_v1.py
kjh2064 ee3e799de1 feat: 리밸런싱 엔진 V1 + GAS 버그 수정 (2026-06-13)
주요 변경:
- tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규
  * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합
  * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일)
- src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규
  * Logger.log / getSpreadsheet_() 로 run_all 연동 수정
- src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs
  * _mergePositionRecord_(): 소수주 중복 행 합산 신규
  * parseInt → parseFloat (qty, availQty)
- src/gas_adapter_parts/gdf_01_price_metrics.gs
  * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL
- spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63)
- spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-13 13:20:14 +09:00

177 lines
6.3 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import yaml
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
# Base whitelist of allowed Temp files for runtime read
BASE_WHITELIST = {
"Temp/final_decision_packet_active.json",
"Temp/operational_report.json",
"Temp/operational_report.md",
"Temp/number_provenance_ledger_v4.json",
"Temp/final_context_for_llm_v5.yaml",
"Temp/final_context_for_llm_v4.yaml",
"Temp/live_replay_separation_v2.json",
"Temp/live_replay_separation_v3.json",
"Temp/shadow_ledger_v2.json",
"Temp/late_chase_attribution_v2.json",
"Temp/value_preservation_scorer_v2.json",
"Temp/engine_health_card_v1.json",
"Temp/operating_cadence_signal_v1.json",
"Temp/change_request_audit_v1.json",
"Temp/low_capability_llm_regression_v1.json",
"Temp/report_numeric_consistency_guard_v2.json",
"Temp/release_dag_run_v3.json",
"Temp/release_dag_run_v2.json",
"Temp/release_dag_run_v1.json",
"Temp/runtime_source_whitelist_audit_v1.json",
}
EXEMPT_FILES = {
"tools/clean_temp_artifacts_v1.py",
"tools/lint_repo_hygiene.py",
"src/quant_engine/refactor_master_helpers.py",
"tools/audit_repository_entropy_v2.py",
"tools/audit_repository_entropy_v1.py",
"tools/sync_active_manifest_with_canonical_v1.py",
}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--manifest", default="runtime/active_artifact_manifest.yaml")
ap.add_argument("--scan", nargs="+", default=["src", "tools"])
args = ap.parse_args()
manifest_path = ROOT / args.manifest
whitelist = set(BASE_WHITELIST)
if manifest_path.exists():
try:
manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8"))
active_aliases = manifest.get("active_aliases", {})
for val in active_aliases.values():
whitelist.add(val.replace("\\", "/"))
except Exception as e:
print(f"Warning: Failed to load manifest: {e}")
# Convert whitelist elements to standard format
whitelist = {w.lower() for w in whitelist}
violations = []
deprecated_reads = 0
archive_reads = 0
# Glob search patterns in scan directories
scan_paths = []
for s in args.scan:
path = ROOT / s
if path.is_file():
scan_paths.append(path)
elif path.is_dir():
scan_paths.extend(path.rglob("*"))
# Also check gas_*.gs files in ROOT
for f in ROOT.glob("gas_*.gs"):
scan_paths.append(f)
# Exclude directories like __pycache__ and this script itself
this_file = Path(__file__).resolve()
for p in scan_paths:
if not p.is_file():
continue
if p.resolve() == this_file:
continue
if "__pycache__" in p.parts or ".git" in p.parts or ".claude" in p.parts:
continue
if p.suffix not in (".py", ".gs", ".js"):
continue
rel_path = str(p.relative_to(ROOT)).replace("\\", "/")
if rel_path in EXEMPT_FILES:
continue
# Skip check for build, validate, run, render, audit scripts in tools using relative path
path_parts = Path(rel_path).parts
if path_parts and path_parts[0] == "tools" and (
p.name.startswith("build_") or
p.name.startswith("validate_") or
p.name.startswith("run_") or
p.name.startswith("render_") or
p.name.startswith("emit_") or
p.name.startswith("clean_") or
p.name.startswith("lint_") or
p.name.startswith("audit_")
):
continue
try:
content = p.read_text(encoding="utf-8")
except Exception:
# Skip unreadable files
continue
# Look for globbing Temp
if "glob" in content.lower() and "temp" in content.lower():
if re.search(r"glob.*\btemp\b", content, re.IGNORECASE) or re.search(r"\btemp\b.*glob", content, re.IGNORECASE):
violations.append({
"file": rel_path,
"line": 0,
"reason": "Direct globbing of Temp/ directory is forbidden."
})
for lineno, line in enumerate(content.splitlines(), start=1):
line_lower = line.lower()
# Check for archive path reads
if "archive/" in line_lower or "archive\\" in line_lower:
if "read_for_audit_only" not in line:
violations.append({
"file": rel_path,
"line": lineno,
"reason": "Access to archive/ directory is only allowed if annotated with '# read_for_audit_only'."
})
archive_reads += 1
# Check for Temp/ reads
matches = re.findall(r"['\"](temp[/\\][^'\"]+)['\"]", line, re.IGNORECASE)
for m in matches:
normalized_path = m.replace("\\", "/").lower()
if normalized_path not in whitelist:
is_write = any(w in line_lower for w in ["write", "save", "dump", "output", "open(..., 'w'", "open(..., \"w\""])
if not is_write:
violations.append({
"file": rel_path,
"line": lineno,
"reason": f"Read access to non-whitelisted Temp file: {m}"
})
deprecated_reads += 1
result = {
"formula_id": "RUNTIME_SOURCE_WHITELIST_AUDIT_V1",
"deprecated_runtime_read_count": deprecated_reads,
"archive_runtime_read_count": archive_reads,
"active_alias_resolution_pct": 100.0 if deprecated_reads == 0 else 0.0,
"violation_count": len(violations),
"violations": violations[:100],
"gate": "PASS" if not violations else "FAIL"
}
out_path = ROOT / "Temp" / "runtime_source_whitelist_audit_v1.json"
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps(result, ensure_ascii=True, indent=2))
return 0 if not violations else 1
if __name__ == "__main__":
import sys
sys.exit(main())