from __future__ import annotations import os import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from math import fsum from pathlib import Path from typing import Any, Callable ROOT = Path(__file__).resolve().parents[2] def utf8_env() -> dict[str, str]: env = os.environ.copy() env.setdefault("PYTHONIOENCODING", "utf-8") env.setdefault("PYTHONUTF8", "1") return env def _run_command(command: list[str], cwd: Path | None = None) -> dict[str, Any]: started = datetime.now(timezone.utc) resolved = list(command) if os.name == "nt" and resolved and resolved[0].lower() == "npm": resolved[0] = "npm.cmd" subprocess.run(resolved, cwd=cwd or ROOT, check=True, env=utf8_env()) finished = datetime.now(timezone.utc) return { "kind": "command", "command": command, "started_at": started.isoformat(), "finished_at": finished.isoformat(), "elapsed_sec": round((finished - started).total_seconds(), 3), "status": "OK", } def _run_callable(func: Callable[[], Any]) -> dict[str, Any]: started = datetime.now(timezone.utc) payload = func() finished = datetime.now(timezone.utc) return { "kind": "callable", "started_at": started.isoformat(), "finished_at": finished.isoformat(), "elapsed_sec": round((finished - started).total_seconds(), 3), "status": "OK", "payload": payload, } def _parse_iso8601(value: str) -> datetime: text = str(value or "") if text.endswith("Z"): text = text[:-1] + "+00:00" return datetime.fromisoformat(text) def summarize_plan(results: list[dict[str, Any]]) -> dict[str, Any]: if not results: return { "step_count": 0, "command_step_count": 0, "callable_step_count": 0, "critical_path_sec": 0.0, "wall_clock_sec": 0.0, "parallel_width_max": 0, "step_names": [], } by_name = {str(item.get("name")): item for item in results if item.get("name")} memo: dict[str, float] = {} def critical_path_for(name: str) -> float: if name in memo: return memo[name] item = by_name[name] own = float(item.get("elapsed_sec") or 0.0) deps = [str(dep) for dep in (item.get("depends_on") or []) if str(dep) in by_name] if deps: own += max(critical_path_for(dep) for dep in deps) memo[name] = own return own starts = [] ends = [] points: list[tuple[datetime, int]] = [] command_step_count = 0 callable_step_count = 0 for item in results: started = _parse_iso8601(str(item.get("started_at"))) finished = _parse_iso8601(str(item.get("finished_at"))) starts.append(started) ends.append(finished) points.append((started, 1)) points.append((finished, -1)) if item.get("kind") == "command": command_step_count += 1 elif item.get("kind") == "callable": callable_step_count += 1 points.sort(key=lambda pair: (pair[0], -pair[1])) active = 0 parallel_width_max = 0 for _, delta in points: active += delta if active > parallel_width_max: parallel_width_max = active step_elapsed_sum = fsum(float(item.get("elapsed_sec") or 0.0) for item in results) wall_clock_sec = round((max(ends) - min(starts)).total_seconds(), 3) critical_path_sec = round(max(critical_path_for(name) for name in by_name), 3) return { "step_count": len(results), "command_step_count": command_step_count, "callable_step_count": callable_step_count, "step_elapsed_sum_sec": round(step_elapsed_sum, 3), "critical_path_sec": critical_path_sec, "wall_clock_sec": wall_clock_sec, "parallel_width_max": parallel_width_max, "step_names": [str(item.get("name")) for item in results if item.get("name")], } def run_plan( steps: list[dict[str, Any]], *, label: str = "orchestration", cwd: Path | None = None, ) -> list[dict[str, Any]]: """ Execute a dependency plan. Each step accepts: - name: unique string - depends_on: list[str] - command: list[str] for subprocess execution, or - callable: zero-arg callable for in-process execution """ if not steps: return [] step_map: dict[str, dict[str, Any]] = {} dependents: dict[str, set[str]] = {} remaining: set[str] = set() for index, step in enumerate(steps): name = str(step.get("name") or "").strip() if not name: raise ValueError(f"{label}: step missing name at index {index}") if name in step_map: raise ValueError(f"{label}: duplicate step name: {name}") step_map[name] = dict(step) step_map[name]["_index"] = index remaining.add(name) dependents.setdefault(name, set()) for name, step in step_map.items(): deps = step.get("depends_on") or [] if not isinstance(deps, list): raise ValueError(f"{label}: depends_on must be a list for {name}") step["_depends_on"] = {str(dep) for dep in deps} for dep in step["_depends_on"]: if dep not in step_map: raise ValueError(f"{label}: unknown dependency {dep} for {name}") dependents.setdefault(dep, set()).add(name) completed: set[str] = set() results: dict[str, dict[str, Any]] = {} while remaining: ready = sorted( [name for name in remaining if step_map[name]["_depends_on"].issubset(completed)], key=lambda n: step_map[n]["_index"], ) if not ready: blocked = sorted(remaining) raise RuntimeError(f"{label}: cyclic or blocked dependencies: {blocked}") with ThreadPoolExecutor(max_workers=len(ready)) as executor: future_map = {} for name in ready: step = step_map[name] if "command" in step and step["command"] is not None: future = executor.submit(_run_command, list(step["command"]), cwd) elif "callable" in step and step["callable"] is not None: future = executor.submit(_run_callable, step["callable"]) else: raise ValueError(f"{label}: step {name} missing command/callable") future_map[future] = name for future in as_completed(future_map): name = future_map[future] result = future.result() result["name"] = name result["depends_on"] = list(step_map[name]["_depends_on"]) result["index"] = step_map[name]["_index"] results[name] = result completed.add(name) remaining.remove(name) for child in dependents.get(name, set()): if child in step_map: step_map[child]["_depends_on"].discard(name) ordered = sorted(results.values(), key=lambda item: item["index"]) for item in ordered: item.pop("index", None) return ordered