# Nodes

Nodes are the building blocks of hypergraph. Wrap functions, compose graphs, adapt interfaces.

* **FunctionNode** - Wrap any Python function (sync, async, generator)
* **GraphNode** - Nest a graph as a node for hierarchical composition
* **HyperNode** - Abstract base class defining the common interface

## HyperNode

**HyperNode** is the abstract base class for all node types. It defines the minimal interface that all nodes share: name, inputs, outputs, and rename capabilities.

### Cannot Be Instantiated Directly

```python
from hypergraph import HyperNode

HyperNode()  # TypeError: HyperNode cannot be instantiated directly
```

Use `FunctionNode` (via `@node` decorator) or other concrete node types instead.

### Core Attributes

Every HyperNode has these attributes (set by subclass `__init__`):

```python
node.name: str                           # Public node name
node.inputs: tuple[str, ...]             # Input parameter names
node.outputs: tuple[str, ...]            # Output value names
node._rename_history: list[RenameEntry]  # Internal: tracks renames for error messages
```

### Public Methods

### `with_name(name: str) -> HyperNode`

Return a new node with a different name.

```python
from hypergraph import node

@node(output_name="result")
def process(x: int) -> int:
    return x * 2

renamed = process.with_name("preprocessor")
print(renamed.name)     # "preprocessor"
print(process.name)     # "process" (original unchanged)
```

**Returns:** New node instance (immutable pattern)

**Raises:** None (always succeeds)

### `with_inputs(mapping=None, /, **kwargs) -> HyperNode`

Return a new node with renamed inputs.

```python
@node(output_name="result")
def process(text: str, config: dict) -> str:
    return text.upper()

# Using keyword args
adapted = process.with_inputs(text="raw_input", config="settings")
print(adapted.inputs)  # ("raw_input", "settings")

# Using dict (for reserved keywords or dynamic renames)
adapted = process.with_inputs({"text": "raw_input", "class": "category"})
```

**Args:**

* `mapping` (optional, positional-only): Dict of `{old_name: new_name}`
* `**kwargs`: Additional renames as keyword arguments

**Returns:** New node instance with updated inputs

**Raises:**

* `RenameError` - If any old name not found in current inputs
* `RenameError` - Includes helpful history if name was already renamed

### `with_outputs(mapping=None, /, **kwargs) -> HyperNode`

Return a new node with renamed outputs.

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

# Using keyword args
adapted = process.with_outputs(result="doubled")
print(adapted.outputs)  # ("doubled",)

# Using dict
adapted = process.with_outputs({"result": "doubled"})
```

**Args:**

* `mapping` (optional, positional-only): Dict of `{old_name: new_name}`
* `**kwargs`: Additional renames as keyword arguments

**Returns:** New node instance with updated outputs

**Raises:**

* `RenameError` - If any old name not found in current outputs
* `RenameError` - Includes helpful history if name was already renamed

### Immutability Pattern

All `with_*` methods return new instances. The original is never modified:

```python
original = process
v1 = original.with_name("v1")
v2 = original.with_name("v2")
v3 = v1.with_inputs(x="input")

print(original.name)  # "process" (unchanged)
print(v1.name)        # "v1"
print(v2.name)        # "v2"
print(v3.name)        # "v1" (same as v1)
print(v3.inputs)      # ("input",) (renamed from v1)
```

### Type Checking

Use `isinstance()` to check node types:

```python
from hypergraph import HyperNode, FunctionNode

node = FunctionNode(lambda x: x)

