Skip to content

使用 workflow-py 搭建 Deep Research 工作流

参考 Google Gemini API: LLamaIndex 设计,引入 Arize Phoenix 进行模型监控。

py
import asyncio
import logging
from time import sleep
from typing import Literal

import phoenix
from google.genai import types
from llama_index.core.agent.workflow import (
    AgentOutput,
    AgentWorkflow,
    FunctionAgent,
    ToolCall,
    ToolCallResult,
)
from llama_index.llms.google_genai import GoogleGenAI
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from phoenix.otel import register
from pydantic_settings import BaseSettings, SettingsConfigDict
from workflows import Context


# Ref: from mcp.server.fastmcp.utilities.logging import configure_logging
def configure_logging(
    level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
) -> None:
    handlers: list[logging.Handler] = []
    try:
        from rich.console import Console
        from rich.logging import RichHandler

        handlers.append(RichHandler(console=Console(stderr=True), rich_tracebacks=True))
    except ImportError:
        pass

    if not handlers:
        handlers.append(logging.StreamHandler())
    logging.basicConfig(
        level=level,
        format="%(message)s",
        handlers=handlers,
    )


configure_logging(level="INFO")
logger = logging.getLogger(__name__)


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_prefix="DEEP_RESEARCH_",
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )
    model_name: str = "gemini-3-pro-preview"
    api_key: str = ""
    use_trace: bool = True
    launch_local_tracer: bool = True
    trace_endpoint: str = "http://localhost:6006/v1/traces"
    otel_project_name: str = "workflow-deep-research-python"


settings = Settings()


def setup_tracer():
    if not settings.use_trace:
        logger.info(f"Tracing is disabled, config: {settings.use_trace=}")
        return
    logger.info(
        f"Setting up tracer {settings.otel_project_name=}, {settings.trace_endpoint=}"
    )
    tracer_provider = register(project_name=settings.otel_project_name, verbose=False)
    span_exporter = OTLPSpanExporter(endpoint=settings.trace_endpoint)
    span_processor = SimpleSpanProcessor(span_exporter)
    tracer_provider.add_span_processor(span_processor)
    LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
    if settings.launch_local_tracer:
        phoenix.launch_app()


setup_tracer()


google_search_tool = types.Tool(google_search=types.GoogleSearch())

llm = GoogleGenAI(
    model=settings.model_name,
    api_key=settings.api_key,
)
llm_with_search = GoogleGenAI(
    model=settings.model_name,
    api_key=settings.api_key,
    generation_config=types.GenerateContentConfig(tools=[google_search_tool]),
)


async def search_web(ctx: Context, query: str) -> str:
    """Useful for searching the web about a specific query or topic"""
    response = await llm_with_search.acomplete(f"""Please research given this query or topic,
    and return the result\n<query_or_topic>{query}</query_or_topic>""")
    return response.text


async def record_notes(ctx: Context, notes: str, notes_title: str) -> str:
    """Useful for recording notes on a given topic."""
    current_state = await ctx.store.get("state")
    if "research_notes" not in current_state:
        current_state["research_notes"] = {}
    current_state["research_notes"][notes_title] = notes
    await ctx.store.set("state", current_state)
    return "Notes recorded."


async def write_report(ctx: Context, report_content: str) -> str:
    """Useful for writing a report on a given topic."""
    current_state = await ctx.store.get("state")
    current_state["report_content"] = report_content
    await ctx.store.set("state", current_state)
    return "Report written."


async def review_report(ctx: Context, review: str) -> str:
    """Useful for reviewing a report and providing feedback."""
    current_state = await ctx.store.get("state")
    current_state["review"] = review
    await ctx.store.set("state", current_state)
    return "Report reviewed."


research_agent = FunctionAgent(
    name="ResearchAgent",
    description="Useful for searching the web for information on a given topic and recording notes on the topic.",
    system_prompt=(
        "You are the ResearchAgent that can search the web for information on a given topic and record notes on the topic. "
        "Once notes are recorded and you are satisfied, you should hand off control to the WriteAgent to write a report on the topic."
    ),
    llm=llm,
    tools=[search_web, record_notes],
    can_handoff_to=["WriteAgent"],
)

