使用 workflow-py 搭建 Deep Research 工作流
参考 Google Gemini API: LLamaIndex 设计,引入 Arize Phoenix 进行模型监控。
py
import asyncio
import logging
from time import sleep
from typing import Literal
import phoenix
from google.genai import types
from llama_index.core.agent.workflow import (
AgentOutput,
AgentWorkflow,
FunctionAgent,
ToolCall,
ToolCallResult,
)
from llama_index.llms.google_genai import GoogleGenAI
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from phoenix.otel import register
from pydantic_settings import BaseSettings, SettingsConfigDict
from workflows import Context
# Ref: from mcp.server.fastmcp.utilities.logging import configure_logging
def configure_logging(
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
) -> None:
handlers: list[logging.Handler] = []
try:
from rich.console import Console
from rich.logging import RichHandler
handlers.append(RichHandler(console=Console(stderr=True), rich_tracebacks=True))
except ImportError:
pass
if not handlers:
handlers.append(logging.StreamHandler())
logging.basicConfig(
level=level,
format="%(message)s",
handlers=handlers,
)
configure_logging(level="INFO")
logger = logging.getLogger(__name__)
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="DEEP_RESEARCH_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
model_name: str = "gemini-3-pro-preview"
api_key: str = ""
use_trace: bool = True
launch_local_tracer: bool = True
trace_endpoint: str = "http://localhost:6006/v1/traces"
otel_project_name: str = "workflow-deep-research-python"
settings = Settings()
def setup_tracer():
if not settings.use_trace:
logger.info(f"Tracing is disabled, config: {settings.use_trace=}")
return
logger.info(
f"Setting up tracer {settings.otel_project_name=}, {settings.trace_endpoint=}"
)
tracer_provider = register(project_name=settings.otel_project_name, verbose=False)
span_exporter = OTLPSpanExporter(endpoint=settings.trace_endpoint)
span_processor = SimpleSpanProcessor(span_exporter)
tracer_provider.add_span_processor(span_processor)
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
if settings.launch_local_tracer:
phoenix.launch_app()
setup_tracer()
google_search_tool = types.Tool(google_search=types.GoogleSearch())
llm = GoogleGenAI(
model=settings.model_name,
api_key=settings.api_key,
)
llm_with_search = GoogleGenAI(
model=settings.model_name,
api_key=settings.api_key,
generation_config=types.GenerateContentConfig(tools=[google_search_tool]),
)
async def search_web(ctx: Context, query: str) -> str:
"""Useful for searching the web about a specific query or topic"""
response = await llm_with_search.acomplete(f"""Please research given this query or topic,
and return the result\n<query_or_topic>{query}</query_or_topic>""")
return response.text
async def record_notes(ctx: Context, notes: str, notes_title: str) -> str:
"""Useful for recording notes on a given topic."""
current_state = await ctx.store.get("state")
if "research_notes" not in current_state:
current_state["research_notes"] = {}
current_state["research_notes"][notes_title] = notes
await ctx.store.set("state", current_state)
return "Notes recorded."
async def write_report(ctx: Context, report_content: str) -> str:
"""Useful for writing a report on a given topic."""
current_state = await ctx.store.get("state")
current_state["report_content"] = report_content
await ctx.store.set("state", current_state)
return "Report written."
async def review_report(ctx: Context, review: str) -> str:
"""Useful for reviewing a report and providing feedback."""
current_state = await ctx.store.get("state")
current_state["review"] = review
await ctx.store.set("state", current_state)
return "Report reviewed."
research_agent = FunctionAgent(
name="ResearchAgent",
description="Useful for searching the web for information on a given topic and recording notes on the topic.",
system_prompt=(
"You are the ResearchAgent that can search the web for information on a given topic and record notes on the topic. "
"Once notes are recorded and you are satisfied, you should hand off control to the WriteAgent to write a report on the topic."
),
llm=llm,
tools=[search_web, record_notes],
can_handoff_to=["WriteAgent"],
)
write_agent = FunctionAgent(
name="WriteAgent",
description="Useful for writing a report on a given topic.",
system_prompt=(
"You are the WriteAgent that can write a report on a given topic. "
"Your report should be in a markdown format. The content should be grounded in the research notes. "
"Once the report is written, you should get feedback at least once from the ReviewAgent."
),
llm=llm,
tools=[write_report],
can_handoff_to=["ReviewAgent", "ResearchAgent"],
)
review_agent = FunctionAgent(
name="ReviewAgent",
description="Useful for reviewing a report and providing feedback.",
system_prompt=(
"You are the ReviewAgent that can review a report and provide feedback. "
"Your feedback should either approve the current report or request changes for the WriteAgent to implement."
),
llm=llm,
tools=[review_report],
can_handoff_to=["ResearchAgent", "WriteAgent"],
)
agent_workflow = AgentWorkflow(
agents=[research_agent, write_agent, review_agent],
root_agent=research_agent.name,
initial_state={
"research_notes": {},
"report_content": "Not written yet.",
"review": "Review required.",
},
)
async def deep_research():
research_topic = """Write me a report on the history of the web.
Briefly describe the history of the world wide web, including
the development of the internet and the development of the web,
including 21st century developments"""
handler = agent_workflow.run(user_msg=research_topic)
current_agent = None
async for event in handler.stream_events():
if (
hasattr(event, "current_agent_name")
and event.current_agent_name != current_agent
):
current_agent = event.current_agent_name
logger.info(f"\n{'=' * 50}")
logger.info(f"🤖 Agent: {current_agent}")
logger.info(f"{'=' * 50}\n")
elif isinstance(event, AgentOutput):
if event.response.content:
logger.info("📤 Output: %s", event.response.content)
if event.tool_calls:
logger.info(
"🛠️ Planning to use tools: %s",
[call.tool_name for call in event.tool_calls],
)
elif isinstance(event, ToolCallResult):
logger.info("🔧 Tool Result (%s):", event.tool_name)
logger.info(" Arguments: %s", event.tool_kwargs)
logger.info(" Output: %s", event.tool_output)
elif isinstance(event, ToolCall):
logger.info("🔨 Calling Tool: %s", event.tool_name)
logger.info(" With arguments: %s", event.tool_kwargs)
state = await handler.ctx.store.get("state")
logger.info("Report Content:\n%s", state["report_content"])
logger.info("\n------------\nFinal Review:\n%s", state["review"])
def research() -> None:
asyncio.run(deep_research())
if settings.launch_local_tracer:
logger.info("Ctrl+C to exit and stop the local tracer.")
while True:
try:
sleep(1)
except KeyboardInterrupt:
break
if __name__ == "__main__":
research()toml
[project]
name = "python-deep-research"
version = "0.1.0"
description = "Deep Research workflow using workflow-py"
readme = "README.md"
authors = []
requires-python = ">=3.13"
dependencies = [
"arize-phoenix>=12.25.1",
"llama-index-core>=0.14.10",
"llama-index-llms-google-genai>=0.8.2",
"llama-index-tools-google>=0.6.2",
"openinference-instrumentation-llama-index>=4.3.9",
"pydantic>=2.12.5",
"pydantic-settings>=2.12.0",
"rich>=14.2.0",
]
[project.scripts]
python-deep-research = "python_deep_research:main"
[build-system]
requires = ["uv_build>=0.8.13,<0.9.0"]
build-backend = "uv_build"ini
# DEEP_RESEARCH
DEEP_RESEARCH_API_KEY=your_deep_research_api_key