LlamaIndex Agent Workflow 架构解析
本文深入分析 LlamaIndex 中基于 Workflow 实现 Agent 的架构设计,解读核心组件的语义与分工,帮助开发者理解如何使用事件驱动的工作流模式构建智能代理系统。
1. Workflow 基础架构
Workflow 是 LlamaIndex Agent 系统的核心基础设施,它采用 事件驱动 的编排模式来定义和运行应用流程。每个 Workflow 由多个使用 @step 装饰器标记的步骤组成,这些步骤通过接收和发射类型化的 Event 进行通信。
1.1 核心概念
Workflow 架构的核心思想是将复杂的 Agent 行为分解为一系列 有状态的步骤,每个步骤:
- 接收特定类型的事件作为输入
- 执行相应的业务逻辑
- 返回新的事件以触发后续步骤
1.2 关键组件
WorkflowMeta(元类):通过 Python 元类机制自动收集工作流中定义的所有步骤函数,存储在 _step_functions 字典中。
Workflow 基类:提供工作流的基础功能,包括:
- 超时控制(
timeout) - 事件图验证
- 并发运行限制
- 资源管理器注入
Event 事件系统:所有事件都继承自 Event 基类,它是一个轻量级、可序列化的数据载体,支持属性访问和字典访问两种语义。
class Event(DictLikeModel):
"""所有工作流事件的基类"""
pass
class StartEvent(Event):
"""工作流启动事件"""
pass
class StopEvent(Event):
"""工作流终止事件,携带最终结果"""
result: Any2. Agent 事件体系
Agent Workflow 定义了一套专门的事件类型,用于在 Agent 执行过程中传递状态和数据。
2.1 核心事件类型
| 事件类型 | 语义 | 数据内容 |
|---|---|---|
AgentWorkflowStartEvent | 工作流启动 | 用户消息、聊天历史、内存配置、最大迭代次数 |
AgentInput | LLM 输入准备完成 | 消息列表、当前代理名称 |
AgentSetup | 代理设置完成 | 格式化后的消息列表、当前代理名称 |
AgentOutput | LLM 输出 | 响应消息、工具调用列表、结构化响应、原始输出 |
AgentStream | 流式输出 | 增量文本、完整响应、工具调用、思考过程 |
ToolCall | 工具调用请求 | 工具名称、参数、调用 ID |
ToolCallResult | 工具执行结果 | 工具输出、是否直接返回 |
2.2 事件流转示例
# AgentWorkflowStartEvent 携带的数据
start_event = AgentWorkflowStartEvent(
user_msg="请帮我查询天气",
chat_history=[],
max_iterations=20
)
# AgentOutput 携带的数据结构
agent_output = AgentOutput(
response=ChatMessage(role="assistant", content="..."),
tool_calls=[
ToolSelection(
tool_id="uuid-xxx",
tool_name="weather_query",
tool_kwargs={"city": "北京"}
)
],
current_agent_name="WeatherAgent"
)3. BaseWorkflowAgent 抽象基类
BaseWorkflowAgent 是所有 Agent 实现的基类,它同时继承自 Workflow、BaseModel 和 PromptMixin,融合了工作流编排、数据模型和提示词管理三种能力。
3.1 核心属性
class BaseWorkflowAgent(Workflow, BaseModel, PromptMixin):
name: str # 代理名称
description: str # 代理描述(多代理场景下用于路由)
system_prompt: Optional[str] # 系统提示词
tools: Optional[List[BaseTool]] # 可用工具列表
tool_retriever: Optional[ObjectRetriever] # 动态工具检索器
can_handoff_to: Optional[List[str]] # 可交接的代理列表
llm: LLM # 语言模型
initial_state: Dict[str, Any] # 初始状态
state_prompt: BasePromptTemplate # 状态提示模板
output_cls: Optional[Type[BaseModel]] # 结构化输出类型
streaming: bool # 是否流式输出3.2 抽象方法
每个具体的 Agent 实现必须覆写三个核心抽象方法:
@abstractmethod
async def take_step(
self, ctx: Context, llm_input: List[ChatMessage],
tools: Sequence[AsyncBaseTool], memory: BaseMemory
) -> AgentOutput:
"""执行单步推理"""
@abstractmethod
async def handle_tool_call_results(
self, ctx: Context, results: List[ToolCallResult], memory: BaseMemory
) -> None:
"""处理工具调用结果"""
@abstractmethod
async def finalize(
self, ctx: Context, output: AgentOutput, memory: BaseMemory
) -> AgentOutput:
"""完成代理执行,清理状态"""3.3 内置步骤方法
BaseWorkflowAgent 预定义了五个标准工作流步骤:
init_run:初始化工作流上下文,处理用户输入和聊天历史。
@step
async def init_run(self, ctx: Context, ev: AgentWorkflowStartEvent) -> AgentInput:
await self._init_context(ctx, ev)
# 处理用户消息和聊天历史
# 初始化内存
return AgentInput(input=input_messages, current_agent_name=self.name)setup_agent:准备 LLM 输入,注入系统提示词和状态信息。
@step
async def setup_agent(self, ctx: Context, ev: AgentInput) -> AgentSetup:
# 添加系统提示词
# 格式化状态信息到输入中
return AgentSetup(input=llm_input, current_agent_name=ev.current_agent_name)run_agent_step:调用具体 Agent 的 take_step 方法执行推理。
@step
async def run_agent_step(self, ctx: Context, ev: AgentSetup) -> AgentOutput:
agent_output = await self.take_step(ctx, ev.input, tools, memory)
ctx.write_event_to_stream(agent_output)
return agent_outputparse_agent_output:解析 Agent 输出,决定是继续工具调用还是返回结果。
@step
async def parse_agent_output(self, ctx: Context, ev: AgentOutput
) -> Union[StopEvent, AgentInput, ToolCall, None]:
if not ev.tool_calls:
return StopEvent(result=output)
for tool_call in ev.tool_calls:
ctx.send_event(ToolCall(...))
return Nonecall_tool & aggregate_tool_results:执行工具调用并聚合结果。
4. 具体 Agent 实现
LlamaIndex 提供了三种内置的 Agent 实现,它们采用不同的推理策略。
4.1 FunctionAgent(函数调用代理)
FunctionAgent 利用 LLM 的原生函数调用能力(Function Calling)来决定工具使用。它适用于支持工具调用 API 的模型(如 OpenAI GPT-4、Claude 等)。
核心特点:
- 使用
llm.achat_with_tools()进行推理 - 支持并行工具调用(
allow_parallel_tool_calls) - 使用 scratchpad 维护中间对话状态
class FunctionAgent(BaseWorkflowAgent):
scratchpad_key: str = "scratchpad"
initial_tool_choice: Optional[str] = None
allow_parallel_tool_calls: bool = True
async def take_step(self, ctx, llm_input, tools, memory) -> AgentOutput:
# 从 scratchpad 获取历史消息
scratchpad = await ctx.store.get(self.scratchpad_key, default=[])
current_llm_input = [*llm_input, *scratchpad]
# 调用 LLM 的工具调用 API
response = await self.llm.achat_with_tools(
chat_history=current_llm_input,
tools=tools,
allow_parallel_tool_calls=self.allow_parallel_tool_calls
)
# 解析工具调用
tool_calls = self.llm.get_tool_calls_from_response(response)
return AgentOutput(response=response.message, tool_calls=tool_calls)4.2 ReActAgent(推理行动代理)
ReActAgent 实现了经典的 ReAct(Reasoning + Acting)范式,通过显式的思考-行动-观察循环进行推理。它不依赖 LLM 的原生工具调用能力,而是通过提示词工程实现。
核心特点:
- 使用
ReActChatFormatter格式化输入 - 使用
ReActOutputParser解析输出 - 维护推理步骤链(
current_reasoning)
class ReActAgent(BaseWorkflowAgent):
reasoning_key: str = "current_reasoning"
output_parser: ReActOutputParser
formatter: ReActChatFormatter
async def take_step(self, ctx, llm_input, tools, memory) -> AgentOutput:
# 格式化 ReAct 风格的输入
current_reasoning = await ctx.store.get(self.reasoning_key, default=[])
input_chat = self.formatter.format(tools, chat_history=llm_input,
current_reasoning=current_reasoning)
# 普通 LLM 调用
response = await self.llm.achat(input_chat)
# 解析 Thought/Action/Action Input 格式
reasoning_step = self.output_parser.parse(response.message.content)
if reasoning_step.is_done:
return AgentOutput(response=response.message)
# 构建工具调用
tool_calls = [ToolSelection(
tool_name=reasoning_step.action,
tool_kwargs=reasoning_step.action_input
)]
return AgentOutput(response=response.message, tool_calls=tool_calls)ReAct 格式示例:
Thought: 我需要查询北京的天气
Action: weather_query
Action Input: {"city": "北京"}4.3 CodeActAgent
CodeActAgent 是一种更高级的代理,它通过生成和执行代码来完成任务,而不是简单的工具调用。
5. 多代理工作流
AgentWorkflow 支持多个代理之间的协作和交接(Handoff),实现复杂的多代理系统。
5.1 交接机制
多代理场景下,每个代理会自动获得一个 handoff 工具,用于将控制权转移给其他代理:
async def handoff(ctx: Context, to_agent: str, reason: str) -> str:
"""将聊天控制权交给指定代理"""
await ctx.store.set("next_agent", to_agent)
return f"Agent {to_agent} is now handling the request..."5.2 配置示例
from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent
# 定义多个专业代理
researcher = FunctionAgent(
name="Researcher",
description="专门负责信息检索和研究",
tools=[search_tool, wiki_tool],
can_handoff_to=["Writer"]
)
writer = FunctionAgent(
name="Writer",
description="专门负责内容创作和编辑",
tools=[write_tool, edit_tool],
can_handoff_to=["Researcher"]
)
# 创建多代理工作流
workflow = AgentWorkflow(
agents=[researcher, writer],
root_agent="Researcher",
initial_state={"task": "写一篇关于 AI 的文章"}
)
# 运行工作流
result = await workflow.run(user_msg="请帮我写一篇关于 AI 的文章")5.3 执行流程
6. Context 上下文管理
Context 是每次工作流运行的全局上下文,提供了状态存储、事件传递和流式输出等功能。
6.1 核心功能
class Context:
# 状态存储 - 用于跨步骤共享数据
store: InMemoryStateStore
# 事件操作
def send_event(self, event: Event) -> None:
"""发送事件触发其他步骤"""
def write_event_to_stream(self, event: Event) -> None:
"""将事件写入输出流(用于实时反馈)"""
def collect_events(self, ev: Event, expected: List[Type]) -> List[Event]:
"""收集多个事件(用于聚合)"""
# 序列化支持
def to_dict(self) -> dict:
"""序列化上下文"""
@classmethod
def from_dict(cls, workflow, data: dict) -> Context:
"""反序列化上下文"""6.2 状态管理示例
@step
async def my_step(self, ctx: Context, ev: SomeEvent) -> AnotherEvent:
# 读取状态
counter = await ctx.store.get("counter", default=0)
# 更新状态
await ctx.store.set("counter", counter + 1)
# 读取内存
memory: BaseMemory = await ctx.store.get("memory")
messages = await memory.aget()
return AnotherEvent(...)7. 使用工作流实现 Agent 的步骤
基于以上分析,使用 Workflow 实现自定义 Agent 的完整步骤如下:
7.1 定义事件类型
from workflows.events import Event, StartEvent, StopEvent
class MyAgentStartEvent(StartEvent):
task: str
context: dict = {}
class MyAgentOutput(Event):
result: str
metadata: dict = {}7.2 继承 BaseWorkflowAgent
from llama_index.core.agent.workflow import BaseWorkflowAgent
class MyCustomAgent(BaseWorkflowAgent):
# 自定义配置
custom_config: str = "default"
async def take_step(self, ctx, llm_input, tools, memory) -> AgentOutput:
# 实现推理逻辑
response = await self.llm.achat(llm_input)
# 解析工具调用...
return AgentOutput(response=response.message, tool_calls=[...])
async def handle_tool_call_results(self, ctx, results, memory) -> None:
# 处理工具结果
for result in results:
await memory.aput(ChatMessage(
role="tool",
content=result.tool_output.content
))
async def finalize(self, ctx, output, memory) -> AgentOutput:
# 清理状态,返回最终结果
return output7.3 添加自定义步骤(可选)
from llama_index.core.workflow import step
class MyCustomAgent(BaseWorkflowAgent):
@step
async def custom_preprocessing(self, ctx: Context, ev: AgentInput) -> AgentSetup:
# 自定义预处理逻辑
enhanced_input = self._enhance_input(ev.input)
return AgentSetup(input=enhanced_input, current_agent_name=self.name)7.4 运行 Agent
agent = MyCustomAgent(
name="MyAgent",
llm=OpenAI(model="gpt-4"),
tools=[my_tool_1, my_tool_2],
system_prompt="You are a helpful assistant."
)
# 简单运行
result = await agent.run(user_msg="请帮我完成这个任务")
# 流式运行
handler = agent.run(user_msg="请帮我完成这个任务")
async for event in handler.stream_events():
if isinstance(event, AgentStream):
print(event.delta, end="", flush=True)
result = await handler8. 总结
LlamaIndex 的 Agent Workflow 架构提供了一种灵活、可扩展的方式来构建智能代理系统:
- 事件驱动:通过类型化的事件在步骤之间传递数据,实现松耦合
- 状态管理:Context 提供统一的状态存储,支持跨步骤共享和持久化
- 可扩展性:通过继承
BaseWorkflowAgent和添加自定义步骤,轻松实现定制化 Agent - 多代理协作:
AgentWorkflow支持多代理之间的交接和协作 - 流式输出:内置流式事件机制,支持实时反馈
理解这套架构后,开发者可以根据具体需求选择合适的 Agent 实现(FunctionAgent、ReActAgent),或者创建完全自定义的 Agent 来满足特殊场景的需要。