Skip to content

使用 workflow-py 搭建 Deep Research 工作流

参考 Google Gemini API: LLamaIndex 设计,引入 Arize Phoenix 进行模型监控。

py
import asyncio
import logging
from time import sleep
from typing import Literal

import phoenix
from llama_index.core.agent.workflow import (
    AgentOutput,
    AgentWorkflow,
    FunctionAgent,
    ToolCall,
    ToolCallResult,
)
from llama_index.llms.openai_like import OpenAILike
from llama_index_instrumentation import get_dispatcher
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter as GrpcOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
    OTLPSpanExporter as HttpOTLPSpanExporter,
)
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from phoenix.otel import register
from pydantic_settings import BaseSettings, SettingsConfigDict
from tavily import AsyncTavilyClient
from workflows import Context


# Ref: from mcp.server.fastmcp.utilities.logging import configure_logging
def configure_logging(
    level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO",
) -> None:
    handlers: list[logging.Handler] = []
    try:
        from rich.console import Console
        from rich.logging import RichHandler

        handlers.append(RichHandler(console=Console(stderr=True), rich_tracebacks=True))
    except ImportError:
        pass

    if not handlers:
        handlers.append(logging.StreamHandler())
    logging.basicConfig(
        level=level,
        format="%(message)s",
        handlers=handlers,
    )


configure_logging(level="INFO")
logger = logging.getLogger(__name__)


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_prefix="DEEP_RESEARCH_",
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )
    model_name: str = "gpt-4o"
    base_url: str = ""
    api_key: str = ""
    tavily_api_key: str = ""
    use_trace: bool = True
    launch_local_tracer: bool = True
    use_grpc_trace_exporter: bool = False
    trace_endpoint: str = "http://localhost:6006/v1/traces"
    otel_project_name: str = "workflow-deep-research-python"


dispatcher = get_dispatcher(__name__)
settings = Settings()


def setup_tracer():
    if not settings.use_trace:
        logger.info(f"Tracing is disabled, config: {settings.use_trace=}")
        return
    logger.info(
        f"Setting up tracer {settings.otel_project_name=}, {settings.trace_endpoint=}"
    )
    tracer_provider = register(
        project_name=settings.otel_project_name,
        verbose=False,
        endpoint=settings.trace_endpoint,
    )
    if settings.use_grpc_trace_exporter:
        span_exporter = GrpcOTLPSpanExporter(endpoint=settings.trace_endpoint)
    else:
        span_exporter = HttpOTLPSpanExporter(endpoint=settings.trace_endpoint)
    span_processor = SimpleSpanProcessor(span_exporter)
    tracer_provider.add_span_processor(span_processor)
    LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)
    if settings.launch_local_tracer:
        phoenix.launch_app()


llm = OpenAILike(
    model=settings.model_name,
    api_key=settings.api_key or None,
    api_base=settings.base_url or None,
    is_function_calling_model=True,
    is_chat_model=True,
)

tavily_client = AsyncTavilyClient(api_key=settings.tavily_api_key)

setup_tracer()


async def search_web(ctx: Context, query: str) -> str:
    """Useful for searching the web about a specific query or topic"""
    response = await tavily_client.search(
        query=query,
        search_depth="advanced",
        max_results=5,
        include_answer=True,
    )
    # Compose a readable result from Tavily response
    parts: list[str] = []
    if response.get("answer"):
        parts.append(f"Summary: {response['answer']}\n")
    for result in response.get("results", []):
        title = result.get("title", "")
        url = result.get("url", "")
        content = result.get("content", "")
        parts.append(f"### {title}\nURL: {url}\n{content}\n")
    return "\n".join(parts) if parts else "No results found."


async def record_notes(ctx: Context, notes: str, notes_title: str) -> str:
    """Useful for recording notes on a given topic."""
    current_state = await ctx.store.get("state")
    if "research_notes" not in current_state:
        current_state["research_notes"] = {}
    current_state["research_notes"][notes_title] = notes
    await ctx.store.set("state", current_state)
    return "Notes recorded."


async def get_notes(ctx: Context) -> str:
    """Useful for retrieving the current research notes."""
    current_state = await ctx.store.get("state")
    research_notes = current_state.get("research_notes", {})
    if not research_notes:
        return "No research notes found."
    return "\n\n".join(
        f"### {title}\n{content}" for title, content in research_notes.items()
    )


async def write_report(ctx: Context, report_content: str) -> str:
    """Useful for writing a report on a given topic."""
    current_state = await ctx.store.get("state")
    current_state["report_content"] = report_content
    await ctx.store.set("state", current_state)
    return "Report written."


async def get_report(ctx: Context) -> str:
    """Useful for retrieving the current report content."""
    current_state = await ctx.store.get("state")
    return current_state.get("report_content", "No report content found.")


async def review_report(ctx: Context, approved: bool, feedback: str) -> str:
    """Useful for reviewing a report and providing feedback.

    Args:
        approved: Whether the report is approved (True) or needs changes (False).
        feedback: The detailed feedback explaining the review decision.
    """
    current_state = await ctx.store.get("state")
    current_state["review"] = {
        "approved": approved,
        "feedback": feedback,
    }
    await ctx.store.set("state", current_state)
    return f"Report {'approved' if approved else 'needs changes'}: {feedback}"


async def get_review(ctx: Context) -> str:
    """Useful for retrieving the latest review feedback on the report."""
    current_state = await ctx.store.get("state")
    review = current_state.get("review", {})
    if not review or not isinstance(review, dict):
        return "No review feedback yet."
    status = "Approved" if review.get("approved") else "Changes Requested"
    return f"Review Status: {status}\nFeedback: {review.get('feedback', 'No details.')}"


