LlamaIndex Workflows 项目深入总结
1. 总体定位
LlamaIndex Workflows 是一个“事件驱动 + 异步并发 + 可恢复上下文 + 插件化运行时”的工作流编排框架,用于构建:LLM 多阶段推理链、检索与综合流水线、人机协同(Human-in-the-loop)流程、可重入/可暂停的复杂 Agent 系统。
框架特点:
- 强类型与签名验证:运行前静态检查事件输入/输出图,避免“悬空事件”与“未消费事件”。
- 真正事件驱动:控制循环(Control Loop)以“Tick”驱动状态转移,纯函数化 Reducer 保证可测试与可回放。
- 上下文持久化与恢复:
Context可序列化(包括状态、事件缓冲、等待点),支持跨运行延续与外部注入。 - 资源与重试策略原语内建:依赖注入 + 失败自恢复。
- HITL(Human-In-The-Loop)一等公民:
InputRequiredEvent与HumanResponseEvent显式建模。 - 插件化运行时:可替换调度/事件流实现,支持快照回放、分布式/外部执行适配。
2. 顶层使用者关注的原语
| 原语 | 文件 | 说明 | 关键职责 | 可扩展点 |
|---|---|---|---|---|
| Workflow | workflow.py | 编排器基类 | 收集步骤、运行、验证事件图 | 子类定义步骤;可注入资源与重试策略 |
| @step | decorators.py | 步骤声明装饰器 | 解析函数签名,封装 StepConfig | 支持自由函数 + 方法;扩展签名规则 |
| Event | events.py | 全部事件基类 | 轻量载荷 + 动态字段 + 可序列化 | 自定义领域事件;继承内置 Start/Stop/HITL |
| Context | context/context.py | 运行态上下文 | 状态存储 + 事件派发 + 恢复 | 支持自定义序列化、插件、更换状态模型 |
| WorkflowHandler | handler.py | 运行句柄 | 等待最终结果 + 流式事件 | 可扩展流式过滤策略(内部事件暴露) |
| Resource | resource.py | 依赖注入原语 | 工厂 + 缓存控制 | 自定义生命周期管理(连接池等) |
| RetryPolicy | retry_policy.py | 重试策略协议 | 判定下次重试间隔 | 定制指数退避/熔断策略 |
| Client | client/client.py | 远程调用 | HTTP + 事件流消费 | 支持自定义认证/传输层 |
| Serializable Event Envelope | protocol/serializable_events.py | 事件跨进程表示 | 元数据 + 回放反射 | 适配第三方传输格式(SSE/Kafka) |
3. 中层运行时原语(控制循环的“骨骼”)
| 原语 | 文件 | 角色 | 摘要 |
|---|---|---|---|
| WorkflowBroker | runtime/broker.py | 运行协调者 | 创建 handler,注册 run,上送事件到 runtime,暴露 collect/wait API |
| Control Loop Runner | runtime/control_loop.py | 主状态机驱动 | 以 Tick 驱动 _reduce_tick 纯函数,产出 Commands 并执行 |
| WorkflowPluginRegistry | runtime/workflow_registry.py | 注册表 | 保证每个 plugin 与 workflow 的注册唯一性;维护 run 上下文 |
| Tick* | runtime/types/ticks.py | 驱动输入 | StepResult / AddEvent / Timeout / Cancel / Publish 五类 Tick |
| Command* | runtime/types/commands.py | 纯函数输出 | 指令集合:RunWorker / QueueEvent / PublishEvent / CompleteRun / Fail / Halt |
| BrokerState | runtime/types/internal_state.py | 全局可还原状态 | 步骤 worker 池、事件队列、收集缓冲、等待者(waiters)、运行标记 |
| InternalStepWorkerState | 同上 | 每步运行态 | 维护队列 + in_progress + 收集事件 + 等待条件 |
| InProgressState | 同上 | 单个执行单元快照 | 捕获启动时共享状态,支持乐观重试一致性策略 |
| EventAttempt | 同上 | 事件重试上下文 | 记录 attempts 与首失败时间,用于重试策略判定 |
| StepWorkerFunction | runtime/types/step_function.py | 包装用户 @step | 设置 ContextVar + 资源注入 + 结果归一化 |
| StepFunctionResult 枚举族 | runtime/types/results.py | 工作器产物 | Result / Failed / Add/DeleteCollectedEvent / Add/DeleteWaiter |
| StepStateChanged | events.py | 内部进度事件 | 暴露状态转换(PREPARING/RUNNING/NOT_RUNNING)供调试/观测 |
说明
这些“中层原语”将用户定义的业务函数与底层调度逻辑解耦,使得:测试可直接针对 _reduce_tick、状态可快照/回放、分布式运行时可替换 WorkflowRuntime 实现。
控制循环核心简化流程
Tick 与 Command 关系图
4. HITL(人类参与)原语模型
| 事件 | 方向 | 说明 | 验证规则特殊性 |
|---|---|---|---|
| InputRequiredEvent | 步骤 → 外部 | 步骤暂停并向调用方发出需要用户输入的信号 | 可被视为“未消费产出”但不触发验证错误 |
| HumanResponseEvent | 外部 → 步骤 | 用户/操作者反馈数据 | 允许出现在“只消费不产出”集合中 |
典型流:
5. Context 与状态存储
Context 初始化时会:
- 收集所有步骤的
context_state_type泛型约束(若存在冲突抛错)。 - 若传入历史快照则反序列化为
BrokerState + InMemoryStateStore。 - 每次
workflow.run()时创建新的WorkflowBroker(旧 broker 如存在先 shutdown)。
状态存储:InMemoryStateStore 提供:
get_state()/set_state()原子替换模型- 事务编辑:
async with store.edit_state(): state.xxx += 1 - 路径访问:
await store.get("user.profile.name")/await store.set("user.profile.name", "Ada") - 支持
DictState(动态键值)与自定义 Pydantic 模型 - 自定义序列化策略:
JsonSerializer/PickleSerializer
序列化上下文:
6. 资源注入与执行环境隔离
步骤签名通过 typing.Annotated[T, Resource(factory, cache=...)] 声明依赖:
- 执行前收集资源列表 -> 调用
ResourceManager.get() cache=False时每次执行都重新构造(适合一次性连接)- 可扩展:将 Resource 工厂替换为连接池、远程客户端等
7. 重试机制
RetryPolicy.next(elapsed_time, attempts, error) -> delay | None
- 返回 delay:重新排队事件(保持原首次时间戳,累积 attempts)
- 返回 None:视为失败 -> 发布
StopEvent()+ 抛CommandFailWorkflow ConstantDelayRetryPolicy:固定次数 + 固定间隔;可自定义指数退避或随机抖动策略。
8. 事件序列化协议(跨进程/网络)
文件:protocol/serializable_events.py
- 写出:
EventEnvelope.from_event(event)—— - 带元数据回放:
EventEnvelopeWithMetadata.from_event(event)—— 增加types(继承链)与qualified_name - 读取:
EventEnvelope.parse(client_data, registry)—— 优先注册表匹配;否则模块反射;失败聚合错误信息 - 用途:HTTP / SSE / 存储回放 / 外部系统集成(Kafka 等)
9. 运行时内部状态快照与回放
快照来源:WorkflowRuntime 若实现 as_snapshottable(plugin),即可 replay() Tick 序列再用 rebuild_state_from_ticks 重建 BrokerState。
优势:
- 调试:无需逐步调试协程,只看 Tick 序列及对应 Commands。
- 迁移:状态跨进程移动 / 分布式恢复。
- 审计:完整执行轨迹(包含内部事件
StepStateChanged)。
10. 事件图验证(Workflow._validate())
检查点:
- 存在唯一
StartEvent输入类型。 - 存在唯一
StopEvent输出类型。 - 无步骤直接消费
StopEvent(只能由终结逻辑_done接受)。 - 所有消费事件(除 HITL 或 Stop)均在产生集合;所有产出事件(除 Stop/HITL)均被消费。违反即抛
WorkflowValidationError。 - 返回是否使用 HITL(InputRequiredEvent 产生 或 HumanResponseEvent 消费)。
11. 原语关系总览
12. 示例(加强版,涵盖:收集事件 + 等待人类)
from typing import Annotated
from pydantic import BaseModel
from workflows import Workflow, step, Context
from workflows.events import StartEvent, StopEvent, InputRequiredEvent, HumanResponseEvent, Event
from workflows.resource import Resource
class MyState(BaseModel):
greetings: list[str] = []
def get_time():
import time
return time.time()
class GreetEvent(Event):
name: str
class CollectWorkflow(Workflow):
@step
async def start(self, ctx: Context[MyState], ev: StartEvent) -> InputRequiredEvent:
return InputRequiredEvent(prefix="请输入你的姓名: ")
@step
async def human(self, ctx: Context[MyState], ev: HumanResponseEvent, ts: Annotated[float, Resource(get_time)]) -> GreetEvent:
async with ctx.store.edit_state() as st:
st.greetings.append(ev.response)
ctx.write_event_to_stream(ev)
return GreetEvent(name=ev.response)
@step
async def done(self, ctx: Context[MyState], ev: GreetEvent) -> StopEvent:
async with ctx.store.edit_state() as st:
return StopEvent(result=st.greetings)
# 调用方伪代码:
# handler = CollectWorkflow().run()
# async for ev in handler.stream_events():
# if isinstance(ev, InputRequiredEvent):
# handler.ctx.send_event(HumanResponseEvent(response="Alice"))
# result = await handler13. 开发/测试速览
# 安装依赖
uv sync
# 运行全部测试
uv run pytest -q
# 聚焦运行时控制循环测试
uv run pytest tests/runtime/test_control_loop.py -vv
# 查看覆盖率
uv run pytest --cov=src/workflows --cov-report=term-missing14. 设计关键取舍与优势
| 方面 | 取舍 | 优势 | 可能的扩展 |
|---|---|---|---|
| Reducer + Tick | 将调度逻辑拆为纯函数 | 易测试/回放/审计 | 事件溯源与时间旅行调试工具 |
| Context 可序列化 | 强约束快照格式 | 支持跨运行/跨进程迁移 | 引入增量快照与压缩策略 |
| 资源注入 | 通过签名反射实现 | 用户无需手写装配代码 | 生命周期 hooks(on_create/on_close) |
| 重试策略协议 | 简单 next() 接口 | 可快速定制各种策略 | 插入可观测统计(失败率、退避次数) |
| HITL 事件特殊豁免 | 验证逻辑更复杂 | 明确人类交互节点 | 自动生成 UI 表单/提示模板 |
| 内部事件 StepStateChanged | 泄露调度细节 | 调试与监控更直观 | 与分布式追踪整合(trace span enrich) |
15. 常见调试 & 排错建议
| 症状 | 可能原因 | 排查路径 |
|---|---|---|
| WorkflowValidationError: produced/consumed 不匹配 | 某步骤事件注解遗漏或返回 None 过多 | 检查步骤返回类型 Union;确认未丢弃事件 |
| StopEvent 未出现超时退出 | 步骤逻辑未触发终止分支 | 增加日志或返回内部 StepStateChanged 观察活动步 |
| 重试不生效 | 未配置 RetryPolicy 或 attempts 超限 | 查看 retry_policy 实例参数 |
| wait_for_event 卡住 | 未发送 HumanResponseEvent 或 requirements 不匹配 | 检查 waiter_id 与 requirements 键值 |
| 状态恢复失败 | 自定义状态模型缺省字段或类型不兼容 | 确认 Pydantic 模型默认值;对比序列化快照结构 |
16. 全局原语速查表
| 分类 | 名称 | 作用一句话 | 关键文件 |
|---|---|---|---|
| 事件 | StartEvent / StopEvent | 入口 / 终止 | events.py |
| 事件 | InputRequiredEvent / HumanResponseEvent | 人机交互 | events.py |
| 事件 | InternalDispatchEvent / StepStateChanged | 内部调度可视化 | events.py |
| 编排 | Workflow / WorkflowMeta | 步骤收集 + 验证 + 启动 | workflow.py |
| 步骤 | @step / StepConfig / StepFunction | 签名解析 + 运行配置 | decorators.py |
| 运行 | WorkflowBroker | 启动/桥接 runtime 与 handler | runtime/broker.py |
| 运行 | Control Loop / _reduce_tick | 纯函数状态机 | runtime/control_loop.py |
| 运行 | Tick*, Command* | 状态变迁输入/输出 | runtime/types/ |
| 运行 | BrokerState / InternalStepWorkerState | 可快照运行态 | runtime/types/internal_state.py |
| 执行 | StepWorkerFunction | 封装用户逻辑并产出结果命令 | runtime/types/step_function.py |
| 结果 | StepFunctionResult 族 | 描述执行产出类型 | runtime/types/results.py |
| 上下文 | Context / InMemoryStateStore / DictState | 全局可恢复状态 + KV 操作 | context/ |
| 序列化 | BaseSerializer / JsonSerializer / PickleSerializer | 状态与事件值编码 | context/serializers.py |
| 注入 | Resource / ResourceManager | 依赖工厂与缓存 | resource.py |
| 重试 | RetryPolicy / ConstantDelayRetryPolicy | 失败恢复策略 | retry_policy.py |
| 协议 | EventEnvelope / EventEnvelopeWithMetadata | 跨进程事件结构 | protocol/serializable_events.py |
| 远程 | WorkflowClient | HTTP + 流式消费 | client/client.py |
17. 核心模块概览
本节总结 packages/llama-index-workflows/src/workflows 目录下的主要模块及其职责,帮助读者快速定位功能入口并理解系统分层。
17.1 核心编排层
| 文件 | 职责 |
|---|---|
__init__.py | 仅对外暴露 Workflow、Context 与 step,构成最小可用 API 面;py.typed 确保类型提示向下游分发 |
workflow.py | 事件驱动编排器,负责收集/验证步骤、推断 StartEvent 与 StopEvent、控制并发、创建 Context 并产出 WorkflowHandler,也是资源注入与运行期校验的核心 |
decorators.py | 实现 @step 装饰器、StepConfig、StepFunction 协议,通过反射签名推断事件、上下文、资源与重试策略,确保每个步骤都具备明确的输入输出 |
events.py | 定义事件体系,包括基础 Event、StartEvent、StopEvent、HITL 相关的 InputRequiredEvent/HumanResponseEvent,以及内部观测事件 InternalDispatchEvent/StepStateChanged |
handler.py | WorkflowHandler 继承 asyncio.Future,统一承载运行态:等待结果、流式消费事件、访问 Context、取消运行,并缓存最终 StopEvent |
errors.py | 集中声明所有自定义异常(配置/运行/取消/超时等),供运行时、客户端与服务器一致处理 |
types.py | 维护 StopEventT/RunResultT 类型别名,兼顾当前的向后兼容与未来以 StopEvent 为唯一返回值的规划 |
utils.py | 提供步骤签名解析 (inspect_signature)、校验 (validate_step_signature)、自由函数识别及 nanoid 生成等辅助工具 |
resource.py | 用 _Resource、ResourceDefinition 与 ResourceManager 抽象依赖注入,支持在步骤签名中通过 typing.Annotated 声明资源并控制缓存粒度 |
retry_policy.py | 定义重试策略协议与 ConstantDelayRetryPolicy,让步骤能够基于尝试次数/时间决定是否继续 |
17.2 上下文与状态子系统
| 文件 | 职责 |
|---|---|
context/__init__.py | 导出 Context 及序列化器族 |
context/context.py | Context 本体,串联 workflow、broker 与 runtime,负责状态存取、事件流、collect_events/wait_for_event、序列化快照、重放与并发控制 |
context/context_types.py | Pydantic 数据模型,定义序列化格式(V0/V1)、SerializedContext、SerializedStepWorkerState 等,并在加载时补齐重试次数与 Waiter 信息 |
context/state_store.py | 提供默认 DictState 与泛型 InMemoryStateStore,支持异步锁、路径式读写、事务编辑以及序列化/反序列化,确保全局状态类型安全 |
context/serializers.py | BaseSerializer 抽象以及 JSON 优先的 JsonSerializer、JSON→Pickle 兜底的 PickleSerializer,支持 Pydantic/LlamaIndex 组件的合规编码 |
context/utils.py | get_qualified_name 与 import_module_from_qualified_name,为跨进程序列化提供模块定位能力 |
17.3 协议与客户端
| 文件 | 职责 |
|---|---|
client/__init__.py | 暴露 WorkflowClient |
client/client.py | 基于 httpx 的异步客户端,覆盖健康检查、列出 workflow、运行(同步/异步)、获取/取消 handler、SSE 事件流以及发送事件,内部 _serialize_event 兼容事件实例或原始字典 |
protocol/__init__.py | 定义 HTTP API 的 Pydantic 模型(HandlerData、HandlersListResponse、WorkflowGraphResponse 等)与 Status literal,保障 server/client 一致 |
protocol/serializable_events.py | 封装 EventEnvelope/EventEnvelopeWithMetadata,负责事件载荷标准化、注册表查找、Legacy 兼容及反序列化校验 |
17.4 运行时与插件体系
| 文件 | 职责 |
|---|---|
plugins/basic.py | 默认插件 basic_runtime,AsyncioWorkflowRuntime 以单进程邮件箱实现 send_event、事件流、快照回放等能力 |
runtime/broker.py | WorkflowBroker 是调度中枢,连接 workflow、context 与 runtime,负责启动 handler、调度 step worker、写事件流、收集 waiters/资源,并暴露 running_steps 等调试接口 |
runtime/control_loop.py | 事件溯源 reducer,_ControlLoopRunner 监听 ticks,通过 _reduce_tick 产出 WorkflowCommand,调度 step 协程并处理 timeout/cancel/StopEvent 发布;control_loop() 是 runtime 入口,rebuild_state_from_ticks() 用于快照重建 |
runtime/workflow_registry.py | WorkflowPluginRegistry 使用弱引用 (IdentityWeakKeyDict) 确保"每个 workflow 在每个插件内只注册一次",并维护 run_id → RegisteredRunContext |
runtime/types/commands.py | 定义控制循环命令(CommandRunWorker、CommandQueueEvent、CommandCompleteRun 等)及其退出条件 |
runtime/types/internal_state.py | 以 BrokerState/InternalStepConfig/InternalStepWorkerState 描述全部运行态,可序列化到 SerializedContext 并从中恢复 |
runtime/types/plugin.py | 描述插件与 runtime 所需接口(发送 tick、写事件流、sleep/replay 等),并提供 SnapshottableRuntime/as_snapshottable 帮助判断运行时是否可回放 |
runtime/types/results.py | 定义传入 step 的 StepWorkerContext、StepWorkerState、StepWorkerWaiter、Returns 以及 step 执行返回的多种结果(成功、失败、添加/删除 waiter、收集事件) |
runtime/types/step_function.py | as_step_worker_function 包装用户步骤,注入上下文、资源、调度同步函数进 executor,捕获 WaitingForEvent,并将结果写回 Returns |
runtime/types/ticks.py | 列举驱动控制循环的 tick 类型:步结果、外部事件、取消、发布、超时 |
runtime/types/_identity_weak_ref.py | 提供基于对象身份的弱引用字典,供注册表清理 workflow-插件映射 |
17.5 服务器与持久化
| 文件 | 职责 |
|---|---|
server/__init__.py | 导出 WorkflowServer、AbstractWorkflowStore、SqliteWorkflowStore |
server/server.py | 基于 Starlette/uvicorn 的 HTTP 服务,注册 run/run-nowait、事件流、schema、图结构、handler 管理等路由,管理 handler/Context 生命周期并支持断电恢复 |
server/abstract_workflow_store.py | 定义持久化接口、HandlerQuery 与 PersistentHandler(含 StopEvent 序列化策略) |
server/memory_workflow_store.py | 简单的内存实现,基于 dict + 筛选谓词 |
server/sqlite/sqlite_workflow_store.py | SQLite 后端,负责迁移(migrate.py)、条件查询、UPSERT 与 JSON 化上下文;server/sqlite/migrations/ 存放 SQL 脚本 |
server/representation_utils.py | 解析 workflow 步骤/事件关系,生成节点/边和 HITL 场景中的 external_step,供前端可视化使用 |
server/static/ | 前端 UI 静态资源,便于查看 workflow 状态与事件流 |
server/__main__.py | CLI 入口,动态加载用户脚本中的 WorkflowServer,并根据环境变量启动 uvicorn |
17.6 测试与辅助工具
| 文件 | 职责 |
|---|---|
testing/runner.py | WorkflowTestRunner 可以运行 workflow、收集流式事件、统计事件类型并返回结果上下文,便于在 pytest 中做端到端断言 |
testing/__init__.py | 输出测试 runner,保持 API 简洁 |
17.7 整体体系结构
LlamaIndex Workflows 以 Workflow/Context/Runtime 三层分离的方式构建:
- API 层(workflow + decorators + events):聚焦开发体验,定义步骤语义和事件模型。
- 运行态(context + runtime + broker):负责状态存储、事件路由、并行调度与可重放控制循环。
- 外围生态(protocol/client/server/testing):提供远程调用、服务端托管、可视化与测试工具。
这种分层保证了:
- 编排代码仍以 Python 协程编写,但运行期可以替换为不同插件(例如 Durable 托管)。
- 上下文、状态与事件序列化具备稳定格式,可跨进程/跨机器迁移。
- 服务器、客户端与测试工具共用统一协议,减少调试与集成成本。
18. 总结
LlamaIndex Workflows 将"用户可见的步骤与事件"与"内部纯函数化调度状态机"分层抽象:
- 顶层开发者只需思考:事件类型 + 步骤逻辑 + 状态模型 +(可选)资源与重试策略。
- 中层运行时提供:可回放、可验证、可替换、可观测的执行语义。
- 底层原语(Tick / Command / BrokerState)保证调度透明与可调试性,为未来:分布式执行、断点续跑、执行图可视化、时序审计 打下稳定基础。
通过本文件的原语清单与关系图,读者可快速定位:扩展点、调试入口、以及二次封装(如:DSL、图形化编排、远程执行代理)的位置。
展望
下一步可演进方向:事件持久化队列接入(Kafka / Redis Stream)、分布式 StepWorker 调度、执行轨迹 Web 可视化、对接 Langfuse / Phoenix 全链路追踪等。