isinstance(node, HyperNode)      # True
isinstance(node, FunctionNode)   # True
```

***

## FunctionNode

**FunctionNode** wraps a Python function as a graph node. Created via the `@node` decorator or `FunctionNode()` constructor directly.

### Constructor

```python
def __init__(
    self,
    source: Callable | FunctionNode,
    name: str | None = None,
    output_name: str | tuple[str, ...] | None = None,
    *,
    rename_inputs: dict[str, str] | None = None,
    cache: bool = False,
    hide: bool = False,
    emit: str | tuple[str, ...] | None = None,
    wait_for: str | tuple[str, ...] | None = None,
) -> None: ...
```

**Args:**

* `source` (required): Function to wrap, or existing FunctionNode (extracts underlying function)
* `name`: Public node name. Defaults to `source.__name__` if source is a function
* `output_name`: Name(s) for the output value(s). If None, node is side-effect only (outputs = ())
  * `str` - Single output (becomes 1-tuple)
  * `tuple[str, ...]` - Multiple outputs
  * `None` - Side-effect only, no outputs
* `rename_inputs`: Optional dict `{old_param: new_param}` for input renaming
* `cache`: Whether to cache results (default: False)
* `hide`: Whether to hide this node from visualization (default: False)
* `emit`: Ordering-only output name(s). Auto-produced when the node runs
* `wait_for`: Ordering-only input name(s). Node waits until these values exist and are fresh

**Returns:** FunctionNode instance

**Raises:**

* `ValueError` - If function source cannot be retrieved (for definition\_hash)
* `UserWarning` - If function has return annotation but no output\_name provided

### Creating from a function

```python
from hypergraph import FunctionNode

def double(x: int) -> int:
    return x * 2

node = FunctionNode(double, name="double_value", output_name="result")

print(node.name)     # "double_value"
print(node.inputs)   # ("x",)
print(node.outputs)  # ("result",)
```

### Creating from existing FunctionNode

When source is a FunctionNode, only the underlying function is extracted. All other config is discarded:

```python
# Original node
original = FunctionNode(double, name="original_name", output_name="original_output")

# Creating fresh from existing FunctionNode
fresh = FunctionNode(original, name="new_name", output_name="new_output")

print(fresh.func is original.func)     # True (same function)
print(fresh.name)                      # "new_name" (new config)
print(fresh.outputs)                   # ("new_output",)
print(original.name)                   # "original_name" (unchanged)
```

### Properties

### `func: Callable`

The wrapped Python function. Read-only.

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

# Call directly
result = process.func(5)
print(result)  # 10

# Or use __call__
result = process(5)
print(result)  # 10
```

### `name: str`

Public node name (may differ from function name).

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

print(process.name)  # "process"

renamed = process.with_name("preprocessor")
print(renamed.name)  # "preprocessor"
```

### `inputs: tuple[str, ...]`

Input parameter names from function signature (after renaming).

```python
@node(output_name="result")
def add(x: int, y: int) -> int:
    return x + y

print(add.inputs)  # ("x", "y")

adapted = add.with_inputs(x="a", y="b")
print(adapted.inputs)  # ("a", "b")
```

### `outputs: tuple[str, ...]`

Output value names (empty if no output\_name).

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

print(process.outputs)  # ("result",)

# Multiple outputs
@node(output_name=("mean", "std"))
def stats(data: list) -> tuple:
    return ...

print(stats.outputs)  # ("mean", "std")

# Side-effect only
@node
def log(msg: str) -> None:
    print(msg)

print(log.outputs)  # ()
```

### `data_outputs: tuple[str, ...]`

Output names that carry data (excludes emit outputs). Same as `outputs` when no `emit` is set.

```python
@node(output_name="result", emit="done")
def producer(x: int) -> int:
    return x + 1

print(producer.outputs)       # ("result", "done")
print(producer.data_outputs)  # ("result",)
```

### `wait_for: tuple[str, ...]`

Ordering-only inputs. Empty tuple when not set.

```python
@node(output_name="result", wait_for="signal")
def consumer(x: int) -> int:
    return x

print(consumer.wait_for)  # ("signal",)
```

### `cache: bool`

Whether results are cached (default: False). Set via constructor.

```python
@node(output_name="result", cache=True)
def expensive(x: int) -> int:
    return x ** 100

print(expensive.cache)  # True
```

### `definition_hash: str`

SHA256 hash of function source code (64-character hex string). Computed at node creation.

Used for cache invalidation - if function source changes, hash changes, and cached results are invalidated.

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

hash_val = process.definition_hash
print(len(hash_val))  # 64
print(hash_val)       # "a3f5f6d7e8c9b0a1..." (example)
```

**Raises ValueError** if source cannot be retrieved (built-ins, C extensions):

```python
# This will raise ValueError
import os
node = FunctionNode(os.path.exists, output_name="exists")
# ValueError: Cannot hash function exists: could not get source code
```

### `is_async: bool`

True if function is async or async generator. Read-only, auto-detected.

```python
# Sync function
@node(output_name="result")
def sync_func(x: int) -> int:
    return x * 2

