Copy from hypergraph import Graph, node, route, END, SyncRunner, AsyncRunner
# ═══════════════════════════════════════════════════════════════
# RESEARCHER AGENT
# ═══════════════════════════════════════════════════════════════
@node(output_name="search_results")
def search(topic: str) -> list[dict]:
"""Search for relevant sources."""
return search_api.query(topic, max_results=10)
@node(output_name="analysis")
def analyze(search_results: list[dict]) -> dict:
"""Analyze and synthesize search results."""
return llm.analyze(
f"Analyze these sources and identify key themes:\n{search_results}"
)
@node(output_name="research_summary")
def summarize_research(analysis: dict, topic: str) -> str:
"""Produce research summary."""
return llm.generate(
f"Summarize the research on '{topic}':\n{analysis}"
)
researcher = Graph([search, analyze, summarize_research], name="researcher")
# ═══════════════════════════════════════════════════════════════
# WRITER AGENT
# ═══════════════════════════════════════════════════════════════
@node(output_name="draft")
def write_draft(research_summary: str, outline: str = "") -> str:
"""Write initial draft from research."""
prompt = f"Research:\n{research_summary}"
if outline:
prompt += f"\n\nOutline to follow:\n{outline}"
return llm.generate(prompt)
@node(output_name="refined_draft")
def refine(draft: str, feedback: str = "") -> str:
"""Refine draft based on feedback."""
if not feedback:
return draft
return llm.generate(f"Revise this draft:\n{draft}\n\nFeedback:\n{feedback}")
@node(output_name="report")
def format_report(refined_draft: str) -> str:
"""Format the final report."""
return formatter.apply_template(refined_draft)
writer = Graph([write_draft, refine, format_report], name="writer")
# ═══════════════════════════════════════════════════════════════
# REVIEWER AGENT
# ═══════════════════════════════════════════════════════════════
@node(output_name="fact_check")
def check_facts(report: str, research_summary: str) -> dict:
"""Verify claims against research."""
return fact_checker.verify(report, sources=research_summary)
@node(output_name="style_check")
def check_style(report: str) -> dict:
"""Check writing quality."""
return style_analyzer.analyze(report)
@node(output_name="review_score")
def score_report(fact_check: dict, style_check: dict) -> float:
"""Overall quality score."""
fact_score = fact_check["accuracy"]
style_score = style_check["quality"]
return (fact_score + style_score) / 2
@node(output_name="feedback")
def generate_feedback(fact_check: dict, style_check: dict, review_score: float) -> str:
"""Generate feedback for revision."""
if review_score >= 0.9:
return ""
issues = []
if fact_check["issues"]:
issues.extend(fact_check["issues"])
if style_check["issues"]:
issues.extend(style_check["issues"])
return "\n".join(issues)
reviewer = Graph([check_facts, check_style, score_report, generate_feedback], name="reviewer")
# ═══════════════════════════════════════════════════════════════
# TEAM ORCHESTRATION
# ═══════════════════════════════════════════════════════════════
@route(targets=["writer", END])
def review_gate(review_score: float, revision_count: int = 0) -> str:
"""Decide if report is ready or needs revision."""
if review_score >= 0.9:
return END
if revision_count >= 3:
return END # Accept after 3 revisions
return "writer"
@node(output_name="revision_count")
def track_revisions(revision_count: int = 0) -> int:
return revision_count + 1
# Compose the team
research_team = Graph([
researcher.as_node(),
writer.as_node(),
reviewer.as_node(),
track_revisions,
review_gate,
], name="research_team")
# Run the team
runner = SyncRunner()
result = runner.run(research_team, {"topic": "Quantum Computing in 2024"})
print(result["report"])