Streaming

Stream LLM responses token-by-token so users see output as it's generated, not after the full response completes.

When to Use

  • Chat interfaces — Show responses as they're generated

  • Long-form content — Don't make users wait for full generation

  • Stoppable nodes — Let users cancel long-running generation

Basic Pattern: NodeContext

Use NodeContext to stream tokens and support cooperative stop:

from hypergraph import Graph, node, AsyncRunner, NodeContext
from anthropic import Anthropic

client = Anthropic()

@node(output_name="response")
async def stream_response(messages: list, ctx: NodeContext, system: str = "") -> str:
    """Stream tokens from Claude with stop support."""
    response = ""

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        system=system,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            if ctx.stop_requested:
                break
            response += text
            ctx.stream(text)  # emit StreamingChunkEvent for live UI

    return response


graph = Graph([stream_response])
runner = AsyncRunner()

result = await runner.run(graph, {
    "messages": [{"role": "user", "content": "Explain quantum computing"}],
    "system": "You are a helpful physics tutor.",
})

ctx.stream(chunk) emits a StreamingChunkEvent — a side-channel for live UI preview. It does not affect the return value. The node controls its own output type.

ctx.stop_requested is a cooperative stop signal. The node checks it and decides when to break. See NodeContext API for details.

Adding ctx: NodeContext is optional. Nodes without it work exactly as before — the framework detects the type hint and injects it automatically (same pattern as FastAPI's Request).

Streaming with OpenAI

Streaming in RAG Pipelines

Combine retrieval (fast) with streaming generation:

Consuming Streaming Events

ctx.stream() emits StreamingChunkEvents through the event system. Consume them with an event processor or via .iter():

Multi-Turn Streaming with Stop

Stream responses in a conversation loop with stop support:

To stop mid-stream from another coroutine or endpoint:

Error Handling in Streams

Handle streaming errors gracefully:

Testing Streaming Nodes

Nodes with NodeContext are testable as plain Python — pass a mock:

No framework setup needed — the function is a plain async function.

What's Next?

Last updated