Runners

Runners execute graphs. They handle the execution loop, node scheduling, and concurrency.

  • SyncRunner - Sequential execution for synchronous nodes

  • DaftRunner - Columnar execution via Daft for DAG-only graphs (batch, distributed)

  • AsyncRunner - Concurrent execution with async support and max_concurrency

  • RunResult - Output values, status, and error information

  • map() - Batch processing with zip or cartesian product modes

Overview

Runner
Async Nodes
Cycles
Distributed
Returns

SyncRunner

No

Yes

No

RunResult

DaftRunner

Yes (Daft-native)

No

Yes

RunResult / MapResult

AsyncRunner

Yes

Yes

No

Coroutine[RunResult]

SyncRunner

Sequential execution for synchronous graphs.

SyncRunner is not built for interrupts or HITL flows. If a graph contains InterruptNodes, SyncRunner raises IncompatibleRunnerError; use AsyncRunner instead.

from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = SyncRunner()
result = runner.run(graph, {"x": 5})

print(result["doubled"])  # 10

Constructor

Args:

  • cache — Optional cache backend for node result caching. Nodes opt in with @node(..., cache=True). Supports InMemoryCache, DiskCache, or any CacheBackend implementation.

  • checkpointer — Optional checkpointer for persistent run history. For run(), enables strict lineage semantics (resume/fork) and auto-generates workflow_id when omitted. For map(), persistence is enabled when workflow_id is provided. Requires SqliteCheckpointer or any SyncCheckpointerProtocol implementation.

run()

Execute a graph once.

Args:

  • graph - The graph to execute

  • values - Optional input values as {param_name: value}

  • select - Runtime select overrides are not supported. Configure output scope on the graph with graph.select(...) before execution.

  • on_missing - How to handle missing selected outputs:

    • "ignore" (default): silently omit missing outputs

    • "warn": warn about missing outputs, return what's available

    • "error": raise error if any selected output is missing

  • on_internal_override - Reserved for compatibility. Internal produced values are rejected deterministically.

  • max_iterations - Max local iterations per cyclic execution region (SCC) (default: 1000)

  • error_handling - How to handle node execution errors:

    • "raise" (default): Re-raise the original exception (e.g., ValueError). Clean traceback, no wrapper.

    • "continue": Return RunResult with status=FAILED and partial values instead of raising.

  • event_processors - Optional list of event processors to observe execution

  • checkpoint - Optional low-level checkpoint snapshot (values + steps) for explicit fork restores.

  • workflow_id - Optional workflow identifier for lineage tracking. With a checkpointer:

    • omitted: auto-generated for run()

    • existing: strict resume only (no runtime values; same graph structure). Existing persisted workflows may be active, paused, or failed; completed workflows are terminal.

    • new + checkpoint: explicit fork

  • override_workflow - Convenience shortcut for existing workflow_ids. When True and the workflow_id already exists, run() auto-forks from that workflow (generates a new workflow ID and uses its checkpoint) instead of raising strict resume errors.

  • fork_from - Workflow ID to fork from directly (no manual checkpoint plumbing). Requires a checkpointer.

  • retry_from - Workflow ID to retry from directly (records retry lineage metadata). Requires a checkpointer.

  • Lineage hashing: checkpoint compatibility uses a structural hash; a separate code hash is recorded for observability/caching workflows.

  • **input_values - Input shorthand (merged with values)

Returns: RunResult with outputs and status

Raises:

  • MissingInputError - Required input not provided

  • IncompatibleRunnerError - Graph contains async nodes

  • GraphConfigError - If graph is cyclic and has no configured entrypoint

  • ValueError - If runtime select or entrypoint overrides are passed

  • Node execution errors (e.g., ValueError, TypeError) when error_handling="raise" (the default)

Example:

map()

Execute a graph multiple times with different inputs.

