# Getting Started

Learn the core concepts: nodes, graphs, and runners.

* **Nodes** - Wrap functions with named inputs and outputs
* **Graphs** - Connect nodes automatically via matching names
* **Runners** - Execute graphs synchronously or asynchronously

## Core Concepts

### Nodes

A **node** is a function wrapped as a graph component. Nodes have:

* **Inputs** - Parameter names from the function signature
* **Outputs** - Named values produced by the function
* **A name** - Identifier for the node (defaults to function name)

```python
from hypergraph import node

@node(output_name="result")
def add(x: int, y: int) -> int:
    return x + y

# Properties
print(add.name)      # "add"
print(add.inputs)    # ("x", "y")
print(add.outputs)   # ("result",)
```

### Automatic Edge Inference

Hypergraph connects nodes automatically based on matching names. If node A produces output "embedding" and node B takes input "embedding", they're connected.

```python
@node(output_name="embedding")
def embed(text: str) -> list[float]:
    return model.embed(text)

@node(output_name="docs")
def retrieve(embedding: list[float]) -> list[str]:
    return db.search(embedding)

# Edges inferred: embed → retrieve (via "embedding")
```

Name your outputs and inputs consistently — edges are inferred automatically.

## Creating Your First Node

### Simple Function

```python
from hypergraph import node

@node(output_name="doubled")
def double(x: int) -> int:
    """Double the input."""
    return x * 2

# Call directly
result = double(5)
print(result)  # 10

# Access properties
print(double.inputs)   # ("x",)
print(double.outputs)  # ("doubled",)
print(double.is_async)  # False
print(double.is_generator)  # False
```

### Side-Effect Only Nodes

If a function doesn't return a value, omit `output_name`:

```python
@node  # No output_name - side-effect only
def log(message: str) -> None:
    print(f"[LOG] {message}")

print(log.outputs)  # ()
```

Warning: If you accidentally have a return annotation without `output_name`, hypergraph warns you:

```python
@node  # Missing output_name!
def fetch_data(url: str) -> dict:
    return requests.get(url).json()

# Warning: Function 'fetch_data' has return type '<class 'dict'>' but no output_name.
# If you want to capture the return value, use @node(output_name='...')
```

### Multiple Outputs

Functions can produce multiple outputs:

```python
@node(output_name=("mean", "std"))
def statistics(data: list) -> tuple[float, float]:
    """Calculate mean and standard deviation."""
    mean = sum(data) / len(data)
    std = (sum((x - mean) ** 2 for x in data) / len(data)) ** 0.5
    return mean, std

print(statistics.outputs)  # ("mean", "std")
```

The return value must be a tuple matching the number of output names. Unpacking is automatic.

## Working with Nodes

### Renaming Inputs and Outputs

Use `with_inputs()` and `with_outputs()` to rename without creating new functions:

```python
@node(output_name="result")
def process(text: str) -> str:
    return text.upper()

# Rename to fit your graph's naming convention
adapted = process.with_inputs(text="raw_input").with_outputs(result="processed")

print(adapted.inputs)   # ("raw_input",)
print(adapted.outputs)  # ("processed",)

# Original unchanged
print(process.inputs)   # ("text",)
print(process.outputs)  # ("result",)
```

### Renaming the Node

```python
preprocessor = process.with_name("string_preprocessor")
print(preprocessor.name)  # "string_preprocessor"
```

### Chaining Renames

All rename methods return new instances:

```python
custom = (
    process
    .with_name("preprocessor")
    .with_inputs(text="raw")
    .with_outputs(result="cleaned")
)

print(custom.name)     # "preprocessor"
print(custom.inputs)   # ("raw",)
print(custom.outputs)  # ("cleaned",)
```

## Building Graphs

Now that you know how to create nodes, let's compose them into graphs.

### Basic Graph Construction

A **Graph** connects nodes together automatically based on matching parameter names. If node A produces output "x" and node B takes input "x", they're connected.

```python
from hypergraph import node, Graph

@node(output_name="result")
def add(a: int, b: int) -> int:
    return a + b

@node(output_name="final")
def double(result: int) -> int:
    return result * 2

# Create graph - edges are inferred automatically
g = Graph([add, double])

print(list(g.nodes.keys()))  # ['add', 'double']
print(g.outputs)             # ('result', 'final')
print(g.inputs.required)     # ('a', 'b')
```

The graph automatically connects `add` → `double` because `add` produces "result" and `double` consumes "result".

### Graph Properties

```python
# What inputs does the graph need?
print(g.inputs.required)  # ('a', 'b')
print(g.inputs.optional)  # ()

# What outputs does the graph produce?
print(g.outputs)  # ('result', 'final')

# Is there a cycle? (A→B→A)
print(g.has_cycles)  # False

# Are any nodes async?
print(g.has_async_nodes)  # False
```

