This is hypergraph's core insight: real AI workflows naturally nest DAGs inside cycles and cycles inside DAGs. Hierarchical composition makes this explicit and clean.
The Pattern
1. Build a graph for one task
2. Use it as a node in a larger graph
3. The larger graph can be used as a node in an even larger graph
4. Repeat at any depth
# A graph...rag =Graph([embed, retrieve, generate],name="rag")# ...becomes a nodeworkflow =Graph([ validate, rag.as_node(),# Graph as a single node format_output,])
Why This Matters
You don't build one graph and stop. You build graphs, compose them, and reuse them in many contexts:
Context
The Same Graph Used As...
Inference
Direct execution for user queries
Evaluation
A node inside a test harness
Optimization
A component in a prompt tuning loop
Batch processing
Mapped over a dataset
Build once. Use everywhere.
Example 1: DAG Inside a Cycle (Multi-Turn RAG)
A multi-turn conversation is cyclic — the user can keep asking follow-up questions. But retrieval within each turn is a DAG.
The structure:
The RAG pipeline runs to completion on each turn. The outer loop decides whether to continue.
Example 2: Cycle Inside a DAG (Evaluation Harness)
Now flip it: your cyclic conversation graph becomes a node inside an evaluation DAG.
The structure:
Same graph, different context. In inference, conversation handles live users. In evaluation, it's a component being tested.
Example 3: Prompt Optimization (Multiple Nesting Levels)
Context engineering and prompt optimization involve nested loops:
Three levels of nesting:
Think Singular, Scale with Map
Another dimension of hierarchy: write logic for one item, scale to many.
Why this works:
No batch loops in your code
Each function is testable with a single input
The framework handles fan-out, parallelism, and caching
This combines with hierarchical composition:
The .as_node() API
Convert any graph to a node:
Key properties:
The nested graph runs to completion before the outer graph continues
Inputs and outputs are determined by the nested graph's InputSpec
Type annotations flow through for strict_types validation
from hypergraph import Graph, node, route, END, AsyncRunner
# ─────────────────────────────────────────────────────────────
# The RAG pipeline (DAG) — processes one turn
# ─────────────────────────────────────────────────────────────
@node(output_name="embedding")
async def embed(query: str) -> list[float]:
return await embedder.embed(query)
@node(output_name="docs")
async def retrieve(embedding: list[float]) -> list[str]:
return await vector_db.search(embedding, k=5)
@node(output_name="response")
async def generate(docs: list[str], query: str, history: list) -> str:
context = "\n".join(docs)
return await llm.generate(
system=f"Context:\n{context}",
messages=history + [{"role": "user", "content": query}]
)
# This is a DAG — no cycles
rag_pipeline = Graph([embed, retrieve, generate], name="rag")
# ─────────────────────────────────────────────────────────────
# The conversation loop (Cyclic) — wraps the RAG DAG
# ─────────────────────────────────────────────────────────────
@node(output_name="history")
def accumulate(history: list, query: str, response: str) -> list:
return history + [
{"role": "user", "content": query},
{"role": "assistant", "content": response},
]
@route(targets=["rag", END])
def should_continue(history: list) -> str:
# In practice: check for [END] token, max turns, etc.
if len(history) >= 20: # Max 10 turns
return END
return "rag" # Continue conversation
# Compose: RAG DAG inside conversation cycle
conversation = Graph([
rag_pipeline.as_node(), # The DAG becomes a single node
accumulate,
should_continue,
], name="conversation")
Conversation Loop (cyclic)
├── RAG Pipeline (DAG)
│ ├── embed
│ ├── retrieve
│ └── generate
├── accumulate
└── should_continue → loops back to RAG or exits
# ─────────────────────────────────────────────────────────────
# Evaluation harness (DAG) — contains the cyclic conversation
# ─────────────────────────────────────────────────────────────
@node(output_name="test_cases")
def load_test_cases(dataset_path: str) -> list[dict]:
"""Load test conversations from a dataset."""
return json.load(open(dataset_path))
@node(output_name="scores")
def score_responses(history: list, expected: str) -> dict:
"""Score the conversation against expected outcomes."""
final_response = history[-1]["content"]
return {
"relevance": compute_relevance(final_response, expected),
"coherence": compute_coherence(history),
"turn_count": len(history) // 2,
}
@node(output_name="report")
def aggregate_metrics(scores: list[dict]) -> dict:
"""Aggregate scores across all test cases."""
return {
"avg_relevance": mean([s["relevance"] for s in scores]),
"avg_coherence": mean([s["coherence"] for s in scores]),
"avg_turns": mean([s["turn_count"] for s in scores]),
}
# The evaluation pipeline — a DAG containing our cyclic conversation
evaluation = Graph([
load_test_cases,
conversation.as_node(), # Cyclic graph as a single node
score_responses,
aggregate_metrics,
], name="evaluation")
# Run evaluation: the cyclic conversation runs inside the DAG
runner = AsyncRunner()
report = await runner.run(evaluation, {
"dataset_path": "test_conversations.json",
"query": "initial query", # First query for each test case
"history": [],
})
Outer loop: Human reviews results, provides feedback
└── Inner loop: Run variants, evaluate, select best
└── Pipeline under test: The actual workflow being optimized
# ─────────────────────────────────────────────────────────────
# The pipeline being optimized
# ─────────────────────────────────────────────────────────────
@node(output_name="response")
def generate_with_prompt(query: str, system_prompt: str) -> str:
return llm.generate(system=system_prompt, user=query)
pipeline = Graph([generate_with_prompt], name="pipeline")
runner = SyncRunner()
# ─────────────────────────────────────────────────────────────
# Variant testing loop (cyclic) — tests multiple prompts
# ─────────────────────────────────────────────────────────────
@node(output_name="variants")
def generate_prompt_variants(base_prompt: str, feedback: str) -> list[str]:
"""Generate prompt variations based on feedback."""
return prompt_generator.create_variants(base_prompt, feedback, n=5)
@node(output_name="results")
def test_variants(variants: list[str], test_queries: list[str]) -> list[dict]:
"""Test each variant on the test set."""
results = []
for variant in variants:
scores = []
for query in test_queries:
response = runner.run(pipeline, {"query": query, "system_prompt": variant})
scores.append(evaluate(response, query))
results.append({"prompt": variant, "avg_score": mean(scores)})
return results
@node(output_name="best_prompt")
def select_best(results: list[dict]) -> str:
return max(results, key=lambda r: r["avg_score"])["prompt"]
@route(targets=["generate_variants", END])
def optimization_gate(best_prompt: str, target_score: float, results: list) -> str:
best_score = max(r["avg_score"] for r in results)
if best_score >= target_score:
return END
return "generate_variants" # Keep optimizing
variant_tester = Graph([
generate_prompt_variants,
test_variants,
select_best,
optimization_gate,
], name="variant_tester")
# ─────────────────────────────────────────────────────────────
# Human-in-the-loop wrapper (cyclic) — gets human feedback
# ─────────────────────────────────────────────────────────────
@node(output_name="feedback")
def get_human_feedback(best_prompt: str, results: list) -> str:
"""Display results to human, get feedback for next iteration."""
display_results(best_prompt, results)
return input("Feedback (or 'done'): ")
@route(targets=["variant_tester", END])
def human_gate(feedback: str) -> str:
if feedback.lower() == "done":
return END
return "variant_tester"
optimization_loop = Graph([
variant_tester.as_node(), # Cyclic graph as a node
get_human_feedback,
human_gate,
], name="optimization")
# Write for ONE document
@node(output_name="features")
def extract_features(document: str) -> dict:
return {
"length": len(document),
"entities": extract_entities(document),
"sentiment": analyze_sentiment(document),
}
pipeline = Graph([extract_features])
# Scale to 1000 documents
runner = SyncRunner()
results = runner.map(
pipeline,
{"document": documents}, # List of 1000 documents
map_over="document",
)
# Returns: list of 1000 feature dicts
# Complex pipeline, still written for one item
analysis = Graph([
preprocess,
extract_features,
classify,
generate_summary,
], name="analysis")
# Use in batch processing
batch_pipeline = Graph([
load_documents,
analysis.as_node().map_over("document"), # Fan out over documents
aggregate_results,
])
# Basic usage
graph_node = my_graph.as_node()
# With custom name
graph_node = my_graph.as_node(name="custom_name")
# With input/output renaming
graph_node = my_graph.as_node().with_inputs(old="new")
# With map_over for fan-out
graph_node = my_graph.as_node().map_over("items")