Runners
Runners execute graphs. They handle the execution loop, node scheduling, and concurrency.
SyncRunner - Sequential execution for synchronous nodes
DaftRunner - Columnar execution via Daft for DAG-only graphs (batch, distributed)
AsyncRunner - Concurrent execution with async support and
max_concurrencyRunResult - Output values, status, and error information
map() - Batch processing with zip or cartesian product modes
Overview
SyncRunner
No
Yes
No
RunResult
DaftRunner
Yes (Daft-native)
No
Yes
RunResult / MapResult
AsyncRunner
Yes
Yes
No
Coroutine[RunResult]
SyncRunner
Sequential execution for synchronous graphs.
SyncRunner is not built for interrupts or HITL flows. If a graph contains InterruptNodes, SyncRunner raises IncompatibleRunnerError; use AsyncRunner instead.
from hypergraph import Graph, node, SyncRunner
@node(output_name="doubled")
def double(x: int) -> int:
return x * 2
graph = Graph([double])
runner = SyncRunner()
result = runner.run(graph, {"x": 5})
print(result["doubled"]) # 10Constructor
Args:
cache— Optional cache backend for node result caching. Nodes opt in with@node(..., cache=True). SupportsInMemoryCache,DiskCache, or anyCacheBackendimplementation.checkpointer— Optional checkpointer for persistent run history. Forrun(), enables strict lineage semantics (resume/fork) and auto-generatesworkflow_idwhen omitted. Formap(), persistence is enabled whenworkflow_idis provided. RequiresSqliteCheckpointeror anySyncCheckpointerProtocolimplementation.
run()
Execute a graph once.
Args:
graph- The graph to executevalues- Optional input values as{param_name: value}select- Runtime select overrides are not supported. Configure output scope on the graph withgraph.select(...)before execution.on_missing- How to handle missing selected outputs:"ignore"(default): silently omit missing outputs"warn": warn about missing outputs, return what's available"error": raise error if any selected output is missing
on_internal_override- Reserved for compatibility. Internal produced values are rejected deterministically.max_iterations- Max local iterations per cyclic execution region (SCC) (default: 1000)error_handling- How to handle node execution errors:"raise"(default): Re-raise the original exception (e.g.,ValueError). Clean traceback, no wrapper."continue": ReturnRunResultwithstatus=FAILEDand partial values instead of raising.
event_processors- Optional list of event processors to observe executioncheckpoint- Optional low-level checkpoint snapshot (values + steps) for explicit fork restores.workflow_id- Optional workflow identifier for lineage tracking. With a checkpointer:omitted: auto-generated for
run()existing: strict resume only (no runtime values; same graph structure). Existing persisted workflows may be
active,paused, orfailed; completed workflows are terminal.new +
checkpoint: explicit fork
override_workflow- Convenience shortcut for existingworkflow_ids. WhenTrueand theworkflow_idalready exists,run()auto-forks from that workflow (generates a new workflow ID and uses its checkpoint) instead of raising strict resume errors.fork_from- Workflow ID to fork from directly (no manual checkpoint plumbing). Requires a checkpointer.retry_from- Workflow ID to retry from directly (records retry lineage metadata). Requires a checkpointer.Lineage hashing: checkpoint compatibility uses a structural hash; a separate code hash is recorded for observability/caching workflows.
**input_values- Input shorthand (merged withvalues)
Returns: RunResult with outputs and status
Raises:
MissingInputError- Required input not providedIncompatibleRunnerError- Graph contains async nodesGraphConfigError- If graph is cyclic and has no configured entrypointValueError- If runtimeselectorentrypointoverrides are passedNode execution errors (e.g.,
ValueError,TypeError) whenerror_handling="raise"(the default)
Example:
map()
Execute a graph multiple times with different inputs.
Args:
graph- The graph to executevalues- Optional input values. Parameters inmap_overshould be listsmap_over- Parameter name(s) to iterate overmap_mode-"zip"for parallel iteration,"product"for cartesian productclone- Deep-copy mutable values for each iteration.Trueclones all non-map_overvalues; pass a list of names to clone selectively. Prevents cross-iteration mutation.select- Runtime select overrides are not supported. Configure output scope on the graph withgraph.select(...)before execution.on_missing- How to handle missing selected outputs ("ignore","warn", or"error")error_handling- How to handle failures:"raise"(default): Stop on first failure and raise the exception"continue": Collect all results, including failures asRunResultwithstatus=FAILED
event_processors- Optional list of event processors to observe executionworkflow_id- Optional workflow identifier for checkpoint persistence and resume. Creates a parent batch run with per-item child runs ({workflow_id}/0,{workflow_id}/1, ...). On re-run, completed items are skipped. See Resuming Batches.**input_values- Input shorthand (merged withvalues)
Returns: MapResult wrapping per-iteration RunResults with batch metadata
Example:
capabilities
Returns capabilities for compatibility checking:
DaftRunner
Translation runner: converts DAGs into chained Daft df.with_column() UDF calls.
DaftRunner translates each node into a Daft UDF and chains them via df.with_column(). The entire graph becomes a single Daft query plan executed columnar-style. This is a good fit when:
you want columnar batch execution over a dataset
you need distributed fan-out via Daft
your graph is a DAG (no cycles, no gates, no interrupts)
It supports FunctionNode and GraphNode (including nested map_over). Async nodes are handled natively by Daft's async UDF support.
Constructor
Args:
cache- Optional cache backend for node-level caching.
Raises:
ImportError- If thedaftdependency is not installed. Install withpip install 'hypergraph[daft]'.
run()
Execute a graph once via a 1-row Daft plan.
Args:
graph- The graph to execute (must be a DAG)values- Optional input values as{param_name: value}select- Runtime select overrides are not supported. Configure output scope on the graph withgraph.select(...).on_missing- How to handle missing selected outputs ("ignore","warn", or"error")entrypoint- Runtime entrypoint overrides are not supportedmax_iterations- Accepted for API compatibility but not used (DaftRunner does not support cycles)error_handling-"raise"re-raises the original exception;"continue"returns a failedRunResultevent_processors- Accepted but ignored with a warning (DaftRunner does not support events)**input_values- Input shorthand (merged withvalues)
Example:
map()
Execute a graph for each item via Daft columnar execution.
All items are packed into a single Daft DataFrame, and the entire graph executes as chained df.with_column() UDF calls. This is the primary batch entrypoint when your data is a Python collection.
Args:
graph- The graph to execute (must be a DAG)values- Optional input values. Parameters inmap_overshould be listsmap_over- Parameter name(s) to iterate overmap_mode-"zip"for parallel iteration or"product"for cartesian productclone- Deep-copy mutable broadcast values for each row.Trueclones all non-map_overvalues; pass a list of names to clone selectivelyselect- Runtime select overrides are not supportedon_missing- How to handle missing selected outputs ("ignore","warn", or"error")error_handling-"raise"re-raises the first failed item's original exception;"continue"falls back to per-item execution and preserves failures insideMapResultevent_processors- Accepted but ignored with a warning**input_values- Input shorthand (merged withvalues)
Example:
map_dataframe()
Execute one graph run per Daft DataFrame row.
Use this when your dataset already lives in a Daft DataFrame. Each row becomes one graph execution.
Args:
graph- The graph to execute (must be a DAG)dataframe- Daft DataFrame supplying row-wise inputscolumns- Optional subset of DataFrame columns to use as graph inputs. Defaults to all columns.values/**input_values- Additional broadcast inputs merged into every row. Must not overlap with DataFrame column names.select- Runtime select overrides are not supportedon_missing- How to handle missing selected outputs ("ignore","warn", or"error")error_handling- Same contract asmap():"raise"re-raises the first failed row,"continue"falls back to per-item executionclone- Deep-copy mutable broadcast values for each row
Example:
Broadcast values (shared across all rows) are passed as keyword arguments and captured in UDF closures:
capabilities
@stateful
Decorator to mark a class for once-per-worker initialization. DaftRunner wraps stateful objects with @daft.cls instead of @daft.func, so heavy resources (ML models, DB connections) are created once per worker process rather than once per row.
The class must support zero-argument construction (__init__() with no required args) so Daft can re-create it on each worker.
batch=True
Use batch=True on the @node decorator for vectorized @daft.func.batch execution. Batch UDFs receive daft.Series instead of scalar values, useful for NumPy/Arrow operations.
AsyncRunner
Concurrent execution with async support.
Constructor
Args:
cache— Optional cache backend for node result caching. Nodes opt in with@node(..., cache=True).checkpointer— Optional checkpointer for persistent run history. Forrun(), enables strict lineage semantics (resume/fork) and auto-generatesworkflow_idwhen omitted. Formap(), persistence is enabled whenworkflow_idis provided. RequiresSqliteCheckpointeror anyCheckpointerimplementation.
run()
Execute a graph asynchronously.
Args:
graph- The graph to executevalues- Optional input valuesselect- Runtime select overrides are not supported. Configure output scope on the graph withgraph.select(...)before execution.on_missing- How to handle missing selected outputs ("ignore","warn", or"error")entrypoint- Runtime entrypoint overrides are not supported. Configure entrypoints on the graph viaGraph(..., entrypoint=...)orgraph.with_entrypoint(...).max_iterations- Max local iterations per cyclic execution region (SCC) (default: 1000)max_concurrency- Max parallel node executions (default: unlimited)error_handling- How to handle node execution errors:"raise"(default): Re-raise the original exception. Clean traceback, no wrapper."continue": ReturnRunResultwithstatus=FAILEDand partial values instead of raising.
event_processors- Optional list of event processors to observe execution (supportsAsyncEventProcessor)checkpoint- Optional low-level checkpoint snapshot (values + steps) for explicit fork restores.workflow_id- Optional workflow identifier for lineage tracking. With a checkpointer:omitted: auto-generated for
run()existing: strict resume only (no runtime values; same graph structure). Existing persisted workflows may be
active,paused, orfailed; completed workflows are terminal.new +
checkpoint: explicit fork
override_workflow- Convenience shortcut for existingworkflow_ids. WhenTrueand theworkflow_idalready exists,run()auto-forks from that workflow (generates a new workflow ID and uses its checkpoint) instead of raising strict resume errors.fork_from- Workflow ID to fork from directly (no manual checkpoint plumbing). Requires a checkpointer.retry_from- Workflow ID to retry from directly (records retry lineage metadata). Requires a checkpointer.Lineage hashing: checkpoint compatibility uses a structural hash; a separate code hash is recorded for observability/caching workflows.
**input_values- Input shorthand (merged withvalues)
Returns: RunResult with outputs and status
Example:
Concurrency Control
The max_concurrency parameter limits how many nodes execute simultaneously:
Concurrency limits are shared across:
All nodes in a superstep
Nested GraphNodes
All items in
map()calls
This prevents overwhelming external services when processing large batches.
map()
Execute graph multiple times concurrently.
Args:
graph- The graph to executevalues- Optional input valuesmap_over- Parameter name(s) to iterate overmap_mode-"zip"or"product"clone- Deep-copy mutable values for each iteration.Trueclones all non-map_overvalues; pass a list of names to clone selectively.select- Runtime select overrides are not supported. Configure output scope on the graph withgraph.select(...)before execution.on_missing- How to handle missing selected outputs ("ignore","warn", or"error")entrypoint- Runtime entrypoint overrides are not supported.max_concurrency- Shared limit across all executionserror_handling- How to handle failures:"raise"(default): Stop on first failure and raise the exception"continue": Collect all results, including failures asRunResultwithstatus=FAILED
event_processors- Optional list of event processors to observe executionworkflow_id- Optional workflow identifier for checkpoint persistence and resume. Creates per-item child runs that can be skipped on re-run. See Resuming Batches.**input_values- Input shorthand (merged withvalues)
Example:
For very large batches, prefer setting max_concurrency explicitly. If max_concurrency=None and the fan-out is extremely large, AsyncRunner.map() raises ValueError to avoid unbounded task creation.
capabilities
RunResult
Result of a graph execution.
Attributes
Convenience Properties
Dict-like Access
Partial Values on Failure
By default, run() raises the original exception on node failure. To get partial results instead, use error_handling="continue":
This is useful for debugging — you can inspect which nodes completed successfully.
Progressive Disclosure
MapResult
Result of a batch map() execution. Wraps individual RunResult items with batch-level metadata.
Supports read-only sequence protocol — len(), iter(), [int] work; mutable list ops do not.
Attributes
Status Precedence
FAILED > PAUSED > COMPLETED. If any item failed, the batch status is FAILED. Empty batch → COMPLETED.
RunStatus
Enum for execution status.
Usage:
With the default error_handling="raise", node failures raise before returning a RunResult, so the status will be COMPLETED or PAUSED.
Errors
MissingInputError
Raised when required inputs are not provided.
IncompatibleRunnerError
Raised when runner can't execute graph.
InfiniteLoopError
Raised when cyclic graph exceeds max iterations.
Execution Model
Input Normalization
Runners accept inputs in two equivalent ways:
Rules:
values+ kwargs are mergedduplicate keys raise
ValueErroroption names like
select,map_over,max_concurrencyare reserved for runner optionsreserved option names in kwargs raise
ValueErrorif an input name matches an option name, pass that input through
values={...}
Execution Model
Runners execute graphs in two phases:
Build a static execution plan from the active graph
Walk that plan until each region reaches quiescence
The static plan is a DAG of strongly connected components (SCCs):
DAG region — a component with no feedback loop, usually runs in one pass
Cyclic region — a component with feedback, iterates locally until no node in that region is ready
Gate edges — participate in planning so gate-driven loops stay in one region, but gate decisions still act as runtime activation
Within one topo layer, runners still use supersteps as the execution batch:
Find ready nodes inside the current layer
Execute them (sequentially for Sync, concurrently for Async)
Update values, versions, and routing decisions
Repeat until that layer reaches quiescence
For cycles:
Value Resolution Order
When collecting inputs for a node, values are resolved in this order:
Edge value - Output from upstream node
Input value - Provided via
values/kwargs. On resume/fork, checkpoint state is restored first, then runtime inputs apply (runtime values win). See Run Lineage.Bound value - From
graph.bind()Function default - From function signature
Note: Fork/resume restoration rehydrates execution state from checkpoint snapshot data (including prior computed values) and uses staleness checks to decide which nodes re-execute.
Cyclic Graphs
Graphs with cycles (feedback loops) execute as local SCC regions until quiescent:
The runner:
Tracks value and
wait_forfreshness via versionsRe-executes nodes in the current cyclic region when their inputs become fresh
Stops when that region has no more ready nodes, or
max_iterationsis hit
Nested Graphs
GraphNodes (nested graphs) are executed by the same runner:
The runner automatically:
Delegates to the inner graph
Shares concurrency limits (AsyncRunner)
Propagates the cache backend to nested graphs
Propagates errors
Last updated