Streaming
When to Use
Basic Pattern: NodeContext
from hypergraph import Graph, node, AsyncRunner, NodeContext
from anthropic import Anthropic
client = Anthropic()
@node(output_name="response")
async def stream_response(messages: list, ctx: NodeContext, system: str = "") -> str:
"""Stream tokens from Claude with stop support."""
response = ""
with client.messages.stream(
model="claude-sonnet-4-5-20250929",
max_tokens=1024,
system=system,
messages=messages,
) as stream:
for text in stream.text_stream:
if ctx.stop_requested:
break
response += text
ctx.stream(text) # emit StreamingChunkEvent for live UI
return response
graph = Graph([stream_response])
runner = AsyncRunner()
result = await runner.run(graph, {
"messages": [{"role": "user", "content": "Explain quantum computing"}],
"system": "You are a helpful physics tutor.",
})Streaming with OpenAI
Streaming in RAG Pipelines
Consuming Streaming Events
Multi-Turn Streaming with Stop
Error Handling in Streams
Testing Streaming Nodes
What's Next?
Last updated