#!/usr/bin/env python3 import argparse import sys from pathlib import Path import yaml ROOT = Path(__file__).resolve().parents[1] def find_cycle(node, adj, visited, stack): visited.add(node) stack.add(node) for neighbor in adj.get(node, []): if neighbor not in visited: if find_cycle(neighbor, adj, visited, stack): return True elif neighbor in stack: return True stack.remove(node) return False def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--dag", default="spec/41_release_dag.yaml") parser.add_argument("--strict", action="store_true") args = parser.parse_args() dag_path = ROOT / args.dag if not dag_path.exists(): print(f"DAG file not found: {dag_path}") return 1 try: data = yaml.safe_load(dag_path.read_text(encoding="utf-8")) except Exception as e: print(f"Error parsing YAML: {e}") return 1 if not isinstance(data, dict) or "dag" not in data or "nodes" not in data["dag"]: print("Invalid DAG format: missing 'dag.nodes'") return 1 nodes = data["dag"]["nodes"] # 1. Field validation required_fields = ["id", "command", "inputs", "outputs", "depends_on", "timeout_sec"] for nid, node in nodes.items(): if not isinstance(node, dict): print(f"Node {nid} is not a dictionary") return 1 for field in required_fields: if field not in node: print(f"Node {nid} is missing required field: {field}") return 1 # 2. Cycle detection adj = {} for nid, node in nodes.items(): adj[nid] = node["depends_on"] visited = set() stack = set() for nid in nodes: if nid not in visited: if find_cycle(nid, adj, visited, stack): print("Cycle detected in DAG dependencies!") return 1 # 3. Duplicate output owner detection outputs_map = {} for nid, node in nodes.items(): for out in node.get("outputs") or []: if out in outputs_map: print(f"Duplicate output owner detected! Both {nid} and {outputs_map[out]} output {out}") return 1 outputs_map[out] = nid print("PASS") return 0 if __name__ == "__main__": sys.exit(main())