Skip to content

LlamaIndex Workflows 工作流框架入门到进阶

工作流(Workflow)是构建复杂 AI 应用的核心编排框架。本文将基于 LlamaIndex Workflows 项目,深入浅出地介绍工作流的概念、设计原理和实践技巧,从入门到高级用法,帮助你掌握这一强大的工具。

1 工作流框架概述

LlamaIndex Workflows 是一个用于编排和链接复杂多步骤系统的框架。它采用异步优先、事件驱动的架构,使得构建 AI 应用变得更加简洁和可维护。

1.1 适用场景

工作流框架特别适合以下场景:

  1. AI 智能体:创建能够推理、决策并跨多个步骤执行操作的智能系统
  2. 文档处理管道:构建用于摄取、分析、总结和路由文档的系统
  3. 多模型 AI 应用:协调不同 AI 模型(LLM、视觉模型等)解决复杂任务
  4. 研究助手:开发能够搜索、分析、综合信息并提供全面答案的工作流
  5. 内容生成系统:创建具有人工审核环节的内容生成、审核和发布管道
  6. 客户支持自动化:构建能够理解、分类和响应客户询问的智能路由系统

1.2 核心特性

工作流框架具有以下核心特性:

  1. 异步优先:工作流围绕 Python 的异步功能构建,步骤是异步函数,从 asyncio 队列处理传入事件并向其他队列发送新事件
  2. 事件驱动:工作流由步骤(Step)和事件(Event)组成,围绕事件和步骤组织代码使其更易于推理和测试
  3. 状态管理:每次工作流运行都是自包含的,可以在其中保存信息、序列化工作流状态并稍后恢复
  4. 可观测性:工作流自动支持可观测性,可以直接使用 Arize Phoenix 和 OpenTelemetry 等工具

2 基础概念

在深入代码之前,让我们先理解工作流框架的核心概念。

2.1 事件(Event)

事件是工作流中步骤之间传递的轻量级、可序列化的数据载体。所有事件都继承自 Event 基类:

python
from workflows.events import Event, StartEvent, StopEvent

class MyEvent(Event):
    """自定义事件"""
    message: str
    score: int

框架提供了几个内置事件类型:

事件类型说明
StartEvent工作流的入口事件,启动工作流时自动创建
StopEvent工作流的终止事件,返回此事件表示工作流完成
InputRequiredEvent人机交互事件,表示需要人工输入
HumanResponseEvent人工响应事件,携带用户的输入

2.2 步骤(Step)

步骤是使用 @step 装饰器标记的异步函数,是工作流的基本执行单元。每个步骤:

  1. 接收特定类型的事件作为输入
  2. 执行某些操作
  3. 返回一个或多个事件类型
python
from workflows import step

@step
async def process(self, ev: MyEvent) -> StopEvent:
    # 处理逻辑
    return StopEvent(result="完成")

2.3 上下文(Context)

上下文对象 Context 是工作流运行时的全局状态容器,提供:

  1. 状态存储:在步骤之间共享数据
  2. 事件发送:向其他步骤发送事件
  3. 事件收集:等待并收集多个事件
  4. 事件流写入:向外部观察者发送事件
python
from workflows import Context

@step
async def my_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
    # 使用状态存储
    await ctx.store.set("key", "value")
    value = await ctx.store.get("key")

    # 发送事件到流
    ctx.write_event_to_stream(ProgressEvent(progress=50))

    return StopEvent(result="完成")

2.4 工作流类图

下面的类图展示了工作流框架的核心组件关系:

3 入门示例:第一个工作流

让我们从一个简单的例子开始,创建一个基础的工作流。

3.1 安装

首先安装工作流框架:

bash
pip install llama-index-workflows

3.2 基础工作流

下面是一个完整的基础工作流示例,演示了如何定义事件、状态模型和工作流类,以及如何在多次运行间保持状态。

点击查看完整示例
py
"""
示例 1: 基础工作流
演示最简单的工作流定义和运行
"""

import asyncio

from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


# 1. 定义自定义事件
class ProcessEvent(Event):
    """处理事件,携带消息列表"""

    msg: list[str]


# 2. 定义状态模型(可选)
class RunState(BaseModel):
    """工作流运行状态"""

    num_runs: int = Field(default=0)


# 3. 定义工作流类
class MyWorkflow(Workflow):
    """一个简单的工作流示例"""

    @step
    async def start(self, ctx: Context[RunState], ev: StartEvent) -> ProcessEvent:
        """
        工作流入口步骤
        - 接收 StartEvent 作为输入
        - 返回自定义的 ProcessEvent
        """
        # 使用 ctx.store.edit_state() 安全地修改状态
        async with ctx.store.edit_state() as state:
            state.num_runs += 1
            return ProcessEvent(msg=[ev.input_msg] * state.num_runs)

    @step
    async def process(self, ctx: Context[RunState], ev: ProcessEvent) -> StopEvent:
        """
        处理步骤
        - 接收 ProcessEvent 作为输入
        - 返回 StopEvent 结束工作流
        """
        data_length = len("".join(ev.msg))
        new_msg = f"处理了 {len(ev.msg)} 次,数据长度: {data_length}"
        return StopEvent(result=new_msg)


async def main():
    # 创建工作流实例
    workflow = MyWorkflow()

    # 创建上下文对象(用于在多次运行间共享状态)
    ctx = Context(workflow)

    # 第一次运行
    result = await workflow.run(input_msg="Hello, world!", ctx=ctx)
    print("第一次运行结果:", result)

    # 第二次运行(状态会保持)
    result = await workflow.run(input_msg="Hello, world!", ctx=ctx)
    print("第二次运行结果:", result)


if __name__ == "__main__":
    asyncio.run(main())

运行结果:

text
第一次运行结果: 处理了 1 次,数据长度: 13
第二次运行结果: 处理了 2 次,数据长度: 26

3.3 工作流执行流程

下面的流程图展示了上述工作流的执行过程:

4 分支与条件路由

在实际应用中,我们经常需要根据条件执行不同的处理逻辑。工作流框架通过事件类型实现分支路由。

4.1 分支工作流示例

分支工作流通过返回不同类型的事件来实现条件路由。下面的示例展示了如何根据输入内容的情感倾向选择不同的处理路径。

点击查看完整示例
py
"""
示例 2: 多步骤分支工作流
演示如何在工作流中实现分支和条件路由
"""

import asyncio

from workflows import Workflow, step
from workflows.events import Event, StartEvent, StopEvent


# 定义多个事件类型实现分支逻辑
class AnalyzeEvent(Event):
    """分析事件"""

    content: str


class PositiveEvent(Event):
    """正面结果事件"""

    score: float
    content: str


class NegativeEvent(Event):
    """负面结果事件"""

    score: float
    content: str