research_agent = FunctionAgent(
    name="ResearchAgent",
    description="Useful for searching the web for information on a given topic and recording notes on the topic.",
    system_prompt=(
        "You are the ResearchAgent that can search the web for information on a given topic and record notes on the topic. "
        "You should search for information from multiple angles to ensure comprehensive coverage. "
        "Once you have gathered enough notes, hand off control to the WriteAgent to write a report."
    ),
    llm=llm,
    tools=[search_web, record_notes],
    can_handoff_to=["WriteAgent"],
)

write_agent = FunctionAgent(
    name="WriteAgent",
    description="Useful for writing a report based on research notes, or revising a report based on review feedback.",
    system_prompt=(
        "You are the WriteAgent that can write a report on a given topic. "
        "Your report should be in markdown format. The content must be grounded in the research notes. "
        "Follow these steps:\n"
        "1. First, call get_notes to retrieve all research notes.\n"
        "2. If you are revising a report, call get_review to understand the reviewer's feedback, then call get_report to get the current report.\n"
        "3. Write or revise the report using write_report.\n"
        "4. Hand off to ReviewAgent for feedback.\n"
        "If the reviewer requests more research, hand off to ResearchAgent instead."
    ),
    llm=llm,
    tools=[write_report, get_report, get_notes, get_review],
    can_handoff_to=["ReviewAgent", "ResearchAgent"],
)

review_agent = FunctionAgent(
    name="ReviewAgent",
    description="Useful for reviewing a report and providing feedback.",
    system_prompt=(
        "You are the ReviewAgent that reviews reports for quality and completeness. "
        "Follow these steps strictly:\n"
        "1. First, call get_report to read the current report content.\n"
        "2. Evaluate the report for accuracy, completeness, structure, and clarity.\n"
        "3. Call review_report with your decision (approved=True/False) and detailed feedback.\n"
        "4. If approved (approved=True), respond with a brief summary and do NOT hand off to any agent — the workflow will end.\n"
        "5. If changes are needed (approved=False), provide specific feedback on what to improve, "
        "then hand off to WriteAgent for revisions, or to ResearchAgent if more research is needed."
    ),
    llm=llm,
    tools=[get_report, review_report],
    can_handoff_to=["WriteAgent", "ResearchAgent"],
)


agent_workflow = AgentWorkflow(
    agents=[research_agent, write_agent, review_agent],
    root_agent=research_agent.name,
    initial_state={
        "research_notes": {},
        "report_content": "Not written yet.",
        "review": {},
    },
)


@dispatcher.span
async def deep_research(research_topic: str) -> str:
    handler = agent_workflow.run(user_msg=research_topic)
    current_agent = None
    async for event in handler.stream_events():
        if (
            hasattr(event, "current_agent_name")
            and event.current_agent_name != current_agent
        ):
            current_agent = event.current_agent_name
            logger.info(f"\n{'=' * 50}")
            logger.info(f"🤖 Agent: {current_agent}")
            logger.info(f"{'=' * 50}\n")
        elif isinstance(event, AgentOutput):
            if event.response.content:
                logger.info("📤 Output: %s", event.response.content)
            if event.tool_calls:
                logger.info(
                    "🛠️  Planning to use tools: %s",
                    [call.tool_name for call in event.tool_calls],
                )
        elif isinstance(event, ToolCallResult):
            logger.info("🔧 Tool Result (%s):", event.tool_name)
            logger.info("  Arguments: %s", event.tool_kwargs)
            logger.info("  Output: %s ...", str(event.tool_output)[:500])
        elif isinstance(event, ToolCall):
            logger.info("🔨 Calling Tool: %s", event.tool_name)
            logger.info("  With arguments: %s", event.tool_kwargs)
    state = await handler.ctx.store.get("state")
    logger.info("Report Content:\n%s", state["report_content"])
    logger.info("\n------------\nFinal Review:\n%s", state["review"])
    return state["report_content"]


def research() -> None:
    research_topic = (
        input("Enter a research topic: ")
        or """Write me a report on the history of the web.
    Briefly describe the history of the world wide web, including
    the development of the internet and the development of the web,
    including 21st century developments"""
    )
    asyncio.run(deep_research(research_topic))

    if settings.launch_local_tracer:
        logger.info("Ctrl+C to exit and stop the local tracer.")
        while True:
            try:
                sleep(1)
            except KeyboardInterrupt:
                break


if __name__ == "__main__":
    research()
toml
[project]
name = "python-deep-research"
version = "0.1.0"
description = "Deep Research workflow using workflow-py"
readme = "README.md"
authors = []
requires-python = ">=3.13"
dependencies = [
    "arize-phoenix>=12.25.1",
    "llama-index-core>=0.14.10",
    "llama-index-llms-google-genai>=0.8.2",
    "llama-index-llms-openai-like>=0.7.1",
    "llama-index-tools-google>=0.6.2",
    "openinference-instrumentation-llama-index>=4.3.9",
    "opentelemetry-instrumentation-httpx>=0.60b1",
    "pydantic>=2.12.5",
    "pydantic-settings>=2.12.0",
    "pytz>=2025.2",
    "rich>=14.2.0",
    "tavily-python>=0.5.0",
]
ini
# DEEP_RESEARCH
DEEP_RESEARCH_API_KEY=your_deep_research_api_key