### Binding Values

You can pre-fill some inputs with `bind()`:

```python
# Bind 'a' to always be 10
bound = g.bind(a=10)

print(bound.inputs.required)  # ('b',) - only 'b' is needed now
print(bound.inputs.bound)     # {'a': 10} - 'a' is pre-filled
```

Binding is immutable - it returns a new graph, leaving the original unchanged.

## Type Validation with strict\_types

This is hypergraph's core feature: **catch type errors at graph construction time**, before you run anything.

### Why This Matters

Without type validation, you might wire nodes incorrectly and only discover the error at runtime:

```python
@node(output_name="count")
def count_words(text: str) -> int:
    return len(text.split())

@node(output_name="result")
def process_list(count: list) -> int:  # Expects list, not int!
    return len(count)

# Without strict_types, this "works" at construction...
g = Graph([count_words, process_list])

# ...but fails at runtime when int meets list
```

### Enable strict\_types

Add `strict_types=True` to catch these errors immediately:

```python
from hypergraph import node, Graph

@node(output_name="value")
def producer() -> int:
    return 42

@node(output_name="result")
def consumer(value: int) -> int:
    return value * 2

# Types match - construction succeeds
g = Graph([producer, consumer], strict_types=True)
print(g.strict_types)  # True
```

### Type Mismatch Errors

When types don't match, you get a clear error at construction time:

```python
@node(output_name="value")
def producer() -> int:
    return 42

@node(output_name="result")
def consumer(value: str) -> str:  # Expects str, but producer gives int
    return value.upper()

# This raises GraphConfigError immediately!
Graph([producer, consumer], strict_types=True)

# Error: Type mismatch on edge 'producer' → 'consumer' (value='value')
#   Output type: int
#   Input type:  str
#
# How to fix:
#   - Change producer's return type to match str
#   - Change consumer's parameter type to match int
#   - Use a Union type if both are valid
```

### Missing Annotations

With `strict_types=True`, all connected nodes must have type annotations:

```python
@node(output_name="value")
def producer():  # Missing return type!
    return 42

@node(output_name="result")
def consumer(value: int) -> int:
    return value * 2

Graph([producer, consumer], strict_types=True)

# Error: Missing type annotation in strict_types mode
#   -> Node 'producer' output 'value' has no type annotation
#
# How to fix:
#   Add type annotation: def producer(...) -> ReturnType
```

### Union Types

Union types work as you'd expect - a more specific type satisfies a broader one:

```python
@node(output_name="value")
def producer() -> int:
    return 42

@node(output_name="result")
def consumer(value: int | str) -> str:  # Accepts int OR str
    return str(value)

# Works! int satisfies int | str
g = Graph([producer, consumer], strict_types=True)
```

### When to Use strict\_types

* **Development**: Enable it to catch wiring mistakes early
* **Production**: Enable it for safety in critical pipelines
* **Prototyping**: Disable it (default) for quick experiments

```python
# Quick prototype - skip type checking
g = Graph([node1, node2])  # strict_types=False by default

# Production code - validate everything
g = Graph([node1, node2], strict_types=True)
```

## Execution Modes

Hypergraph supports four execution modes, auto-detected from the function signature:

### 1. Synchronous Function (Default)

```python
@node(output_name="result")
def sync_process(x: int) -> int:
    return x * 2

print(sync_process.is_async)      # False
print(sync_process.is_generator)  # False
```

### 2. Asynchronous Function

```python
import httpx

@node(output_name="data")
async def fetch(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        return (await client.get(url)).json()

print(fetch.is_async)      # True
print(fetch.is_generator)  # False
```

### 3. Synchronous Generator

```python
from typing import Iterator

@node(output_name="chunks")
def chunk_text(text: str, size: int = 100) -> Iterator[str]:
    """Yield text in chunks."""
    for i in range(0, len(text), size):
        yield text[i:i+size]

print(chunk_text.is_async)      # False
print(chunk_text.is_generator)  # True
```

### 4. Asynchronous Generator

```python
from typing import AsyncIterator

@node(output_name="tokens")
async def stream_llm(prompt: str) -> AsyncIterator[str]:
    """Stream LLM response tokens."""
    stream = openai.responses.create(
        model="gpt-5.2",
        input=prompt,
        stream=True,
    )
    for part in stream:
        if part.output_text:
            yield part.output_text

print(stream_llm.is_async)      # True
print(stream_llm.is_generator)  # True
```

## Function Node Properties

Every node created with `@node` has these properties:

### `name`

The public identifier for this node (defaults to function name).

```python
@node(output_name="result")
def process(x): pass

print(process.name)  # "process"
print(process.with_name("custom").name)  # "custom"
```

