Files
QuantEngineByItz/tools/run_release_dag_v3.py

222 lines
6.5 KiB
Python

#!/usr/bin/env python3
import argparse
import sys
import os
import json
import time
import subprocess
import hashlib
from pathlib import Path
import yaml
ROOT = Path(__file__).resolve().parents[1]
LINEAGE_LOG = ROOT / "runtime" / "lineage_events.jsonl"
REPORT = ROOT / "Temp" / "release_dag_run_v3.json"
def file_sha256(path: Path) -> str:
if not path.exists():
return ""
h = hashlib.sha256()
try:
with path.open("rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
except Exception:
return ""
def compute_combined_hash(paths: list[str]) -> str:
hashes = []
for p in paths:
path = ROOT / p
h = file_sha256(path)
if h:
hashes.append(h)
if not hashes:
return ""
return hashlib.sha256("".join(hashes).encode("utf-8")).hexdigest()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dag", default="spec/41_release_dag.yaml")
parser.add_argument(
"--mode",
choices=["release", "quick", "package-only", "audit-only", "full"],
default="release"
)
parser.add_argument("--strict", action="store_true")
args = parser.parse_args()
dag_path = ROOT / args.dag
if not dag_path.exists():
print(f"DAG file not found: {dag_path}")
return 1
try:
data = yaml.safe_load(dag_path.read_text(encoding="utf-8"))
except Exception as e:
print(f"Error parsing YAML: {e}")
return 1
nodes = data["dag"]["nodes"]
# 1. Identify target nodes based on mode
target_nodes = []
if args.mode == "release":
target_nodes = [nid for nid in nodes if nid.startswith("validate_")]
elif args.mode == "quick":
target_nodes = ["validate_specs", "validate_active_manifest"]
elif args.mode == "package-only":
target_nodes = [nid for nid in nodes if nid.startswith("build_")] + ["prepare_zip"]
elif args.mode == "audit-only":
target_nodes = [nid for nid in nodes if nid.startswith("audit_")]
else: # full
target_nodes = list(nodes.keys())
# 2. Compute closure
closure = set()
def add_to_closure(nid):
if nid not in closure:
closure.add(nid)
for dep in sorted(nodes[nid].get("depends_on") or []):
if dep in nodes:
add_to_closure(dep)
for nid in target_nodes:
add_to_closure(nid)
# 3. Topological sort of closure
visited = set()
temp = set()
order = []
def visit(nid):
if nid in temp:
raise ValueError(f"Cycle detected involving {nid}")
if nid not in visited:
temp.add(nid)
for dep in sorted(nodes[nid].get("depends_on") or []):
if dep in closure:
visit(dep)
temp.remove(nid)
visited.add(nid)
order.append(nid)
for nid in sorted(closure):
if nid not in visited:
try:
visit(nid)
except ValueError as e:
print(e)
return 1
steps_run = []
success = True
warning_failed = False
warning_failures: list[dict[str, object]] = []
LINEAGE_LOG.parent.mkdir(parents=True, exist_ok=True)
print(f"Executing DAG mode: {args.mode} (closure size: {len(order)})")
for nid in order:
node = nodes[nid]
# Optimization: skip build nodes if outputs exist and mode is "release" (optional, but keep it deterministic for now)
# For now, run everything in the closure.
cmd = list(node["command"])
if cmd and cmd[0] == "python":
cmd[0] = sys.executable
print(f"Running node: {nid} ...")
start_time = time.time()
input_hash = compute_combined_hash(node.get("inputs") or [])
env = dict(os.environ)
env["PYTHONPATH"] = str(ROOT) + os.pathsep + env.get("PYTHONPATH", "")
proc = subprocess.run(
cmd,
cwd=ROOT,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
env=env
)
elapsed = round(time.time() - start_time, 3)
output_hash = compute_combined_hash(node.get("outputs") or [])
gate = "PASS" if proc.returncode == 0 else "FAIL"
# Log lineage event
event = {
"node_id": nid,
"command": " ".join(cmd),
"returncode": proc.returncode,
"elapsed_sec": elapsed,
"gate": gate,
"input_hash": input_hash,
"output_hash": output_hash,
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
}
with LINEAGE_LOG.open("a", encoding="utf-8") as lf:
lf.write(json.dumps(event, ensure_ascii=False) + "\n")
steps_run.append({
"node_id": nid,
"command": " ".join(cmd),
"returncode": proc.returncode,
"gate": gate,
"executed_due_to_dependency": nid not in target_nodes
})
if proc.returncode != 0:
print(f"Node {nid} failed with returncode {proc.returncode}")
print(proc.stderr)
if node.get("warn_only", False):
print(f"Node {nid} is warn_only - continuing")
warning_failed = True
warning_failures.append({
"node_id": nid,
"returncode": proc.returncode,
})
else:
success = False
if node.get("strict", True) or args.strict:
break
# Save report
REPORT.parent.mkdir(parents=True, exist_ok=True)
REPORT.write_text(json.dumps({
"formula_id": "RELEASE_DAG_RUN_V4",
"mode": args.mode,
"steps": steps_run,
"warning_failures": warning_failures,
"gate": "PASS" if success and not warning_failed else "PASS_WITH_WARNINGS" if success else "FAIL"
}, ensure_ascii=False, indent=2), encoding="utf-8")
print(json.dumps({
"formula_id": "RELEASE_DAG_RUN_V4",
"mode": args.mode,
"step_count": len(steps_run),
"warning_failure_count": len(warning_failures),
"gate": "PASS" if success and not warning_failed else "PASS_WITH_WARNINGS" if success else "FAIL"
}, ensure_ascii=True, indent=2))
if not success:
return 1
if args.strict and warning_failed:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())