Skip to content

LlamaIndex Workflows 项目深入总结

1. 总体定位

LlamaIndex Workflows 是一个“事件驱动 + 异步并发 + 可恢复上下文 + 插件化运行时”的工作流编排框架,用于构建:LLM 多阶段推理链、检索与综合流水线、人机协同(Human-in-the-loop)流程、可重入/可暂停的复杂 Agent 系统。

框架特点:

  1. 强类型与签名验证:运行前静态检查事件输入/输出图,避免“悬空事件”与“未消费事件”。
  2. 真正事件驱动:控制循环(Control Loop)以“Tick”驱动状态转移,纯函数化 Reducer 保证可测试与可回放。
  3. 上下文持久化与恢复:Context 可序列化(包括状态、事件缓冲、等待点),支持跨运行延续与外部注入。
  4. 资源与重试策略原语内建:依赖注入 + 失败自恢复。
  5. HITL(Human-In-The-Loop)一等公民:InputRequiredEventHumanResponseEvent 显式建模。
  6. 插件化运行时:可替换调度/事件流实现,支持快照回放、分布式/外部执行适配。

2. 顶层使用者关注的原语

原语文件说明关键职责可扩展点
Workflowworkflow.py编排器基类收集步骤、运行、验证事件图子类定义步骤;可注入资源与重试策略
@stepdecorators.py步骤声明装饰器解析函数签名,封装 StepConfig支持自由函数 + 方法;扩展签名规则
Eventevents.py全部事件基类轻量载荷 + 动态字段 + 可序列化自定义领域事件;继承内置 Start/Stop/HITL
Contextcontext/context.py运行态上下文状态存储 + 事件派发 + 恢复支持自定义序列化、插件、更换状态模型
WorkflowHandlerhandler.py运行句柄等待最终结果 + 流式事件可扩展流式过滤策略(内部事件暴露)
Resourceresource.py依赖注入原语工厂 + 缓存控制自定义生命周期管理(连接池等)
RetryPolicyretry_policy.py重试策略协议判定下次重试间隔定制指数退避/熔断策略
Clientclient/client.py远程调用HTTP + 事件流消费支持自定义认证/传输层
Serializable Event Envelopeprotocol/serializable_events.py事件跨进程表示元数据 + 回放反射适配第三方传输格式(SSE/Kafka)

3. 中层运行时原语(控制循环的“骨骼”)

原语文件角色摘要
WorkflowBrokerruntime/broker.py运行协调者创建 handler,注册 run,上送事件到 runtime,暴露 collect/wait API
Control Loop Runnerruntime/control_loop.py主状态机驱动以 Tick 驱动 _reduce_tick 纯函数,产出 Commands 并执行
WorkflowPluginRegistryruntime/workflow_registry.py注册表保证每个 plugin 与 workflow 的注册唯一性;维护 run 上下文
Tick*runtime/types/ticks.py驱动输入StepResult / AddEvent / Timeout / Cancel / Publish 五类 Tick
Command*runtime/types/commands.py纯函数输出指令集合:RunWorker / QueueEvent / PublishEvent / CompleteRun / Fail / Halt
BrokerStateruntime/types/internal_state.py全局可还原状态步骤 worker 池、事件队列、收集缓冲、等待者(waiters)、运行标记
InternalStepWorkerState同上每步运行态维护队列 + in_progress + 收集事件 + 等待条件
InProgressState同上单个执行单元快照捕获启动时共享状态,支持乐观重试一致性策略
EventAttempt同上事件重试上下文记录 attempts 与首失败时间,用于重试策略判定
StepWorkerFunctionruntime/types/step_function.py包装用户 @step设置 ContextVar + 资源注入 + 结果归一化
StepFunctionResult 枚举族runtime/types/results.py工作器产物Result / Failed / Add/DeleteCollectedEvent / Add/DeleteWaiter
StepStateChangedevents.py内部进度事件暴露状态转换(PREPARING/RUNNING/NOT_RUNNING)供调试/观测

说明

这些“中层原语”将用户定义的业务函数与底层调度逻辑解耦,使得:测试可直接针对 _reduce_tick、状态可快照/回放、分布式运行时可替换 WorkflowRuntime 实现。