### `inputs`

Tuple of input parameter names from the function signature.

```python
@node(output_name="result")
def add(x: int, y: int) -> int: pass

print(add.inputs)  # ("x", "y")
```

### `outputs`

Tuple of output names (empty if no output\_name).

```python
@node(output_name="sum")
def add(x: int, y: int) -> int: pass

print(add.outputs)  # ("sum",)
```

### `func`

Direct reference to the underlying Python function.

```python
@node(output_name="result")
def double(x: int) -> int:
    return x * 2

result = double.func(5)  # Call directly
print(result)  # 10
```

### `cache`

Whether this node's results are cached (default: False).

```python
@node(output_name="result", cache=True)
def expensive(x: int) -> int:
    return x ** 100

print(expensive.cache)  # True
```

### `is_async`

True if the function is async or async generator.

```python
@node(output_name="data")
async def fetch(url: str) -> dict: pass

print(fetch.is_async)  # True
```

### `is_generator`

True if the function yields values (sync or async generator).

```python
@node(output_name="items")
def produce(n: int):
    for i in range(n):
        yield i

print(produce.is_generator)  # True
```

### `definition_hash`

SHA256 hash of the function's source code (for cache invalidation).

```python
@node(output_name="result")
def process(x: int) -> int:
    return x * 2

print(len(process.definition_hash))  # 64 (hex string)
```

## Error Handling: RenameError

When you try to rename a name that doesn't exist, you get a helpful error:

```python
@node(output_name="result")
def process(x: int) -> int: pass

# Try to rename non-existent input
process.with_inputs(y="renamed")
# RenameError: 'y' not found. Current inputs: ('x',)
```

If you renamed and then try to use the old name:

```python
renamed = process.with_inputs(x="input")
renamed.with_inputs(x="different")  # ERROR - x was already renamed to "input"

# Error message shows history:
# RenameError: 'x' was renamed to 'input'. Current inputs: ('input',)
```

## Running Graphs

Graphs need a **runner** to execute. Hypergraph provides two runners:

* `SyncRunner` - Sequential execution for synchronous nodes
* `AsyncRunner` - Concurrent execution with async support

### Basic Execution with SyncRunner

```python
from hypergraph import Graph, node, SyncRunner

@node(output_name="embedding")
def embed(query: str) -> list[float]:
    return [0.1, 0.2, 0.3]  # Simplified example

@node(output_name="docs")
def retrieve(embedding: list[float]) -> list[str]:
    return ["doc1", "doc2"]

graph = Graph([embed, retrieve])

# Create runner and execute
runner = SyncRunner()
result = runner.run(graph, {"text": "RAG tutorial", "query": "What is RAG?"})

# Access results
print(result["docs"])         # ["doc1", "doc2"]
print(result["embedding"])    # [0.1, 0.2, 0.3]
print(result.status)          # RunStatus.COMPLETED
```

### Async Execution with Concurrent Nodes

```python
import asyncio
from hypergraph import Graph, node, AsyncRunner

@node(output_name="response")
async def call_llm(prompt: str) -> str:
    await asyncio.sleep(0.1)  # Simulate API call
    return f"Response to: {prompt}"

@node(output_name="embedding")
async def embed(text: str) -> list[float]:
    await asyncio.sleep(0.1)  # Simulate API call
    return [0.1, 0.2, 0.3]

# Independent nodes run concurrently
graph = Graph([call_llm, embed])

runner = AsyncRunner()
result = await runner.run(
    graph,
    {"prompt": "Hello", "text": "World"},
    max_concurrency=10,  # Limit parallel operations
)

print(result["response"])    # "Response to: Hello"
print(result["embedding"])   # [0.1, 0.2, 0.3]
```

### Batch Processing with runner.map()

Process multiple inputs efficiently:

```python
from hypergraph import Graph, node, SyncRunner

@node(output_name="doubled")
def double(x: int) -> int:
    return x * 2

graph = Graph([double])
runner = SyncRunner()

# Process multiple values
results = runner.map(
    graph,
    {"x": [1, 2, 3, 4, 5]},  # x is a list
    map_over="x",            # Iterate over x
)

# Each result contains one output
print([r["doubled"] for r in results])  # [2, 4, 6, 8, 10]
```

### Map Modes: zip vs product

When mapping over multiple parameters:

```python
@node(output_name="result")
def add(a: int, b: int) -> int:
    return a + b

graph = Graph([add])
runner = SyncRunner()

# Zip mode (default): parallel iteration, equal lengths required
results = runner.map(
    graph,
    {"a": [1, 2, 3], "b": [10, 20, 30]},
    map_over=["a", "b"],
    map_mode="zip",  # (1,10), (2,20), (3,30)
)
print([r["result"] for r in results])  # [11, 22, 33]

# Product mode: cartesian product, all combinations
results = runner.map(
    graph,
    {"a": [1, 2], "b": [10, 20]},
    map_over=["a", "b"],
    map_mode="product",  # (1,10), (1,20), (2,10), (2,20)
)
print([r["result"] for r in results])  # [11, 21, 12, 22]
```