Args:

  • graph - The graph to execute

  • values - Optional input values. Parameters in map_over should be lists

  • map_over - Parameter name(s) to iterate over

  • map_mode - "zip" for parallel iteration, "product" for cartesian product

  • clone - Deep-copy mutable values for each iteration. True clones all non-map_over values; pass a list of names to clone selectively. Prevents cross-iteration mutation.

  • select - Runtime select overrides are not supported. Configure output scope on the graph with graph.select(...) before execution.

  • on_missing - How to handle missing selected outputs ("ignore", "warn", or "error")

  • error_handling - How to handle failures:

    • "raise" (default): Stop on first failure and raise the exception

    • "continue": Collect all results, including failures as RunResult with status=FAILED

  • event_processors - Optional list of event processors to observe execution

  • workflow_id - Optional workflow identifier for checkpoint persistence and resume. Creates a parent batch run with per-item child runs ({workflow_id}/0, {workflow_id}/1, ...). On re-run, completed items are skipped. See Resuming Batches.

  • **input_values - Input shorthand (merged with values)

Returns: MapResult wrapping per-iteration RunResults with batch metadata

Example:

capabilities

Returns capabilities for compatibility checking:


DaftRunner

Translation runner: converts DAGs into chained Daft df.with_column() UDF calls.

DaftRunner translates each node into a Daft UDF and chains them via df.with_column(). The entire graph becomes a single Daft query plan executed columnar-style. This is a good fit when:

  • you want columnar batch execution over a dataset

  • you need distributed fan-out via Daft

  • your graph is a DAG (no cycles, no gates, no interrupts)

It supports FunctionNode and GraphNode (including nested map_over). Async nodes are handled natively by Daft's async UDF support.

Constructor

Args:

  • cache - Optional cache backend for node-level caching.

Raises:

  • ImportError - If the daft dependency is not installed. Install with pip install 'hypergraph[daft]'.

run()

Execute a graph once via a 1-row Daft plan.

Args:

  • graph - The graph to execute (must be a DAG)

  • values - Optional input values as {param_name: value}

  • select - Runtime select overrides are not supported. Configure output scope on the graph with graph.select(...).

  • on_missing - How to handle missing selected outputs ("ignore", "warn", or "error")

  • entrypoint - Runtime entrypoint overrides are not supported

  • max_iterations - Accepted for API compatibility but not used (DaftRunner does not support cycles)

  • error_handling - "raise" re-raises the original exception; "continue" returns a failed RunResult

  • event_processors - Accepted but ignored with a warning (DaftRunner does not support events)

  • **input_values - Input shorthand (merged with values)

Example:

map()

Execute a graph for each item via Daft columnar execution.

All items are packed into a single Daft DataFrame, and the entire graph executes as chained df.with_column() UDF calls. This is the primary batch entrypoint when your data is a Python collection.

Args:

  • graph - The graph to execute (must be a DAG)

  • values - Optional input values. Parameters in map_over should be lists

  • map_over - Parameter name(s) to iterate over

  • map_mode - "zip" for parallel iteration or "product" for cartesian product

  • clone - Deep-copy mutable broadcast values for each row. True clones all non-map_over values; pass a list of names to clone selectively

  • select - Runtime select overrides are not supported

  • on_missing - How to handle missing selected outputs ("ignore", "warn", or "error")

  • error_handling - "raise" re-raises the first failed item's original exception; "continue" falls back to per-item execution and preserves failures inside MapResult

  • event_processors - Accepted but ignored with a warning

  • **input_values - Input shorthand (merged with values)

Example:

map_dataframe()

Execute one graph run per Daft DataFrame row.

Use this when your dataset already lives in a Daft DataFrame. Each row becomes one graph execution.

Args:

  • graph - The graph to execute (must be a DAG)

  • dataframe - Daft DataFrame supplying row-wise inputs

  • columns - Optional subset of DataFrame columns to use as graph inputs. Defaults to all columns.

  • values / **input_values - Additional broadcast inputs merged into every row. Must not overlap with DataFrame column names.

  • select - Runtime select overrides are not supported

  • on_missing - How to handle missing selected outputs ("ignore", "warn", or "error")

  • error_handling - Same contract as map(): "raise" re-raises the first failed row, "continue" falls back to per-item execution

  • clone - Deep-copy mutable broadcast values for each row

