ee3e799de1
주요 변경: - tools/build_rebalance_engine_v1.py: REBALANCE_ENGINE_V1 신규 * account_snapshot 직접 합산(_build_snap_position_map) → 소수주 분리 행 병합 * 레짐 소스 macro.REGIME_PRELIM 최우선 (GAS 와 동일) - src/gas_adapter_parts/gdf_06_rebalance.gs: runRebalanceSheet_() 신규 * Logger.log / getSpreadsheet_() 로 run_all 연동 수정 - src/gas_adapter_parts/gdc_01_fetch_fundamentals.gs * _mergePositionRecord_(): 소수주 중복 행 합산 신규 * parseInt → parseFloat (qty, availQty) - src/gas_adapter_parts/gdf_01_price_metrics.gs * 미보유 종목 SELL_READY → WATCH_EXIT_SIGNAL - spec/41_release_dag.yaml: build_rebalance_sheet 노드 추가 (step_count 63) - spec/51_formula_lifecycle_registry.yaml: REBALANCE_ENGINE_V1 등록 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
177 lines
6.3 KiB
Python
177 lines
6.3 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import yaml
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
# Base whitelist of allowed Temp files for runtime read
|
|
BASE_WHITELIST = {
|
|
"Temp/final_decision_packet_active.json",
|
|
"Temp/operational_report.json",
|
|
"Temp/operational_report.md",
|
|
"Temp/number_provenance_ledger_v4.json",
|
|
"Temp/final_context_for_llm_v5.yaml",
|
|
"Temp/final_context_for_llm_v4.yaml",
|
|
"Temp/live_replay_separation_v2.json",
|
|
"Temp/live_replay_separation_v3.json",
|
|
"Temp/shadow_ledger_v2.json",
|
|
"Temp/late_chase_attribution_v2.json",
|
|
"Temp/value_preservation_scorer_v2.json",
|
|
"Temp/engine_health_card_v1.json",
|
|
"Temp/operating_cadence_signal_v1.json",
|
|
"Temp/change_request_audit_v1.json",
|
|
"Temp/low_capability_llm_regression_v1.json",
|
|
"Temp/report_numeric_consistency_guard_v2.json",
|
|
"Temp/release_dag_run_v3.json",
|
|
"Temp/release_dag_run_v2.json",
|
|
"Temp/release_dag_run_v1.json",
|
|
"Temp/runtime_source_whitelist_audit_v1.json",
|
|
}
|
|
|
|
EXEMPT_FILES = {
|
|
"tools/clean_temp_artifacts_v1.py",
|
|
"tools/lint_repo_hygiene.py",
|
|
"src/quant_engine/refactor_master_helpers.py",
|
|
"tools/audit_repository_entropy_v2.py",
|
|
"tools/audit_repository_entropy_v1.py",
|
|
"tools/sync_active_manifest_with_canonical_v1.py",
|
|
}
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--manifest", default="runtime/active_artifact_manifest.yaml")
|
|
ap.add_argument("--scan", nargs="+", default=["src", "tools"])
|
|
args = ap.parse_args()
|
|
|
|
manifest_path = ROOT / args.manifest
|
|
whitelist = set(BASE_WHITELIST)
|
|
|
|
if manifest_path.exists():
|
|
try:
|
|
manifest = yaml.safe_load(manifest_path.read_text(encoding="utf-8"))
|
|
active_aliases = manifest.get("active_aliases", {})
|
|
for val in active_aliases.values():
|
|
whitelist.add(val.replace("\\", "/"))
|
|
except Exception as e:
|
|
print(f"Warning: Failed to load manifest: {e}")
|
|
|
|
# Convert whitelist elements to standard format
|
|
whitelist = {w.lower() for w in whitelist}
|
|
|
|
violations = []
|
|
deprecated_reads = 0
|
|
archive_reads = 0
|
|
|
|
# Glob search patterns in scan directories
|
|
scan_paths = []
|
|
for s in args.scan:
|
|
path = ROOT / s
|
|
if path.is_file():
|
|
scan_paths.append(path)
|
|
elif path.is_dir():
|
|
scan_paths.extend(path.rglob("*"))
|
|
|
|
# Also check gas_*.gs files in ROOT
|
|
for f in ROOT.glob("gas_*.gs"):
|
|
scan_paths.append(f)
|
|
|
|
# Exclude directories like __pycache__ and this script itself
|
|
this_file = Path(__file__).resolve()
|
|
|
|
for p in scan_paths:
|
|
if not p.is_file():
|
|
continue
|
|
if p.resolve() == this_file:
|
|
continue
|
|
if "__pycache__" in p.parts or ".git" in p.parts or ".claude" in p.parts:
|
|
continue
|
|
if p.suffix not in (".py", ".gs", ".js"):
|
|
continue
|
|
|
|
rel_path = str(p.relative_to(ROOT)).replace("\\", "/")
|
|
if rel_path in EXEMPT_FILES:
|
|
continue
|
|
|
|
# Skip check for build, validate, run, render, audit scripts in tools using relative path
|
|
path_parts = Path(rel_path).parts
|
|
if path_parts and path_parts[0] == "tools" and (
|
|
p.name.startswith("build_") or
|
|
p.name.startswith("validate_") or
|
|
p.name.startswith("run_") or
|
|
p.name.startswith("render_") or
|
|
p.name.startswith("emit_") or
|
|
p.name.startswith("clean_") or
|
|
p.name.startswith("lint_") or
|
|
p.name.startswith("audit_")
|
|
):
|
|
continue
|
|
|
|
try:
|
|
content = p.read_text(encoding="utf-8")
|
|
except Exception:
|
|
# Skip unreadable files
|
|
continue
|
|
|
|
# Look for globbing Temp
|
|
if "glob" in content.lower() and "temp" in content.lower():
|
|
if re.search(r"glob.*\btemp\b", content, re.IGNORECASE) or re.search(r"\btemp\b.*glob", content, re.IGNORECASE):
|
|
violations.append({
|
|
"file": rel_path,
|
|
"line": 0,
|
|
"reason": "Direct globbing of Temp/ directory is forbidden."
|
|
})
|
|
|
|
for lineno, line in enumerate(content.splitlines(), start=1):
|
|
line_lower = line.lower()
|
|
|
|
# Check for archive path reads
|
|
if "archive/" in line_lower or "archive\\" in line_lower:
|
|
if "read_for_audit_only" not in line:
|
|
violations.append({
|
|
"file": rel_path,
|
|
"line": lineno,
|
|
"reason": "Access to archive/ directory is only allowed if annotated with '# read_for_audit_only'."
|
|
})
|
|
archive_reads += 1
|
|
|
|
# Check for Temp/ reads
|
|
matches = re.findall(r"['\"](temp[/\\][^'\"]+)['\"]", line, re.IGNORECASE)
|
|
for m in matches:
|
|
normalized_path = m.replace("\\", "/").lower()
|
|
if normalized_path not in whitelist:
|
|
is_write = any(w in line_lower for w in ["write", "save", "dump", "output", "open(..., 'w'", "open(..., \"w\""])
|
|
if not is_write:
|
|
violations.append({
|
|
"file": rel_path,
|
|
"line": lineno,
|
|
"reason": f"Read access to non-whitelisted Temp file: {m}"
|
|
})
|
|
deprecated_reads += 1
|
|
|
|
result = {
|
|
"formula_id": "RUNTIME_SOURCE_WHITELIST_AUDIT_V1",
|
|
"deprecated_runtime_read_count": deprecated_reads,
|
|
"archive_runtime_read_count": archive_reads,
|
|
"active_alias_resolution_pct": 100.0 if deprecated_reads == 0 else 0.0,
|
|
"violation_count": len(violations),
|
|
"violations": violations[:100],
|
|
"gate": "PASS" if not violations else "FAIL"
|
|
}
|
|
|
|
out_path = ROOT / "Temp" / "runtime_source_whitelist_audit_v1.json"
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding="utf-8")
|
|
|
|
print(json.dumps(result, ensure_ascii=True, indent=2))
|
|
|
|
return 0 if not violations else 1
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
sys.exit(main())
|