print(sync_func.is_async)  # False

# Async function
@node(output_name="result")
async def async_func(x: int) -> int:
    return x * 2

print(async_func.is_async)  # True

# Async generator
@node(output_name="items")
async def async_gen(n: int):
    for i in range(n):
        yield i

print(async_gen.is_async)  # True
```

### `is_generator: bool`

True if function yields values (sync or async generator). Read-only, auto-detected.

```python
# Sync function
@node(output_name="result")
def sync_func(x: int) -> int:
    return x * 2

print(sync_func.is_generator)  # False

# Sync generator
@node(output_name="items")
def sync_gen(n: int):
    for i in range(n):
        yield i

print(sync_gen.is_generator)  # True

# Async generator
@node(output_name="items")
async def async_gen(n: int):
    for i in range(n):
        yield i

print(async_gen.is_generator)  # True
```

### Special Methods

### \_\_call\_\_(\*args, \*\*kwargs)

Call the wrapped function directly. Delegates to `self.func(*args, **kwargs)`.

```python
@node(output_name="result")
def double(x: int) -> int:
    return x * 2

# Both equivalent
result1 = double(5)
result2 = double.func(5)

print(result1)  # 10
print(result2)  # 10
```

### \_\_repr\_\_() -> str

Informative string representation showing function name and node config.

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

print(repr(process))
# FunctionNode(process, outputs=('result',))

renamed = process.with_name("preprocessor")
print(repr(renamed))
# FunctionNode(process as 'preprocessor', outputs=('result',))
```

***

## @node Decorator

```python
@node
def foo(x): ...

# or

@node(output_name="result", cache=True)
def foo(x): ...
```

Decorator to wrap a function as a FunctionNode. Can be used with or without parentheses.

### Signature

```python
def node(
    source: Callable | None = None,
    output_name: str | tuple[str, ...] | None = None,
    *,
    rename_inputs: dict[str, str] | None = None,
    cache: bool = False,
    hide: bool = False,
    emit: str | tuple[str, ...] | None = None,
    wait_for: str | tuple[str, ...] | None = None,
) -> FunctionNode | Callable[[Callable], FunctionNode]: ...
```

**Args:**