class BranchWorkflow(Workflow):
    """演示分支逻辑的工作流"""

    @step
    async def start(self, ev: StartEvent) -> AnalyzeEvent:
        """入口步骤:接收输入并传递给分析步骤"""
        # StartEvent 继承自 DictLikeModel,使用 .get() 方法访问动态属性
        return AnalyzeEvent(content=ev.get("text", ""))

    @step
    async def analyze(self, ev: AnalyzeEvent) -> PositiveEvent | NegativeEvent:
        """
        分析步骤:根据内容决定走哪个分支
        返回类型注解使用 Union 类型表示可能的多个输出
        """
        # 简单的情感分析模拟
        positive_words = ["好", "棒", "优秀", "喜欢", "开心", "happy", "good", "great"]
        negative_words = ["差", "坏", "糟糕", "讨厌", "难过", "bad", "sad", "terrible"]

        content_lower = ev.content.lower()
        positive_count = sum(1 for word in positive_words if word in content_lower)
        negative_count = sum(1 for word in negative_words if word in content_lower)

        if positive_count >= negative_count:
            score = positive_count / (positive_count + negative_count + 1)
            return PositiveEvent(score=score, content=ev.content)
        else:
            score = negative_count / (positive_count + negative_count + 1)
            return NegativeEvent(score=score, content=ev.content)

    @step
    async def handle_positive(self, ev: PositiveEvent) -> StopEvent:
        """处理正面结果"""
        return StopEvent(
            result={
                "sentiment": "positive",
                "score": ev.score,
                "message": f"这是一条正面内容: {ev.content}",
            }
        )

    @step
    async def handle_negative(self, ev: NegativeEvent) -> StopEvent:
        """处理负面结果"""
        return StopEvent(
            result={
                "sentiment": "negative",
                "score": ev.score,
                "message": f"这是一条负面内容: {ev.content}",
            }
        )


async def main():
    workflow = BranchWorkflow()

    # 测试正面内容
    result1 = await workflow.run(text="今天天气很好,我很开心!")
    print("正面测试结果:", result1)

    # 测试负面内容
    result2 = await workflow.run(text="这个产品太差了,真糟糕!")
    print("负面测试结果:", result2)


if __name__ == "__main__":
    asyncio.run(main())

4.2 分支流程图

5 并行处理与事件收集

工作流框架支持并行处理多个任务,并使用 collect_events 收集结果。这对于提升处理性能和实现分布式计算非常有用。

5.1 并行处理配置

使用 @step(num_workers=N) 配置步骤的并行工作者数量:

python
@step(num_workers=4)  # 最多 4 个并行执行
async def worker(self, ctx: Context, ev: WorkerEvent) -> WorkerResultEvent:
    # 并行处理逻辑
    await asyncio.sleep(0.1)
    return WorkerResultEvent(result="处理完成")

5.2 事件分发与收集

使用 ctx.send_event() 分发多个任务,然后在收集步骤中聚合结果。下面是一个完整的并行处理示例:

点击查看完整示例
py
"""
示例 3: 事件收集与并行处理
演示如何使用 collect_events 和 send_event 实现并行处理
"""

import asyncio

from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class WorkerEvent(Event):
    """工作事件,用于分发任务"""

    task_id: int
    data: str


class WorkerResultEvent(Event):
    """工作结果事件"""

    task_id: int
    result: str


class GatherEvent(Event):
    """收集信号事件"""

    total_tasks: int


class WorkflowState(BaseModel):
    """工作流状态"""

    processed_count: int = Field(default=0)
    total_tasks: int = Field(default=0)
    results: list[str] = Field(default_factory=list)


class ParallelWorkflow(Workflow):
    """演示并行处理的工作流"""

    @step
    async def start(
        self, ctx: Context[WorkflowState], ev: StartEvent
    ) -> GatherEvent | WorkerEvent:
        """
        入口步骤:分发多个任务
        使用 ctx.send_event() 发送多个事件
        注意:返回类型需要声明 WorkerEvent,以便工作流验证器知道它会被产生
        """
        # StartEvent 继承自 DictLikeModel,使用 get() 访问动态属性
        tasks = ev.get("tasks", [])
        num_tasks = len(tasks)

        # 分发任务给工作步骤
        for i, task in enumerate(tasks):
            ctx.send_event(WorkerEvent(task_id=i, data=task))

        # 返回收集信号
        return GatherEvent(total_tasks=num_tasks)

    @step(num_workers=4)  # 配置并行工作者数量
    async def worker(
        self, ctx: Context[WorkflowState], ev: WorkerEvent
    ) -> WorkerResultEvent:
        """
        工作步骤:处理单个任务
        num_workers=4 表示最多可以有 4 个并行执行
        """
        # 模拟处理任务
        await asyncio.sleep(0.1)  # 模拟耗时操作
        result = f"处理完成: {ev.data.upper()}"

        async with ctx.store.edit_state() as state:
            state.processed_count += 1

        return WorkerResultEvent(task_id=ev.task_id, result=result)

    @step
    async def gather(
        self, ctx: Context[WorkflowState], ev: WorkerResultEvent | GatherEvent
    ) -> StopEvent | None:
        """
        收集步骤:等待所有任务完成
        使用 collect_events 收集多个事件
        """
        # 我们需要收集 GatherEvent 和所有 WorkerResultEvent
        # 首先检查是否收到了 GatherEvent
        if isinstance(ev, GatherEvent):
            # 存储需要收集的总数
            async with ctx.store.edit_state() as state:
                state.total_tasks = ev.total_tasks
                state.results = []

        if isinstance(ev, WorkerResultEvent):
            # 收集结果
            async with ctx.store.edit_state() as state:
                state.results.append(ev.result)

                if len(state.results) >= state.total_tasks:
                    # 所有任务完成
                    return StopEvent(
                        result={
                            "total_processed": state.processed_count,
                            "results": state.results,
                        }
                    )

        return None  # 继续等待更多事件


async def main():
    workflow = ParallelWorkflow()

    # 创建任务列表
    tasks = ["任务A", "任务B", "任务C", "任务D", "任务E"]

    result = await workflow.run(tasks=tasks)
    print("并行处理结果:", result)


if __name__ == "__main__":
    asyncio.run(main())

5.3 并行执行流程图

下面的流程图展示了并行处理的完整流程,包括任务分发、并行执行和结果收集:

5.4 性能优化建议

在实现并行处理时,需要注意以下几点:

  1. 合理设置工作者数量:根据任务类型和系统资源选择合适的 num_workers

    • CPU 密集型任务:通常设置为 CPU 核心数
    • I/O 密集型任务:可以设置为核心数的 2-4 倍
    • 混合型任务:需要根据实际测试调整
  2. 避免资源竞争:使用 async with ctx.store.edit_state() 确保状态更新的原子性

  3. 错误处理:考虑部分任务失败的情况,决定是继续处理还是立即终止

  4. 监控和日志:记录任务处理时间和成功率,便于性能分析

6 人机交互(Human-in-the-Loop)

工作流框架内置支持人机交互模式(HITL),允许在工作流执行过程中暂停等待人工输入。这对于需要人工审核、决策或提供额外信息的场景非常有用。

6.1 HITL 工作流示例

人机交互工作流允许在执行过程中暂停并等待用户输入。这对于需要人工审核、确认或提供额外信息的场景非常有用。

点击查看完整示例
py
"""
示例 4: 人机交互(Human-in-the-Loop)工作流
演示如何实现需要人工输入的工作流
"""

import asyncio

from workflows import Workflow, step
from workflows.events import (
    HumanResponseEvent,
    InputRequiredEvent,
    StartEvent,
    StopEvent,
)


