Skip to content

LlamaIndex Agent Workflow 架构解析

本文深入分析 LlamaIndex 中基于 Workflow 实现 Agent 的架构设计,解读核心组件的语义与分工,帮助开发者理解如何使用事件驱动的工作流模式构建智能代理系统。

1. Workflow 基础架构

Workflow 是 LlamaIndex Agent 系统的核心基础设施,它采用 事件驱动 的编排模式来定义和运行应用流程。每个 Workflow 由多个使用 @step 装饰器标记的步骤组成,这些步骤通过接收和发射类型化的 Event 进行通信。

1.1 核心概念

Workflow 架构的核心思想是将复杂的 Agent 行为分解为一系列 有状态的步骤,每个步骤:

  1. 接收特定类型的事件作为输入
  2. 执行相应的业务逻辑
  3. 返回新的事件以触发后续步骤

1.2 关键组件

WorkflowMeta(元类):通过 Python 元类机制自动收集工作流中定义的所有步骤函数,存储在 _step_functions 字典中。

Workflow 基类:提供工作流的基础功能,包括:

  • 超时控制(timeout
  • 事件图验证
  • 并发运行限制
  • 资源管理器注入

Event 事件系统:所有事件都继承自 Event 基类,它是一个轻量级、可序列化的数据载体,支持属性访问和字典访问两种语义。

python
class Event(DictLikeModel):
    """所有工作流事件的基类"""
    pass

class StartEvent(Event):
    """工作流启动事件"""
    pass

class StopEvent(Event):
    """工作流终止事件,携带最终结果"""
    result: Any

2. Agent 事件体系

Agent Workflow 定义了一套专门的事件类型,用于在 Agent 执行过程中传递状态和数据。

2.1 核心事件类型

事件类型语义数据内容
AgentWorkflowStartEvent工作流启动用户消息、聊天历史、内存配置、最大迭代次数
AgentInputLLM 输入准备完成消息列表、当前代理名称
AgentSetup代理设置完成格式化后的消息列表、当前代理名称
AgentOutputLLM 输出响应消息、工具调用列表、结构化响应、原始输出
AgentStream流式输出增量文本、完整响应、工具调用、思考过程
ToolCall工具调用请求工具名称、参数、调用 ID
ToolCallResult工具执行结果工具输出、是否直接返回

2.2 事件流转示例

python
# AgentWorkflowStartEvent 携带的数据
start_event = AgentWorkflowStartEvent(
    user_msg="请帮我查询天气",
    chat_history=[],
    max_iterations=20
)

# AgentOutput 携带的数据结构
agent_output = AgentOutput(
    response=ChatMessage(role="assistant", content="..."),
    tool_calls=[
        ToolSelection(
            tool_id="uuid-xxx",
            tool_name="weather_query",
            tool_kwargs={"city": "北京"}
        )
    ],
    current_agent_name="WeatherAgent"
)

3. BaseWorkflowAgent 抽象基类

BaseWorkflowAgent 是所有 Agent 实现的基类,它同时继承自 WorkflowBaseModelPromptMixin,融合了工作流编排、数据模型和提示词管理三种能力。

3.1 核心属性

python
class BaseWorkflowAgent(Workflow, BaseModel, PromptMixin):
    name: str                              # 代理名称
    description: str                       # 代理描述(多代理场景下用于路由)
    system_prompt: Optional[str]           # 系统提示词
    tools: Optional[List[BaseTool]]        # 可用工具列表
    tool_retriever: Optional[ObjectRetriever]  # 动态工具检索器
    can_handoff_to: Optional[List[str]]    # 可交接的代理列表
    llm: LLM                               # 语言模型
    initial_state: Dict[str, Any]          # 初始状态
    state_prompt: BasePromptTemplate       # 状态提示模板
    output_cls: Optional[Type[BaseModel]]  # 结构化输出类型
    streaming: bool                        # 是否流式输出

3.2 抽象方法

每个具体的 Agent 实现必须覆写三个核心抽象方法:

python
@abstractmethod
async def take_step(
    self, ctx: Context, llm_input: List[ChatMessage],
    tools: Sequence[AsyncBaseTool], memory: BaseMemory
) -> AgentOutput:
    """执行单步推理"""

@abstractmethod
async def handle_tool_call_results(
    self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
    """处理工具调用结果"""

@abstractmethod
async def finalize(
    self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
    """完成代理执行,清理状态"""

3.3 内置步骤方法

BaseWorkflowAgent 预定义了五个标准工作流步骤:

init_run:初始化工作流上下文,处理用户输入和聊天历史。

python
@step
async def init_run(self, ctx: Context, ev: AgentWorkflowStartEvent) -> AgentInput:
    await self._init_context(ctx, ev)
    # 处理用户消息和聊天历史
    # 初始化内存
    return AgentInput(input=input_messages, current_agent_name=self.name)

setup_agent:准备 LLM 输入,注入系统提示词和状态信息。

python
@step
async def setup_agent(self, ctx: Context, ev: AgentInput) -> AgentSetup:
    # 添加系统提示词
    # 格式化状态信息到输入中
    return AgentSetup(input=llm_input, current_agent_name=ev.current_agent_name)

run_agent_step:调用具体 Agent 的 take_step 方法执行推理。

python
@step
async def run_agent_step(self, ctx: Context, ev: AgentSetup) -> AgentOutput:
    agent_output = await self.take_step(ctx, ev.input, tools, memory)
    ctx.write_event_to_stream(agent_output)
    return agent_output

parse_agent_output:解析 Agent 输出,决定是继续工具调用还是返回结果。

python
@step
async def parse_agent_output(self, ctx: Context, ev: AgentOutput
) -> Union[StopEvent, AgentInput, ToolCall, None]:
    if not ev.tool_calls:
        return StopEvent(result=output)
    for tool_call in ev.tool_calls:
        ctx.send_event(ToolCall(...))
    return None

call_tool & aggregate_tool_results:执行工具调用并聚合结果。

4. 具体 Agent 实现

LlamaIndex 提供了三种内置的 Agent 实现,它们采用不同的推理策略。

4.1 FunctionAgent(函数调用代理)

FunctionAgent 利用 LLM 的原生函数调用能力(Function Calling)来决定工具使用。它适用于支持工具调用 API 的模型(如 OpenAI GPT-4、Claude 等)。

核心特点

  • 使用 llm.achat_with_tools() 进行推理
  • 支持并行工具调用(allow_parallel_tool_calls
  • 使用 scratchpad 维护中间对话状态
python
class FunctionAgent(BaseWorkflowAgent):
    scratchpad_key: str = "scratchpad"
    initial_tool_choice: Optional[str] = None
    allow_parallel_tool_calls: bool = True

    async def take_step(self, ctx, llm_input, tools, memory) -> AgentOutput:
        # 从 scratchpad 获取历史消息
        scratchpad = await ctx.store.get(self.scratchpad_key, default=[])
        current_llm_input = [*llm_input, *scratchpad]

        # 调用 LLM 的工具调用 API
        response = await self.llm.achat_with_tools(
            chat_history=current_llm_input,
            tools=tools,
            allow_parallel_tool_calls=self.allow_parallel_tool_calls
        )

        # 解析工具调用
        tool_calls = self.llm.get_tool_calls_from_response(response)
        return AgentOutput(response=response.message, tool_calls=tool_calls)

4.2 ReActAgent(推理行动代理)

ReActAgent 实现了经典的 ReAct(Reasoning + Acting)范式,通过显式的思考-行动-观察循环进行推理。它不依赖 LLM 的原生工具调用能力,而是通过提示词工程实现。

核心特点

  • 使用 ReActChatFormatter 格式化输入
  • 使用 ReActOutputParser 解析输出
  • 维护推理步骤链(current_reasoning
python
class ReActAgent(BaseWorkflowAgent):
    reasoning_key: str = "current_reasoning"
    output_parser: ReActOutputParser
    formatter: ReActChatFormatter

    async def take_step(self, ctx, llm_input, tools, memory) -> AgentOutput:
        # 格式化 ReAct 风格的输入
        current_reasoning = await ctx.store.get(self.reasoning_key, default=[])
        input_chat = self.formatter.format(tools, chat_history=llm_input,
                                           current_reasoning=current_reasoning)

        # 普通 LLM 调用
        response = await self.llm.achat(input_chat)

        # 解析 Thought/Action/Action Input 格式
        reasoning_step = self.output_parser.parse(response.message.content)

        if reasoning_step.is_done:
            return AgentOutput(response=response.message)

        # 构建工具调用
        tool_calls = [ToolSelection(
            tool_name=reasoning_step.action,
            tool_kwargs=reasoning_step.action_input
        )]
        return AgentOutput(response=response.message, tool_calls=tool_calls)

ReAct 格式示例:

text
Thought: 我需要查询北京的天气
Action: weather_query
Action Input: {"city": "北京"}

4.3 CodeActAgent

CodeActAgent 是一种更高级的代理,它通过生成和执行代码来完成任务,而不是简单的工具调用。

5. 多代理工作流

AgentWorkflow 支持多个代理之间的协作和交接(Handoff),实现复杂的多代理系统。

5.1 交接机制

多代理场景下,每个代理会自动获得一个 handoff 工具,用于将控制权转移给其他代理:

python
async def handoff(ctx: Context, to_agent: str, reason: str) -> str:
    """将聊天控制权交给指定代理"""
    await ctx.store.set("next_agent", to_agent)
    return f"Agent {to_agent} is now handling the request..."

5.2 配置示例

python
from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent

# 定义多个专业代理
researcher = FunctionAgent(
    name="Researcher",
    description="专门负责信息检索和研究",
    tools=[search_tool, wiki_tool],
    can_handoff_to=["Writer"]
)

writer = FunctionAgent(
    name="Writer",
    description="专门负责内容创作和编辑",
    tools=[write_tool, edit_tool],
    can_handoff_to=["Researcher"]
)

# 创建多代理工作流
workflow = AgentWorkflow(
    agents=[researcher, writer],
    root_agent="Researcher",
    initial_state={"task": "写一篇关于 AI 的文章"}
)

# 运行工作流
result = await workflow.run(user_msg="请帮我写一篇关于 AI 的文章")

5.3 执行流程

6. Context 上下文管理

Context 是每次工作流运行的全局上下文,提供了状态存储、事件传递和流式输出等功能。

6.1 核心功能

python
class Context:
    # 状态存储 - 用于跨步骤共享数据
    store: InMemoryStateStore

    # 事件操作
    def send_event(self, event: Event) -> None:
        """发送事件触发其他步骤"""

    def write_event_to_stream(self, event: Event) -> None:
        """将事件写入输出流(用于实时反馈)"""

    def collect_events(self, ev: Event, expected: List[Type]) -> List[Event]:
        """收集多个事件(用于聚合)"""

    # 序列化支持
    def to_dict(self) -> dict:
        """序列化上下文"""

    @classmethod
    def from_dict(cls, workflow, data: dict) -> Context:
        """反序列化上下文"""

6.2 状态管理示例

python
@step
async def my_step(self, ctx: Context, ev: SomeEvent) -> AnotherEvent:
    # 读取状态
    counter = await ctx.store.get("counter", default=0)

    # 更新状态
    await ctx.store.set("counter", counter + 1)

    # 读取内存
    memory: BaseMemory = await ctx.store.get("memory")
    messages = await memory.aget()

    return AnotherEvent(...)

7. 使用工作流实现 Agent 的步骤

基于以上分析,使用 Workflow 实现自定义 Agent 的完整步骤如下:

7.1 定义事件类型

python
from workflows.events import Event, StartEvent, StopEvent

class MyAgentStartEvent(StartEvent):
    task: str
    context: dict = {}

class MyAgentOutput(Event):
    result: str
    metadata: dict = {}

7.2 继承 BaseWorkflowAgent

python
from llama_index.core.agent.workflow import BaseWorkflowAgent

class MyCustomAgent(BaseWorkflowAgent):
    # 自定义配置
    custom_config: str = "default"

    async def take_step(self, ctx, llm_input, tools, memory) -> AgentOutput:
        # 实现推理逻辑
        response = await self.llm.achat(llm_input)
        # 解析工具调用...
        return AgentOutput(response=response.message, tool_calls=[...])

    async def handle_tool_call_results(self, ctx, results, memory) -> None:
        # 处理工具结果
        for result in results:
            await memory.aput(ChatMessage(
                role="tool",
                content=result.tool_output.content
            ))

    async def finalize(self, ctx, output, memory) -> AgentOutput:
        # 清理状态,返回最终结果
        return output

7.3 添加自定义步骤(可选)

python
from llama_index.core.workflow import step

class MyCustomAgent(BaseWorkflowAgent):
    @step
    async def custom_preprocessing(self, ctx: Context, ev: AgentInput) -> AgentSetup:
        # 自定义预处理逻辑
        enhanced_input = self._enhance_input(ev.input)
        return AgentSetup(input=enhanced_input, current_agent_name=self.name)

7.4 运行 Agent

python
agent = MyCustomAgent(
    name="MyAgent",
    llm=OpenAI(model="gpt-4"),
    tools=[my_tool_1, my_tool_2],
    system_prompt="You are a helpful assistant."
)

# 简单运行
result = await agent.run(user_msg="请帮我完成这个任务")

# 流式运行
handler = agent.run(user_msg="请帮我完成这个任务")
async for event in handler.stream_events():
    if isinstance(event, AgentStream):
        print(event.delta, end="", flush=True)
result = await handler

8. 总结

LlamaIndex 的 Agent Workflow 架构提供了一种灵活、可扩展的方式来构建智能代理系统:

  1. 事件驱动:通过类型化的事件在步骤之间传递数据,实现松耦合
  2. 状态管理:Context 提供统一的状态存储,支持跨步骤共享和持久化
  3. 可扩展性:通过继承 BaseWorkflowAgent 和添加自定义步骤,轻松实现定制化 Agent
  4. 多代理协作AgentWorkflow 支持多代理之间的交接和协作
  5. 流式输出:内置流式事件机制,支持实时反馈

理解这套架构后,开发者可以根据具体需求选择合适的 Agent 实现(FunctionAgent、ReActAgent),或者创建完全自定义的 Agent 来满足特殊场景的需要。