#!/usr/bin/env python3 import argparse import sys import os import json import time import subprocess import hashlib from pathlib import Path import yaml ROOT = Path(__file__).resolve().parents[1] LINEAGE_LOG = ROOT / "runtime" / "lineage_events.jsonl" REPORT = ROOT / "Temp" / "release_dag_run_v3.json" def file_sha256(path: Path) -> str: if not path.exists(): return "" h = hashlib.sha256() try: with path.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest() except Exception: return "" def compute_combined_hash(paths: list[str]) -> str: hashes = [] for p in paths: path = ROOT / p h = file_sha256(path) if h: hashes.append(h) if not hashes: return "" return hashlib.sha256("".join(hashes).encode("utf-8")).hexdigest() def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--dag", default="spec/41_release_dag.yaml") parser.add_argument( "--mode", choices=["release", "quick", "package-only", "audit-only", "full"], default="release" ) parser.add_argument("--strict", action="store_true") args = parser.parse_args() dag_path = ROOT / args.dag if not dag_path.exists(): print(f"DAG file not found: {dag_path}") return 1 try: data = yaml.safe_load(dag_path.read_text(encoding="utf-8")) except Exception as e: print(f"Error parsing YAML: {e}") return 1 nodes = data["dag"]["nodes"] # 1. Identify target nodes based on mode target_nodes = [] if args.mode == "release": target_nodes = [nid for nid in nodes if nid.startswith("validate_")] elif args.mode == "quick": target_nodes = ["validate_specs", "validate_active_manifest"] elif args.mode == "package-only": target_nodes = [nid for nid in nodes if nid.startswith("build_")] + ["prepare_zip"] elif args.mode == "audit-only": target_nodes = [nid for nid in nodes if nid.startswith("audit_")] else: # full target_nodes = list(nodes.keys()) # 2. Compute closure closure = set() def add_to_closure(nid): if nid not in closure: closure.add(nid) for dep in nodes[nid].get("depends_on") or []: if dep in nodes: add_to_closure(dep) for nid in target_nodes: add_to_closure(nid) # 3. Topological sort of closure visited = set() temp = set() order = [] def visit(nid): if nid in temp: raise ValueError(f"Cycle detected involving {nid}") if nid not in visited: temp.add(nid) for dep in nodes[nid].get("depends_on") or []: if dep in closure: visit(dep) temp.remove(nid) visited.add(nid) order.append(nid) for nid in closure: if nid not in visited: try: visit(nid) except ValueError as e: print(e) return 1 steps_run = [] success = True LINEAGE_LOG.parent.mkdir(parents=True, exist_ok=True) print(f"Executing DAG mode: {args.mode} (closure size: {len(order)})") for nid in order: node = nodes[nid] # Optimization: skip build nodes if outputs exist and mode is "release" (optional, but keep it deterministic for now) # For now, run everything in the closure. cmd = list(node["command"]) if cmd and cmd[0] == "python": cmd[0] = sys.executable print(f"Running node: {nid} ...") start_time = time.time() input_hash = compute_combined_hash(node.get("inputs") or []) env = dict(os.environ) env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "") proc = subprocess.run( cmd, cwd=ROOT, capture_output=True, text=True, encoding="utf-8", errors="replace", env=env ) elapsed = round(time.time() - start_time, 3) output_hash = compute_combined_hash(node.get("outputs") or []) gate = "PASS" if proc.returncode == 0 else "FAIL" # Log lineage event event = { "node_id": nid, "command": " ".join(cmd), "returncode": proc.returncode, "elapsed_sec": elapsed, "gate": gate, "input_hash": input_hash, "output_hash": output_hash, "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) } with LINEAGE_LOG.open("a", encoding="utf-8") as lf: lf.write(json.dumps(event, ensure_ascii=False) + "\n") steps_run.append({ "node_id": nid, "command": " ".join(cmd), "returncode": proc.returncode, "gate": gate, "executed_due_to_dependency": nid not in target_nodes }) if proc.returncode != 0: print(f"Node {nid} failed with returncode {proc.returncode}") print(proc.stderr) if node.get("warn_only", False): print(f"Node {nid} is warn_only - continuing") else: success = False if node.get("strict", True) or args.strict: break # Save report REPORT.parent.mkdir(parents=True, exist_ok=True) REPORT.write_text(json.dumps({ "formula_id": "RELEASE_DAG_RUN_V4", "mode": args.mode, "steps": steps_run, "gate": "PASS" if success else "FAIL" }, ensure_ascii=False, indent=2), encoding="utf-8") print(json.dumps({ "formula_id": "RELEASE_DAG_RUN_V4", "mode": args.mode, "step_count": len(steps_run), "gate": "PASS" if success else "FAIL" }, ensure_ascii=True, indent=2)) return 0 if success else 1 if __name__ == "__main__": sys.exit(main())