在上一篇文章中,我们介绍了 LangGraph 的基础概念和快速开始。本文将深入探讨 LangGraph 的核心特性,这些特性使其成为构建生产级 Agent 系统的强大工具。
目录
持久化(Persistence)
LangGraph 内置了持久化层,通过检查点(checkpointer)实现。当使用检查点编译图时,检查点会在每个超级步骤保存图状态的快照。这些检查点保存到线程中,可以在图执行后访问。
线程(Threads)
线程是分配给检查点保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。
调用带有检查点的图时,必须在配置的 configurable 部分指定 thread_id:
config = {"configurable": {"thread_id": "1"}}
检查点(Checkpoints)
线程在特定时间点的状态称为检查点。检查点是在每个超级步骤保存的图状态快照,由 StateSnapshot 对象表示,具有以下关键属性:
- config:与此检查点关联的配置
- metadata:与此检查点关联的元数据
- values:此时间点的状态通道值
- next:图中下一个要执行的节点名称元组
- tasks:包含下一个要执行任务信息的 PregelTask 对象元组
让我们看一个简单的例子:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated, TypedDict
from operator import add
class State(TypedDict):
foo: str
bar: Annotated[list[str], add]
def node_a(state: State):
return {"foo": "a", "bar": ["a"]}
def node_b(state: State):
return {"foo": "b", "bar": ["b"]}
workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)
checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": "", "bar": []}, config)
运行图后,我们会看到 4 个检查点:
- 空检查点,START 作为下一个要执行的节点
- 包含用户输入的检查点,node_a 作为下一个节点
- 包含 node_a 输出的检查点,node_b 作为下一个节点
- 包含 node_b 输出的检查点,没有下一个节点
获取状态
与保存的图状态交互时,必须指定线程标识符。可以通过调用 graph.get_state(config) 查看图的最新状态:
# 获取最新状态快照
config = {"configurable": {"thread_id": "1"}}
state = graph.get_state(config)
# 获取特定检查点的状态快照
config = {
"configurable": {
"thread_id": "1",
"checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"
}
}
state = graph.get_state(config)
获取状态历史
可以通过调用 graph.get_state_history(config) 获取给定线程的完整图执行历史:
config = {"configurable": {"thread_id": "1"}}
history = list(graph.get_state_history(config))
检查点将按时间顺序排列,最新的检查点在列表的第一位。
更新状态
除了从特定检查点重放图之外,我们还可以编辑图状态。使用 update_state 方法:
graph.update_state(config, {"foo": 2, "bar": ["b"]})
重要提示:更新会传递给 reducer 函数(如果为某些通道定义了 reducer)。这意味着 update_state 不会自动覆盖每个通道的值,而只覆盖没有 reducer 的通道。
例如,如果状态定义为:
from typing import Annotated
from operator import add
class State(TypedDict):
foo: int
bar: Annotated[list[str], add]
当前状态为 {"foo": 1, "bar": ["a"]},执行更新:
graph.update_state(config, {"foo": 2, "bar": ["b"]})
新状态将是 {"foo": 2, "bar": ["a", "b"]}。foo 键被完全更改,但 bar 键使用 add reducer,所以 “b” 被追加到状态中。
重放(Replay)
也可以重放先前的图执行。如果使用 thread_id 和 checkpoint_id 调用图,那么我们将重放对应于 checkpoint_id 的检查点之前执行的步骤,并且只执行检查点之后的步骤。
config = {
"configurable": {
"thread_id": "1",
"checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
}
}
graph.invoke(None, config=config)
LangGraph 知道特定步骤是否之前已执行。如果已执行,LangGraph 只是重放该步骤而不重新执行。
记忆存储(Memory Store)
状态模式指定了在图执行时填充的一组键。如上所述,状态可以由检查点写入线程。但是,如果我们想在线程之间保留某些信息怎么办?
考虑聊天机器人的情况,我们希望在与该用户的所有聊天对话(即线程)中保留有关用户的特定信息!
仅使用检查点,我们无法跨线程共享信息。这就是 Store 接口的动机。
基本用法
from langgraph.store.memory import InMemoryStore
import uuid
store = InMemoryStore()
# 记忆按元组命名空间
user_id = "1"
namespace_for_memory = (user_id, "memories")
# 保存记忆
memory_id = str(uuid.uuid4())
memory = {"food_preference": "我喜欢披萨"}
store.put(namespace_for_memory, memory_id, memory)
# 读取记忆
memories = store.search(namespace_for_memory)
print(memories[-1].dict())
输出:
{
'value': {'food_preference': '我喜欢披萨'},
'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
'namespace': ['1', 'memories'],
'created_at': '2024-10-02T17:22:31.590602+00:00',
'updated_at': '2024-10-02T17:22:31.590605+00:00'
}
语义搜索
Store 还支持语义搜索,允许根据含义而不是精确匹配查找记忆。要启用此功能,请使用嵌入模型配置 store:
from langchain.embeddings import init_embeddings
store = InMemoryStore(
index={
"embed": init_embeddings("openai:text-embedding-3-small"),
"dims": 1536,
"fields": ["food_preference", "$"]
}
)
# 使用自然语言查询查找记忆
memories = store.search(
namespace_for_memory,
query="用户喜欢吃什么?",
limit=3
)
在 LangGraph 中使用
Store 与检查点协同工作:检查点将状态保存到线程,Store 允许我们存储任意信息以跨线程访问。
from dataclasses import dataclass
from langgraph.checkpoint.memory import MemorySaver
from langgraph.runtime import Runtime
@dataclass
class Context:
user_id: str
checkpointer = MemorySaver()
# 编译图时传入 checkpointer 和 store
builder = StateGraph(MessagesState, context_schema=Context)
# ... 添加节点和边 ...
graph = builder.compile(checkpointer=checkpointer, store=store)
# 在节点中访问 store
async def call_model(state: MessagesState, runtime: Runtime[Context]):
# 从运行时上下文获取用户 ID
user_id = runtime.context.user_id
namespace = (user_id, "memories")
# 搜索记忆
memories = await runtime.store.asearch(
namespace,
query=state["messages"][-1].content,
limit=3
)
# 在模型调用中使用记忆
info = "\n".join([d.value["memory"] for d in memories])
# ... 使用记忆进行模型调用
流式输出(Streaming)
LangGraph 实现了流式系统以提供实时更新。流式输出对于增强基于 LLM 的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流式输出也能显著改善用户体验。
LangGraph 流式输出的能力
- 流式图状态:使用
updates和values模式获取状态更新/值 - 流式子图输出:包括父图和任何嵌套子图的输出
- 流式 LLM 令牌:从任何地方捕获令牌流:节点内部、子图或工具
- 流式自定义数据:直接从工具函数发送自定义更新或进度信号
- 使用多种流式模式:从 values、updates、messages、custom 或 debug 中选择
支持的流式模式
| 模式 | 描述 |
|---|---|
| values | 在图的每个步骤后流式传输状态的完整值 |
| updates | 在图的每个步骤后流式传输状态的更新 |
| custom | 从图节点内部流式传输自定义数据 |
| messages | 从调用 LLM 的任何图节点流式传输 LLM 令牌和元数据的 2 元组 |
| debug | 在图执行过程中流式传输尽可能多的信息 |
基本用法
for chunk in graph.stream(inputs, stream_mode="updates"):
print(chunk)
流式图状态示例
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
topic: str
joke: str
def refine_topic(state: State):
return {"topic": state["topic"] + " 和猫"}
def generate_joke(state: State):
return {"joke": f"这是一个关于 {state['topic']} 的笑话"}
graph = (
StateGraph(State)
.add_node(refine_topic)
.add_node(generate_joke)
.add_edge(START, "refine_topic")
.add_edge("refine_topic", "generate_joke")
.add_edge("generate_joke", END)
.compile()
)
# 流式更新
for chunk in graph.stream(
{"topic": "冰淇淋"},
stream_mode="updates",
):
print(chunk)
输出:
{'refine_topic': {'topic': '冰淇淋和猫'}}
{'generate_joke': {'joke': '这是一个关于冰淇淋和猫的笑话'}}
流式多种模式
可以将列表作为 stream_mode 参数传递以同时流式传输多种模式:
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
print(f"{mode}: {chunk}")
流式 LLM 令牌
使用 messages 流式模式从图的任何部分逐令牌流式传输 LLM 输出:
from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START
@dataclass
class MyState:
topic: str
joke: str = ""
model = init_chat_model(model="gpt-4.1-mini")
def call_model(state: MyState):
"""调用 LLM 生成笑话"""
model_response = model.invoke(
[{"role": "user", "content": f"生成一个关于 {state.topic} 的笑话"}]
)
return {"joke": model_response.content}
graph = (
StateGraph(MyState)
.add_node(call_model)
.add_edge(START, "call_model")
.compile()
)
# 流式 LLM 令牌
for message_chunk, metadata in graph.stream(
{"topic": "冰淇淋"},
stream_mode="messages",
):
if message_chunk.content:
print(message_chunk.content, end="|", flush=True)
按节点过滤
要仅从特定节点流式传输令牌,使用 stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出:
for msg, metadata in graph.stream(
inputs,
stream_mode="messages",
):
if msg.content and metadata["langgraph_node"] == "some_node_name":
print(msg.content, end="|", flush=True)
流式自定义数据
要从 LangGraph 节点或工具内部发送自定义用户定义的数据:
from langgraph.config import get_stream_writer
def node(state: State):
# 获取流写入器以发送自定义数据
writer = get_stream_writer()
# 发出自定义键值对
writer({"custom_key": "在节点内生成自定义数据"})
return {"answer": "一些数据"}
# 设置 stream_mode="custom" 以在流中接收自定义数据
for chunk in graph.stream(inputs, stream_mode="custom"):
print(chunk)
使用任何 LLM
可以使用 stream_mode="custom" 从任何 LLM API 流式传输数据 - 即使该 API 没有实现 LangChain 聊天模型接口:
from langgraph.config import get_stream_writer
def call_arbitrary_model(state):
"""调用任意模型并流式传输输出的示例节点"""
writer = get_stream_writer()
# 使用自定义流式客户端生成 LLM 令牌
for chunk in your_custom_streaming_client(state["topic"]):
writer({"custom_llm_chunk": chunk})
return {"result": "已完成"}
# 设置 stream_mode="custom" 以在流中接收自定义数据
for chunk in graph.stream(
{"topic": "猫"},
stream_mode="custom",
):
print(chunk)
人机协作(Human-in-the-Loop)
检查点使人机协作工作流成为可能,允许人类检查、中断和批准图步骤。
使用 interrupt()
要在执行过程中暂停并获取人工输入,使用 interrupt() 函数:
from langgraph.types import Command, interrupt
def lookup_customer_history(state: State):
if not state.get('customer_id'):
# 暂停并请求用户输入
user_input = interrupt({
"message": "需要客户 ID",
"request": "请提供客户的账户 ID 以查找其订阅历史"
})
return Command(
update={"customer_id": user_input['customer_id']},
goto="lookup_customer_history"
)
# 现在继续查找
customer_data = fetch_customer_history(state['customer_id'])
return Command(
update={"customer_history": customer_data},
goto="draft_response"
)
运行和恢复
# 初始运行 - 将在 interrupt() 处暂停
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)
# 检查中断
print(f"中断信息: {result['__interrupt__']}")
# 准备好后,提供人工输入以恢复
from langgraph.types import Command
human_response = Command(
resume={
"customer_id": "CUST-12345"
}
)
# 恢复执行
final_result = app.invoke(human_response, config)
图在遇到 interrupt() 时暂停,将所有内容保存到检查点,然后等待。它可以在几天后恢复,从中断处继续执行。thread_id 确保此对话的所有状态一起保留。
检查点库
LangGraph 提供了几个检查点实现:
- langgraph-checkpoint:基础接口和内存检查点(InMemorySaver)
- langgraph-checkpoint-sqlite:SQLite 数据库检查点,适合实验和本地工作流
- langgraph-checkpoint-postgres:PostgreSQL 数据库检查点,用于生产环境
- langgraph-checkpoint-cosmosdb:Azure Cosmos DB 检查点,用于 Azure 生产环境
使用 SQLite 检查点
pip install langgraph-checkpoint-sqlite
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver
conn = sqlite3.connect("checkpoint.db")
checkpointer = SqliteSaver(conn)
graph = workflow.compile(checkpointer=checkpointer)
使用 PostgreSQL 检查点
pip install langgraph-checkpoint-postgres
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:password@localhost:5432/dbname"
)
checkpointer.setup()
graph = workflow.compile(checkpointer=checkpointer)
关键要点
-
持久化是核心能力:通过检查点,Agent 可以从故障中恢复,支持长时间运行的工作流
-
线程管理对话:每个对话或任务使用唯一的 thread_id,状态在该线程中持久化
-
记忆跨线程共享:Store 接口允许在多个对话之间保留信息
-
流式输出提升体验:多种流式模式支持实时反馈,从状态更新到 LLM 令牌
-
人机协作是一等公民:interrupt() 函数使暂停和恢复执行变得简单
下一步
在下一篇文章中,我们将探讨:
- 应用结构:如何组织 LangGraph 项目
- 测试策略:如何测试 Agent 和工作流
- 部署选项:如何将 LangGraph 应用部署到生产环境
- 可观测性:如何监控和调试生产中的 Agent
参考资源
内容基于 LangGraph 官方文档改编,遵循内容许可限制