控制循环核心简化流程

Tick 与 Command 关系图

4. HITL(人类参与)原语模型

事件方向说明验证规则特殊性
InputRequiredEvent步骤 → 外部步骤暂停并向调用方发出需要用户输入的信号可被视为“未消费产出”但不触发验证错误
HumanResponseEvent外部 → 步骤用户/操作者反馈数据允许出现在“只消费不产出”集合中

典型流:

5. Context 与状态存储

Context 初始化时会:

  1. 收集所有步骤的 context_state_type 泛型约束(若存在冲突抛错)。
  2. 若传入历史快照则反序列化为 BrokerState + InMemoryStateStore
  3. 每次 workflow.run() 时创建新的 WorkflowBroker(旧 broker 如存在先 shutdown)。

状态存储:InMemoryStateStore 提供:

  • get_state()/set_state() 原子替换模型
  • 事务编辑:async with store.edit_state(): state.xxx += 1
  • 路径访问:await store.get("user.profile.name") / await store.set("user.profile.name", "Ada")
  • 支持 DictState(动态键值)与自定义 Pydantic 模型
  • 自定义序列化策略:JsonSerializer / PickleSerializer

序列化上下文:

6. 资源注入与执行环境隔离

步骤签名通过 typing.Annotated[T, Resource(factory, cache=...)] 声明依赖:

  • 执行前收集资源列表 -> 调用 ResourceManager.get()
  • cache=False 时每次执行都重新构造(适合一次性连接)
  • 可扩展:将 Resource 工厂替换为连接池、远程客户端等

7. 重试机制

RetryPolicy.next(elapsed_time, attempts, error) -> delay | None

  • 返回 delay:重新排队事件(保持原首次时间戳,累积 attempts)
  • 返回 None:视为失败 -> 发布 StopEvent() + 抛 CommandFailWorkflow
  • ConstantDelayRetryPolicy:固定次数 + 固定间隔;可自定义指数退避或随机抖动策略。

8. 事件序列化协议(跨进程/网络)

文件:protocol/serializable_events.py

  • 写出EventEnvelope.from_event(event) ——
  • 带元数据回放EventEnvelopeWithMetadata.from_event(event) —— 增加 types(继承链)与 qualified_name
  • 读取EventEnvelope.parse(client_data, registry) —— 优先注册表匹配;否则模块反射;失败聚合错误信息
  • 用途:HTTP / SSE / 存储回放 / 外部系统集成(Kafka 等)

9. 运行时内部状态快照与回放

快照来源:WorkflowRuntime 若实现 as_snapshottable(plugin),即可 replay() Tick 序列再用 rebuild_state_from_ticks 重建 BrokerState

优势:

  • 调试:无需逐步调试协程,只看 Tick 序列及对应 Commands。
  • 迁移:状态跨进程移动 / 分布式恢复。
  • 审计:完整执行轨迹(包含内部事件 StepStateChanged)。

