# Agentic Loops

Agentic workflows iterate until a goal is achieved. The graph cycles back to earlier nodes based on runtime conditions.

## When to Use

* **Multi-turn conversations**: User asks, system responds, user follows up
* **Iterative refinement**: Generate, evaluate, improve until quality threshold
* **Tool-using agents**: Call tools, observe results, decide next action
* **Retry patterns**: Attempt, check result, retry if needed

## The Core Pattern

Use `@route` to decide whether to continue or stop:

```python
from hypergraph import Graph, node, route, END, SyncRunner

@node(output_name="draft")
def generate(prompt: str, feedback: str = "") -> str:
    """Generate content, incorporating any feedback."""
    full_prompt = f"{prompt}\n\nFeedback to address: {feedback}" if feedback else prompt
    return llm.generate(full_prompt)

@node(output_name="score")
def evaluate(draft: str) -> float:
    """Score the draft quality (0-1)."""
    return quality_model.score(draft)

@node(output_name="feedback")
def critique(draft: str, score: float) -> str:
    """Generate feedback for improvement."""
    if score >= 0.8:
        return ""  # Good enough
    return critic_model.generate(f"Critique this draft:\n{draft}")

@route(targets=["generate", END])
def should_continue(score: float, attempts: int) -> str:
    """Decide whether to continue refining."""
    if score >= 0.8:
        return END  # Quality achieved
    if attempts >= 5:
        return END  # Max attempts reached
    return "generate"  # Keep refining

@node(output_name="attempts")
def count_attempts(attempts: int = 0) -> int:
    """Track iteration count."""
    return attempts + 1

# Build the loop
refinement_loop = Graph([
    generate,
    evaluate,
    critique,
    count_attempts,
    should_continue,
])

# Run until done
runner = SyncRunner()
result = runner.run(refinement_loop, {"prompt": "Write a haiku about Python"})

print(f"Final draft: {result['draft']}")
print(f"Final score: {result['score']}")
print(f"Attempts: {result['attempts']}")
```

## How It Works

```
┌─────────────────────────────────────────┐
│                                         │
│   generate → evaluate → critique        │
│       ↑                    ↓            │
│       └──── should_continue ────→ END   │
│                                         │
└─────────────────────────────────────────┘
```

1. `generate` creates a draft
2. `evaluate` scores it
3. `critique` provides feedback
4. `should_continue` decides:
   * Return `END` → graph completes
   * Return `"generate"` → loop back

## The END Sentinel

`END` is a special value that terminates execution:

```python
from hypergraph import END

@route(targets=["next_step", END])
def check_done(result: dict) -> str:
    if result["complete"]:
        return END
    return "next_step"
```

**Important**: Always include `END` in your targets when you want the option to stop.

## Multi-Turn Conversation

A conversation loop that continues until the user says goodbye:

```python
@node(output_name="response")
def generate_response(messages: list, context: str) -> str:
    """Generate assistant response."""
    return llm.chat(messages, system=context)

@node(output_name="messages")
def update_history(messages: list, user_input: str, response: str) -> list:
    """Append new messages to history."""
    return messages + [
        {"role": "user", "content": user_input},
        {"role": "assistant", "content": response},
    ]

@route(targets=["generate_response", END])
def should_continue_chat(response: str, messages: list) -> str:
    """Check if conversation should continue."""
    # End if assistant said goodbye or max turns reached
    if "goodbye" in response.lower() or len(messages) > 20:
        return END
    return "generate_response"

chat_loop = Graph([
    generate_response,
    update_history,
    should_continue_chat,
])
```

### Shared State for Chat Loops

When multiple nodes read and write `messages`, auto-wiring may struggle with ambiguity. Use `shared` to declare `messages` as shared state — auto-wiring handles everything else, and you only add ordering edges for the gaps:

```python
@node(output_name="messages")
def add_user_message(messages: list, user_input: str) -> list:
    return [*messages, {"role": "user", "content": user_input}]

@node(output_name="response")
def generate_response(messages: list) -> str:
    return llm.chat(messages)

@node(output_name="messages")
def add_response(messages: list, response: str) -> list:
    return [*messages, {"role": "assistant", "content": response}]

@route(targets=["add_user_message", END])
def should_continue_chat(messages: list) -> str:
    return "add_user_message" if len(messages) < 20 else END

chat_loop = Graph(
    [add_user_message, generate_response, add_response, should_continue_chat],
    shared=["messages"],
    entrypoint="add_user_message",
    edges=[
        (add_user_message, generate_response),   # ordering
        (add_response, should_continue_chat),     # ordering
    ],
)

# messages needs an initial value
result = runner.run(chat_loop, {"messages": [], "user_input": "Hello!"})
```