### Selecting Specific Outputs

By default, `run()` returns all graph outputs. Configure output scope on the graph to filter:

```python
result = runner.run(graph.select("final_answer"), values)
print(result.values.keys())  # Only "final_answer"
```

### Error Handling

```python
result = runner.run(graph, {"x": 5})

if result.status == RunStatus.COMPLETED:
    print(result["output"])
else:
    print(f"Failed: {result.error}")
```

## Conditional Routing

Use `@route` to control execution flow based on data.

### Basic Routing

```python
from hypergraph import Graph, node, route, END, SyncRunner

@node(output_name="doc_type")
def classify(document: str) -> str:
    """Classify document type."""
    if "diagram" in document.lower():
        return "visual"
    return "text"

@route(targets=["text_processor", "visual_processor"])
def route_by_type(doc_type: str) -> str:
    """Route to appropriate processor."""
    if doc_type == "visual":
        return "visual_processor"
    return "text_processor"

@node(output_name="result")
def text_processor(document: str) -> str:
    return f"Text: {document}"

@node(output_name="result")
def visual_processor(document: str) -> str:
    return f"Visual: {document}"

graph = Graph([classify, route_by_type, text_processor, visual_processor])
runner = SyncRunner()

result = runner.run(graph, {"document": "This has a diagram"})
print(result["result"])  # "Visual: This has a diagram"
```

### Terminating Early with END

```python
from hypergraph import route, END

@route(targets=["process", END])
def check_valid(data: dict) -> str:
    if not data.get("valid"):
        return END  # Stop execution
    return "process"
```

### Agentic Loops

Routing enables cycles for multi-turn workflows:

```python
@route(targets=["generate", END])
def should_continue(attempts: int, quality: float) -> str:
    if quality >= 0.8 or attempts >= 3:
        return END
    return "generate"  # Loop back
```

For more patterns including multi-target routing and real-world RAG examples, see the [Routing Guide](https://gilad-rubin.gitbook.io/hypergraph/patterns/02-routing).

## Next Steps

* [Routing Guide](https://gilad-rubin.gitbook.io/hypergraph/patterns/02-routing) - Conditional routing, agentic loops, and real-world patterns
* Explore [API Reference](https://gilad-rubin.gitbook.io/hypergraph/api-reference/graph) for complete documentation on nodes, graphs, and types
* Read [Runners API](https://gilad-rubin.gitbook.io/hypergraph/api-reference/runners) for detailed runner documentation
* Read [Philosophy](https://gilad-rubin.gitbook.io/hypergraph/design/philosophy) to understand the design principles
* Try building a small pipeline with `strict_types=True` to catch errors early

## Common Patterns

### Creating Multiple Variants

Reuse a function in different contexts:

```python
@node(output_name="embedding")
def embed(text: str) -> list[float]:
    return model.embed(text)

# For search pipeline
search_embed = embed.with_name("search_embedding")

# For chat pipeline
chat_embed = embed.with_name("chat_embedding")

print(search_embed.name)  # "search_embedding"
print(chat_embed.name)    # "chat_embedding"
```

### Reconfiguring an Existing Node

Pass a FunctionNode to `FunctionNode()` to create a fresh node with new configuration. Only the underlying function is extracted - all other settings are discarded:

```python
from hypergraph import node, FunctionNode

@node(output_name="original_output", cache=True)
def process(x: int) -> int:
    return x * 2

# Create new node with different config (extracts just the function)
reconfigured = FunctionNode(
    process,  # Pass the FunctionNode directly
    name="new_name",
    output_name="new_output",
    cache=False,
)

print(reconfigured.name)     # "new_name"
print(reconfigured.outputs)  # ("new_output",)
print(reconfigured.cache)    # False

# Original unchanged
print(process.name)     # "process"
print(process.outputs)  # ("original_output",)
print(process.cache)    # True

# The underlying function is the same
print(reconfigured.func is process.func)  # True
```

This is useful when you want to completely reconfigure a node rather than just rename parts of it.

### Conditional Output Names

Choose output names at creation time:

```python
def create_processor(mode: str):
    output_name = "doubled" if mode == "double" else "incremented"

    @node(output_name=output_name)
    def process(x: int) -> int:
        if mode == "double":
            return x * 2
        else:
            return x + 1

    return process

processor = create_processor("double")
print(processor.outputs)  # ("doubled",)
```