class HITLWorkflow(Workflow):
    """
    人机交互工作流示例
    当需要人工输入时,发送 InputRequiredEvent
    等待 HumanResponseEvent 接收用户响应
    """

    @step
    async def start(self, ev: StartEvent) -> InputRequiredEvent:
        """
        入口步骤:请求用户输入
        返回 InputRequiredEvent 会暂停工作流并等待人工响应
        InputRequiredEvent 支持动态属性,可以传递任意键值对
        """
        # InputRequiredEvent 继承自 DictLikeModel,支持动态属性
        return InputRequiredEvent(
            prompt="请输入您的名字: ",  # type: ignore[call-arg]
            step="name_input",  # type: ignore[call-arg]
        )

    @step
    async def process_name(self, ev: HumanResponseEvent) -> InputRequiredEvent:
        """
        处理用户名称并请求更多输入
        HumanResponseEvent 同样支持动态属性
        """
        name = ev.get("response", "用户")
        # InputRequiredEvent 继承自 DictLikeModel,支持动态属性
        return InputRequiredEvent(
            prompt=f"你好 {name}!请输入您想查询的内容: ",  # type: ignore[call-arg]
            step="query_input",  # type: ignore[call-arg]
            name=name,  # type: ignore[call-arg]
        )

    @step
    async def process_query(self, ev: HumanResponseEvent) -> StopEvent:
        """
        处理用户查询并返回结果
        注意:这里需要通过事件属性判断来自哪个步骤
        """
        # 在实际应用中,可以通过 context 或其他方式区分不同的响应
        query = ev.get("response", "")
        return StopEvent(
            result={
                "status": "completed",
                "query": query,
                "message": f"已收到您的查询: {query}",
            }
        )


async def main():
    """
    演示如何与 HITL 工作流交互
    """
    workflow = HITLWorkflow()

    # 启动工作流
    handler = workflow.run()

    # 流式处理事件
    async for event in handler.stream_events():
        if isinstance(event, InputRequiredEvent):
            # 收到需要输入的事件
            print(f"\n系统请求: {event.get('prompt', '')}")

            # 模拟用户输入(实际应用中应该从用户获取)
            # 使用 .get() 方法访问动态属性
            if event.get("step") == "name_input":
                user_input = "张三"  # 模拟用户输入
            else:
                user_input = "今天的天气如何?"  # 模拟用户输入

            print(f"用户输入: {user_input}")

            # 发送用户响应 - HumanResponseEvent 支持动态属性
            handler.ctx.send_event(HumanResponseEvent(response=user_input))  # type: ignore[call-arg]

        elif isinstance(event, StopEvent):
            print(f"\n工作流完成: {event.result}")
            break

    # 获取最终结果
    result = await handler
    print(f"\n最终结果: {result}")


if __name__ == "__main__":
    asyncio.run(main())

6.2 HITL 时序图

6.3 实现要点

实现 HITL 工作流时需要注意以下几点:

  1. 事件类型:使用 InputRequiredEvent 请求输入,使用 HumanResponseEvent 传递响应
  2. 动态属性:这两个事件都继承自 DictLikeModel,支持动态添加属性
  3. 事件流:通过 stream_events() 监听事件,识别需要人工输入的时机
  4. 响应处理:收到用户输入后,通过 handler.ctx.send_event() 发送响应事件

6.4 应用场景

HITL 模式适用于以下场景:

  • 内容审核:AI 生成内容后需要人工审核和修改
  • 决策支持:在关键决策点需要人工确认
  • 数据标注:在数据处理流程中需要人工标注
  • 异常处理:当自动处理失败时,请求人工介入

7 状态持久化

工作流框架支持状态的序列化和反序列化,使得工作流可以跨会话恢复。这对于实现长时间运行的对话系统、可恢复的数据处理管道等场景非常重要。

7.1 状态持久化示例

下面的示例展示了如何实现一个可持久化的对话工作流,支持保存对话状态并在后续会话中恢复:

点击查看完整示例
py
"""
示例 5: 状态持久化与恢复
演示如何序列化和反序列化工作流状态
"""

import asyncio
import json

from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class ProcessEvent(Event):
    """处理事件"""

    batch_id: int


class WorkflowState(BaseModel):
    """工作流状态"""

    processed_count: int = Field(default=0)
    results: list[str] = Field(default_factory=list)
    last_batch: int = Field(default=0)


class StatefulWorkflow(Workflow):
    """
    可持久化的批处理工作流
    演示如何在多次运行间保持状态
    """

    @step
    async def start(self, ctx: Context[WorkflowState], ev: StartEvent) -> ProcessEvent:
        """初始化步骤"""
        batch_id = ev.get("batch_id", 0)

        async with ctx.store.edit_state() as state:
            if state.processed_count > 0:
                print("[恢复] 从上次中断的位置继续")
                print(f"[恢复] 已处理 {state.processed_count} 个批次")
            else:
                print("[初始化] 开始新的处理流程")

        return ProcessEvent(batch_id=batch_id)

    @step
    async def process(self, ctx: Context[WorkflowState], ev: ProcessEvent) -> StopEvent:
        """处理步骤"""
        batch_id = ev.batch_id

        # 模拟处理
        await asyncio.sleep(0.1)
        result = f"批次 {batch_id} 处理完成"

        async with ctx.store.edit_state() as state:
            state.processed_count += 1
            state.results.append(result)
            state.last_batch = batch_id

        print(f"[处理] {result} (总计: {state.processed_count} 批次)")

        return StopEvent(
            result={
                "batch_id": batch_id,
                "processed_count": state.processed_count,
                "results": state.results,
            }
        )


async def demo_state_persistence():
    """
    演示状态持久化功能
    """
    workflow = StatefulWorkflow()

    print("=== 第一阶段:处理批次 1-3 ===\n")

    # 创建上下文并处理前三个批次
    ctx = Context(workflow)
    result = None

    for batch_id in range(1, 4):
        result = await workflow.run(ctx=ctx, batch_id=batch_id)
        await asyncio.sleep(0.1)

    if result:
        print(f"\n第一阶段完成,共处理 {result['processed_count']} 个批次\n")

    # 保存状态
    saved_state = ctx.to_dict()
    print("=== 状态已保存 ===")

    # 显示保存的状态信息
    state_info = saved_state.get("state_store", {}).get("states", {}).get("root", {})
    print("保存的状态信息:")
    print(f"  - 已处理批次数: {state_info.get('processed_count', 0)}")
    print(f"  - 最后批次: {state_info.get('last_batch', 0)}")
    print(f"  - 结果列表: {state_info.get('results', [])}\n")

    # 将状态保存到文件(模拟)
    state_json = json.dumps(saved_state, indent=2, ensure_ascii=False)
    print(f"状态 JSON 大小: {len(state_json)} 字节\n")

    # 模拟服务重启
    print("=== 模拟服务重启... ===\n")
    await asyncio.sleep(0.5)

    # 从保存的状态恢复
    print("=== 第二阶段:从保存点恢复 ===\n")
    restored_ctx = Context.from_dict(workflow, saved_state)

    # 继续处理后续批次
    print("继续处理批次 4-6:\n")
    final_result = None

    for batch_id in range(4, 7):
        final_result = await workflow.run(ctx=restored_ctx, batch_id=batch_id)
        await asyncio.sleep(0.1)

    if final_result:
        print(f"\n第二阶段完成,总计处理 {final_result['processed_count']} 个批次\n")

        print("=== 演示完成 ===")
        print("所有处理结果:")
        for i, res in enumerate(final_result["results"], 1):
            print(f"  {i}. {res}")


async def main():
    await demo_state_persistence()


if __name__ == "__main__":
    asyncio.run(main())

