RAG Pipeline

A simple Retrieval-Augmented Generation pipeline. Query comes in, documents are retrieved, answer is generated.

When to Use

  • Question-answering over documents

  • Knowledge base search

  • Single-turn information retrieval

For multi-turn conversations with follow-up questions, see Multi-Turn RAG.

The Pipeline

query → embed → retrieve → generate → answer

Complete Implementation

from hypergraph import Graph, node, AsyncRunner
from anthropic import Anthropic
from openai import OpenAI

# Initialize clients
anthropic = Anthropic()
openai = OpenAI()

# ═══════════════════════════════════════════════════════════════
# EMBEDDING
# ═══════════════════════════════════════════════════════════════

@node(output_name="embedding")
async def embed(query: str) -> list[float]:
    """
    Embed the query for vector search.
    Uses OpenAI's embedding model.
    """
    response = openai.embeddings.create(
        model="text-embedding-3-large",
        input=query,
    )
    return response.data[0].embedding


# ═══════════════════════════════════════════════════════════════
# RETRIEVAL
# ═══════════════════════════════════════════════════════════════

@node(output_name="docs")
async def retrieve(embedding: list[float], top_k: int = 5) -> list[dict]:
    """
    Search the vector database for relevant documents.
    Returns documents with content and metadata.
    """
    results = await vector_db.search(
        vector=embedding,
        limit=top_k,
        include_metadata=True,
    )

    return [
        {
            "content": r["content"],
            "source": r["metadata"].get("source", "unknown"),
            "score": r["score"],
        }
        for r in results
    ]


# ═══════════════════════════════════════════════════════════════
# GENERATION
# ═══════════════════════════════════════════════════════════════

@node(output_name="answer")
def generate(docs: list[dict], query: str) -> str:
    """
    Generate an answer using Claude Sonnet 4.5.
    Cites sources from retrieved documents.
    """
    # Format context with source attribution
    context_parts = []
    for i, doc in enumerate(docs, 1):
        context_parts.append(f"[{i}] {doc['source']}:\n{doc['content']}")

    context = "\n\n".join(context_parts)

    message = anthropic.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system="""You are a helpful assistant that answers questions based on the provided context.
Always cite your sources using [1], [2], etc.
If the context doesn't contain the answer, say so clearly.""",
        messages=[{
            "role": "user",
            "content": f"Context:\n{context}\n\nQuestion: {query}",
        }],
    )

    return message.content[0].text


# ═══════════════════════════════════════════════════════════════
# COMPOSE THE PIPELINE
# ═══════════════════════════════════════════════════════════════

rag_pipeline = Graph([embed, retrieve, generate], name="rag")

# Check what inputs are needed
print(rag_pipeline.inputs.required)  # ('query',)
print(rag_pipeline.inputs.optional)  # ('top_k',)


# ═══════════════════════════════════════════════════════════════
# RUN THE PIPELINE
# ═══════════════════════════════════════════════════════════════

async def main():
    runner = AsyncRunner()

    result = await runner.run(rag_pipeline, {
        "query": "How do I create a graph in hypergraph?",
        "top_k": 5,
    })

    print(f"Answer:\n{result['answer']}")
    print(f"\nRetrieved {len(result['docs'])} documents")


# asyncio.run(main())

With Streaming

Stream the generation while retrieval happens first:

With Reranking

Add a reranking step for better relevance:

With Query Expansion

Expand the query for better retrieval:

Testing

What's Next?

Last updated