* `source`: The function (when used without parens like `@node`)
* `output_name`: Output name(s). If None, side-effect only (outputs = ())
* `rename_inputs`: Optional dict to rename inputs
* `cache`: Enable result caching for this node. Requires a cache backend on the runner. See [Caching](https://gilad-rubin.gitbook.io/hypergraph/patterns/08-caching). Not allowed on GraphNode
* `hide`: Whether to hide this node from visualization (default: False)
* `emit`: Ordering-only output name(s). Auto-produced when the node runs. Used with `wait_for` to enforce execution order without data dependency. See [Ordering](https://gilad-rubin.gitbook.io/hypergraph/patterns/03-agentic-loops#ordering-with-emitwait_for)
* `wait_for`: Ordering-only input name(s). Node won't run until these values exist and are fresh. Must reference an `emit` or `output_name` of another node

**Returns:**

* FunctionNode if source provided (decorator without parens)
* Decorator function if source is None (decorator with parens)

### Usage Without Parentheses

```python
@node
def double(x: int) -> int:
    return x * 2

print(double.name)     # "double"
print(double.outputs)  # ()  ← Side-effect only! Warning emitted.
```

The decorator always uses `func.__name__` for the node name. To customize, use FunctionNode directly.

### Usage With Parentheses

```python
@node(output_name="result")
def double(x: int) -> int:
    return x * 2

print(double.name)     # "double"
print(double.outputs)  # ("result",)
```

### With All Parameters

```python
@node(
    output_name="result",
    cache=True,
)
def expensive_operation(x: int) -> int:
    return x ** 100

print(expensive_operation.name)   # "expensive_operation"
print(expensive_operation.cache)  # True
```

### Warning on Missing output\_name

If your function has a return type annotation but no output\_name, a warning is emitted:

```python
@node  # Missing output_name!
def fetch(url: str) -> dict:
    return requests.get(url).json()

# UserWarning: Function 'fetch' has return type '<class 'dict'>' but no output_name.
# If you want to capture the return value, use @node(output_name='...').
# Otherwise, ignore this warning for side-effect only nodes.
```

This helps catch accidental omissions. If the function is truly side-effect only, add type hints:

```python
from typing import NoReturn

@node
def log(msg: str) -> None:  # Explicitly None → no warning
    print(msg)

# or

@node
def log(msg: str):  # No return annotation → no warning
    print(msg)
```

***

## RenameError

Exception raised when a rename operation references a non-existent name.

```python
from hypergraph import RenameError, node

@node(output_name="result")
def process(x: int) -> int:
    return x * 2

try:
    process.with_inputs(y="renamed")  # 'y' doesn't exist
except RenameError as e:
    print(e)
    # 'y' not found. Current inputs: ('x',)
```

### Error Messages Include History

When a name was previously renamed, the error message helps you understand what happened:

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

# Rename x to input
renamed = process.with_inputs(x="input")

# Try to use old name
try:
    renamed.with_inputs(x="something_else")
except RenameError as e:
    print(e)
    # 'x' was renamed to 'input'. Current inputs: ('input',)
```

### Exception Details

* Type: `Exception`
* Module: `hypergraph.nodes._rename`
* Public export: `from hypergraph import RenameError`

***

## Execution Modes

FunctionNode supports four execution modes, auto-detected from the function signature:

### 1. Synchronous Function

```python
@node(output_name="result")
def sync(x: int) -> int:
    return x * 2

print(sync.is_async)      # False
print(sync.is_generator)  # False
```

### 2. Asynchronous Function

```python
@node(output_name="data")
async def async_func(url: str) -> dict:
    async with client.get(url) as resp:
        return await resp.json()

print(async_func.is_async)      # True
print(async_func.is_generator)  # False
```

### 3. Synchronous Generator

```python
from typing import Iterator

@node(output_name="chunks")
def sync_gen(text: str, size: int = 100) -> Iterator[str]:
    for i in range(0, len(text), size):
        yield text[i:i+size]

print(sync_gen.is_async)      # False
print(sync_gen.is_generator)  # True
```

### 4. Asynchronous Generator

```python
from typing import AsyncIterator

@node(output_name="tokens")
async def async_gen(prompt: str) -> AsyncIterator[str]:
    async for chunk in llm.stream(prompt):
        yield chunk.text

print(async_gen.is_async)      # True
print(async_gen.is_generator)  # True
```

***

## Complete Example

Combining all features:

```python
from hypergraph import node, FunctionNode

# Define a function
def calculate(x: int, y: int) -> tuple[int, int]:
    return x + y, x * y

# Create node with full config
node_instance = FunctionNode(
    source=calculate,
    name="arithmetic",
    output_name=("sum", "product"),
    rename_inputs={"x": "first", "y": "second"},
    cache=True,
)

# Access properties
print(node_instance.name)           # "arithmetic"
print(node_instance.inputs)         # ("first", "second")
print(node_instance.outputs)        # ("sum", "product")
print(node_instance.cache)          # True
print(node_instance.is_async)       # False
print(node_instance.is_generator)   # False

# Transform with fluent API
adapted = (
    node_instance
    .with_name("math_ops")
    .with_inputs(first="a", second="b")
    .with_outputs(sum="total", product="multiply")
)

print(adapted.name)     # "math_ops"
print(adapted.inputs)   # ("a", "b")
print(adapted.outputs)  # ("total", "multiply")

# Call the function
result = node_instance(5, 3)
print(result)           # (8, 15)
```

## GraphNode

**GraphNode** wraps a Graph for use as a node in another graph. This enables hierarchical composition: a graph can contain other graphs as nodes.

### Creating GraphNode

Create via `Graph.as_node()` rather than directly:

```python
from hypergraph import node, Graph

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

# Inner graph must have a name
inner = Graph([double], name="doubler")

# Wrap as node
gn = inner.as_node()
print(gn.name)     # "doubler"
print(gn.inputs)   # ('x',)
print(gn.outputs)  # ('doubled',)
```

### Overriding the Name

You can override the name when calling `as_node()`:

```python
gn = inner.as_node(name="my_doubler")
print(gn.name)  # "my_doubler"
```

### Properties

GraphNode inherits from HyperNode and has these properties:

#### `name: str`

The node name. Either from `graph.name` or explicitly provided to `as_node()`.

#### `inputs: tuple[str, ...]`

All inputs of the wrapped graph (required + optional + entry point params).

```python
gn = inner.as_node()
print(gn.inputs)  # ('x',)
```

#### `outputs: tuple[str, ...]`

All outputs of the wrapped graph.

```python
gn = inner.as_node()
print(gn.outputs)  # ('doubled',)
```

#### `graph: Graph`

The wrapped Graph instance.

```python
gn = inner.as_node()
print(gn.graph.name)  # "doubler"
```

#### `is_async: bool`

True if the wrapped graph contains any async nodes.

```python
@node(output_name="data")
async def fetch(url: str) -> dict:
    return {}

async_graph = Graph([fetch], name="fetcher")
gn = async_graph.as_node()
print(gn.is_async)  # True
```

#### `definition_hash: str`

Delegates to the wrapped graph's `definition_hash`.

```python
gn = inner.as_node()
print(gn.definition_hash == inner.definition_hash)  # True
```

### Type Annotation Forwarding

GraphNode forwards type annotations from the inner graph for `strict_types` validation:

```python
@node(output_name="value")
def producer() -> int:
    return 42

inner = Graph([producer], name="inner")
gn = inner.as_node()

# Type forwarding works
print(gn.get_output_type("value"))  # <class 'int'>

# Allows strict_types validation in outer graph
@node(output_name="result")
def consumer(value: int) -> int:
    return value * 2

outer = Graph([gn, consumer], strict_types=True)  # Works!
```

### Nested Composition Example

```python
from hypergraph import node, Graph

# Level 1: Simple nodes
@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

@node(output_name="tripled")
def triple(doubled: int) -> int:
    return doubled * 3

# Level 2: Inner graph
inner = Graph([double, triple], name="multiply")
print(inner.inputs.required)  # ('x',)
print(inner.outputs)          # ('doubled', 'tripled')

# Level 3: Wrap and use in outer graph
@node(output_name="final")
def finalize(tripled: int) -> str:
    return f"Result: {tripled}"

outer = Graph([inner.as_node(), finalize])
print(outer.inputs.required)  # ('x',)
print(outer.outputs)          # ('doubled', 'tripled', 'final')
```

### Rename Methods

GraphNode supports the same rename methods as other nodes:

```python
gn = inner.as_node()

# Rename inputs
adapted = gn.with_inputs(x="input_value")
print(adapted.inputs)  # ('input_value',)

# Rename outputs
adapted = gn.with_outputs(doubled="result")
print(adapted.outputs)  # ('result',)

# Rename the node itself
adapted = gn.with_name("my_processor")
print(adapted.name)  # "my_processor"
```

### map\_over()

Configure a GraphNode to iterate over input parameters. When the outer graph runs, the inner graph executes multiple times—once per value in the mapped parameters.

```python
def map_over(
    self,
    *params: str,
    mode: Literal["zip", "product"] = "zip",
    error_handling: Literal["raise", "continue"] = "raise",
) -> GraphNode: ...
```

**Args:**

* `*params` - Input parameter names to iterate over
* `mode` - How to combine multiple parameters:
  * `"zip"` (default): Parallel iteration, equal-length lists required
  * `"product"`: Cartesian product, all combinations
* `error_handling` - How to handle failures during mapped execution:
  * `"raise"` (default): Stop on first failure and raise the error
  * `"continue"`: Collect all results, using `None` as placeholder for failed items (preserving list length)

**Returns:** New GraphNode with map\_over configuration

**Raises:**

* `ValueError` - If no parameters specified
* `ValueError` - If parameter not in node's inputs

**Example: Basic Iteration**

```python
from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

# Inner graph
inner = Graph([double], name="inner")

# Configure for iteration over x
gn = inner.as_node().map_over("x")

# Use in outer graph
outer = Graph([gn])

runner = SyncRunner()
result = runner.run(outer, {"x": [1, 2, 3]})

# Output is a list of results
print(result["doubled"])  # [2, 4, 6]
```

**Example: Zip Mode (Multiple Parameters)**

```python
@node(output_name="sum")
def add(a: int, b: int) -> int:
    return a + b

inner = Graph([add], name="adder")
gn = inner.as_node().map_over("a", "b", mode="zip")

outer = Graph([gn])
result = runner.run(outer, {"a": [1, 2, 3], "b": [10, 20, 30]})

# Pairs: (1,10), (2,20), (3,30)
print(result["sum"])  # [11, 22, 33]
```

**Example: Product Mode**

```python
gn = inner.as_node().map_over("a", "b", mode="product")

outer = Graph([gn])
result = runner.run(outer, {"a": [1, 2], "b": [10, 20]})

# All combinations: (1,10), (1,20), (2,10), (2,20)
print(result["sum"])  # [11, 21, 12, 22]
```

**Example: Error Handling**

```python
# Continue on errors — failed items become None, preserving list length
gn = inner.as_node().map_over("x", error_handling="continue")

outer = Graph([gn])
result = runner.run(outer, {"x": [1, 2, "bad_input", 4]})

# result["doubled"] → [2, 4, None, 8]
# None placeholders keep list aligned with inputs
```

**Output Types with map\_over**

When `map_over` is configured, output types are automatically wrapped in `list[]`:

```python
@node(output_name="value")
def produce() -> int:
    return 42

inner = Graph([produce], name="inner")

# Without map_over
gn = inner.as_node()
print(gn.get_output_type("value"))  # <class 'int'>

# With map_over
gn_mapped = gn.map_over("x")
print(gn_mapped.get_output_type("value"))  # list[int]
```

This enables `strict_types=True` validation in outer graphs.

**Rename Integration**

When you rename inputs, map\_over configuration updates automatically:

```python
gn = inner.as_node().map_over("x")
renamed = gn.with_inputs(x="input_value")

# map_over now references "input_value"
print(renamed.inputs)  # ('input_value',)
```

### map\_config Property

Check the current map\_over configuration:

```python
@property
def map_config(self) -> tuple[list[str], Literal["zip", "product"], ErrorHandling] | None: ...
```

```python
gn = inner.as_node()
print(gn.map_config)  # None

gn_mapped = gn.map_over("x", "y", mode="product")
print(gn_mapped.map_config)  # (['x', 'y'], 'product', 'raise')
```

### Error: Missing Name

If neither the graph nor `as_node()` provides a name, an error is raised:

```python
unnamed = Graph([double])  # No name
unnamed.as_node()

# ValueError: GraphNode requires a name. Either set name on Graph(..., name='x')
# or pass name to as_node(name='x')
```

***

## NodeContext

**NodeContext** provides framework capabilities to nodes that need them: cooperative stop signals and live streaming. Injected automatically when detected in the function signature via type hint.

### Usage

Add `ctx: NodeContext` to any node function:

```python
from hypergraph import node, NodeContext

@node(output_name="response")
async def llm_reply(messages: list, ctx: NodeContext) -> str:
    response = ""
    async for chunk in llm.stream(messages):
        if ctx.stop_requested:
            break
        response += chunk
        ctx.stream(chunk)
    return response
```

The framework detects `NodeContext` in the signature and injects it at execution time. The parameter is excluded from the node's inputs — it never appears in `node.inputs` and cannot be provided via `bind()` or `values`.

```python
llm_reply.inputs   # ("messages",) — ctx is not a graph input
llm_reply.outputs  # ("response",)
```

### Properties

#### `stop_requested: bool`

Read-only. `True` when `runner.stop(workflow_id)` has been called. The node checks this cooperatively and decides when to break.

```python
@node(output_name="results")
async def process_batch(items: list, ctx: NodeContext) -> list:
    results = []
    for item in items:
        if ctx.stop_requested:
            break
        results.append(await process(item))
    return results
```

### Methods

#### `stream(chunk: Any) -> None`

Emit a `StreamingChunkEvent` for live UI preview. Does not affect the node's return value — the node controls its own output type.

```python
@node(output_name="response")
async def generate(prompt: str, ctx: NodeContext) -> str:
    response = ""
    async for chunk in llm.stream(prompt):
        response += chunk
        ctx.stream(chunk)  # UI sees tokens live
    return response         # output: final string
```

Streaming is a side-channel. The framework doesn't accumulate chunks, manage reducers, or touch output types. `ctx.stream()` is silently skipped if `stop_requested` is `True`.

### Injection Mechanism

NodeContext uses the same type-hint inspection that powers automatic edge inference. This is the same pattern FastAPI uses for `Request` and `BackgroundTasks`:

* The **type annotation** determines injection, not the parameter name. `ctx`, `context`, `nc` — all work.
* Functions **without** `NodeContext` work exactly as before. Backward compatible.
* Testing is plain Python: `llm_reply(messages=["hi"], ctx=mock_context)`.

### Testing

```python
from unittest.mock import MagicMock

def test_llm_reply_stops():
    ctx = MagicMock(spec=NodeContext)
    ctx.stop_requested = True

    result = llm_reply(messages=["hello"], ctx=ctx)
    assert result == ""  # stopped immediately
```

No framework setup needed — pass a mock or stub directly.

***

## InterruptNode

**InterruptNode** is a thin FunctionNode subclass that acts as a declarative pause point for human-in-the-loop workflows. When the handler returns `None`, execution pauses. When it returns a value, the interrupt auto-resolves.

### `@interrupt` Decorator

The `@interrupt` decorator creates an InterruptNode from a function, just like `@node` creates a FunctionNode:

```python
from hypergraph import interrupt

@interrupt(output_name="decision")
def approval(draft: str) -> str | None:
    return None  # pause for human review
    # return "auto-approved"  # returns value -> auto-resolve
```

**Decorator args:**

* `output_name` (required): Output name(s)
* `rename_inputs`: Optional dict to rename inputs
* `cache`: Enable result caching (default: `False`)
* `emit`: Ordering-only output name(s) (see [emit/wait\_for](https://gilad-rubin.gitbook.io/hypergraph/patterns/03-agentic-loops#ordering-with-emitwait_for))
* `wait_for`: Ordering-only input name(s)
* `hide`: Whether to hide from visualization

### Constructor

Like FunctionNode, InterruptNode can also be created via the constructor. `output_name` is required.

```python
from hypergraph import InterruptNode

def my_handler(draft: str) -> str:
    return "approved"

approval = InterruptNode(my_handler, output_name="decision")
# Or with all options:
approval = InterruptNode(
    my_handler,
    name="review",
    output_name="decision",
    emit="reviewed",
    wait_for="ready",
)
```

### Properties

| Property          | Type              | Description                                     |
| ----------------- | ----------------- | ----------------------------------------------- |
| `inputs`          | `tuple[str, ...]` | Input parameter names (from function signature) |
| `outputs`         | `tuple[str, ...]` | All output names (data + emit)                  |
| `data_outputs`    | `tuple[str, ...]` | Data-only outputs (excluding emit)              |
| `is_interrupt`    | `bool`            | Always `True`                                   |
| `cache`           | `bool`            | Whether caching is enabled (default: `False`)   |
| `hide`            | `bool`            | Whether hidden from visualization               |
| `wait_for`        | `tuple[str, ...]` | Ordering-only inputs                            |
| `is_async`        | `bool`            | True if handler is async                        |
| `is_generator`    | `bool`            | True if handler yields                          |
| `definition_hash` | `str`             | SHA256 hash of function source                  |

### Methods

#### Inherited: `with_name()`, `with_inputs()`, `with_outputs()`

All HyperNode rename methods work as expected.

### Example: Pause and Resume

```python
from hypergraph import Graph, node, AsyncRunner, interrupt

@node(output_name="draft")
def make_draft(query: str) -> str:
    return f"Draft for: {query}"

@interrupt(output_name="decision")
def approval(draft: str) -> str | None:
    return None  # pause for human review

@node(output_name="result")
def finalize(decision: str) -> str:
    return f"Final: {decision}"

graph = Graph([make_draft, approval, finalize])
runner = AsyncRunner()

# Pauses at the interrupt
result = await runner.run(graph, {"query": "hello"})
assert result.paused
assert result.pause.value == "Draft for: hello"

# Resume with response
result = await runner.run(graph, {
    "query": "hello",
    result.pause.response_key: "approved",
})
assert result["result"] == "Final: approved"
```

For a full guide with multiple interrupts, nested graphs, and handler patterns, see [Human-in-the-Loop](https://gilad-rubin.gitbook.io/hypergraph/patterns/07-human-in-the-loop).
