LangChain 1.0 核心特性
流式传输 API (Streaming API)
LangGraph SDK 允许你从 LangSmith Deployment API 流式传输输出。这是构建响应式 AI 应用的关键能力。
基本用法
python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
# 使用名为 "agent" 的部署图
assistant_id = "agent"
# 创建线程
thread = await client.threads.create()
thread_id = thread["thread_id"]
# 创建流式运行
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input=inputs,
stream_mode="updates"
):
print(chunk.data)支持的流式模式
LangGraph 提供多种流式模式以适应不同的使用场景:
| 模式 | 描述 | 对应的 LangGraph 库方法 |
|---|---|---|
values | 在每个超级步骤后流式传输完整的图状态 | .stream() / .astream() with stream_mode="values" |
updates | 流式传输图中每一步之后的状态更新。如果在同一步中进行多次更新(例如运行多个节点),这些更新会分别流式传输 | .stream() / .astream() with stream_mode="updates" |
messages-tuple | 流式传输 LLM 令牌和元数据(对聊天应用很有用) | .stream() / .astream() with stream_mode="messages" |
debug | 在图执行过程中流式传输尽可能多的信息 | .stream() / .astream() with stream_mode="debug" |
custom | 从图内部流式传输自定义数据 | .stream() / .astream() with stream_mode="custom" |
events | 流式传输所有事件(包括图的状态);主要在迁移大型 LCEL 应用时有用 | .astream_events() |
流式传输多种模式
你可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。流式输出将是 (mode, chunk) 元组:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input=inputs,
stream_mode=["updates", "custom"]
):
print(chunk)流式传输图状态
Stream Mode: updates
用于流式传输每一步后节点返回的状态更新:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="updates"
):
print(chunk.data)输出示例:
python
{'run_id': '1f02c2b3-3cef-68de-b720-eec2a4a8e920', 'attempt': 1}
{'refine_topic': {'topic': 'ice cream and cats'}}
{'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}Stream Mode: values
用于流式传输每一步后图的完整状态:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="values"
):
print(chunk.data)子图流式传输
要在流式输出中包含子图的输出,可以在父图的 .stream() 方法中设置 subgraphs=True:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"foo": "foo"},
stream_subgraphs=True, # 设置为 True 以流式传输子图输出
stream_mode="updates",
):
print(chunk)LLM 令牌流式传输
使用 messages-tuple 流式模式可以逐令牌流式传输大型语言模型(LLM)的输出:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="messages-tuple",
):
if chunk.event != "messages":
continue
message_chunk, metadata = chunk.data
if message_chunk["content"]:
print(message_chunk["content"], end="|", flush=True)流式输出是一个元组 (message_chunk, metadata):
message_chunk: 来自 LLM 的令牌或消息片段metadata: 包含图节点和 LLM 调用详细信息的字典
过滤 LLM 令牌
- 按 LLM 调用过滤:可以将标签与 LLM 调用关联
- 按节点过滤:使用
stream_mode="messages"并通过流式元数据中的langgraph_node字段过滤输出
调试模式
使用 debug 流式模式在图执行期间流式传输尽可能多的信息:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="debug"
):
print(chunk.data)流式传输自定义数据
发送自定义用户定义的数据:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"query": "example"},
stream_mode="custom"
):
print(chunk.data)流式传输事件
流式传输所有事件,包括图的状态:
python
async for chunk in client.runs.stream(
thread_id,
assistant_id,
input={"topic": "ice cream"},
stream_mode="events"
):
print(chunk.data)无状态运行
如果你不想在检查点数据库中持久化流式运行的输出,可以创建无状态运行而无需创建线程:
python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
async for chunk in client.runs.stream(
None, # 传递 None 而不是 thread_id UUID
assistant_id,
input=inputs,
stream_mode="updates"
):
print(chunk.data)加入并流式传输
LangSmith 允许你加入一个活动的后台运行并从中流式传输输出:
python
from langgraph_sdk import get_client
client = get_client(url=<DEPLOYMENT_URL>, api_key=<API_KEY>)
async for chunk in client.runs.join_stream(
thread_id,
run_id, # 要加入的现有运行的 run_id
):
print(chunk)注意: 使用 .join_stream 时,输出不会被缓冲,因此在加入之前产生的任何输出都不会被接收。
核心优势
- 灵活的流式模式: 支持多种流式模式,适应不同的应用场景
- 实时响应: 逐令牌流式传输 LLM 输出,提供更好的用户体验
- 状态透明: 可以实时查看图的执行状态和更新
- 子图支持: 能够流式传输嵌套图的输出
- 调试友好: 提供详细的调试模式以便排查问题
- 灵活的状态管理: 支持有状态和无状态运行
- 后台运行加入: 可以随时加入正在运行的任务并开始流式传输