Batch Processing

Use runner.map() to process multiple inputs through the same graph.

The Pattern: Think Singular, Scale with Map

# 1. Write logic for ONE item
@node(output_name="features")
def extract(document: str) -> dict:
    return analyze(document)

# 2. Build a graph
graph = Graph([extract])

# 3. Scale to many items
results = runner.map(graph, {"document": documents}, map_over="document")

This pattern is one of hypergraph's biggest advantages in practice:

  • write the logic for one item

  • compose it into a graph

  • scale it with runner.map() or a mapped GraphNode

That same shape works for:

  • ETL and document ingestion

  • feature extraction

  • model comparison

  • evaluation datasets

  • nested workflows where each item may branch internally

Basic Usage

Map Over Multiple Parameters

Zip Mode (Default)

Parallel iteration — lists must be equal length:

Product Mode

Cartesian product — all combinations:

Fixed Parameters

Parameters not in map_over are fixed across all iterations:

Async Batch Processing

Use AsyncRunner for concurrent processing:

Nested Graphs with Map

Fan out a nested graph over a collection:

For many real systems, this is the most natural Hypergraph pattern:

  1. Build and test the single-item workflow first

  2. Name it with Graph(..., name="...")

  3. Reuse it as a node with .as_node()

  4. Add .map_over(...) when you need batch scale

This keeps the core logic small and reusable instead of mixing per-item logic with batch orchestration.

Working with MapResult

runner.map() returns a MapResult — a read-only sequence with batch-level metadata and aggregate accessors. It's fully backward compatible: len(), iteration, and indexing all work as before.

Error Handling

Control what happens when individual items fail using the error_handling parameter.

Fail-Fast (Default)

By default, map() stops on the first failure and raises the exception. This is useful during development and when failures indicate a systematic bug:

Continue on Error

Use error_handling="continue" to collect all results, including failures. MapResult provides aggregate status and filtering:

Error Handling in Nested Graphs

When using map_over() on a nested graph, error handling works the same way. Failed items produce None placeholders to preserve list alignment with inputs:

map_over() does not support nested graphs that contain interrupts. If the wrapped graph has an @interrupt, graph construction raises a GraphConfigError. For human-in-the-loop batch workflows, use AsyncRunner.map() with one item per run instead.

runner.map() vs map_over

Two batch patterns, different tradeoffs. Both start from the same idea — write logic for one item, scale to many — but they give different guarantees:

runner.map()

map_over

Returns

MapResult (N RunResults)

One RunResult with list outputs

Error isolation

Per-item — failures don't affect other items

Whole step — one failure can fail the batch

Tracing

Per-item RunLogs with full routing/timing

One RunLog (batch is a single step)

Checkpointing

Parent batch run + per-item child runs

Persisted as one run step

Product mode

Yes — map_mode="product" with multi-key map_over

Yes — mode="product" for cartesian product

Use in pipelines

Top-level batch processing

Step inside a larger graph

When to use runner.map()

  • Processing independent items (scraping, embedding, classification)

  • When you need per-item RunLogs for debugging

  • When batch items should behave like separate workflow runs

  • Quick fan-out over a single parameter

  • With workflow_id: persisted batch with per-item child runs

When to use map_over

  • Batch step inside a larger pipeline (load → process_all → aggregate)

  • When you need checkpoint persistence for the batch

  • Cartesian product mode (mode="product")

  • When the batch is part of a nested graph hierarchy

Checkpointing with map()

Pass a workflow_id to runner.map() to persist batch results. This creates a parent batch run and per-item child runs:

From the CLI:

Without workflow_id, runner.map() still works but results exist only in-process.

Run Lineage: Resume vs Fork

run() now uses strict, git-like lineage semantics when a checkpointer is configured:

  • Same workflow_id means "same lineage"

  • Resume is strict: no new runtime values

  • Structural graph changes require fork

  • Completed workflows are terminal (fork to branch)

When workflow_id is omitted and a checkpointer exists, run() auto-generates one and returns it in result.workflow_id.

Resume (same workflow_id, no new values)

Resume is intended for ACTIVE/PAUSED/FAILED workflows (for example: still-running work, interrupt-paused workflows, or retry after failure), not for appending new work to completed lineages.

Fork (new workflow_id, optional overrides)

Fork by workflow ID when you want to branch history, override inputs, or run a changed graph:

Retry is symmetrical:

If you pass runtime values with an existing workflow ID, run() raises InputOverrideRequiresForkError. If graph structure changed for an existing workflow ID, run() raises GraphChangedError.

Resuming Batches

When you re-run map() with the same workflow_id, completed items are automatically skipped. Only failed or unfinished items are re-executed:

This makes it safe to retry large batches — you only pay for the items that actually need re-processing.

CLI batch execution

You can also run batch operations directly from the terminal:

See Debug Workflows — CLIarrow-up-right for full CLI reference.

When to Use Map vs Loop

Use runner.map() or map_over

Use a Python loop

Same graph, different inputs

Different graphs per item

Want parallel execution

Need sequential dependencies

Processing a collection

One-off processing

What's Next?

Last updated