Example:

Broadcast values (shared across all rows) are passed as keyword arguments and captured in UDF closures:

capabilities

@stateful

Decorator to mark a class for once-per-worker initialization. DaftRunner wraps stateful objects with @daft.cls instead of @daft.func, so heavy resources (ML models, DB connections) are created once per worker process rather than once per row.

The class must support zero-argument construction (__init__() with no required args) so Daft can re-create it on each worker.

batch=True

Use batch=True on the @node decorator for vectorized @daft.func.batch execution. Batch UDFs receive daft.Series instead of scalar values, useful for NumPy/Arrow operations.


AsyncRunner

Concurrent execution with async support.

Constructor

Args:

  • cache — Optional cache backend for node result caching. Nodes opt in with @node(..., cache=True).

  • checkpointer — Optional checkpointer for persistent run history. For run(), enables strict lineage semantics (resume/fork) and auto-generates workflow_id when omitted. For map(), persistence is enabled when workflow_id is provided. Requires SqliteCheckpointer or any Checkpointer implementation.

run()

Execute a graph asynchronously.

Args:

  • graph - The graph to execute

  • values - Optional input values

  • select - Runtime select overrides are not supported. Configure output scope on the graph with graph.select(...) before execution.

  • on_missing - How to handle missing selected outputs ("ignore", "warn", or "error")

  • entrypoint - Runtime entrypoint overrides are not supported. Configure entrypoints on the graph via Graph(..., entrypoint=...) or graph.with_entrypoint(...).

  • max_iterations - Max local iterations per cyclic execution region (SCC) (default: 1000)

  • max_concurrency - Max parallel node executions (default: unlimited)

  • error_handling - How to handle node execution errors:

    • "raise" (default): Re-raise the original exception. Clean traceback, no wrapper.

    • "continue": Return RunResult with status=FAILED and partial values instead of raising.

  • event_processors - Optional list of event processors to observe execution (supports AsyncEventProcessor)

  • checkpoint - Optional low-level checkpoint snapshot (values + steps) for explicit fork restores.

  • workflow_id - Optional workflow identifier for lineage tracking. With a checkpointer:

    • omitted: auto-generated for run()

    • existing: strict resume only (no runtime values; same graph structure). Existing persisted workflows may be active, paused, or failed; completed workflows are terminal.

    • new + checkpoint: explicit fork

  • override_workflow - Convenience shortcut for existing workflow_ids. When True and the workflow_id already exists, run() auto-forks from that workflow (generates a new workflow ID and uses its checkpoint) instead of raising strict resume errors.

  • fork_from - Workflow ID to fork from directly (no manual checkpoint plumbing). Requires a checkpointer.

  • retry_from - Workflow ID to retry from directly (records retry lineage metadata). Requires a checkpointer.

  • Lineage hashing: checkpoint compatibility uses a structural hash; a separate code hash is recorded for observability/caching workflows.

  • **input_values - Input shorthand (merged with values)

Returns: RunResult with outputs and status

Example:

Concurrency Control

The max_concurrency parameter limits how many nodes execute simultaneously:

Concurrency limits are shared across:

  • All nodes in a superstep

  • Nested GraphNodes

  • All items in map() calls

This prevents overwhelming external services when processing large batches.

map()

Execute graph multiple times concurrently.