10. 事件图验证(Workflow._validate()

检查点:

  1. 存在唯一 StartEvent 输入类型。
  2. 存在唯一 StopEvent 输出类型。
  3. 无步骤直接消费 StopEvent(只能由终结逻辑 _done 接受)。
  4. 所有消费事件(除 HITL 或 Stop)均在产生集合;所有产出事件(除 Stop/HITL)均被消费。违反即抛 WorkflowValidationError
  5. 返回是否使用 HITL(InputRequiredEvent 产生 或 HumanResponseEvent 消费)。

11. 原语关系总览


12. 示例(加强版,涵盖:收集事件 + 等待人类)

python
from typing import Annotated
from pydantic import BaseModel
from workflows import Workflow, step, Context
from workflows.events import StartEvent, StopEvent, InputRequiredEvent, HumanResponseEvent, Event
from workflows.resource import Resource

class MyState(BaseModel):
    greetings: list[str] = []

def get_time():
    import time
    return time.time()

class GreetEvent(Event):
    name: str

class CollectWorkflow(Workflow):
    @step
    async def start(self, ctx: Context[MyState], ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="请输入你的姓名: ")

    @step
    async def human(self, ctx: Context[MyState], ev: HumanResponseEvent, ts: Annotated[float, Resource(get_time)]) -> GreetEvent:
        async with ctx.store.edit_state() as st:
            st.greetings.append(ev.response)
        ctx.write_event_to_stream(ev)
        return GreetEvent(name=ev.response)

    @step
    async def done(self, ctx: Context[MyState], ev: GreetEvent) -> StopEvent:
        async with ctx.store.edit_state() as st:
            return StopEvent(result=st.greetings)

# 调用方伪代码:
# handler = CollectWorkflow().run()
# async for ev in handler.stream_events():
#     if isinstance(ev, InputRequiredEvent):
#         handler.ctx.send_event(HumanResponseEvent(response="Alice"))
# result = await handler

13. 开发/测试速览

bash
# 安装依赖
uv sync

# 运行全部测试
uv run pytest -q

# 聚焦运行时控制循环测试
uv run pytest tests/runtime/test_control_loop.py -vv

# 查看覆盖率
uv run pytest --cov=src/workflows --cov-report=term-missing

14. 设计关键取舍与优势

方面取舍优势可能的扩展
Reducer + Tick将调度逻辑拆为纯函数易测试/回放/审计事件溯源与时间旅行调试工具
Context 可序列化强约束快照格式支持跨运行/跨进程迁移引入增量快照与压缩策略
资源注入通过签名反射实现用户无需手写装配代码生命周期 hooks(on_create/on_close)
重试策略协议简单 next() 接口可快速定制各种策略插入可观测统计(失败率、退避次数)
HITL 事件特殊豁免验证逻辑更复杂明确人类交互节点自动生成 UI 表单/提示模板
内部事件 StepStateChanged泄露调度细节调试与监控更直观与分布式追踪整合(trace span enrich)

15. 常见调试 & 排错建议

症状可能原因排查路径
WorkflowValidationError: produced/consumed 不匹配某步骤事件注解遗漏或返回 None 过多检查步骤返回类型 Union;确认未丢弃事件
StopEvent 未出现超时退出步骤逻辑未触发终止分支增加日志或返回内部 StepStateChanged 观察活动步
重试不生效未配置 RetryPolicy 或 attempts 超限查看 retry_policy 实例参数
wait_for_event 卡住未发送 HumanResponseEvent 或 requirements 不匹配检查 waiter_idrequirements 键值
状态恢复失败自定义状态模型缺省字段或类型不兼容确认 Pydantic 模型默认值;对比序列化快照结构

16. 全局原语速查表

分类名称作用一句话关键文件
事件StartEvent / StopEvent入口 / 终止events.py
事件InputRequiredEvent / HumanResponseEvent人机交互events.py
事件InternalDispatchEvent / StepStateChanged内部调度可视化events.py
编排Workflow / WorkflowMeta步骤收集 + 验证 + 启动workflow.py
步骤@step / StepConfig / StepFunction签名解析 + 运行配置decorators.py
运行WorkflowBroker启动/桥接 runtime 与 handlerruntime/broker.py
运行Control Loop / _reduce_tick纯函数状态机runtime/control_loop.py
运行Tick*, Command*状态变迁输入/输出runtime/types/
运行BrokerState / InternalStepWorkerState可快照运行态runtime/types/internal_state.py
执行StepWorkerFunction封装用户逻辑并产出结果命令runtime/types/step_function.py
结果StepFunctionResult 族描述执行产出类型runtime/types/results.py
上下文Context / InMemoryStateStore / DictState全局可恢复状态 + KV 操作context/
序列化BaseSerializer / JsonSerializer / PickleSerializer状态与事件值编码context/serializers.py
注入Resource / ResourceManager依赖工厂与缓存resource.py
重试RetryPolicy / ConstantDelayRetryPolicy失败恢复策略retry_policy.py
协议EventEnvelope / EventEnvelopeWithMetadata跨进程事件结构protocol/serializable_events.py
远程WorkflowClientHTTP + 流式消费client/client.py

17. 核心模块概览

本节总结 packages/llama-index-workflows/src/workflows 目录下的主要模块及其职责,帮助读者快速定位功能入口并理解系统分层。

17.1 核心编排层

文件职责
__init__.py仅对外暴露 WorkflowContextstep,构成最小可用 API 面;py.typed 确保类型提示向下游分发
workflow.py事件驱动编排器,负责收集/验证步骤、推断 StartEventStopEvent、控制并发、创建 Context 并产出 WorkflowHandler,也是资源注入与运行期校验的核心
decorators.py实现 @step 装饰器、StepConfigStepFunction 协议,通过反射签名推断事件、上下文、资源与重试策略,确保每个步骤都具备明确的输入输出
events.py定义事件体系,包括基础 EventStartEventStopEvent、HITL 相关的 InputRequiredEvent/HumanResponseEvent,以及内部观测事件 InternalDispatchEvent/StepStateChanged
handler.pyWorkflowHandler 继承 asyncio.Future,统一承载运行态:等待结果、流式消费事件、访问 Context、取消运行,并缓存最终 StopEvent
errors.py集中声明所有自定义异常(配置/运行/取消/超时等),供运行时、客户端与服务器一致处理
types.py维护 StopEventT/RunResultT 类型别名,兼顾当前的向后兼容与未来以 StopEvent 为唯一返回值的规划
utils.py提供步骤签名解析 (inspect_signature)、校验 (validate_step_signature)、自由函数识别及 nanoid 生成等辅助工具
resource.py_ResourceResourceDefinitionResourceManager 抽象依赖注入,支持在步骤签名中通过 typing.Annotated 声明资源并控制缓存粒度
retry_policy.py定义重试策略协议与 ConstantDelayRetryPolicy,让步骤能够基于尝试次数/时间决定是否继续

17.2 上下文与状态子系统

文件职责
context/__init__.py导出 Context 及序列化器族
context/context.pyContext 本体,串联 workflow、broker 与 runtime,负责状态存取、事件流、collect_events/wait_for_event、序列化快照、重放与并发控制
context/context_types.pyPydantic 数据模型,定义序列化格式(V0/V1)、SerializedContextSerializedStepWorkerState 等,并在加载时补齐重试次数与 Waiter 信息
context/state_store.py提供默认 DictState 与泛型 InMemoryStateStore,支持异步锁、路径式读写、事务编辑以及序列化/反序列化,确保全局状态类型安全
context/serializers.pyBaseSerializer 抽象以及 JSON 优先的 JsonSerializer、JSON→Pickle 兜底的 PickleSerializer,支持 Pydantic/LlamaIndex 组件的合规编码
context/utils.pyget_qualified_nameimport_module_from_qualified_name,为跨进程序列化提供模块定位能力

17.3 协议与客户端

文件职责
client/__init__.py暴露 WorkflowClient
client/client.py基于 httpx 的异步客户端,覆盖健康检查、列出 workflow、运行(同步/异步)、获取/取消 handler、SSE 事件流以及发送事件,内部 _serialize_event 兼容事件实例或原始字典
protocol/__init__.py定义 HTTP API 的 Pydantic 模型(HandlerDataHandlersListResponseWorkflowGraphResponse 等)与 Status literal,保障 server/client 一致
protocol/serializable_events.py封装 EventEnvelope/EventEnvelopeWithMetadata,负责事件载荷标准化、注册表查找、Legacy 兼容及反序列化校验

17.4 运行时与插件体系

文件职责
plugins/basic.py默认插件 basic_runtimeAsyncioWorkflowRuntime 以单进程邮件箱实现 send_event、事件流、快照回放等能力
runtime/broker.pyWorkflowBroker 是调度中枢,连接 workflow、context 与 runtime,负责启动 handler、调度 step worker、写事件流、收集 waiters/资源,并暴露 running_steps 等调试接口
runtime/control_loop.py事件溯源 reducer,_ControlLoopRunner 监听 ticks,通过 _reduce_tick 产出 WorkflowCommand,调度 step 协程并处理 timeout/cancel/StopEvent 发布;control_loop() 是 runtime 入口,rebuild_state_from_ticks() 用于快照重建
runtime/workflow_registry.pyWorkflowPluginRegistry 使用弱引用 (IdentityWeakKeyDict) 确保"每个 workflow 在每个插件内只注册一次",并维护 run_id → RegisteredRunContext
runtime/types/commands.py定义控制循环命令(CommandRunWorkerCommandQueueEventCommandCompleteRun 等)及其退出条件
runtime/types/internal_state.pyBrokerState/InternalStepConfig/InternalStepWorkerState 描述全部运行态,可序列化到 SerializedContext 并从中恢复
runtime/types/plugin.py描述插件与 runtime 所需接口(发送 tick、写事件流、sleep/replay 等),并提供 SnapshottableRuntime/as_snapshottable 帮助判断运行时是否可回放
runtime/types/results.py定义传入 step 的 StepWorkerContextStepWorkerStateStepWorkerWaiterReturns 以及 step 执行返回的多种结果(成功、失败、添加/删除 waiter、收集事件)
runtime/types/step_function.pyas_step_worker_function 包装用户步骤,注入上下文、资源、调度同步函数进 executor,捕获 WaitingForEvent,并将结果写回 Returns
runtime/types/ticks.py列举驱动控制循环的 tick 类型:步结果、外部事件、取消、发布、超时
runtime/types/_identity_weak_ref.py提供基于对象身份的弱引用字典,供注册表清理 workflow-插件映射

17.5 服务器与持久化

文件职责
server/__init__.py导出 WorkflowServerAbstractWorkflowStoreSqliteWorkflowStore
server/server.py基于 Starlette/uvicorn 的 HTTP 服务,注册 run/run-nowait、事件流、schema、图结构、handler 管理等路由,管理 handler/Context 生命周期并支持断电恢复
server/abstract_workflow_store.py定义持久化接口、HandlerQueryPersistentHandler(含 StopEvent 序列化策略)
server/memory_workflow_store.py简单的内存实现,基于 dict + 筛选谓词
server/sqlite/sqlite_workflow_store.pySQLite 后端,负责迁移(migrate.py)、条件查询、UPSERT 与 JSON 化上下文;server/sqlite/migrations/ 存放 SQL 脚本
server/representation_utils.py解析 workflow 步骤/事件关系,生成节点/边和 HITL 场景中的 external_step,供前端可视化使用
server/static/前端 UI 静态资源,便于查看 workflow 状态与事件流
server/__main__.pyCLI 入口,动态加载用户脚本中的 WorkflowServer,并根据环境变量启动 uvicorn

17.6 测试与辅助工具

文件职责
testing/runner.pyWorkflowTestRunner 可以运行 workflow、收集流式事件、统计事件类型并返回结果上下文,便于在 pytest 中做端到端断言
testing/__init__.py输出测试 runner,保持 API 简洁

17.7 整体体系结构

LlamaIndex Workflows 以 Workflow/Context/Runtime 三层分离的方式构建:

  1. API 层(workflow + decorators + events):聚焦开发体验,定义步骤语义和事件模型。
  2. 运行态(context + runtime + broker):负责状态存储、事件路由、并行调度与可重放控制循环。
  3. 外围生态(protocol/client/server/testing):提供远程调用、服务端托管、可视化与测试工具。

这种分层保证了:

  • 编排代码仍以 Python 协程编写,但运行期可以替换为不同插件(例如 Durable 托管)。
  • 上下文、状态与事件序列化具备稳定格式,可跨进程/跨机器迁移。
  • 服务器、客户端与测试工具共用统一协议,减少调试与集成成本。

18. 总结

LlamaIndex Workflows 将"用户可见的步骤与事件"与"内部纯函数化调度状态机"分层抽象:

  • 顶层开发者只需思考:事件类型 + 步骤逻辑 + 状态模型 +(可选)资源与重试策略。
  • 中层运行时提供:可回放、可验证、可替换、可观测的执行语义。
  • 底层原语(Tick / Command / BrokerState)保证调度透明与可调试性,为未来:分布式执行、断点续跑、执行图可视化、时序审计 打下稳定基础。

通过本文件的原语清单与关系图,读者可快速定位:扩展点、调试入口、以及二次封装(如:DSL、图形化编排、远程执行代理)的位置。

展望

下一步可演进方向:事件持久化队列接入(Kafka / Redis Stream)、分布式 StepWorker 调度、执行轨迹 Web 可视化、对接 Langfuse / Phoenix 全链路追踪等。