Skip to content
OpenInfoHub
Go back

LangGraph 核心特性详解:持久化、流式输出与人机协作

在上一篇文章中,我们介绍了 LangGraph 的基础概念和快速开始。本文将深入探讨 LangGraph 的核心特性,这些特性使其成为构建生产级 Agent 系统的强大工具。

目录

持久化(Persistence)

LangGraph 内置了持久化层,通过检查点(checkpointer)实现。当使用检查点编译图时,检查点会在每个超级步骤保存图状态的快照。这些检查点保存到线程中,可以在图执行后访问。

线程(Threads)

线程是分配给检查点保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。

调用带有检查点的图时,必须在配置的 configurable 部分指定 thread_id

config = {"configurable": {"thread_id": "1"}}

检查点(Checkpoints)

线程在特定时间点的状态称为检查点。检查点是在每个超级步骤保存的图状态快照,由 StateSnapshot 对象表示,具有以下关键属性:

让我们看一个简单的例子:

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from typing import Annotated, TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}

workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = MemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": "", "bar": []}, config)

运行图后,我们会看到 4 个检查点:

  1. 空检查点,START 作为下一个要执行的节点
  2. 包含用户输入的检查点,node_a 作为下一个节点
  3. 包含 node_a 输出的检查点,node_b 作为下一个节点
  4. 包含 node_b 输出的检查点,没有下一个节点

获取状态

与保存的图状态交互时,必须指定线程标识符。可以通过调用 graph.get_state(config) 查看图的最新状态:

# 获取最新状态快照
config = {"configurable": {"thread_id": "1"}}
state = graph.get_state(config)

# 获取特定检查点的状态快照
config = {
    "configurable": {
        "thread_id": "1",
        "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"
    }
}
state = graph.get_state(config)

获取状态历史

可以通过调用 graph.get_state_history(config) 获取给定线程的完整图执行历史:

config = {"configurable": {"thread_id": "1"}}
history = list(graph.get_state_history(config))

检查点将按时间顺序排列,最新的检查点在列表的第一位。

更新状态

除了从特定检查点重放图之外,我们还可以编辑图状态。使用 update_state 方法:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

重要提示:更新会传递给 reducer 函数(如果为某些通道定义了 reducer)。这意味着 update_state 不会自动覆盖每个通道的值,而只覆盖没有 reducer 的通道。

例如,如果状态定义为:

from typing import Annotated
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

当前状态为 {"foo": 1, "bar": ["a"]},执行更新:

graph.update_state(config, {"foo": 2, "bar": ["b"]})

新状态将是 {"foo": 2, "bar": ["a", "b"]}foo 键被完全更改,但 bar 键使用 add reducer,所以 “b” 被追加到状态中。

重放(Replay)

也可以重放先前的图执行。如果使用 thread_idcheckpoint_id 调用图,那么我们将重放对应于 checkpoint_id 的检查点之前执行的步骤,并且只执行检查点之后的步骤。

config = {
    "configurable": {
        "thread_id": "1",
        "checkpoint_id": "0c62ca34-ac19-445d-bbb0-5b4984975b2a"
    }
}
graph.invoke(None, config=config)

LangGraph 知道特定步骤是否之前已执行。如果已执行,LangGraph 只是重放该步骤而不重新执行。

记忆存储(Memory Store)

状态模式指定了在图执行时填充的一组键。如上所述,状态可以由检查点写入线程。但是,如果我们想在线程之间保留某些信息怎么办?

考虑聊天机器人的情况,我们希望在与该用户的所有聊天对话(即线程)中保留有关用户的特定信息!

仅使用检查点,我们无法跨线程共享信息。这就是 Store 接口的动机。

基本用法

from langgraph.store.memory import InMemoryStore
import uuid

store = InMemoryStore()

# 记忆按元组命名空间
user_id = "1"
namespace_for_memory = (user_id, "memories")

# 保存记忆
memory_id = str(uuid.uuid4())
memory = {"food_preference": "我喜欢披萨"}
store.put(namespace_for_memory, memory_id, memory)

# 读取记忆
memories = store.search(namespace_for_memory)
print(memories[-1].dict())

输出:

{
    'value': {'food_preference': '我喜欢披萨'},
    'key': '07e0caf4-1631-47b7-b15f-65515d4c1843',
    'namespace': ['1', 'memories'],
    'created_at': '2024-10-02T17:22:31.590602+00:00',
    'updated_at': '2024-10-02T17:22:31.590605+00:00'
}

语义搜索

Store 还支持语义搜索,允许根据含义而不是精确匹配查找记忆。要启用此功能,请使用嵌入模型配置 store:

from langchain.embeddings import init_embeddings

store = InMemoryStore(
    index={
        "embed": init_embeddings("openai:text-embedding-3-small"),
        "dims": 1536,
        "fields": ["food_preference", "$"]
    }
)

# 使用自然语言查询查找记忆
memories = store.search(
    namespace_for_memory,
    query="用户喜欢吃什么?",
    limit=3
)

在 LangGraph 中使用

Store 与检查点协同工作:检查点将状态保存到线程,Store 允许我们存储任意信息以跨线程访问。

from dataclasses import dataclass
from langgraph.checkpoint.memory import MemorySaver
from langgraph.runtime import Runtime

@dataclass
class Context:
    user_id: str

checkpointer = MemorySaver()

# 编译图时传入 checkpointer 和 store
builder = StateGraph(MessagesState, context_schema=Context)
# ... 添加节点和边 ...
graph = builder.compile(checkpointer=checkpointer, store=store)

# 在节点中访问 store
async def call_model(state: MessagesState, runtime: Runtime[Context]):
    # 从运行时上下文获取用户 ID
    user_id = runtime.context.user_id
    namespace = (user_id, "memories")
    
    # 搜索记忆
    memories = await runtime.store.asearch(
        namespace,
        query=state["messages"][-1].content,
        limit=3
    )
    
    # 在模型调用中使用记忆
    info = "\n".join([d.value["memory"] for d in memories])
    # ... 使用记忆进行模型调用

流式输出(Streaming)

LangGraph 实现了流式系统以提供实时更新。流式输出对于增强基于 LLM 的应用程序的响应性至关重要。通过逐步显示输出,即使在完整响应准备好之前,流式输出也能显著改善用户体验。

LangGraph 流式输出的能力

支持的流式模式

模式描述
values在图的每个步骤后流式传输状态的完整值
updates在图的每个步骤后流式传输状态的更新
custom从图节点内部流式传输自定义数据
messages从调用 LLM 的任何图节点流式传输 LLM 令牌和元数据的 2 元组
debug在图执行过程中流式传输尽可能多的信息

基本用法

for chunk in graph.stream(inputs, stream_mode="updates"):
    print(chunk)

流式图状态示例

from typing import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    topic: str
    joke: str

def refine_topic(state: State):
    return {"topic": state["topic"] + " 和猫"}

def generate_joke(state: State):
    return {"joke": f"这是一个关于 {state['topic']} 的笑话"}

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

# 流式更新
for chunk in graph.stream(
    {"topic": "冰淇淋"},
    stream_mode="updates",
):
    print(chunk)

输出:

{'refine_topic': {'topic': '冰淇淋和猫'}}
{'generate_joke': {'joke': '这是一个关于冰淇淋和猫的笑话'}}

流式多种模式

可以将列表作为 stream_mode 参数传递以同时流式传输多种模式:

for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    print(f"{mode}: {chunk}")

流式 LLM 令牌

使用 messages 流式模式从图的任何部分逐令牌流式传输 LLM 输出:

from dataclasses import dataclass
from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START

@dataclass
class MyState:
    topic: str
    joke: str = ""

model = init_chat_model(model="gpt-4.1-mini")