See [Shared State](https://gilad-rubin.gitbook.io/hypergraph/api-reference/graph#shared-state) for details.

## Tool-Using Agent

An agent that decides which tool to call:

```python
@node(output_name="action")
def decide_action(observation: str, goal: str) -> dict:
    """Decide next action based on observation."""
    return agent_model.decide(observation, goal)

@node(output_name="observation")
def execute_action(action: dict) -> str:
    """Execute the chosen action."""
    tool_name = action["tool"]
    tool_args = action["args"]
    return tools[tool_name](**tool_args)

@route(targets=["decide_action", END])
def check_goal_achieved(action: dict, observation: str) -> str:
    """Check if the goal is achieved."""
    if action["tool"] == "finish":
        return END
    return "decide_action"

agent_loop = Graph([decide_action, execute_action, check_goal_achieved])
```

## Quality Gate Pattern

Ensure output meets quality standards before proceeding:

```python
@node(output_name="content")
def generate_content(topic: str, previous_attempt: str = "") -> str:
    if previous_attempt:
        return llm.generate(f"Improve this: {previous_attempt}")
    return llm.generate(f"Write about: {topic}")

@node(output_name="validation")
def validate(content: str) -> dict:
    return {
        "has_intro": "introduction" in content.lower(),
        "has_conclusion": "conclusion" in content.lower(),
        "min_length": len(content) > 500,
        "no_errors": grammar_check(content),
    }

@node(output_name="all_valid")
def check_validation(validation: dict) -> bool:
    return all(validation.values())

@route(targets=["generate_content", END])
def quality_gate(all_valid: bool, attempts: int = 0) -> str:
    if all_valid:
        return END
    if attempts >= 3:
        return END  # Give up after 3 attempts
    return "generate_content"

quality_loop = Graph([generate_content, validate, check_validation, quality_gate])
```

## Ordering with emit/wait\_for

In cyclic graphs, you sometimes need a node to wait for another node to finish — even when there's no direct data dependency. Use `emit` and `wait_for` to enforce execution order.

**The problem**: In a chat loop, `should_continue` reads `messages`. But `accumulate` also reads `messages` and produces the updated version. Without ordering, `should_continue` might see stale messages from the previous turn.

**The fix**: `accumulate` emits a signal when it finishes. `should_continue` waits for that signal.

```python
@node(output_name="response")
def generate(messages: list) -> str:
    return llm.chat(messages)

@node(output_name="messages", emit="turn_done")
def accumulate(messages: list, response: str) -> list:
    return messages + [{"role": "assistant", "content": response}]

@route(targets=["generate", END], wait_for="turn_done")
def should_continue(messages: list) -> str:
    if len(messages) >= 10:
        return END
    return "generate"
```

**How it works**:

* `emit="turn_done"` declares an ordering-only output. A sentinel value is auto-produced when `accumulate` runs — your function doesn't return it.
* `wait_for="turn_done"` declares an ordering-only input. `should_continue` won't run until `turn_done` exists and is fresh (produced since `should_continue` last ran).
* `emit` names appear in `node.outputs` but not in `node.data_outputs`. They're filtered from the final result.

**When to use emit/wait\_for vs data edges**:

* If node B needs node A's output value → use a data edge (parameter matching)
* If node B just needs to run after node A → use `emit`/`wait_for`

**Validation**: emit names must not overlap with `output_name`, and wait\_for names must not overlap with function parameters. Referencing a nonexistent emit/output in `wait_for` raises `GraphConfigError` at build time.

## Entry Points

When a parameter is both an input and output of a cycle (like `history` or `iteration`), it becomes an **entrypoint parameter** — an initial value needed to start the first iteration. Provide these in the `values` dict when calling `runner.run()`:

```python
result = runner.run(graph, {
    "prompt": "...",
    "history": [],       # Entry point: initial value before first iteration
    "iteration": 0,      # Entry point: starting counter
})
```

You can check what entrypoints a graph has via `graph.inputs.entrypoints`. This returns a dict mapping node names to the cycle parameters they need:

```python
print(graph.inputs.entrypoints)
# {'accumulate_history': ('history',), 'increment': ('iteration',)}
```

Pick ONE entrypoint per cycle and provide its parameters. For full details, see [InputSpec](https://gilad-rubin.gitbook.io/hypergraph/api-reference/inputspec).

## Tracking State Across Iterations

Use a node to accumulate state:

```python
@node(output_name="history")
def accumulate_history(history: list, new_item: str) -> list:
    """Append new item to history."""
    return history + [new_item]

@node(output_name="iteration")
def increment(iteration: int = 0) -> int:
    """Track iteration count."""
    return iteration + 1
```

Provide initial values when running:

```python
result = runner.run(graph, {
    "history": [],      # Start with empty history
    "iteration": 0,     # Start at iteration 0
    "prompt": "...",
})
```

## Shared Outputs in a Cycle

When multiple nodes produce the same output name in a cycle — like two nodes both producing `messages` — the graph needs to know their execution order. There are two approaches: ordering signals (`emit`/`wait_for`) and explicit edges.

### With emit/wait\_for

Use `emit`/`wait_for` to prove ordering between same-name producers. This keeps auto-inference for edge wiring while adding ordering constraints:

```python
@node(output_name="query")
def generate_query(messages: list) -> str:
    """Get the next user query (e.g., from input or an LLM)."""
    return get_user_input(messages)

@node(output_name="messages", emit="query_done")
def accumulate_query(messages: list, query: str) -> list:
    return messages + [{"role": "user", "content": query}]

@node(output_name="response")
def generate_response(messages: list) -> str:
    """Generate an assistant response from the conversation so far."""
    return llm.chat(messages)

@node(output_name="messages", wait_for="query_done")
def accumulate_response(messages: list, response: str) -> list:
    return messages + [{"role": "assistant", "content": response}]

@route(targets=["generate_query", END])
def should_continue(messages: list) -> str:
    return END if len(messages) >= 10 else "generate_query"

# Allowed: accumulate_query emits "query_done", accumulate_response waits for it
graph = Graph([
    generate_query, accumulate_query,
    generate_response, accumulate_response,
    should_continue,
])
```

### With Explicit Edges

When cycles share common output names like `messages`, `df`, or `state`, explicit edges let you declare the topology directly instead of inventing signal names. Pass `edges` to `Graph()` to disable auto-inference and wire edges manually:

```python
from hypergraph import Graph, node, route, END

@node(output_name="messages")
def add_query(messages: list, query: str) -> list:
    return [*messages, {"role": "user", "content": query}]

@node(output_name="response")
def generate(messages: list) -> str:
    return llm.chat(messages)

@node(output_name="messages")
def add_response(messages: list, response: str) -> list:
    return [*messages, {"role": "assistant", "content": response}]

@route(targets=["add_query", END])
def should_continue(messages: list) -> str:
    return END if len(messages) >= 10 else "add_query"

chat = Graph(
    [add_query, generate, add_response, should_continue],
    edges=[
        (add_query, generate),                   # messages
        (generate, add_response),                # response
        (add_response, should_continue),         # messages
        (add_response, add_query),               # messages (cycle)
    ],
)
```

Each edge is a `(source, target)` tuple. Values are auto-detected from the intersection of source outputs and target inputs. When there's no overlap, the edge becomes ordering-only (structural dependency, no data flow).

You can also use 3-tuples to specify values explicitly: `(source, target, "messages")` or `(source, target, ["messages", "response"])`.

**When to use which approach:**

| Situation                                             | Use                                             |
| ----------------------------------------------------- | ----------------------------------------------- |
| DAGs (no cycles)                                      | Auto-inference (no `edges` needed)              |
| Cycles with unique output names                       | Auto-inference + `emit`/`wait_for` for ordering |
| Cycles with shared output names (`messages`, `state`) | Explicit edges                                  |

For more on the `edges` parameter, see the [Graph API reference](https://gilad-rubin.gitbook.io/hypergraph/api-reference/graph#explicit-edges).

## Preventing Infinite Loops

Hypergraph detects potential infinite loops at runtime:

```python
# This will raise InfiniteLoopError if the loop runs too long
runner = SyncRunner()
result = runner.run(graph, inputs, max_iterations=100)  # Safety limit
```

Best practices:

1. Always have a termination condition (max attempts, quality threshold)
2. Include `END` in your route targets
3. Track iteration count and bail out if needed

## What's Next?

* [Hierarchical Composition](https://gilad-rubin.gitbook.io/hypergraph/patterns/04-hierarchical) — Nest loops inside DAGs
* [Multi-Agent](https://gilad-rubin.gitbook.io/hypergraph/patterns/05-multi-agent) — Coordinate multiple agents
* [Real-World: Multi-Turn RAG](https://gilad-rubin.gitbook.io/hypergraph/real-world-examples/multi-turn-rag) — Complete example