write_agent = FunctionAgent(
    name="WriteAgent",
    description="Useful for writing a report on a given topic.",
    system_prompt=(
        "You are the WriteAgent that can write a report on a given topic. "
        "Your report should be in a markdown format. The content should be grounded in the research notes. "
        "Once the report is written, you should get feedback at least once from the ReviewAgent."
    ),
    llm=llm,
    tools=[write_report],
    can_handoff_to=["ReviewAgent", "ResearchAgent"],
)

review_agent = FunctionAgent(
    name="ReviewAgent",
    description="Useful for reviewing a report and providing feedback.",
    system_prompt=(
        "You are the ReviewAgent that can review a report and provide feedback. "
        "Your feedback should either approve the current report or request changes for the WriteAgent to implement."
    ),
    llm=llm,
    tools=[review_report],
    can_handoff_to=["ResearchAgent", "WriteAgent"],
)


agent_workflow = AgentWorkflow(
    agents=[research_agent, write_agent, review_agent],
    root_agent=research_agent.name,
    initial_state={
        "research_notes": {},
        "report_content": "Not written yet.",
        "review": "Review required.",
    },
)


async def deep_research():
    research_topic = """Write me a report on the history of the web.
    Briefly describe the history of the world wide web, including
    the development of the internet and the development of the web,
    including 21st century developments"""

    handler = agent_workflow.run(user_msg=research_topic)
    current_agent = None
    async for event in handler.stream_events():
        if (
            hasattr(event, "current_agent_name")
            and event.current_agent_name != current_agent
        ):
            current_agent = event.current_agent_name
            logger.info(f"\n{'=' * 50}")
            logger.info(f"🤖 Agent: {current_agent}")
            logger.info(f"{'=' * 50}\n")
        elif isinstance(event, AgentOutput):
            if event.response.content:
                logger.info("📤 Output: %s", event.response.content)
            if event.tool_calls:
                logger.info(
                    "🛠️  Planning to use tools: %s",
                    [call.tool_name for call in event.tool_calls],
                )
        elif isinstance(event, ToolCallResult):
            logger.info("🔧 Tool Result (%s):", event.tool_name)
            logger.info("  Arguments: %s", event.tool_kwargs)
            logger.info("  Output: %s", event.tool_output)
        elif isinstance(event, ToolCall):
            logger.info("🔨 Calling Tool: %s", event.tool_name)
            logger.info("  With arguments: %s", event.tool_kwargs)
    state = await handler.ctx.store.get("state")
    logger.info("Report Content:\n%s", state["report_content"])
    logger.info("\n------------\nFinal Review:\n%s", state["review"])


def research() -> None:
    asyncio.run(deep_research())

    if settings.launch_local_tracer:
        logger.info("Ctrl+C to exit and stop the local tracer.")
        while True:
            try:
                sleep(1)
            except KeyboardInterrupt:
                break


if __name__ == "__main__":
    research()
toml
[project]
name = "python-deep-research"
version = "0.1.0"
description = "Deep Research workflow using workflow-py"
readme = "README.md"
authors = []
requires-python = ">=3.13"
dependencies = [
    "arize-phoenix>=12.25.1",
    "llama-index-core>=0.14.10",
    "llama-index-llms-google-genai>=0.8.2",
    "llama-index-tools-google>=0.6.2",
    "openinference-instrumentation-llama-index>=4.3.9",
    "pydantic>=2.12.5",
    "pydantic-settings>=2.12.0",
    "rich>=14.2.0",
]

[project.scripts]
python-deep-research = "python_deep_research:main"

[build-system]
requires = ["uv_build>=0.8.13,<0.9.0"]
build-backend = "uv_build"
ini
# DEEP_RESEARCH
DEEP_RESEARCH_API_KEY=your_deep_research_api_key