Args:

  • graph - The graph to execute

  • values - Optional input values

  • map_over - Parameter name(s) to iterate over

  • map_mode - "zip" or "product"

  • clone - Deep-copy mutable values for each iteration. True clones all non-map_over values; pass a list of names to clone selectively.

  • select - Runtime select overrides are not supported. Configure output scope on the graph with graph.select(...) before execution.

  • on_missing - How to handle missing selected outputs ("ignore", "warn", or "error")

  • entrypoint - Runtime entrypoint overrides are not supported.

  • max_concurrency - Shared limit across all executions

  • error_handling - How to handle failures:

    • "raise" (default): Stop on first failure and raise the exception

    • "continue": Collect all results, including failures as RunResult with status=FAILED

  • event_processors - Optional list of event processors to observe execution

  • workflow_id - Optional workflow identifier for checkpoint persistence and resume. Creates per-item child runs that can be skipped on re-run. See Resuming Batches.

  • **input_values - Input shorthand (merged with values)

Example:

For very large batches, prefer setting max_concurrency explicitly. If max_concurrency=None and the fan-out is extremely large, AsyncRunner.map() raises ValueError to avoid unbounded task creation.

capabilities


RunResult

Result of a graph execution.

Attributes

Convenience Properties

Dict-like Access

Partial Values on Failure

By default, run() raises the original exception on node failure. To get partial results instead, use error_handling="continue":

This is useful for debugging — you can inspect which nodes completed successfully.

Progressive Disclosure


MapResult

Result of a batch map() execution. Wraps individual RunResult items with batch-level metadata.

Supports read-only sequence protocol — len(), iter(), [int] work; mutable list ops do not.

Attributes

Status Precedence

FAILED > PAUSED > COMPLETED. If any item failed, the batch status is FAILED. Empty batch → COMPLETED.


RunStatus

Enum for execution status.

Usage:

With the default error_handling="raise", node failures raise before returning a RunResult, so the status will be COMPLETED or PAUSED.


Errors

MissingInputError

Raised when required inputs are not provided.

IncompatibleRunnerError

Raised when runner can't execute graph.

InfiniteLoopError

Raised when cyclic graph exceeds max iterations.


Execution Model

Input Normalization

Runners accept inputs in two equivalent ways:

Rules:

  • values + kwargs are merged

  • duplicate keys raise ValueError

  • option names like select, map_over, max_concurrency are reserved for runner options

  • reserved option names in kwargs raise ValueError

  • if an input name matches an option name, pass that input through values={...}

Execution Model

Runners execute graphs in two phases:

  1. Build a static execution plan from the active graph

  2. Walk that plan until each region reaches quiescence

The static plan is a DAG of strongly connected components (SCCs):

  • DAG region — a component with no feedback loop, usually runs in one pass

  • Cyclic region — a component with feedback, iterates locally until no node in that region is ready

  • Gate edges — participate in planning so gate-driven loops stay in one region, but gate decisions still act as runtime activation

Within one topo layer, runners still use supersteps as the execution batch:

  1. Find ready nodes inside the current layer

  2. Execute them (sequentially for Sync, concurrently for Async)

  3. Update values, versions, and routing decisions

  4. Repeat until that layer reaches quiescence

For cycles:

Value Resolution Order

When collecting inputs for a node, values are resolved in this order:

  1. Edge value - Output from upstream node

  2. Input value - Provided via values/kwargs. On resume/fork, checkpoint state is restored first, then runtime inputs apply (runtime values win). See Run Lineage.

  3. Bound value - From graph.bind()

  4. Function default - From function signature

Note: Fork/resume restoration rehydrates execution state from checkpoint snapshot data (including prior computed values) and uses staleness checks to decide which nodes re-execute.

Cyclic Graphs

Graphs with cycles (feedback loops) execute as local SCC regions until quiescent:

The runner:

  1. Tracks value and wait_for freshness via versions

  2. Re-executes nodes in the current cyclic region when their inputs become fresh

  3. Stops when that region has no more ready nodes, or max_iterations is hit


Nested Graphs

GraphNodes (nested graphs) are executed by the same runner:

The runner automatically:

  • Delegates to the inner graph

  • Shares concurrency limits (AsyncRunner)

  • Propagates the cache backend to nested graphs

  • Propagates errors

Last updated