7.2 状态持久化流程图

下面的流程图展示了状态持久化和恢复的完整流程:

7.3 状态管理最佳实践

  1. 使用类型化状态:定义 Pydantic 模型作为状态类型,获得类型检查和验证
  2. 原子操作:使用 async with ctx.store.edit_state() as state 进行原子状态更新
  3. 定期保存:在关键步骤后保存状态,防止数据丢失
  4. 版本控制:为状态结构添加版本号,便于迁移

8 事件流与实时更新

工作流框架支持事件流,允许实时向外部观察者发送进度更新。这对于长时间运行的任务非常有用,可以让用户随时了解处理进度。

8.1 事件流示例

下面的示例展示了如何使用 write_event_to_stream() 发送进度事件,并通过事件流监听实时更新:

点击查看完整示例
py
"""
示例 6: 事件流与实时更新
演示如何使用事件流实现实时进度更新
"""

import asyncio

from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class ProgressEvent(Event):
    """进度更新事件"""

    current: int
    total: int
    message: str


class ProcessingWorkflow(Workflow):
    """
    带进度更新的处理工作流
    通过 write_event_to_stream 向外部发送进度更新
    """

    @step
    async def process(self, ctx: Context, ev: StartEvent) -> StopEvent:
        """
        处理步骤:模拟长时间运行的任务并报告进度
        """
        items = ev.get("items", [])
        total = len(items)
        results = []

        for i, item in enumerate(items):
            # 模拟处理每个项目
            await asyncio.sleep(0.2)
            result = f"已处理: {item}"
            results.append(result)

            # 发送进度事件到流
            ctx.write_event_to_stream(
                ProgressEvent(
                    current=i + 1,
                    total=total,
                    message=f"正在处理 {item}... ({i + 1}/{total})",
                )
            )

        return StopEvent(
            result={
                "status": "completed",
                "processed_count": total,
                "results": results,
            }
        )


async def main():
    """
    演示如何监听事件流
    """
    workflow = ProcessingWorkflow()

    # 准备处理的项目
    items = ["文档A", "文档B", "文档C", "文档D", "文档E"]

    print("开始处理任务...")
    print("-" * 40)

    # 启动工作流
    handler = workflow.run(items=items)

    # 监听事件流
    async for event in handler.stream_events():
        if isinstance(event, ProgressEvent):
            # 显示进度条
            progress = event.current / event.total * 100
            bar_length = 30
            filled_length = int(bar_length * event.current / event.total)
            bar = "█" * filled_length + "░" * (bar_length - filled_length)
            print(f"\r[{bar}] {progress:.0f}% - {event.message}", end="", flush=True)

        elif isinstance(event, StopEvent):
            print()  # 换行
            break

    # 获取最终结果
    result = await handler
    print("-" * 40)
    print(f"处理完成!共处理 {result['processed_count']} 个项目")
    print("结果:")
    for r in result["results"]:
        print(f"  - {r}")


if __name__ == "__main__":
    asyncio.run(main())

8.2 事件流时序图

下面的时序图展示了事件流的工作机制:

8.3 应用场景

事件流特别适用于以下场景:

  • 长时间任务:数据处理、模型训练等耗时操作
  • 批量处理:展示批量处理的实时进度
  • 用户体验:避免用户长时间等待无响应
  • 监控告警:实时监控任务状态,及时发现问题

9 高级用法

9.1 重试策略

工作流框架支持为步骤配置重试策略:

python
from workflows.retry_policy import RetryPolicy

retry_policy = RetryPolicy(
    max_retries=3,
    initial_delay=1.0,
    max_delay=10.0,
    backoff_factor=2.0
)

@step(retry_policy=retry_policy)
async def unreliable_step(self, ev: MyEvent) -> StopEvent:
    # 可能失败的操作
    result = await call_external_api()
    return StopEvent(result=result)

重试策略流程图

9.2 资源注入

使用 AnnotatedResource 实现依赖注入:

python
from typing import Annotated
from workflows.resource import Resource

def get_llm():
    return OpenAI(model="gpt-4")

@step
async def generate(
    self,
    ev: StartEvent,
    llm: Annotated[OpenAI, Resource(get_llm)]
) -> StopEvent:
    response = await llm.achat([...])
    return StopEvent(result=response)

9.3 超时配置

python
# 设置工作流超时(秒)
workflow = MyWorkflow(timeout=60.0)

# 禁用超时
workflow = MyWorkflow(timeout=None)

9.4 并发限制

python
# 限制并发运行数
workflow = MyWorkflow(num_concurrent_runs=5)

10 工作流设计模式

工作流设计模式是经过实践检验的解决方案,用于处理常见的工作流架构问题。理解和应用这些模式可以帮助你构建更加优雅、可维护和高效的工作流系统。

10.1 管道模式(Pipeline Pattern)

管道模式是最基础的工作流模式,数据在多个步骤之间顺序流动,每个步骤对数据进行转换或增强。

10.1.1 模式特点

  1. 线性流程:数据按固定顺序流经各个步骤
  2. 单一输入输出:每个步骤只有一个输入和一个输出
  3. 数据转换:每个步骤都对数据进行某种形式的转换
  4. 易于理解:流程清晰,易于维护和调试

10.1.2 适用场景

  • 文档处理流程:解析 → 清洗 → 分析 → 存储
  • 数据 ETL:提取 → 转换 → 加载
  • 内容生成:生成 → 审核 → 优化 → 发布
  • 图像处理:读取 → 预处理 → 增强 → 保存

10.1.3 架构图

10.1.4 完整示例

点击查看管道模式完整示例
py
"""
示例 7: 管道模式(Pipeline Pattern)
演示线性处理流程,每个步骤只有一个输入和一个输出
适用场景:文档处理、数据转换、内容生成等需要顺序执行的任务
"""

import asyncio

from workflows import Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class PreprocessedEvent(Event):
    """预处理后的事件"""

    content: str
    metadata: dict


class ProcessedEvent(Event):
    """处理后的事件"""

    content: str
    metadata: dict
    analysis: dict


class PostprocessedEvent(Event):
    """后处理后的事件"""

    final_content: str
    summary: str
    metadata: dict


