RAG Pipeline
When to Use
The Pipeline
query → embed → retrieve → generate → answerComplete Implementation
from hypergraph import Graph, node, AsyncRunner
from anthropic import Anthropic
from openai import OpenAI
# Initialize clients
anthropic = Anthropic()
openai = OpenAI()
# ═══════════════════════════════════════════════════════════════
# EMBEDDING
# ═══════════════════════════════════════════════════════════════
@node(output_name="embedding")
async def embed(query: str) -> list[float]:
"""
Embed the query for vector search.
Uses OpenAI's embedding model.
"""
response = openai.embeddings.create(
model="text-embedding-3-large",
input=query,
)
return response.data[0].embedding
# ═══════════════════════════════════════════════════════════════
# RETRIEVAL
# ═══════════════════════════════════════════════════════════════
@node(output_name="docs")
async def retrieve(embedding: list[float], top_k: int = 5) -> list[dict]:
"""
Search the vector database for relevant documents.
Returns documents with content and metadata.
"""
results = await vector_db.search(
vector=embedding,
limit=top_k,
include_metadata=True,
)
return [
{
"content": r["content"],
"source": r["metadata"].get("source", "unknown"),
"score": r["score"],
}
for r in results
]
# ═══════════════════════════════════════════════════════════════
# GENERATION
# ═══════════════════════════════════════════════════════════════
@node(output_name="answer")
def generate(docs: list[dict], query: str) -> str:
"""
Generate an answer using Claude Sonnet 4.5.
Cites sources from retrieved documents.
"""
# Format context with source attribution
context_parts = []
for i, doc in enumerate(docs, 1):
context_parts.append(f"[{i}] {doc['source']}:\n{doc['content']}")
context = "\n\n".join(context_parts)
message = anthropic.messages.create(
model="claude-sonnet-4-5-20250929",
max_tokens=2048,
system="""You are a helpful assistant that answers questions based on the provided context.
Always cite your sources using [1], [2], etc.
If the context doesn't contain the answer, say so clearly.""",
messages=[{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {query}",
}],
)
return message.content[0].text
# ═══════════════════════════════════════════════════════════════
# COMPOSE THE PIPELINE
# ═══════════════════════════════════════════════════════════════
rag_pipeline = Graph([embed, retrieve, generate], name="rag")
# Check what inputs are needed
print(rag_pipeline.inputs.required) # ('query',)
print(rag_pipeline.inputs.optional) # ('top_k',)
# ═══════════════════════════════════════════════════════════════
# RUN THE PIPELINE
# ═══════════════════════════════════════════════════════════════
async def main():
runner = AsyncRunner()
result = await runner.run(rag_pipeline, {
"query": "How do I create a graph in hypergraph?",
"top_k": 5,
})
print(f"Answer:\n{result['answer']}")
print(f"\nRetrieved {len(result['docs'])} documents")
# asyncio.run(main())With Streaming
With Reranking
With Query Expansion
Testing
What's Next?
Last updated