def call_model(state: MyState):
    """调用 LLM 生成笑话"""
    model_response = model.invoke(
        [{"role": "user", "content": f"生成一个关于 {state.topic} 的笑话"}]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# 流式 LLM 令牌
for message_chunk, metadata in graph.stream(
    {"topic": "冰淇淋"},
    stream_mode="messages",
):
    if message_chunk.content:
        print(message_chunk.content, end="|", flush=True)

按节点过滤

要仅从特定节点流式传输令牌,使用 stream_mode="messages" 并通过流式元数据中的 langgraph_node 字段过滤输出:

for msg, metadata in graph.stream(
    inputs,
    stream_mode="messages",
):
    if msg.content and metadata["langgraph_node"] == "some_node_name":
        print(msg.content, end="|", flush=True)

流式自定义数据

要从 LangGraph 节点或工具内部发送自定义用户定义的数据:

from langgraph.config import get_stream_writer

def node(state: State):
    # 获取流写入器以发送自定义数据
    writer = get_stream_writer()
    # 发出自定义键值对
    writer({"custom_key": "在节点内生成自定义数据"})
    return {"answer": "一些数据"}

# 设置 stream_mode="custom" 以在流中接收自定义数据
for chunk in graph.stream(inputs, stream_mode="custom"):
    print(chunk)

使用任何 LLM

可以使用 stream_mode="custom" 从任何 LLM API 流式传输数据 - 即使该 API 没有实现 LangChain 聊天模型接口:

from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """调用任意模型并流式传输输出的示例节点"""
    writer = get_stream_writer()
    
    # 使用自定义流式客户端生成 LLM 令牌
    for chunk in your_custom_streaming_client(state["topic"]):
        writer({"custom_llm_chunk": chunk})
    
    return {"result": "已完成"}

# 设置 stream_mode="custom" 以在流中接收自定义数据
for chunk in graph.stream(
    {"topic": ""},
    stream_mode="custom",
):
    print(chunk)

人机协作(Human-in-the-Loop)

检查点使人机协作工作流成为可能,允许人类检查、中断和批准图步骤。

使用 interrupt()

要在执行过程中暂停并获取人工输入,使用 interrupt() 函数:

from langgraph.types import Command, interrupt

def lookup_customer_history(state: State):
    if not state.get('customer_id'):
        # 暂停并请求用户输入
        user_input = interrupt({
            "message": "需要客户 ID",
            "request": "请提供客户的账户 ID 以查找其订阅历史"
        })
        return Command(
            update={"customer_id": user_input['customer_id']},
            goto="lookup_customer_history"
        )
    
    # 现在继续查找
    customer_data = fetch_customer_history(state['customer_id'])
    return Command(
        update={"customer_history": customer_data},
        goto="draft_response"
    )

运行和恢复

# 初始运行 - 将在 interrupt() 处暂停
config = {"configurable": {"thread_id": "customer_123"}}
result = app.invoke(initial_state, config)

# 检查中断
print(f"中断信息: {result['__interrupt__']}")

# 准备好后,提供人工输入以恢复
from langgraph.types import Command

human_response = Command(
    resume={
        "customer_id": "CUST-12345"
    }
)

# 恢复执行
final_result = app.invoke(human_response, config)

图在遇到 interrupt() 时暂停,将所有内容保存到检查点,然后等待。它可以在几天后恢复,从中断处继续执行。thread_id 确保此对话的所有状态一起保留。

检查点库

LangGraph 提供了几个检查点实现:

使用 SQLite 检查点

pip install langgraph-checkpoint-sqlite
import sqlite3
from langgraph.checkpoint.sqlite import SqliteSaver

conn = sqlite3.connect("checkpoint.db")
checkpointer = SqliteSaver(conn)

graph = workflow.compile(checkpointer=checkpointer)

使用 PostgreSQL 检查点

pip install langgraph-checkpoint-postgres
from langgraph.checkpoint.postgres import PostgresSaver

checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:password@localhost:5432/dbname"
)
checkpointer.setup()

graph = workflow.compile(checkpointer=checkpointer)

关键要点

  1. 持久化是核心能力:通过检查点,Agent 可以从故障中恢复,支持长时间运行的工作流

  2. 线程管理对话:每个对话或任务使用唯一的 thread_id,状态在该线程中持久化

  3. 记忆跨线程共享:Store 接口允许在多个对话之间保留信息

  4. 流式输出提升体验:多种流式模式支持实时反馈,从状态更新到 LLM 令牌

  5. 人机协作是一等公民:interrupt() 函数使暂停和恢复执行变得简单

下一步

在下一篇文章中,我们将探讨:

参考资源


内容基于 LangGraph 官方文档改编,遵循内容许可限制


Share this post on:

Previous Post
LangGraph 项目开发与部署实战指南
Next Post
LangGraph 入门指南:构建有状态的 AI Agent