# Runners

Runners execute graphs. They handle the execution loop, node scheduling, and concurrency.

* **SyncRunner** - Sequential execution for synchronous nodes
* **DaftRunner** - Columnar execution via Daft for DAG-only graphs (batch, distributed)
* **AsyncRunner** - Concurrent execution with async support and `max_concurrency`
* **RunResult** - Output values, status, and error information
* **map()** - Batch processing with zip or cartesian product modes

## Overview

| Runner        | Async Nodes       | Cycles | Distributed | Returns                   |
| ------------- | ----------------- | ------ | ----------- | ------------------------- |
| `SyncRunner`  | No                | Yes    | No          | `RunResult`               |
| `DaftRunner`  | Yes (Daft-native) | No     | Yes         | `RunResult` / `MapResult` |
| `AsyncRunner` | Yes               | Yes    | No          | `Coroutine[RunResult]`    |

## SyncRunner

Sequential execution for synchronous graphs.

`SyncRunner` is not built for interrupts or HITL flows. If a graph contains `InterruptNode`s, `SyncRunner` raises `IncompatibleRunnerError`; use `AsyncRunner` instead.

```python
from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = SyncRunner()
result = runner.run(graph, {"x": 5})

print(result["doubled"])  # 10
```

### Constructor

```python
class SyncRunner:
    def __init__(
        self,
        cache: CacheBackend | None = None,
        checkpointer: Checkpointer | None = None,
    ) -> None: ...
```

**Args:**

