from __future__ import annotations import argparse import json import zipfile from datetime import datetime, timezone from pathlib import Path from tools.orchestration_harness_v1 import run_plan from src.quant_engine.pipeline_runtime_anomaly_lib_v1 import finalize_runtime_profile, runtime_profile_from_steps ROOT = Path(__file__).resolve().parents[2] DEFAULT_OUTPUT = ROOT.parent / f"{ROOT.name}.zip" DEFAULT_PROFILE = ROOT / "Temp" / "pipeline_runtime_profile_v1.json" DEFAULT_GATE_JSON = ROOT / "Temp" / "engine_harness_gate_result.json" DEFAULT_HARDENING_V2 = ROOT / "Temp" / "strategy_hardening_harness_v2.json" DEFAULT_DQ_LOCK_V2 = ROOT / "Temp" / "data_integrity_100_lock_v2.json" RUNTIME_PROFILE = ROOT / "Temp" / "pipeline_runtime_profile_v1.json" UPLOAD_KEEP_FILES = { "AGENTS.md", "README.md", "package.json", "RetirementAssetPortfolio.yaml", "RetirementAssetPortfolioReportTemplate.yaml", "GatherTradingData.json", "gas_data_feed.gs", "gas_data_collect.gs", "gas_lib.gs", "gas_harness_rows.gs", "gas_report.gs", "gas_event_calendar.gs", "gas_apex_alpha_watch.gs", "gas_apex_runtime_core.gs" } UPLOAD_KEEP_DIRS = { "artifacts", "docs", "dist", "examples", "governance", "prompts", "runtime", "schemas", "spec", "src", "suggest", "tests", "tools", "Temp", } ALWAYS_EXCLUDE_DIRS = { ".git", ".claude", "node_modules", "__pycache__", ".pytest_cache", "tmp", } ALWAYS_EXCLUDE_SUFFIXES = { ".pyc", ".pyo", } LEGACY_SEED_FILES = { "sector_targets.json", } TEMP_EXCLUDED_FILES = { "build_bundle_cache_v1.json", } TEMP_KEEP_FILES = { "final_decision_packet_active.json", "final_decision_packet_v4.json", "operational_report.json", "operational_report.md", "release_dag_run_v1.json", "pipeline_runtime_profile_v1.json", "engine_harness_gate_result.json", "strategy_hardening_harness_v2.json", "data_integrity_100_lock_v2.json", "number_provenance_ledger_v4.json", "final_context_for_llm_v4.yaml", "live_replay_separation_v2.json", "formula_runtime_registry_v1.json", "canonical_artifact_resolver_v1.json", "final_execution_decision_v2.json", "prediction_accuracy_harness_v2.json", "single_truth_ledger_v2.json", "smart_cash_recovery_v7.json", "smart_cash_recovery_v9.json", # Data Analysis & Verification Reports "horizon_rebalance_plan_v1.json", "factor_lifecycle_completeness_v1.json", "factor_shadow_eligibility_v1.json", "algorithm_guidance_proof_v1.json", "strategy_routing_audit_v1.json", } UPLOAD_KEEP_DIRS_UPLOAD = { "artifacts", "docs", "governance", "runtime", "spec", "Temp", } def _load_json(path: Path) -> dict: if not path.exists(): return {} try: obj = json.loads(path.read_text(encoding="utf-8")) except Exception: return {} return obj if isinstance(obj, dict) else {} def _is_fresh(path: Path, max_minutes: int) -> bool: if not path.exists(): return False age_seconds = (datetime.now(timezone.utc).timestamp() - path.stat().st_mtime) return age_seconds <= max_minutes * 60 def _quick_gate_ready(max_minutes: int = 60) -> tuple[bool, list[str]]: missing: list[str] = [] required = [DEFAULT_GATE_JSON, DEFAULT_HARDENING_V2, DEFAULT_DQ_LOCK_V2] for p in required: if not p.exists(): missing.append(f"MISSING:{p}") elif not _is_fresh(p, max_minutes): missing.append(f"STALE:{p}") gate = _load_json(DEFAULT_GATE_JSON) if gate.get("status") != "OK": missing.append("ENGINE_GATE_NOT_OK") failed_checks = gate.get("failed_checks") if not isinstance(failed_checks, list) or len(failed_checks) != 0: missing.append("ENGINE_GATE_FAILED_CHECKS_NOT_EMPTY") return len(missing) == 0, missing def should_include(path: Path, mode: str, include_xlsx: bool, include_backups: bool) -> bool: rel = path.relative_to(ROOT) parts = rel.parts if any(part in ALWAYS_EXCLUDE_DIRS for part in parts): return False if path.suffix in ALWAYS_EXCLUDE_SUFFIXES: return False if path.name == DEFAULT_OUTPUT.name: return False if parts[0] == "Temp": if path.name in TEMP_EXCLUDED_FILES: return False if mode != "full" and path.name not in TEMP_KEEP_FILES: return False if path.name in LEGACY_SEED_FILES: return False if not include_backups and ".bak" in path.name: return False if not include_xlsx and path.suffix.lower() in {".xlsm", ".xls"}: return False if mode == "full": return True top = parts[0] if len(parts) == 1: return path.name in UPLOAD_KEEP_FILES # Strictly exclude code directories (src, tools, tests, dist) in upload mode to limit LLM context if top not in UPLOAD_KEEP_DIRS_UPLOAD: return False if top == "tools" and path.name.endswith(".bak"): return False return True def build_zip(output: Path, mode: str, include_xlsx: bool, include_backups: bool) -> int: output = output.resolve() output.parent.mkdir(parents=True, exist_ok=True) if output.exists(): output.unlink() files = [ path for path in ROOT.rglob("*") if path.is_file() and should_include(path, mode, include_xlsx, include_backups) ] files.sort(key=lambda p: str(p.relative_to(ROOT)).lower()) with zipfile.ZipFile(output, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zf: for path in files: arcname = Path(ROOT.name) / path.relative_to(ROOT) zf.write(path, arcname.as_posix()) size_kb = output.stat().st_size / 1024 print(f"ZIP OK: {output} files={len(files)} size={size_kb:.1f}KB mode={mode}") return len(files) def main() -> int: parser = argparse.ArgumentParser(description="Convert market xlsx to JSON and zip this folder for upload.") parser.add_argument("--mode", choices=["upload", "full"], default="upload", help="upload excludes heavy/noisy files; full includes the current folder except cache/temp.") parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT, help="Zip output path. Default is parent/.zip.") parser.add_argument("--include-xlsx", action="store_true", help="Include xlsx/xlsm/xls files in the zip.") parser.add_argument("--include-backups", action="store_true", help="Include .bak files in the zip.") parser.add_argument("--skip-convert", action="store_true", help="Do not regenerate GatherTradingData.json.") parser.add_argument("--skip-validate", action="store_true", help="Do not run validation before zipping.") parser.add_argument("--validation-mode", choices=["release", "quick", "package-only"], default="release", help="Validation depth for packaging.") parser.add_argument("--profile", action="store_true", help="Write runtime profile JSON.") args = parser.parse_args() prof_steps: list[dict[str, object]] = [] skipped_steps: list[str] = [] gate_status = "UNKNOWN" if args.skip_validate and args.validation_mode == "release": print(f"P0-01 ENFORCEMENT: skip_validate=True ignored because validation_mode=release") args.skip_validate = False runtime_context = { "harness_name": "prepare-upload-zip", "mode": args.mode, "validation_mode": args.validation_mode, "include_xlsx": args.include_xlsx, "include_backups": args.include_backups, "skip_validate": args.skip_validate, } if args.skip_validate: plan = [] if not args.skip_convert: plan.append({"name": "prepare", "command": ["npm", "run", "ops:prepare"]}) plan.append({ "name": "zip", "depends_on": ["prepare"] if not args.skip_convert else [], "callable": lambda: build_zip(args.output, args.mode, args.include_xlsx, args.include_backups), }) prof_steps = run_plan(plan, label="prepare-upload-zip:skip-validate") gate_status = "SKIPPED" else: if args.validation_mode == "release": plan = [] if not args.skip_convert: plan.append({"name": "prepare", "command": ["npm", "run", "ops:prepare"]}) plan.extend([ {"name": "release_full", "command": ["npm", "run", "ops:release"], "depends_on": ["prepare"] if not args.skip_convert else []}, { "name": "zip", "depends_on": ["release_full"], "callable": lambda: build_zip(args.output, args.mode, args.include_xlsx, args.include_backups), }, ]) prof_steps = run_plan(plan, label="prepare-upload-zip:release") gate_status = "OK" elif args.validation_mode == "quick": ready, reasons = _quick_gate_ready(max_minutes=60) if ready: skipped_steps.append("release-gate-reused-recent-artifacts") gate_status = "OK" plan = [] if not args.skip_convert: plan.append({"name": "prepare", "command": ["npm", "run", "ops:prepare"]}) plan.extend([ { "name": "build_bundle", "command": ["npm", "run", "ops:build"], }, { "name": "zip", "depends_on": ["build_bundle", "prepare"] if not args.skip_convert else ["build_bundle"], "callable": lambda: build_zip(args.output, args.mode, args.include_xlsx, args.include_backups), }, ]) prof_steps = run_plan(plan, label="prepare-upload-zip:quick-reused") else: print("QUICK_MODE_FALLBACK_RELEASE_GATE:", ";".join(reasons)) plan = [] if not args.skip_convert: plan.append({"name": "prepare", "command": ["npm", "run", "ops:prepare"]}) plan.extend([ {"name": "release_gate", "command": ["npm", "run", "ops:validate"], "depends_on": ["prepare"] if not args.skip_convert else []}, {"name": "build_bundle", "command": ["npm", "run", "ops:build"]}, { "name": "zip", "depends_on": ["release_gate", "build_bundle"], "callable": lambda: build_zip(args.output, args.mode, args.include_xlsx, args.include_backups), }, ]) prof_steps = run_plan(plan, label="prepare-upload-zip:quick-fallback") gate_status = "OK" else: # package-only ready, reasons = _quick_gate_ready(max_minutes=60 * 24) if not ready: raise SystemExit("PACKAGE_ONLY_BLOCKED: " + ";".join(reasons)) skipped_steps.append("all-validation-reused-existing-gate") gate_status = "OK" plan = [] if not args.skip_convert: plan.append({"name": "prepare", "command": ["npm", "run", "ops:prepare"]}) plan.extend([ { "name": "build_bundle", "command": ["npm", "run", "ops:build"], }, { "name": "zip", "depends_on": ["build_bundle", "prepare"] if not args.skip_convert else ["build_bundle"], "callable": lambda: build_zip(args.output, args.mode, args.include_xlsx, args.include_backups), }, ]) prof_steps = run_plan(plan, label="prepare-upload-zip:package-only") total = round(sum(float(s.get("elapsed_sec") or 0.0) for s in prof_steps), 3) payload = runtime_profile_from_steps( harness_name="prepare-upload-zip", mode=args.validation_mode, steps=prof_steps, runtime_context=runtime_context, file_count=len([ path for path in ROOT.rglob("*") if path.is_file() and should_include(path, args.mode, args.include_xlsx, args.include_backups) ]), elapsed_sec_total=total, skipped_duplicate_steps=skipped_steps, gate_status=gate_status, ) min_samples = 1 if args.validation_mode == "package-only" else 5 analysis = finalize_runtime_profile(profile_path=RUNTIME_PROFILE, payload=payload, min_samples=min_samples) if analysis.get("status") == "ALERT": print("RUNTIME_ANOMALY:", ";".join(analysis.get("anomaly_reason_codes") or [])) return 0 if __name__ == "__main__": raise SystemExit(main())