class PipelineWorkflow(Workflow):
    """
    管道模式工作流
    数据流:输入 -> 预处理 -> 处理 -> 后处理 -> 输出
    """

    @step
    async def preprocess(self, ev: StartEvent) -> PreprocessedEvent:
        """
        预处理步骤:清理和规范化输入
        """
        raw_content = ev.get("content", "")

        # 模拟预处理:清理空白字符、转换格式等
        cleaned = raw_content.strip().lower()
        metadata = {
            "original_length": len(raw_content),
            "cleaned_length": len(cleaned),
            "preprocessing_time": 0.1,
        }

        print(
            f"[预处理] 输入长度: {metadata['original_length']}, 清理后: {metadata['cleaned_length']}"
        )

        await asyncio.sleep(0.1)  # 模拟处理时间
        return PreprocessedEvent(content=cleaned, metadata=metadata)

    @step
    async def process(self, ev: PreprocessedEvent) -> ProcessedEvent:
        """
        处理步骤:核心业务逻辑
        """
        content = ev.content

        # 模拟处理:分析内容、提取信息等
        word_count = len(content.split())
        char_count = len(content)
        analysis = {
            "word_count": word_count,
            "char_count": char_count,
            "avg_word_length": char_count / word_count if word_count > 0 else 0,
        }

        print(f"[处理] 词数: {word_count}, 字符数: {char_count}")

        # 合并元数据
        metadata = {**ev.metadata, "processing_time": 0.2}

        await asyncio.sleep(0.2)  # 模拟处理时间
        return ProcessedEvent(content=content, metadata=metadata, analysis=analysis)

    @step
    async def postprocess(self, ev: ProcessedEvent) -> PostprocessedEvent:
        """
        后处理步骤:格式化和生成最终输出
        """
        # 生成摘要
        words = ev.content.split()
        summary = " ".join(words[:10]) + "..." if len(words) > 10 else ev.content

        # 格式化最终内容
        final_content = f"""
处理结果
========
内容: {ev.content}
分析: 共 {ev.analysis["word_count"]} 词, {ev.analysis["char_count"]} 字符
平均词长: {ev.analysis["avg_word_length"]:.2f}
        """.strip()

        print(f"[后处理] 生成摘要: {summary[:30]}...")

        # 更新元数据
        metadata = {**ev.metadata, "postprocessing_time": 0.1}

        await asyncio.sleep(0.1)  # 模拟处理时间
        return PostprocessedEvent(
            final_content=final_content, summary=summary, metadata=metadata
        )

    @step
    async def output(self, ev: PostprocessedEvent) -> StopEvent:
        """
        输出步骤:返回最终结果
        """
        total_time = sum(
            ev.metadata.get(k, 0)
            for k in ["preprocessing_time", "processing_time", "postprocessing_time"]
        )

        result = {
            "content": ev.final_content,
            "summary": ev.summary,
            "metadata": {**ev.metadata, "total_time": total_time},
        }

        print(f"[输出] 处理完成,总耗时: {total_time:.2f}s")

        return StopEvent(result=result)


async def main():
    """
    演示管道模式工作流
    """
    workflow = PipelineWorkflow()

    print("=== 管道模式工作流示例 ===\n")

    # 测试文本
    test_content = """
    Hello World! This is a test document for demonstrating
    the pipeline pattern in workflow processing. Each step
    transforms the data sequentially.
    """

    result = await workflow.run(content=test_content)

    print("\n=== 最终结果 ===")
    print(f"摘要: {result['summary']}")
    print(f"\n{result['content']}")
    print(f"\n元数据: {result['metadata']}")


if __name__ == "__main__":
    asyncio.run(main())

10.1.5 关键要点

  1. 步骤独立性:每个步骤应该是自包含的,不依赖其他步骤的内部实现
  2. 错误处理:在每个步骤中添加适当的错误处理,避免整个管道崩溃
  3. 性能优化:对于耗时的步骤,考虑添加缓存或异步处理
  4. 可观测性:在关键步骤记录日志和指标,便于调试和监控

10.2 扇出-扇入模式(Fan-out Fan-in Pattern)

扇出-扇入模式通过并行处理多个独立任务,然后聚合结果,充分利用系统资源提升处理效率。

10.2.1 模式特点

  1. 并行执行:多个任务同时处理
  2. 结果聚合:收集所有并行任务的结果
  3. 高吞吐量:充分利用多核和异步能力
  4. 资源控制:可配置工作者数量,避免资源耗尽

10.2.2 适用场景

  • 批量数据处理:同时处理数千条记录
  • 多源数据聚合:从多个 API 并行获取数据
  • 分布式计算:将大任务拆分为小任务并行计算
  • 并行搜索:在多个索引或数据库中同时搜索

10.2.3 架构图

10.2.4 完整示例

点击查看扇出-扇入模式完整示例
py
"""
示例 8: 扇出-扇入模式(Fan-out Fan-in Pattern)
演示并行处理大量任务后聚合结果
适用场景:批量数据处理、并行计算、分布式任务等
"""

import asyncio
from typing import Any

from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class DispatchEvent(Event):
    """分发信号事件"""

    total_tasks: int


class TaskEvent(Event):
    """单个任务事件"""

    task_id: int
    data: Any


class TaskResultEvent(Event):
    """任务结果事件"""

    task_id: int
    result: Any
    processing_time: float


class WorkflowState(BaseModel):
    """工作流状态"""

    total_tasks: int = Field(default=0)
    completed_tasks: int = Field(default=0)
    results: dict[int, Any] = Field(default_factory=dict)


class FanOutFanInWorkflow(Workflow):
    """
    扇出-扇入模式工作流
    架构:
    1. 分发器(Fan-out):将任务分发给多个工作者
    2. 工作者(Workers):并行处理任务
    3. 收集器(Fan-in):聚合所有结果
    """

    @step
    async def dispatcher(
        self, ctx: Context[WorkflowState], ev: StartEvent
    ) -> DispatchEvent | TaskEvent:
        """
        分发器步骤:扇出任务到多个工作者
        """
        tasks = ev.get("tasks", [])
        num_tasks = len(tasks)

        print(f"[分发器] 开始分发 {num_tasks} 个任务")

        # 初始化状态
        async with ctx.store.edit_state() as state:
            state.total_tasks = num_tasks
            state.completed_tasks = 0
            state.results = {}

        # 扇出:发送多个任务事件
        for i, task_data in enumerate(tasks):
            ctx.send_event(TaskEvent(task_id=i, data=task_data))
            print(f"  -> 分发任务 #{i}: {task_data}")

        # 返回分发信号
        return DispatchEvent(total_tasks=num_tasks)

    @step(num_workers=4)  # 配置 4 个并行工作者
    async def worker(
        self, ctx: Context[WorkflowState], ev: TaskEvent
    ) -> TaskResultEvent:
        """
        工作者步骤:并行处理单个任务
        """
        task_id = ev.task_id
        data = ev.data

        print(f"  [工作者 #{task_id}] 开始处理: {data}")

        # 模拟处理任务
        start_time = asyncio.get_event_loop().time()
        await asyncio.sleep(0.5)  # 模拟耗时操作

        # 模拟不同类型的处理结果
        if isinstance(data, (int, float)):
            result = data**2  # 数字求平方
        elif isinstance(data, str):
            result = data.upper()  # 字符串转大写
        else:
            result = str(data)

        processing_time = asyncio.get_event_loop().time() - start_time

        print(f"  [工作者 #{task_id}] 完成处理: {result} (耗时 {processing_time:.2f}s)")

        return TaskResultEvent(
            task_id=task_id, result=result, processing_time=processing_time
        )

    @step
    async def collector(
        self,
        ctx: Context[WorkflowState],
        ev: TaskResultEvent | DispatchEvent,
    ) -> StopEvent | None:
        """
        收集器步骤:扇入所有结果并聚合
        """
        # 处理分发信号(初始化)
        if isinstance(ev, DispatchEvent):
            print(f"[收集器] 准备收集 {ev.total_tasks} 个结果")
            return None  # 继续等待结果

        # 处理任务结果
        if isinstance(ev, TaskResultEvent):
            async with ctx.store.edit_state() as state:
                state.results[ev.task_id] = {
                    "result": ev.result,
                    "processing_time": ev.processing_time,
                }
                state.completed_tasks += 1

                completed = state.completed_tasks
                total = state.total_tasks

                print(
                    f"[收集器] 收集进度: {completed}/{total} "
                    f"({completed / total * 100:.0f}%)"
                )

                # 检查是否所有任务都完成
                if completed >= total:
                    # 所有任务完成,计算统计信息
                    processing_times = [
                        r["processing_time"] for r in state.results.values()
                    ]
                    avg_time = sum(processing_times) / len(processing_times)
                    max_time = max(processing_times)
                    min_time = min(processing_times)

                    # 按任务 ID 排序结果
                    sorted_results = [state.results[i]["result"] for i in range(total)]

                    return StopEvent(
                        result={
                            "status": "completed",
                            "total_tasks": total,
                            "results": sorted_results,
                            "statistics": {
                                "avg_processing_time": avg_time,
                                "max_processing_time": max_time,
                                "min_processing_time": min_time,
                                "total_processing_time": sum(processing_times),
                            },
                        }
                    )

        return None  # 继续等待更多结果