* `cache` — Optional [cache backend](https://gilad-rubin.gitbook.io/hypergraph/patterns/08-caching) for node result caching. Nodes opt in with `@node(..., cache=True)`. Supports `InMemoryCache`, `DiskCache`, or any `CacheBackend` implementation.
* `checkpointer` — Optional [checkpointer](https://gilad-rubin.gitbook.io/hypergraph/how-to-guides/batch-processing#checkpointing-with-map) for persistent run history. For `run()`, enables strict lineage semantics (resume/fork) and auto-generates `workflow_id` when omitted. For `map()`, persistence is enabled when `workflow_id` is provided. Requires `SqliteCheckpointer` or any `SyncCheckpointerProtocol` implementation.

### run()

```python
def run(
    self,
    graph: Graph,
    values: dict[str, Any] | None = None,
    *,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    max_iterations: int | None = None,
    error_handling: Literal["raise", "continue"] = "raise",
    event_processors: list[EventProcessor] | None = None,
    checkpoint: Checkpoint | None = None,
    workflow_id: str | None = None,
    override_workflow: bool = False,
    fork_from: str | None = None,
    retry_from: str | None = None,
    **input_values: Any,
) -> RunResult: ...
```

Execute a graph once.

**Args:**

* `graph` - The graph to execute
* `values` - Optional input values as `{param_name: value}`
* `select` - Runtime select overrides are not supported. Configure output scope on the graph with `graph.select(...)` before execution.
* `on_missing` - How to handle missing selected outputs:
  * `"ignore"` (default): silently omit missing outputs
  * `"warn"`: warn about missing outputs, return what's available
  * `"error"`: raise error if any selected output is missing
* `on_internal_override` - Reserved for compatibility. Internal produced values are rejected deterministically.
* `max_iterations` - Max local iterations per cyclic execution region (SCC) (default: 1000)
* `error_handling` - How to handle node execution errors:
  * `"raise"` (default): Re-raise the original exception (e.g., `ValueError`). Clean traceback, no wrapper.
  * `"continue"`: Return `RunResult` with `status=FAILED` and partial values instead of raising.
* `event_processors` - Optional list of [event processors](https://gilad-rubin.gitbook.io/hypergraph/api-reference/events) to observe execution
* `checkpoint` - Optional low-level checkpoint snapshot (`values + steps`) for explicit fork restores.
* `workflow_id` - Optional workflow identifier for lineage tracking. With a checkpointer:
  * omitted: auto-generated for `run()`
  * existing: strict resume only (no runtime values; same graph structure). Existing persisted workflows may be `active`, `paused`, or `failed`; completed workflows are terminal.
  * new + `checkpoint`: explicit fork
* `override_workflow` - Convenience shortcut for existing `workflow_id`s. When `True` and the `workflow_id` already exists, `run()` auto-forks from that workflow (generates a new workflow ID and uses its checkpoint) instead of raising strict resume errors.
* `fork_from` - Workflow ID to fork from directly (no manual checkpoint plumbing). Requires a checkpointer.
* `retry_from` - Workflow ID to retry from directly (records retry lineage metadata). Requires a checkpointer.
* Lineage hashing: checkpoint compatibility uses a structural hash; a separate code hash is recorded for observability/caching workflows.
* `**input_values` - Input shorthand (merged with `values`)

**Returns:** `RunResult` with outputs and status

**Raises:**

* `MissingInputError` - Required input not provided
* `IncompatibleRunnerError` - Graph contains async nodes
* `GraphConfigError` - If graph is cyclic and has no configured entrypoint
* `ValueError` - If runtime `select` or `entrypoint` overrides are passed
* Node execution errors (e.g., `ValueError`, `TypeError`) when `error_handling="raise"` (the default)

**Example:**

```python
# Basic execution — raises on failure (default)
result = runner.run(graph, {"query": "What is RAG?"})

# kwargs shorthand
result = runner.run(graph, query="What is RAG?")

# Configure output scope on the graph
scoped = graph.select("final_answer")
result = runner.run(scoped, values)

# Limit iterations for cyclic graphs
result = runner.run(cyclic_graph, values, max_iterations=50)

# Strict output checking
result = runner.run(graph.select("answer"), values, on_missing="error")

# With progress bars
from hypergraph import RichProgressProcessor
result = runner.run(graph, values, event_processors=[RichProgressProcessor()])

# Collect partial results instead of raising on failure
from hypergraph import RunStatus
result = runner.run(graph, {"x": 5}, error_handling="continue")
if result.status == RunStatus.FAILED:
    print(result.error)        # the original exception
    print(result.values)       # outputs from nodes that completed before the failure

# Fork from an existing run (workflow-id based)
from hypergraph.checkpointers import SqliteCheckpointer
cp = SqliteCheckpointer("./runs.db")
runner = SyncRunner(checkpointer=cp)

runner.run(graph, {"x": 5}, workflow_id="job-1")
result = runner.run(
    graph,
    {"x": 100},
    fork_from="job-1",
)

# Convenience override (auto-forks if "job-1" already exists)
result = runner.run(
    graph,
    {"x": 100},
    workflow_id="job-1",
    override_workflow=True,
)
```

### map()

```python
def map(
    self,
    graph: Graph,
    values: dict[str, Any] | None = None,
    *,
    map_over: str | list[str],
    map_mode: Literal["zip", "product"] = "zip",
    clone: bool | list[str] = False,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    error_handling: Literal["raise", "continue"] = "raise",
    event_processors: list[EventProcessor] | None = None,
    workflow_id: str | None = None,
    **input_values: Any,
) -> MapResult: ...
```

Execute a graph multiple times with different inputs.

**Args:**

* `graph` - The graph to execute
* `values` - Optional input values. Parameters in `map_over` should be lists
* `map_over` - Parameter name(s) to iterate over
* `map_mode` - `"zip"` for parallel iteration, `"product"` for cartesian product
* `clone` - Deep-copy mutable values for each iteration. `True` clones all non-`map_over` values; pass a list of names to clone selectively. Prevents cross-iteration mutation.
* `select` - Runtime select overrides are not supported. Configure output scope on the graph with `graph.select(...)` before execution.
* `on_missing` - How to handle missing selected outputs (`"ignore"`, `"warn"`, or `"error"`)
* `error_handling` - How to handle failures:
  * `"raise"` (default): Stop on first failure and raise the exception
  * `"continue"`: Collect all results, including failures as `RunResult` with `status=FAILED`
* `event_processors` - Optional list of [event processors](https://gilad-rubin.gitbook.io/hypergraph/api-reference/events) to observe execution
* `workflow_id` - Optional workflow identifier for checkpoint persistence and resume. Creates a parent batch run with per-item child runs (`{workflow_id}/0`, `{workflow_id}/1`, ...). On re-run, completed items are skipped. See [Resuming Batches](https://gilad-rubin.gitbook.io/hypergraph/how-to-guides/batch-processing#resuming-batches).
* `**input_values` - Input shorthand (merged with `values`)

**Returns:** [`MapResult`](#mapresult) wrapping per-iteration RunResults with batch metadata

**Example:**

```python
# Single parameter
results = runner.map(graph, {"x": [1, 2, 3]}, map_over="x")

# kwargs shorthand
results = runner.map(graph, map_over="x", x=[1, 2, 3])

# Batch-level metadata
print(results.summary())   # "3 items | 3 completed | 12ms"
print(results["doubled"])  # [2, 4, 6] — collect values across items

# Multiple parameters with zip
results = runner.map(
    graph,
    {"a": [1, 2], "b": [10, 20]},
    map_over=["a", "b"],
    map_mode="zip",  # (1,10), (2,20)
)

# Continue on errors — aggregate status
results = runner.map(
    graph,
    {"x": [1, 2, 3]},
    map_over="x",
    error_handling="continue",
)
if results.failed:
    print(f"{len(results.failures)} items failed")
```

### capabilities

```python
@property
def capabilities(self) -> RunnerCapabilities: ...
```

Returns capabilities for compatibility checking:

```python
runner = SyncRunner()
caps = runner.capabilities

caps.supports_cycles       # True
caps.supports_async_nodes  # False
caps.supports_streaming    # False
caps.returns_coroutine     # False
caps.supports_interrupts   # False
```

***

## DaftRunner

Translation runner: converts DAGs into chained Daft `df.with_column()` UDF calls.

```python
from hypergraph import DaftRunner
```

`DaftRunner` translates each node into a Daft UDF and chains them via `df.with_column()`. The entire graph becomes a single Daft query plan executed columnar-style. This is a good fit when:

* you want columnar batch execution over a dataset
* you need distributed fan-out via Daft
* your graph is a DAG (no cycles, no gates, no interrupts)

It supports `FunctionNode` and `GraphNode` (including nested `map_over`). Async nodes are handled natively by Daft's async UDF support.

### Constructor

```python
class DaftRunner:
    def __init__(
        self,
        *,
        cache: CacheBackend | None = None,
    ) -> None: ...
```

**Args:**

* `cache` - Optional cache backend for node-level caching.

**Raises:**

* `ImportError` - If the `daft` dependency is not installed. Install with `pip install 'hypergraph[daft]'`.

### run()

```python
def run(
    self,
    graph: Graph,
    values: dict[str, Any] | None = None,
    *,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    entrypoint: str | None = None,
    max_iterations: int | None = None,
    error_handling: Literal["raise", "continue"] = "raise",
    event_processors: list[EventProcessor] | None = None,
    **input_values: Any,
) -> RunResult: ...
```

Execute a graph once via a 1-row Daft plan.

**Args:**

* `graph` - The graph to execute (must be a DAG)
* `values` - Optional input values as `{param_name: value}`
* `select` - Runtime select overrides are not supported. Configure output scope on the graph with `graph.select(...)`.
* `on_missing` - How to handle missing selected outputs (`"ignore"`, `"warn"`, or `"error"`)
* `entrypoint` - Runtime entrypoint overrides are not supported
* `max_iterations` - Accepted for API compatibility but not used (DaftRunner does not support cycles)
* `error_handling` - `"raise"` re-raises the original exception; `"continue"` returns a failed `RunResult`
* `event_processors` - Accepted but ignored with a warning (DaftRunner does not support events)
* `**input_values` - Input shorthand (merged with `values`)

**Example:**

```python
from hypergraph import DaftRunner, Graph, node

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = DaftRunner()
result = runner.run(graph, x=5)

print(result["doubled"])  # 10
```

### map()

```python
def map(
    self,
    graph: Graph,
    values: dict[str, Any] | None = None,
    *,
    map_over: str | list[str],
    map_mode: Literal["zip", "product"] = "zip",
    clone: bool | list[str] = False,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    error_handling: Literal["raise", "continue"] = "raise",
    event_processors: list[EventProcessor] | None = None,
    **input_values: Any,
) -> MapResult: ...
```

Execute a graph for each item via Daft columnar execution.

All items are packed into a single Daft DataFrame, and the entire graph executes as chained `df.with_column()` UDF calls. This is the primary batch entrypoint when your data is a Python collection.

**Args:**

* `graph` - The graph to execute (must be a DAG)
* `values` - Optional input values. Parameters in `map_over` should be lists
* `map_over` - Parameter name(s) to iterate over
* `map_mode` - `"zip"` for parallel iteration or `"product"` for cartesian product
* `clone` - Deep-copy mutable broadcast values for each row. `True` clones all non-`map_over` values; pass a list of names to clone selectively
* `select` - Runtime select overrides are not supported
* `on_missing` - How to handle missing selected outputs (`"ignore"`, `"warn"`, or `"error"`)
* `error_handling` - `"raise"` re-raises the first failed item's original exception; `"continue"` falls back to per-item execution and preserves failures inside `MapResult`
* `event_processors` - Accepted but ignored with a warning
* `**input_values` - Input shorthand (merged with `values`)

**Example:**

```python
from hypergraph import DaftRunner, Graph, node

@node(output_name="sentences")
def split_sentences(document: str) -> list[str]:
    return [part.strip() for part in document.split(".") if part.strip()]

@node(output_name="cleaned")
def clean_sentence(text: str) -> str:
    return " ".join(text.lower().split())

sentence_graph = Graph([clean_sentence], name="sentence_graph")
workflow = Graph(
    [
        split_sentences,
        sentence_graph.as_node(name="analyze").with_inputs(text="sentences").map_over("sentences"),
    ]
)

runner = DaftRunner()
results = runner.map(
    workflow,
    {"document": ["Refund requested. Checkout blocked.", "Weekly roadmap update."]},
    map_over="document",
)

print(results["cleaned"])  # [['refund requested', 'checkout blocked'], ['weekly roadmap update']]
```

### map\_dataframe()

```python
def map_dataframe(
    self,
    graph: Graph,
    dataframe: DataFrame,
    *,
    columns: str | Iterable[str] | None = None,
    values: dict[str, Any] | None = None,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    error_handling: Literal["raise", "continue"] = "raise",
    clone: bool | list[str] = False,
    **input_values: Any,
) -> MapResult: ...
```

Execute one graph run per Daft DataFrame row.

Use this when your dataset already lives in a Daft DataFrame. Each row becomes one graph execution.

**Args:**

* `graph` - The graph to execute (must be a DAG)
* `dataframe` - Daft DataFrame supplying row-wise inputs
* `columns` - Optional subset of DataFrame columns to use as graph inputs. Defaults to all columns.
* `values` / `**input_values` - Additional broadcast inputs merged into every row. Must not overlap with DataFrame column names.
* `select` - Runtime select overrides are not supported
* `on_missing` - How to handle missing selected outputs (`"ignore"`, `"warn"`, or `"error"`)
* `error_handling` - Same contract as `map()`: `"raise"` re-raises the first failed row, `"continue"` falls back to per-item execution
* `clone` - Deep-copy mutable broadcast values for each row

**Example:**

```python
import daft
from hypergraph import DaftRunner, Graph, node

@node(output_name="cleaned_text")
def clean(text: str) -> str:
    return " ".join(text.lower().strip().split())

@node(output_name="word_count")
def count(cleaned_text: str) -> int:
    return len(cleaned_text.split())

graph = Graph([clean, count], name="text_pipeline")

frame = daft.from_pydict({
    "text": ["  Alpha beta alpha  ", "Gamma delta epsilon zeta eta"],
})

runner = DaftRunner()
result_df = runner.map_dataframe(graph, frame)
# result_df is a Daft DataFrame with columns: text, cleaned_text, word_count
result_df.show()
```

Broadcast values (shared across all rows) are passed as keyword arguments and captured in UDF closures:

```python
@node(output_name="greeting")
def greet(name: str, prefix: str) -> str:
    return f"{prefix}, {name}!"

graph = Graph([greet])
df = daft.from_pydict({"name": ["Alice", "Bob"]})

result_df = DaftRunner().map_dataframe(graph, df, prefix="Hi")
# Each row gets prefix="Hi" via the UDF closure
```

### capabilities

```python
runner = DaftRunner()
caps = runner.capabilities

caps.supports_cycles          # False
caps.supports_gates           # False
caps.supports_async_nodes     # True  (Daft handles async UDFs natively)
caps.supports_interrupts      # False
caps.supports_events          # False
caps.supports_distributed     # True
caps.supports_checkpointing   # False
```

### @stateful

Decorator to mark a class for once-per-worker initialization. DaftRunner wraps stateful objects with `@daft.cls` instead of `@daft.func`, so heavy resources (ML models, DB connections) are created once per worker process rather than once per row.

```python
from hypergraph import DaftRunner, Graph, node
from hypergraph.runners.daft import stateful

@stateful
class Embedder:
    def __init__(self):
        self.model = load_heavy_model()

    def embed(self, text: str) -> list[float]:
        return self.model.encode(text)

@node(output_name="embedding")
def embed(text: str, embedder: Embedder) -> list[float]:
    return embedder.embed(text)

graph = Graph([embed]).bind(embedder=Embedder())
runner = DaftRunner()
results = runner.map(graph, {"text": texts}, map_over="text")
```

The class must support zero-argument construction (`__init__()` with no required args) so Daft can re-create it on each worker.

### batch=True

Use `batch=True` on the `@node` decorator for vectorized `@daft.func.batch` execution. Batch UDFs receive `daft.Series` instead of scalar values, useful for NumPy/Arrow operations.

```python
import daft
from hypergraph import DaftRunner, Graph, node

@node(output_name="normalized", batch=True)
def normalize(values: daft.Series) -> daft.Series:
    arr = values.to_pylist()
    mean = sum(arr) / len(arr)
    std = (sum((x - mean) ** 2 for x in arr) / len(arr)) ** 0.5
    if std == 0:
        return daft.Series.from_pylist([0.0] * len(arr))
    return daft.Series.from_pylist([round((x - mean) / std, 4) for x in arr])

graph = Graph([normalize], name="batch_norm")
runner = DaftRunner()
results = runner.map(graph, {"values": [10.0, 20.0, 30.0]}, map_over="values")
```

***

## AsyncRunner

Concurrent execution with async support.

```python
import asyncio
from hypergraph import Graph, node, AsyncRunner

@node(output_name="data")
async def fetch(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()

graph = Graph([fetch])
runner = AsyncRunner()
result = await runner.run(graph, {"url": "https://api.example.com"})
```

### Constructor

```python
class AsyncRunner:
    def __init__(
        self,
        cache: CacheBackend | None = None,
        checkpointer: Checkpointer | None = None,
    ) -> None: ...
```

**Args:**

* `cache` — Optional [cache backend](https://gilad-rubin.gitbook.io/hypergraph/patterns/08-caching) for node result caching. Nodes opt in with `@node(..., cache=True)`.
* `checkpointer` — Optional checkpointer for persistent run history. For `run()`, enables strict lineage semantics (resume/fork) and auto-generates `workflow_id` when omitted. For `map()`, persistence is enabled when `workflow_id` is provided. Requires `SqliteCheckpointer` or any `Checkpointer` implementation.

### run()

```python
async def run(
    self,
    graph: Graph,
    values: dict[str, Any] | None = None,
    *,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    entrypoint: str | None = None,
    max_iterations: int | None = None,
    max_concurrency: int | None = None,
    error_handling: Literal["raise", "continue"] = "raise",
    event_processors: list[EventProcessor] | None = None,
    checkpoint: Checkpoint | None = None,
    workflow_id: str | None = None,
    override_workflow: bool = False,
    fork_from: str | None = None,
    retry_from: str | None = None,
    **input_values: Any,
) -> RunResult: ...
```

Execute a graph asynchronously.

**Args:**

* `graph` - The graph to execute
* `values` - Optional input values
* `select` - Runtime select overrides are not supported. Configure output scope on the graph with `graph.select(...)` before execution.
* `on_missing` - How to handle missing selected outputs (`"ignore"`, `"warn"`, or `"error"`)
* `entrypoint` - Runtime entrypoint overrides are not supported. Configure entrypoints on the graph via `Graph(..., entrypoint=...)` or `graph.with_entrypoint(...)`.
* `max_iterations` - Max local iterations per cyclic execution region (SCC) (default: 1000)
* `max_concurrency` - Max parallel node executions (default: unlimited)
* `error_handling` - How to handle node execution errors:
  * `"raise"` (default): Re-raise the original exception. Clean traceback, no wrapper.
  * `"continue"`: Return `RunResult` with `status=FAILED` and partial values instead of raising.
* `event_processors` - Optional list of [event processors](https://gilad-rubin.gitbook.io/hypergraph/api-reference/events) to observe execution (supports `AsyncEventProcessor`)
* `checkpoint` - Optional low-level checkpoint snapshot (`values + steps`) for explicit fork restores.
* `workflow_id` - Optional workflow identifier for lineage tracking. With a checkpointer:
  * omitted: auto-generated for `run()`
  * existing: strict resume only (no runtime values; same graph structure). Existing persisted workflows may be `active`, `paused`, or `failed`; completed workflows are terminal.
  * new + `checkpoint`: explicit fork
* `override_workflow` - Convenience shortcut for existing `workflow_id`s. When `True` and the `workflow_id` already exists, `run()` auto-forks from that workflow (generates a new workflow ID and uses its checkpoint) instead of raising strict resume errors.
* `fork_from` - Workflow ID to fork from directly (no manual checkpoint plumbing). Requires a checkpointer.
* `retry_from` - Workflow ID to retry from directly (records retry lineage metadata). Requires a checkpointer.
* Lineage hashing: checkpoint compatibility uses a structural hash; a separate code hash is recorded for observability/caching workflows.
* `**input_values` - Input shorthand (merged with `values`)

**Returns:** `RunResult` with outputs and status

**Example:**

```python
# Basic async execution
result = await runner.run(graph, {"query": "What is RAG?"})

# kwargs shorthand
result = await runner.run(graph, query="What is RAG?")

# Limit concurrency (important for rate-limited APIs)
result = await runner.run(
    graph,
    {"prompts": prompts},
    max_concurrency=10,
)
```

### Concurrency Control

The `max_concurrency` parameter limits how many nodes execute simultaneously:

```python
# Process 100 items, but only 5 API calls at once
runner = AsyncRunner()
result = await runner.run(
    graph,
    {"items": large_list},
    max_concurrency=5,
)
```

Concurrency limits are shared across:

* All nodes in a superstep
* Nested GraphNodes
* All items in `map()` calls

This prevents overwhelming external services when processing large batches.

### map()

```python
async def map(
    self,
    graph: Graph,
    values: dict[str, Any] | None = None,
    *,
    map_over: str | list[str],
    map_mode: Literal["zip", "product"] = "zip",
    clone: bool | list[str] = False,
    select: str | list[str] = "**",
    on_missing: Literal["ignore", "warn", "error"] = "ignore",
    entrypoint: str | None = None,
    max_concurrency: int | None = None,
    error_handling: Literal["raise", "continue"] = "raise",
    event_processors: list[EventProcessor] | None = None,
    workflow_id: str | None = None,
    **input_values: Any,
) -> MapResult: ...
```

Execute graph multiple times concurrently.

**Args:**

* `graph` - The graph to execute
* `values` - Optional input values
* `map_over` - Parameter name(s) to iterate over
* `map_mode` - `"zip"` or `"product"`
* `clone` - Deep-copy mutable values for each iteration. `True` clones all non-`map_over` values; pass a list of names to clone selectively.
* `select` - Runtime select overrides are not supported. Configure output scope on the graph with `graph.select(...)` before execution.
* `on_missing` - How to handle missing selected outputs (`"ignore"`, `"warn"`, or `"error"`)
* `entrypoint` - Runtime entrypoint overrides are not supported.
* `max_concurrency` - Shared limit across all executions
* `error_handling` - How to handle failures:
  * `"raise"` (default): Stop on first failure and raise the exception
  * `"continue"`: Collect all results, including failures as `RunResult` with `status=FAILED`
* `event_processors` - Optional list of [event processors](https://gilad-rubin.gitbook.io/hypergraph/api-reference/events) to observe execution
* `workflow_id` - Optional workflow identifier for checkpoint persistence and resume. Creates per-item child runs that can be skipped on re-run. See [Resuming Batches](https://gilad-rubin.gitbook.io/hypergraph/how-to-guides/batch-processing#resuming-batches).
* `**input_values` - Input shorthand (merged with `values`)

**Example:**

```python
# Process documents concurrently
results = await runner.map(
    graph,
    {"doc": documents},
    map_over="doc",
    max_concurrency=20,  # Limit total concurrent operations
)

# kwargs shorthand
results = await runner.map(graph, map_over="doc", doc=documents)

# Continue on errors with async
results = await runner.map(
    graph,
    {"doc": documents},
    map_over="doc",
    max_concurrency=20,
    error_handling="continue",
)
```

For very large batches, prefer setting `max_concurrency` explicitly. If `max_concurrency=None` and the fan-out is extremely large, `AsyncRunner.map()` raises `ValueError` to avoid unbounded task creation.

### capabilities

```python
@property
def capabilities(self) -> RunnerCapabilities: ...
```

```python
runner = AsyncRunner()
caps = runner.capabilities

caps.supports_cycles       # True
caps.supports_async_nodes  # True
caps.supports_streaming    # False (Phase 2)
caps.returns_coroutine     # True
caps.supports_interrupts   # True
```

***

## RunResult

Result of a graph execution.

```python
from hypergraph import RunResult, RunStatus

result = runner.run(graph, values)

# Access outputs (dict-like)
value = result["output_name"]
value = result.get("output_name", default)
exists = "output_name" in result

# Check status
if result.status == RunStatus.COMPLETED:
    process(result.values)
else:
    handle_error(result.error)
```

### Attributes

```python
@dataclass
class RunResult:
    values: dict[str, Any]      # Output values
    status: RunStatus           # COMPLETED, FAILED, or PAUSED
    run_id: str                 # Unique identifier (auto-generated)
    workflow_id: str | None     # Optional workflow tracking
    error: BaseException | None # Exception if FAILED
    pause: PauseInfo | None     # Pause info if PAUSED (InterruptNode)
```

### Convenience Properties

```python
result.completed  # True if status == COMPLETED
result.paused     # True if status == PAUSED
result.failed     # True if status == FAILED
```

### Dict-like Access

```python
# These are equivalent
result["key"]
result.values["key"]

# Safe access with default
result.get("key", default_value)

# Check existence
"key" in result
```

### Partial Values on Failure

By default, `run()` raises the original exception on node failure. To get partial results instead, use `error_handling="continue"`:

```python
# Use error_handling="continue" to get partial results instead of raising
result = runner.run(graph, {"x": 5}, error_handling="continue")

if result.status == RunStatus.FAILED:
    # values contains outputs from nodes that succeeded before the failure
    partial = result.values  # e.g. {"step1_output": 10}
    error = result.error     # the exception that caused the failure
```

This is useful for debugging — you can inspect which nodes completed successfully.

### Progressive Disclosure

```python
# One-line summary
result.summary()   # "3 nodes | 12ms | 0 errors | slowest: generate (8ms)"

# JSON-serializable metadata (no raw values or exception objects)
result.to_dict()   # {"status": "completed", "run_id": "run-abc", "log": {...}}
```

***

## MapResult

Result of a batch `map()` execution. Wraps individual `RunResult` items with batch-level metadata.

Supports read-only sequence protocol — `len()`, `iter()`, `[int]` work; mutable list ops do not.

```python
from hypergraph import MapResult

results = runner.map(graph, {"x": [1, 2, 3]}, map_over="x")

# Sequence protocol (backward compatible)
len(results)     # 3
results[0]       # RunResult
for r in results: ...

# String key access — collect values across items
results["doubled"]           # [2, 4, 6]
results.get("doubled", 0)   # [2, 4, 6] (with default for missing)

# Aggregate status
results.status       # RunStatus.COMPLETED (or FAILED if any failed)
results.completed    # True if all completed
results.failed       # True if any failed
results.failures     # List of failed RunResult items

# Progressive disclosure
results.summary()    # "3 items | 3 completed | 12ms"
results.to_dict()    # JSON-serializable batch metadata + per-item results
```

### Attributes

```python
@dataclass(frozen=True)
class MapResult:
    results: tuple[RunResult, ...]   # Individual results
    run_id: str | None               # None for empty maps
    total_duration_ms: float         # Wall-clock batch time
    map_over: tuple[str, ...]        # Parameter names iterated
    map_mode: str                    # "zip" or "product"
    graph_name: str                  # Name of the executed graph
```

### Status Precedence

`FAILED > PAUSED > COMPLETED`. If any item failed, the batch status is FAILED. Empty batch → COMPLETED.

***

## RunStatus

Enum for execution status.

```python
from hypergraph import RunStatus

class RunStatus(Enum):
    COMPLETED = "completed"  # Success
    FAILED = "failed"        # Error occurred
    PAUSED = "paused"        # Waiting for human input (InterruptNode)
```

**Usage:**

```python
# With error_handling="continue", check the status to handle failures
result = runner.run(graph, values, error_handling="continue")

match result.status:
    case RunStatus.COMPLETED:
        return result["output"]
    case RunStatus.PAUSED:
        print(result.pause.value)  # Value from InterruptNode
    case RunStatus.FAILED:
        raise result.error  # Re-raise manually if needed
```

With the default `error_handling="raise"`, node failures raise before returning a `RunResult`, so the status will be `COMPLETED` or `PAUSED`.

***

## Errors

### MissingInputError

Raised when required inputs are not provided.

```python
from hypergraph import MissingInputError

try:
    result = runner.run(graph, {})  # Missing required input
except MissingInputError as e:
    print(e)
    # Missing required input(s): ['query']
    #
    # How to fix:
    #   Provide value for 'query' in the values dict
```

### IncompatibleRunnerError

Raised when runner can't execute graph.

```python
from hypergraph import IncompatibleRunnerError

@node(output_name="data")
async def fetch(url: str) -> dict:
    return {}

graph = Graph([fetch])

try:
    SyncRunner().run(graph, {"url": "..."})
except IncompatibleRunnerError as e:
    print(e)
    # SyncRunner cannot execute async nodes.
    # Found async node: 'fetch'
    #
    # How to fix:
    #   Use AsyncRunner instead
```

### InfiniteLoopError

Raised when cyclic graph exceeds max iterations.

```python
from hypergraph import InfiniteLoopError

try:
    result = runner.run(cyclic_graph, values, max_iterations=100)
except InfiniteLoopError as e:
    print(e)
    # Graph execution exceeded 100 iterations
```

***

## Execution Model

### Input Normalization

Runners accept inputs in two equivalent ways:

```python
# explicit dict
runner.run(graph, values={"query": "hello", "llm": llm})

# kwargs shorthand
runner.run(graph, query="hello", llm=llm)
```

Rules:

* `values` + kwargs are merged
* duplicate keys raise `ValueError`
* option names like `select`, `map_over`, `max_concurrency` are reserved for runner options
* reserved option names in kwargs raise `ValueError`
* if an input name matches an option name, pass that input through `values={...}`

```python
# input named "select" must go through values
runner.run(graph, values={"select": "fast"})
```

### Execution Model

Runners execute graphs in two phases:

1. Build a static execution plan from the active graph
2. Walk that plan until each region reaches quiescence

The static plan is a DAG of **strongly connected components** (SCCs):

* **DAG region** — a component with no feedback loop, usually runs in one pass
* **Cyclic region** — a component with feedback, iterates locally until no node in that region is ready
* **Gate edges** — participate in planning so gate-driven loops stay in one region, but gate decisions still act as runtime activation

Within one topo layer, runners still use supersteps as the execution batch:

1. Find ready nodes inside the current layer
2. Execute them (sequentially for Sync, concurrently for Async)
3. Update values, versions, and routing decisions
4. Repeat until that layer reaches quiescence

```
Layer 1: [embed]
Layer 2: [retrieve]
Layer 3: [generate]
```

For cycles:

```
Layer 1: [generate, should_continue]  → local iteration until quiescent
Layer 2: [finalize]
```

### Value Resolution Order

When collecting inputs for a node, values are resolved in this order:

1. **Edge value** - Output from upstream node
2. **Input value** - Provided via `values`/kwargs. On resume/fork, checkpoint state is restored first, then runtime inputs apply (runtime values win). See [Run Lineage](https://gilad-rubin.gitbook.io/hypergraph/how-to-guides/batch-processing#run-lineage-resume-vs-fork).
3. **Bound value** - From `graph.bind()`
4. **Function default** - From function signature

```python
@node(output_name="result")
def process(x: int = 10) -> int:  # default=10
    return x * 2

graph = Graph([process]).bind(x=5)  # bound=5

# Edge value wins (if exists)
# Then input value: runner.run(graph, {"x": 3})  → x=3
# Then bound value: runner.run(graph, {})        → x=5
# Then default: (if no bind) runner.run(graph, {}) → x=10
```

> **Note:** Fork/resume restoration rehydrates execution state from checkpoint snapshot data (including prior computed values) and uses staleness checks to decide which nodes re-execute.

### Cyclic Graphs

Graphs with cycles (feedback loops) execute as local SCC regions until quiescent:

```python
@node(output_name="count")
def increment(count: int) -> int:
    return count + 1 if count < 5 else count

# Cycle: count feeds back into increment
# Runs until the cyclic region has no more ready work
```

The runner:

1. Tracks value and `wait_for` freshness via versions
2. Re-executes nodes in the current cyclic region when their inputs become fresh
3. Stops when that region has no more ready nodes, or `max_iterations` is hit

***

## Nested Graphs

GraphNodes (nested graphs) are executed by the same runner:

```python
inner = Graph([double], name="inner")
outer = Graph([inner.as_node(), triple])

runner = SyncRunner()
result = runner.run(outer, {"x": 5})
```

The runner automatically:

* Delegates to the inner graph
* Shares concurrency limits (AsyncRunner)
* Propagates the cache backend to nested graphs
* Propagates errors
