使用 workflow-py 搭建 Deep Research 工作流
参考 Google Gemini API: LLamaIndex 设计,引入 Arize Phoenix 进行模型监控。
py
import asyncio
import logging
from time import sleep
from typing import Literal
import phoenix
from llama_index.core.agent.workflow import (
AgentOutput,
AgentWorkflow,
FunctionAgent,
ToolCall,
ToolCallResult,
)
from llama_index.llms.openai_like import OpenAILike
from llama_index_instrumentation import get_dispatcher
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GrpcOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter as HttpOTLPSpanExporter,
)
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from phoenix.otel import register
from pydantic_settings import BaseSettings, SettingsConfigDict
from tavily import AsyncTavilyClient
from workflows import Context
# Ref: from mcp.server.fastmcp.utilities.logging import configure_logging
def configure_logging(
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
) -> None:
handlers: list[logging.Handler] = []
try:
from rich.console import Console
from rich.logging import RichHandler
handlers.append(RichHandler(console=Console(stderr=True), rich_tracebacks=True))
except ImportError:
pass
if not handlers:
handlers.append(logging.StreamHandler())
logging.basicConfig(
level=level,
format="%(message)s",
handlers=handlers,
)
configure_logging(level="INFO")
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="DEEP_RESEARCH_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
model_name: str = "gpt-4o"
base_url: str = ""
api_key: str = ""
tavily_api_key: str = ""
use_trace: bool = True
launch_local_tracer: bool = True
use_grpc_trace_exporter: bool = False
trace_endpoint: str = "http://localhost:6006/v1/traces"
otel_project_name: str = "workflow-deep-research-python"
dispatcher = get_dispatcher(__name__)
settings = Settings()
def setup_tracer():
if not settings.use_trace:
logger.info(f"Tracing is disabled, config: {settings.use_trace=}")
return
logger.info(
f"Setting up tracer {settings.otel_project_name=}, {settings.trace_endpoint=}"
)
tracer_provider = register(
project_name=settings.otel_project_name,
verbose=False,
endpoint=settings.trace_endpoint,
)
if settings.use_grpc_trace_exporter:
span_exporter = GrpcOTLPSpanExporter(endpoint=settings.trace_endpoint)
else:
span_exporter = HttpOTLPSpanExporter(endpoint=settings.trace_endpoint)
span_processor = SimpleSpanProcessor(span_exporter)
tracer_provider.add_span_processor(span_processor)
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
if settings.launch_local_tracer:
phoenix.launch_app()
llm = OpenAILike(
model=settings.model_name,
api_key=settings.api_key or None,
api_base=settings.base_url or None,
is_function_calling_model=True,
is_chat_model=True,
)
tavily_client = AsyncTavilyClient(api_key=settings.tavily_api_key)
setup_tracer()
async def search_web(ctx: Context, query: str) -> str:
"""Useful for searching the web about a specific query or topic"""
response = await tavily_client.search(
query=query,
search_depth="advanced",
max_results=5,
include_answer=True,
)
# Compose a readable result from Tavily response
parts: list[str] = []
if response.get("answer"):
parts.append(f"Summary: {response['answer']}\n")
for result in response.get("results", []):
title = result.get("title", "")
url = result.get("url", "")
content = result.get("content", "")
parts.append(f"### {title}\nURL: {url}\n{content}\n")
return "\n".join(parts) if parts else "No results found."
async def record_notes(ctx: Context, notes: str, notes_title: str) -> str:
"""Useful for recording notes on a given topic."""
current_state = await ctx.store.get("state")
if "research_notes" not in current_state:
current_state["research_notes"] = {}
current_state["research_notes"][notes_title] = notes
await ctx.store.set("state", current_state)
return "Notes recorded."
async def get_notes(ctx: Context) -> str:
"""Useful for retrieving the current research notes."""
current_state = await ctx.store.get("state")
research_notes = current_state.get("research_notes", {})
if not research_notes:
return "No research notes found."
return "\n\n".join(
f"### {title}\n{content}" for title, content in research_notes.items()
)
async def write_report(ctx: Context, report_content: str) -> str:
"""Useful for writing a report on a given topic."""
current_state = await ctx.store.get("state")
current_state["report_content"] = report_content
await ctx.store.set("state", current_state)
return "Report written."
async def get_report(ctx: Context) -> str:
"""Useful for retrieving the current report content."""
current_state = await ctx.store.get("state")
return current_state.get("report_content", "No report content found.")
async def review_report(ctx: Context, approved: bool, feedback: str) -> str:
"""Useful for reviewing a report and providing feedback.
Args:
approved: Whether the report is approved (True) or needs changes (False).
feedback: The detailed feedback explaining the review decision.
"""
current_state = await ctx.store.get("state")
current_state["review"] = {
"approved": approved,
"feedback": feedback,
}
await ctx.store.set("state", current_state)
return f"Report {'approved' if approved else 'needs changes'}: {feedback}"
async def get_review(ctx: Context) -> str:
"""Useful for retrieving the latest review feedback on the report."""
current_state = await ctx.store.get("state")
review = current_state.get("review", {})
if not review or not isinstance(review, dict):
return "No review feedback yet."
status = "Approved" if review.get("approved") else "Changes Requested"
return f"Review Status: {status}\nFeedback: {review.get('feedback', 'No details.')}"
research_agent = FunctionAgent(
name="ResearchAgent",
description="Useful for searching the web for information on a given topic and recording notes on the topic.",
system_prompt=(
"You are the ResearchAgent that can search the web for information on a given topic and record notes on the topic. "
"You should search for information from multiple angles to ensure comprehensive coverage. "
"Once you have gathered enough notes, hand off control to the WriteAgent to write a report."
),
llm=llm,
tools=[search_web, record_notes],
can_handoff_to=["WriteAgent"],
)
write_agent = FunctionAgent(
name="WriteAgent",
description="Useful for writing a report based on research notes, or revising a report based on review feedback.",
system_prompt=(
"You are the WriteAgent that can write a report on a given topic. "
"Your report should be in markdown format. The content must be grounded in the research notes. "
"Follow these steps:\n"
"1. First, call get_notes to retrieve all research notes.\n"
"2. If you are revising a report, call get_review to understand the reviewer's feedback, then call get_report to get the current report.\n"
"3. Write or revise the report using write_report.\n"
"4. Hand off to ReviewAgent for feedback.\n"
"If the reviewer requests more research, hand off to ResearchAgent instead."
),
llm=llm,
tools=[write_report, get_report, get_notes, get_review],
can_handoff_to=["ReviewAgent", "ResearchAgent"],
)
review_agent = FunctionAgent(
name="ReviewAgent",
description="Useful for reviewing a report and providing feedback.",
system_prompt=(
"You are the ReviewAgent that reviews reports for quality and completeness. "
"Follow these steps strictly:\n"
"1. First, call get_report to read the current report content.\n"
"2. Evaluate the report for accuracy, completeness, structure, and clarity.\n"
"3. Call review_report with your decision (approved=True/False) and detailed feedback.\n"
"4. If approved (approved=True), respond with a brief summary and do NOT hand off to any agent — the workflow will end.\n"
"5. If changes are needed (approved=False), provide specific feedback on what to improve, "
"then hand off to WriteAgent for revisions, or to ResearchAgent if more research is needed."
),
llm=llm,
tools=[get_report, review_report],
can_handoff_to=["WriteAgent", "ResearchAgent"],
)
agent_workflow = AgentWorkflow(
agents=[research_agent, write_agent, review_agent],
root_agent=research_agent.name,
initial_state={
"research_notes": {},
"report_content": "Not written yet.",
"review": {},
},
)
@dispatcher.span
async def deep_research(research_topic: str) -> str:
handler = agent_workflow.run(user_msg=research_topic)
current_agent = None
async for event in handler.stream_events():
if (
hasattr(event, "current_agent_name")
and event.current_agent_name != current_agent
):
current_agent = event.current_agent_name
logger.info(f"\n{'=' * 50}")
logger.info(f"🤖 Agent: {current_agent}")
logger.info(f"{'=' * 50}\n")
elif isinstance(event, AgentOutput):
if event.response.content:
logger.info("📤 Output: %s", event.response.content)
if event.tool_calls:
logger.info(
"🛠️ Planning to use tools: %s",
[call.tool_name for call in event.tool_calls],
)
elif isinstance(event, ToolCallResult):
logger.info("🔧 Tool Result (%s):", event.tool_name)
logger.info(" Arguments: %s", event.tool_kwargs)
logger.info(" Output: %s ...", str(event.tool_output)[:500])
elif isinstance(event, ToolCall):
logger.info("🔨 Calling Tool: %s", event.tool_name)
logger.info(" With arguments: %s", event.tool_kwargs)
state = await handler.ctx.store.get("state")
logger.info("Report Content:\n%s", state["report_content"])
logger.info("\n------------\nFinal Review:\n%s", state["review"])
return state["report_content"]
def research() -> None:
research_topic = (
input("Enter a research topic: ")
or """Write me a report on the history of the web.
Briefly describe the history of the world wide web, including
the development of the internet and the development of the web,
including 21st century developments"""
)
asyncio.run(deep_research(research_topic))
if settings.launch_local_tracer:
logger.info("Ctrl+C to exit and stop the local tracer.")
while True:
try:
sleep(1)
except KeyboardInterrupt:
break
if __name__ == "__main__":
research()toml
[project]
name = "python-deep-research"
version = "0.1.0"
description = "Deep Research workflow using workflow-py"
readme = "README.md"
authors = []
requires-python = ">=3.13"
dependencies = [
"arize-phoenix>=12.25.1",
"llama-index-core>=0.14.10",
"llama-index-llms-google-genai>=0.8.2",
"llama-index-llms-openai-like>=0.7.1",
"llama-index-tools-google>=0.6.2",
"openinference-instrumentation-llama-index>=4.3.9",
"opentelemetry-instrumentation-httpx>=0.60b1",
"pydantic>=2.12.5",
"pydantic-settings>=2.12.0",
"pytz>=2025.2",
"rich>=14.2.0",
"tavily-python>=0.5.0",
]ini
# DEEP_RESEARCH
DEEP_RESEARCH_API_KEY=your_deep_research_api_key