async def main():
    """
    演示扇出-扇入模式工作流
    """
    workflow = FanOutFanInWorkflow()

    print("=== 扇出-扇入模式工作流示例 ===\n")

    # 准备任务:混合类型的数据
    tasks = [
        10,
        "hello",
        25,
        "world",
        7,
        "python",
        100,
        "workflow",
    ]

    print(f"待处理任务: {tasks}\n")

    # 运行工作流
    start_time = asyncio.get_event_loop().time()
    result = await workflow.run(tasks=tasks)
    end_time = asyncio.get_event_loop().time()

    print("\n=== 处理结果 ===")
    print(f"状态: {result['status']}")
    print(f"总任务数: {result['total_tasks']}")
    print(f"结果: {result['results']}")
    print("\n=== 性能统计 ===")
    stats = result["statistics"]
    print(f"平均处理时间: {stats['avg_processing_time']:.3f}s")
    print(f"最长处理时间: {stats['max_processing_time']:.3f}s")
    print(f"最短处理时间: {stats['min_processing_time']:.3f}s")
    print(f"总处理时间: {stats['total_processing_time']:.3f}s")
    print(f"实际运行时间: {end_time - start_time:.3f}s")
    print(
        f"并行加速比: {stats['total_processing_time'] / (end_time - start_time):.2f}x"
    )


if __name__ == "__main__":
    asyncio.run(main())

10.2.5 关键要点

  1. 并发控制:使用 num_workers 限制并发数,避免资源耗尽
  2. 状态同步:使用上下文存储来安全地收集结果,注意使用 async with ctx.store.edit_state() 进行原子操作
  3. 错误处理:处理部分任务失败的情况,决定是继续还是终止
  4. 性能监控:记录处理时间,计算并行加速比

10.3 路由模式(Routing Pattern)

路由模式根据输入的类型、内容或条件,将请求分发到不同的处理器,实现智能化的请求处理。

10.3.1 模式特点

  1. 条件分支:根据规则选择处理路径
  2. 类型安全:通过事件类型确保路由正确
  3. 扩展性强:易于添加新的处理器
  4. 职责分离:不同类型的请求由专门的处理器处理

10.3.2 适用场景

  • 请求分类:将用户请求路由到不同的服务
  • 智能客服:根据问题类型选择处理策略
  • 数据路由:根据数据特征选择存储位置
  • 工作流引擎:根据业务规则执行不同流程

10.3.3 架构图

10.3.4 完整示例

点击查看路由模式完整示例
py
"""
示例 9: 路由模式(Routing Pattern)
演示根据输入类型或条件选择不同处理路径
适用场景:请求分类、条件处理、智能路由等
"""

import asyncio
from enum import Enum

from workflows import Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class RequestType(str, Enum):
    """请求类型枚举"""

    QUERY = "query"  # 查询请求
    COMMAND = "command"  # 命令请求
    ANALYSIS = "analysis"  # 分析请求


class ClassifiedEvent(Event):
    """分类后的事件"""

    request_type: RequestType
    content: str
    confidence: float


class QueryEvent(Event):
    """查询事件"""

    query: str


class CommandEvent(Event):
    """命令事件"""

    command: str
    params: dict


class AnalysisEvent(Event):
    """分析事件"""

    data: str


class RoutingWorkflow(Workflow):
    """
    路由模式工作流
    架构:
    1. 分类器:识别输入类型
    2. 路由器:根据类型分发到不同处理器
    3. 处理器:针对不同类型的专门处理逻辑
    """

    @step
    async def classifier(self, ev: StartEvent) -> ClassifiedEvent:
        """
        分类器步骤:识别请求类型
        """
        content = ev.get("content", "")

        print(f"[分类器] 分析输入: {content}")

        # 简单的分类逻辑
        content_lower = content.lower()

        if any(
            keyword in content_lower
            for keyword in [
                "what",
                "when",
                "where",
                "who",
                "how",
                "查询",
                "什么",
                "如何",
            ]
        ):
            request_type = RequestType.QUERY
            confidence = 0.9
        elif any(
            keyword in content_lower
            for keyword in ["run", "execute", "start", "stop", "执行", "运行", "停止"]
        ):
            request_type = RequestType.COMMAND
            confidence = 0.85
        elif any(
            keyword in content_lower
            for keyword in ["analyze", "statistics", "report", "分析", "统计", "报告"]
        ):
            request_type = RequestType.ANALYSIS
            confidence = 0.8
        else:
            request_type = RequestType.QUERY  # 默认为查询
            confidence = 0.5

        print(f"[分类器] 识别为 {request_type.value} 类型 (置信度: {confidence:.0%})")

        return ClassifiedEvent(
            request_type=request_type, content=content, confidence=confidence
        )

    @step
    async def router(
        self, ev: ClassifiedEvent
    ) -> QueryEvent | CommandEvent | AnalysisEvent:
        """
        路由器步骤:根据分类结果路由到不同处理器
        """
        print(f"[路由器] 路由到 {ev.request_type.value} 处理器")

        if ev.request_type == RequestType.QUERY:
            return QueryEvent(query=ev.content)
        elif ev.request_type == RequestType.COMMAND:
            # 简单解析命令和参数
            parts = ev.content.split()
            command = parts[0] if parts else "unknown"
            params = {"args": parts[1:] if len(parts) > 1 else []}
            return CommandEvent(command=command, params=params)
        else:  # ANALYSIS
            return AnalysisEvent(data=ev.content)

    @step
    async def handle_query(self, ev: QueryEvent) -> StopEvent:
        """
        查询处理器:处理查询类请求
        """
        print(f"[查询处理器] 处理查询: {ev.query}")

        await asyncio.sleep(0.2)  # 模拟查询数据库

        # 模拟查询结果
        result = {
            "type": "query",
            "query": ev.query,
            "answer": f"这是对 '{ev.query}' 的查询结果",
            "source": "knowledge_base",
            "processing_time": 0.2,
        }

        print("[查询处理器] 查询完成")

        return StopEvent(result=result)

    @step
    async def handle_command(self, ev: CommandEvent) -> StopEvent:
        """
        命令处理器:处理命令类请求
        """
        print(f"[命令处理器] 执行命令: {ev.command}")
        print(f"[命令处理器] 参数: {ev.params}")

        await asyncio.sleep(0.3)  # 模拟命令执行

        # 模拟命令执行结果
        result = {
            "type": "command",
            "command": ev.command,
            "params": ev.params,
            "status": "success",
            "output": f"命令 '{ev.command}' 执行成功",
            "processing_time": 0.3,
        }

        print("[命令处理器] 命令执行完成")

        return StopEvent(result=result)

    @step
    async def handle_analysis(self, ev: AnalysisEvent) -> StopEvent:
        """
        分析处理器:处理分析类请求
        """
        print(f"[分析处理器] 分析数据: {ev.data}")

        await asyncio.sleep(0.4)  # 模拟数据分析

        # 模拟分析结果
        words = ev.data.split()
        result = {
            "type": "analysis",
            "data": ev.data,
            "statistics": {
                "word_count": len(words),
                "char_count": len(ev.data),
                "avg_word_length": (len(ev.data) / len(words) if words else 0),
            },
            "insights": [
                f"数据包含 {len(words)} 个词",
                f"平均词长为 {len(ev.data) / len(words) if words else 0:.2f} 字符",
            ],
            "processing_time": 0.4,
        }

        print("[分析处理器] 分析完成")

        return StopEvent(result=result)


async def main():
    """
    演示路由模式工作流
    """
    workflow = RoutingWorkflow()

    print("=== 路由模式工作流示例 ===\n")

    # 测试不同类型的请求
    test_cases = [
        "What is the weather today?",  # 查询
        "run python script.py",  # 命令
        "analyze the sales data for last quarter",  # 分析
        "如何使用工作流框架?",  # 查询
    ]

    for i, test_input in enumerate(test_cases, 1):
        print(f"\n{'=' * 50}")
        print(f"测试用例 {i}: {test_input}")
        print("=" * 50)

        result = await workflow.run(content=test_input)

        print(f"\n结果类型: {result['type']}")
        if result["type"] == "query":
            print(f"查询: {result['query']}")
            print(f"答案: {result['answer']}")
        elif result["type"] == "command":
            print(f"命令: {result['command']}")
            print(f"状态: {result['status']}")
            print(f"输出: {result['output']}")
        elif result["type"] == "analysis":
            print(f"统计: {result['statistics']}")
            print("洞察:")
            for insight in result["insights"]:
                print(f"  - {insight}")

        print(f"处理时间: {result['processing_time']}s")


if __name__ == "__main__":
    asyncio.run(main())

10.3.5 关键要点

  1. 分类准确性:确保分类逻辑准确,可以使用机器学习模型提升分类效果
  2. 默认处理:为无法分类的请求提供默认处理路径
  3. 监控指标:记录各类型请求的分布和处理时间
  4. 动态路由:支持运行时修改路由规则

10.4 循环模式(Loop Pattern)

循环模式通过迭代执行来实现收敛计算、优化算法或重试逻辑,直到满足终止条件。

10.4.1 模式特点

  1. 迭代执行:重复执行相同或相似的操作
  2. 条件终止:根据收敛条件或最大迭代次数停止
  3. 状态演进:每次迭代更新状态
  4. 自适应:可根据中间结果调整处理策略

10.4.2 适用场景

  • 迭代优化:梯度下降、牛顿法等优化算法
  • 收敛计算:数值分析、物理模拟
  • 重试逻辑:带退避策略的 API 调用
  • 游戏 AI:蒙特卡洛树搜索、强化学习

10.4.3 架构图

10.4.4 完整示例

点击查看循环模式完整示例
py
"""
示例 10: 循环模式(Loop Pattern)
演示迭代处理直到满足条件的工作流
适用场景:迭代优化、收敛计算、重试逻辑等
"""

import asyncio
import random

from pydantic import BaseModel, Field
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StopEvent


class IterationEvent(Event):
    """迭代事件"""

    iteration: int
    value: float
    converged: bool


class WorkflowState(BaseModel):
    """工作流状态"""

    max_iterations: int = Field(default=10)
    tolerance: float = Field(default=0.01)
    history: list[float] = Field(default_factory=list)


class LoopWorkflow(Workflow):
    """
    循环模式工作流
    架构:
    1. 初始化:设置初始值和终止条件
    2. 迭代处理:执行计算或处理
    3. 条件检查:判断是否满足终止条件
    4. 循环或终止:根据条件决定继续迭代或结束
    """

    @step
    async def initialize(
        self, ctx: Context[WorkflowState], ev: StartEvent
    ) -> IterationEvent:
        """
        初始化步骤:设置参数和初始值
        """
        max_iterations = ev.get("max_iterations", 10)
        tolerance = ev.get("tolerance", 0.01)
        initial_value = ev.get("initial_value", 100.0)

        print(f"[初始化] 最大迭代次数: {max_iterations}")
        print(f"[初始化] 收敛容限: {tolerance}")
        print(f"[初始化] 初始值: {initial_value}")

        async with ctx.store.edit_state() as state:
            state.max_iterations = max_iterations
            state.tolerance = tolerance
            state.history = [initial_value]

        return IterationEvent(iteration=0, value=initial_value, converged=False)

    @step
    async def process(
        self, ctx: Context[WorkflowState], ev: IterationEvent
    ) -> IterationEvent:
        """
        处理步骤:执行迭代计算
        """
        current_iteration = ev.iteration + 1
        current_value = ev.value

        print(f"\n[迭代 {current_iteration}] 当前值: {current_value:.4f}")

        # 模拟迭代算法(例如:梯度下降、牛顿法等)
        # 这里使用简单的收敛模拟:逐步接近目标值 10.0
        target = 10.0
        learning_rate = 0.1

        # 添加一些随机噪声
        noise = random.uniform(-0.5, 0.5)
        new_value = current_value - learning_rate * (current_value - target) + noise

        await asyncio.sleep(0.1)  # 模拟计算时间

        # 计算变化量
        change = abs(new_value - current_value)
        print(f"[迭代 {current_iteration}] 新值: {new_value:.4f} (变化: {change:.4f})")

        # 更新历史记录
        async with ctx.store.edit_state() as state:
            state.history.append(new_value)

        return IterationEvent(
            iteration=current_iteration, value=new_value, converged=False
        )

    @step
    async def check_convergence(
        self, ctx: Context[WorkflowState], ev: IterationEvent
    ) -> IterationEvent | StopEvent:
        """
        条件检查步骤:判断是否满足终止条件
        """
        state = await ctx.store.get_state()

        # 检查是否达到最大迭代次数
        if ev.iteration >= state.max_iterations:
            print(f"\n[收敛检查] 达到最大迭代次数 {state.max_iterations}")
            return StopEvent(
                result={
                    "status": "max_iterations_reached",
                    "iterations": ev.iteration,
                    "final_value": ev.value,
                    "history": state.history,
                    "converged": False,
                }
            )

        # 检查是否收敛(连续两次迭代的变化小于容限)
        if len(state.history) >= 2:
            change = abs(state.history[-1] - state.history[-2])

            if change < state.tolerance:
                print(
                    f"\n[收敛检查] 已收敛!变化量 {change:.6f} < 容限 {state.tolerance}"
                )
                return StopEvent(
                    result={
                        "status": "converged",
                        "iterations": ev.iteration,
                        "final_value": ev.value,
                        "history": state.history,
                        "converged": True,
                    }
                )

        # 未收敛,继续迭代
        print("[收敛检查] 未收敛,继续迭代...")

        # 返回新的迭代事件以触发下一轮迭代
        return IterationEvent(iteration=ev.iteration, value=ev.value, converged=False)


async def main():
    """
    演示循环模式工作流
    """
    workflow = LoopWorkflow()

    print("=== 循环模式工作流示例 ===")
    print("模拟迭代优化算法,目标值: 10.0\n")

    # 运行工作流
    result = await workflow.run(initial_value=100.0, max_iterations=20, tolerance=0.1)

    print("\n" + "=" * 60)
    print("=== 最终结果 ===")
    print("=" * 60)
    print(f"状态: {result['status']}")
    print(f"是否收敛: {'是' if result['converged'] else '否'}")
    print(f"迭代次数: {result['iterations']}")
    print(f"最终值: {result['final_value']:.4f}")
    print(f"\n迭代历史 (共 {len(result['history'])} 个值):")

    # 打印历史值的简要统计
    history = result["history"]
    print(f"  初始值: {history[0]:.4f}")
    print(f"  最终值: {history[-1]:.4f}")
    print(f"  最大值: {max(history):.4f}")
    print(f"  最小值: {min(history):.4f}")
    print(f"  平均值: {sum(history) / len(history):.4f}")

    # 绘制简单的 ASCII 图表
    print("\n迭代曲线:")
    print_ascii_chart(history)


def print_ascii_chart(values: list[float], height: int = 10, width: int = 50):
    """
    打印简单的 ASCII 图表
    """
    if not values:
        return

    min_val = min(values)
    max_val = max(values)
    range_val = max_val - min_val if max_val != min_val else 1

    # 归一化值到图表高度
    normalized = [int((v - min_val) / range_val * (height - 1)) for v in values]

    # 打印图表
    for h in range(height - 1, -1, -1):
        line = []
        for n in normalized:
            if n == h:
                line.append("●")
            elif n > h:
                line.append("│")
            else:
                line.append(" ")

        # 添加 Y 轴标签
        y_value = min_val + (h / (height - 1)) * range_val
        print(f"{y_value:6.2f}{''.join(line)}")

    # 打印 X 轴
    print(" " * 7 + "└" + "─" * len(values))
    print(" " * 8 + f"0{' ' * (len(values) - 5)}{len(values) - 1}")
    print(" " * 8 + "迭代次数")


if __name__ == "__main__":
    asyncio.run(main())

10.4.5 关键要点

  1. 终止条件:必须有明确的终止条件,避免无限循环
  2. 最大迭代次数:设置上限防止异常情况下的无限循环
  3. 状态记录:保存迭代历史,便于分析和调试
  4. 早停机制:检测到不再改善时提前终止

10.5 混合模式

在实际应用中,我们经常需要组合多种模式来构建复杂的工作流。以下是一些常见的混合模式:

10.5.1 管道 + 扇出-扇入

在管道的某个阶段引入并行处理:

适用场景:文档批处理系统、批量图像处理

10.5.2 路由 + 管道

对不同类型的请求执行不同的处理管道:

适用场景:多类型数据处理、智能客服系统

10.5.3 循环 + 路由

在迭代过程中根据条件选择不同策略:

适用场景:自适应优化算法、动态策略选择

10.6 模式选择指南

下表总结了各种模式的特点和选择建议:

模式适用场景优势注意事项
管道顺序处理、数据转换简单清晰、易维护性能瓶颈在最慢步骤
扇出扇入批量处理、并行计算高吞吐量、资源利用需要管理并发和聚合
路由请求分类、条件处理灵活、可扩展分类准确性很重要
循环迭代优化、收敛计算适应性强、自动优化必须保证能够终止
混合复杂业务场景、大型系统强大、全面增加复杂度和维护成本

选择模式时需要考虑:

  1. 业务需求:明确你要解决什么问题
  2. 性能要求:是否需要高吞吐量或低延迟
  3. 数据特征:数据量、类型、依赖关系
  4. 维护成本:团队能力、系统复杂度
  5. 可扩展性:未来可能的需求变化

11 调试与测试

11.1 启用详细日志

python
import logging
logging.basicConfig(level=logging.DEBUG)

workflow = MyWorkflow(verbose=True)

11.2 单元测试

python
import pytest
from workflows.testing import WorkflowTestRunner

@pytest.mark.asyncio
async def test_my_workflow():
    workflow = MyWorkflow()
    runner = WorkflowTestRunner(workflow)

    # 测试单个步骤
    result = await runner.run_step("process", MyEvent(data="test"))
    assert isinstance(result, StopEvent)

    # 测试完整工作流
    result = await workflow.run(input="test")
    assert result["status"] == "success"

11.3 可观测性

工作流框架自动支持 OpenTelemetry,可以集成 Arize Phoenix 等可观测性平台:

python
from llama_index_instrumentation import get_dispatcher

# 工作流自动生成追踪数据
# 可在 Phoenix Dashboard 查看执行详情

12 总结

LlamaIndex Workflows 是一个功能强大的工作流编排框架,它通过事件驱动的架构使复杂 AI 应用的开发变得更加简洁和可维护。本文全面介绍了工作流框架的核心概念和实践技巧:

12.1 核心知识点

  1. 基础概念:事件、步骤、上下文的核心概念和使用方法
  2. 入门示例:如何创建和运行简单的工作流,理解基本执行流程
  3. 分支路由:如何实现条件分支逻辑,根据不同情况执行不同路径
  4. 并行处理:如何配置并行工作者和收集结果,提升处理效率
  5. 人机交互:如何实现需要人工输入的工作流,支持人工审核和决策
  6. 状态持久化:如何保存和恢复工作流状态,实现跨会话的持续运行
  7. 事件流:如何实现实时进度更新,提升用户体验
  8. 高级用法:重试策略、资源注入、超时配置等高级特性

12.2 设计模式精要

本文重点介绍了四种核心工作流设计模式及其应用:

  1. 管道模式:顺序处理数据,适用于文档处理、ETL 等线性流程
  2. 扇出-扇入模式:并行处理任务后聚合,适用于批量处理和分布式计算
  3. 路由模式:智能分发请求,适用于请求分类和条件处理
  4. 循环模式:迭代优化计算,适用于收敛算法和重试逻辑

每种模式都提供了完整的代码示例、架构图和应用场景说明,可以直接参考使用。

12.3 最佳实践

在使用工作流框架时,建议遵循以下最佳实践:

  1. 从简单开始:先实现基本功能,再逐步增加复杂性
  2. 合理设计事件:事件应该携带必要的信息,但不要过于臃肿
  3. 状态管理:使用类型化的 Pydantic 模型管理状态,确保类型安全
  4. 错误处理:在关键步骤添加错误处理,提高系统健壮性
  5. 可观测性:充分利用框架提供的日志和追踪功能
  6. 性能优化:合理配置并行度,避免资源浪费或竞争
  7. 测试驱动:为工作流和步骤编写单元测试,确保质量

12.4 下一步学习

掌握这些知识后,你可以:

  1. 参考官方文档深入了解更多特性
  2. 尝试构建实际项目,积累实践经验
  3. 探索与其他 LlamaIndex 组件的集成
  4. 贡献代码和文档,参与社区建设

工作流框架为构建复杂 AI 应用提供了坚实的基础,希望本文能帮助你快速上手并应用到实际项目中。

13 参考资料

  1. LlamaIndex Workflows 官方文档:https://docs.llamaindex.ai/en/stable/understanding/workflows/
  2. GitHub 仓库:https://github.com/run-llama/workflows
  3. Pydantic 文档:https://docs.pydantic.dev/
  4. Python asyncio 文档:https://docs.python.org/3/library/asyncio.html