最近做通用 Agent 技术选型上有两个方向: 1. vanilla agent 2. 框架(langgraph agent, crew AI)等。最终考虑到细节上选择了框架方向,具体框架选择了 langgraph。
langgraph 呢是 langchain 团队做的一个底层 Agent 开发框架,其上 langchain 就是基于 langgraph 做的。我对 langchain 的认知还停留在 2023 年,彼时他只是一个简单的链式反应工具,做Agent 很重不够灵活,我呢一直是鄙视的态度看待。用他的产品也是仅用了他的文档处理相关的,比如confluence解析等。
最近他们突然融资到 1.25 亿刀
,
然后工作需要,就去仔细研究了下他的底层 langgraph,这一看扫去了我之前对他的认知。下面细说下
Langgraph 的能力
核心是任务执行编排的能力,周边有比较完善的工具链支撑,其次是与模型相关的基础组件接入,生态上有很多社区应用,其特性也比较好的支撑多agent架构。
具体功能特性:
- Human in the loop
- 节点,边,条件边
- 过程流式输出
- 持久化存储中间数据
设计理念
文档的 Thinking in Langgraph 部分讲了如何将一个任务传统任务落地带 Langgraph里,要先考虑怎么把任务拆成离散的 单点任务, 单点任务之间的流转关系是如何的。
其中单点任务就是 Node 流转关系,对应 Edge。 其中可以使用LLM节点来做些动态判断下一跳,就可以完成 Agent 里的规划了。
使用中央 Store 来存储整个 graph 运行过程中的 State。每个 Node 是一个 Function,输入 State 输出 State,或者输出 Command。Command 是内部的一个模式,Command里包含了State更新内容,及下一跳的目的
优略点
目前来看 langgraph 还是比较适合 workflow 类型的 agent 开发, workflow 的场景比较单任务处理的场景。
不适合对于多会话类型的 Agent,这种一般是需要大量来回交互才把需求明确的场景。或者是需要做不少额外的配置才能支持。这块社区有文章细说传送。
底层核心
作为一个比较灵活的任务编排工具,其核心是 channel + pregel 完成的。是支撑所有 feature 的基石。这块在官方文档里也有提到,是参考了 google 的 pregel 图计算算法。
首先是 pregel 完成图执行 runtime,完成各个节点运行过程,其次集合 channel 完成数据通信,总体状态管理等。
这两块的具体实现是挺复杂的,具体源码及功能分析过程,见附录。是用的 claude-code 工具完成的分析过程
似曾相识
State 的设计:函数式编程的思想。跟当初学习前端开发里 的 React 里的状态库 Rudux 库很相似。也获取所有复杂状态管理都是大概的吧。这块细看的时候总感觉很熟悉,下面看下一些具体的内容
更改State的方式:
# langgraph 里更改 State 的方式
def node_1(state: InputState) -> OverallState:
# Write to OverallState
return {"foo": state["user_input"] + " name"}
// Redux 里更改 State 的方式
function reducer(state = initialState, action) {
switch (action.type) {
case 'SET_FOO':
return { ...state, foo: action.payload };
default:
return state;
}
}
总体都是 state in -> process - > state out 的设计思想
应了那句话:技术思想都大概的,只是在不同的应用场景下有不同的实现罢了
一个孜孜不倦的成功者
最早2年前了,我是很不看好langchain的工具的,原因是他很重,也很繁琐,在 ai 应用发展日新月异的背景下,不适合作为技术选型的。
随着 AI 应用的稳定,langchain 团队算是笃定了 AI 应用的方向,不断的随着大模型背景技术的发展,迭代自己的框架。现在新版本的 langgraph 已经让我刮目相看了。感觉能胜任绝大数的场景开发。
作为 startup 公司也算比较成功了,拿到了很很多投资,可以说是AI发展背景下的一个赢家。其核心一方便是风口好,一方便也是他们坚持不懈的打造自己的产品,赢得了信任。
附录
- 一些基于 langgraph 的 agent
- pregel 核心
> 分析下 @libs/langgraph/langgraph/pregel/ 里的功能
⎿ Listed directory libs/langgraph/langgraph/pregel/
⏺ I'll analyze the functionality in the @libs/langgraph/langgraph/pregel/
directory. Let me explore the codebase to understand what each file does.
⏺ Explore(Analyze pregel module functionality)
⎿ Done (19 tool uses · 0 tokens · 2m 35s)
⏺ Based on my analysis of the @libs/langgraph/langgraph/pregel/ directory,
here's a comprehensive overview of the functionality:
LangGraph Pregel System Analysis
The Pregel system implements a Bulk Synchronous Parallel (BSP) execution
model for LangGraph applications. Here's what each component does:
Core Architecture
Main Entry Point (__init__.py)
- Exports Pregel (main orchestrator) and NodeBuilder (fluent API for node
construction)
Core Execution Components:
- main.py: Contains the main Pregel class that orchestrates graph
execution and NodeBuilder for constructing nodes
- _loop.py: Implements the BSP execution cycle with step-by-step execution
and checkpoint management
- _runner.py: Manages parallel task execution with retry logic and error
handling
- _algo.py: Core BSP algorithm implementation with functions for task
planning, write application, and interruption logic
Execution Model (BSP - Bulk Synchronous Parallel)
The system follows a 3-phase execution model:
1. Plan: Select nodes whose trigger channels have been updated
2. Execute: Run selected nodes in parallel until
completion/failure/timeout
3. Update: Apply node writes to channels for visibility in next step
Channel System
- _read.py: Defines PregelNode and channel reading mechanisms
- _write.py: Handles channel writing operations
- _io.py: Manages input/output operations for the graph
Channel types include:
- LastValue: Stores most recent value (default)
- Topic: PubSub-style with accumulation support
- BinaryOperatorAggregate: Applies binary operators for aggregation
- Context: Manages external resource lifecycles
State Management & Persistence
- _checkpoint.py: Implements checkpoint creation, restoration, and
migration for state persistence
- _validate.py: Validates graph structure, channel connectivity, and
naming conflicts
Supporting Components
- _executor.py: Task execution utilities
- _retry.py: Retry mechanisms for failed operations
- _utils.py: Utility functions
- _config.py: Configuration management
- _messages.py: Message handling between nodes
- _log.py: Logging functionality
- debug.py: Debugging support with task tracking and state snapshots
- remote.py: Remote execution capabilities via LangGraph SDK
- protocol.py: Type definitions and protocols
- types.py: Type definitions
Key Features
- Fault Tolerance: Checkpoint-based state management enables recovery
- Parallel Execution: Nodes execute concurrently within supersteps
- Streaming Support: Real-time output during execution with multiple
streaming modes
- Subgraph Support: Handles nested graph execution
- Interrupt/Resume: Graph execution can be interrupted and resumed
- Remote Execution: Integration with LangGraph SDK for distributed
execution
The system provides a robust foundation for building complex graph-based
applications with strong consistency guarantees, fault tolerance, and
flexible execution models.
> 使用中文回复一次
⏺ 基于我对 @libs/langgraph/langgraph/pregel/ 目录的分析,这里是功能概述:
LangGraph Pregel 系统分析
Pregel 系统为 LangGraph
应用实现了**批量同步并行(BSP)**执行模型。以下是各组件的功能:
核心架构
主入口 (__init__.py)
- 导出 Pregel(主协调器)和 NodeBuilder(节点构建的流畅API)
核心执行组件:
- main.py: 包含主要的 Pregel 类来协调图执行,以及 NodeBuilder 用于构建节点
- _loop.py: 实现 BSP 执行循环,支持逐步执行和检查点管理
- _runner.py: 管理并行任务执行,包含重试逻辑和错误处理
- _algo.py: BSP 算法的核心实现,包含任务规划、写入应用和中断逻辑
执行模型(BSP - 批量同步并行)
系统遵循3阶段执行模型:
1. 规划: 选择触发通道已更新的节点
2. 执行: 并行运行选中的节点直到完成/失败/超时
3. 更新: 将节点写入应用到通道,以便下一步可见
通道系统
- _read.py: 定义 PregelNode 和通道读取机制
- _write.py: 处理通道写入操作
- _io.py: 管理图的输入/输出操作
通道类型包括:
- LastValue: 存储最新值(默认)
- Topic: 支持累积的发布订阅样式
- BinaryOperatorAggregate: 应用二元运算符进行聚合
- Context: 管理外部资源生命周期
状态管理与持久化
- _checkpoint.py: 实现检查点创建、恢复和迁移,用于状态持久化
- _validate.py: 验证图结构、通道连接性和命名冲突
支持组件
- _executor.py: 任务执行工具
- _retry.py: 失败操作的重试机制
- _utils.py: 工具函数
- _config.py: 配置管理
- _messages.py: 节点间消息处理
- _log.py: 日志功能
- debug.py: 调试支持,包含任务跟踪和状态快照
- remote.py: 通过 LangGraph SDK 的远程执行能力
- protocol.py: 类型定义和协议
- types.py: 类型定义
关键特性
- 容错: 基于检查点的状态管理支持恢复
- 并行执行: 节点在超步内并发执行
- 流式支持: 执行期间实时输出,支持多种流模式
- 子图支持: 处理嵌套图执行
- 中断/恢复: 图执行可以被中断和恢复
- 远程执行: 与 LangGraph SDK 集成实现分布式执行
该系统为构建具有强一致性保证、容错能力和灵活执行模型的复杂图应用提供了坚实
基础。
> 中断恢复的核心逻辑是什么
⏺ 我来深入分析 LangGraph Pregel 系统的中断恢复核心逻辑。
⏺ Read(libs/langgraph/langgraph/pregel/_loop.py)
⎿ Read 1321 lines
⏺ Read(libs/langgraph/langgraph/pregel/_algo.py)
⎿ Read 1132 lines
⏺ Read(libs/langgraph/langgraph/pregel/_checkpoint.py)
⎿ Read 89 lines
⏺ Search(pattern: "should_interrupt|interrupt_before|interrupt_after|GraphInterr
upt|_suppress_interrupt", path: "libs/langgraph/langgraph/pregel",
output_mode: "content")
⎿ Found 20 lines (ctrl+o to expand)
⏺ Read(libs/langgraph/langgraph/pregel/_runner.py)
⎿ Read 100 lines
⏺ Search(pattern: "CONFIG_KEY_RESUMING|resume|RESUME", path:
"libs/langgraph/langgraph/pregel", output_mode: "content")
⎿ Found 20 lines (ctrl+o to expand)
⏺ 基于我对 LangGraph Pregel
系统中断恢复机制的深入分析,以下是中断恢复的核心逻辑:
中断恢复的核心逻辑
1. 检查点机制 (Checkpoint System)
核心原理:
- 每个执行步骤都会创建检查点,保存完整的图状态
- 检查点包含:channel_values(通道值)、channel_versions(通道版本)、vers
ions_seen(已见版本)
- 支持增量保存和恢复,只保存变化的通道值
关键函数:
# _checkpoint.py
create_checkpoint() # 创建检查点
channels_from_checkpoint() # 从检查点恢复通道状态
copy_checkpoint() # 复制检查点用于备份
2. 中断检测机制
中断触发条件:
- interrupt_before: 在执行指定节点前中断
- interrupt_after: 在执行指定节点后中断
- 通过 should_interrupt() 函数检测是否需要中断
中断判断逻辑:
# _algo.py:140-170
def should_interrupt(checkpoint, interrupt_nodes, tasks):
# 检查自上次中断以来是否有通道更新
any_updates_since_prev_interrupt = any(
version > seen.get(chan, null_version)
for chan, version in checkpoint["channel_versions"].items()
)
# 如果有更新且触发节点在interrupt_nodes列表中,则中断
3. 中断状态保存
中断信息保存:
- 中断时保存 INTERRUPT 写入到检查点
- 记录中断的任务ID和中断数据
- 通过 put_writes() 将中断信息持久化
关键代码:
# _runner.py:436-442
if isinstance(exception, GraphInterrupt):
# 保存中断到检查点
if exception.args[0]:
writes = [(INTERRUPT, exception.args[0])]
if resumes := [w for w in task.writes if w[0] == RESUME]:
writes.extend(resumes)
self.put_writes()(task.id, writes)
4. 恢复机制
恢复触发条件:
- 通过 Command(resume=...) 显式恢复
- 检测到 CONFIG_KEY_RESUMING 标志
- 输入为 None 且存在历史检查点
恢复过程:
# _loop.py:618-733
def _first(self, input_keys, updated_channels):
# 判断是否从上一个检查点恢复
is_resuming = bool(
self.checkpoint["channel_versions"] and
configurable.get(CONFIG_KEY_RESUMING,
self.input is None or isinstance(self.input, Command))
)
# 如果是恢复状态,处理恢复逻辑
if is_resuming:
# 设置中断版本跟踪
self.checkpoint["versions_seen"].setdefault(INTERRUPT, {})
for k in self.channels:
if k in self.checkpoint["channel_versions"]:
version = self.checkpoint["channel_versions"][k]
self.checkpoint["versions_seen"][INTERRUPT][k] = version
5. 中断与恢复的协调
待处理中断管理:
# _loop.py:588-616
def _pending_interrupts(self):
"""返回没有对应恢复值的待处理中断ID集合"""
pending_interrupts = {}
pending_resumes = set()
for task_id, write_type, value in self.checkpoint_pending_writes:
if write_type == INTERRUPT:
pending_interrupts[task_id] = value[0].id
elif write_type == RESUME:
pending_resumes.add(task_id)
# 只保留未恢复的中断
hanging_interrupts = {
interrupt_id for interrupt_id in pending_interrupts.values()
if interrupt_id not in resumed_interrupt_ids
}
return hanging_interrupts
6. 关键恢复技术点
1. 版本追踪:通过 versions_seen 跟踪每个节点已处理的通道版本
2. 状态一致性:确保恢复时通道状态与中断时完全一致
3. 任务重放:跳过已完成的任务,只执行未完成的任务
4. 写入重放:重新应用中断时的待处理写入
5. 嵌套图支持:支持子图的中断和恢复
7. 恢复流程总结
1. 检测恢复状态:检查是否存在历史检查点和恢复标志
2. 恢复通道状态:从检查点重建所有通道的值和版本
3. 处理待处理写入:重新应用中断时的写入操作
4. 确定执行起点:基于版本追踪决定从哪个步骤继续
5. 跳过已完成任务:避免重复执行已经完成的任务
6. 继续执行:从断点处继续正常的BSP执行循环
这种设计确保了精确一次的执行语义,即使在复杂的中断场景下也能保持一致性和正
确性。
> 是怎么知道具体中断代码地方的?
⏺ 这是个很好的问题!让我深入分析 LangGraph 是如何精确定位中断位置的。
⏺ Search(pattern: "task_path|path.*interrupt|interrupt.*path", path:
"libs/langgraph/langgraph/pregel", output_mode: "content")
⎿ Found 20 lines (ctrl+o to expand)
⏺ Search(pattern: "task_id|task.*id|id.*task", path:
"libs/langgraph/langgraph/pregel", output_mode: "content", head_limit:
30)
⎿ Found 30 lines (ctrl+o to expand)
⏺ Read(libs/langgraph/langgraph/pregel/_algo.py)
⎿ Read 50 lines
⏺ Read(libs/langgraph/langgraph/pregel/_algo.py)
⎿ Read 50 lines
⏺ Search(pattern: "PregelExecutableTask|PregelTask", path:
"libs/langgraph/langgraph/pregel", output_mode: "content", head_limit:
20)
⎿ Found 20 lines (ctrl+o to expand)
⏺ Read(libs/langgraph/langgraph/types.py)
⎿ Read 569 lines
⏺ Search(pattern: "_scratchpad|scratchpad|interrupt_counter", path:
"libs/langgraph/langgraph/pregel", output_mode: "content")
⎿ Found 20 lines (ctrl+o to expand)
⏺ Read(libs/langgraph/langgraph/_internal/_scratchpad.py)
⎿ Read 20 lines
⏺ Search(pattern: "versions_seen|channel_versions", path:
"libs/langgraph/langgraph/pregel", output_mode: "content")
⎿ Found 20 lines (ctrl+o to expand)
⏺ 基于深入分析,我来详细解释 LangGraph 是如何精确定位中断代码位置的:
中断位置精确识别的核心技术
1. 任务路径 (Task Path) 系统
任务路径结构:
# 任务路径是一个元组,唯一标识任务在图中的位置
path: tuple[str | int | tuple, ...]
# 不同类型的任务有不同的路径格式:
# PULL 任务: (PULL, node_name)
# PUSH 任务: (PUSH, parent_task_path, push_write_index, parent_task_id,
Call)
# SEND 任务: (PUSH, send_index)
路径生成:
# _algo.py:785-793
task_id = task_id_func(
checkpoint_id_bytes,
checkpoint_ns,
str(step),
name,
PULL,
*triggers,
)
2. 任务ID生成机制
唯一ID生成:
# 基于任务路径、检查点ID、命名空间等生成唯一ID
# 使用 XXH3 哈希算法确保唯一性和确定性
def _xxhash_str(namespace: bytes, *parts: str | bytes) -> str:
hex = xxh3_128_hexdigest(
namespace + b"".join(p.encode() if isinstance(p, str) else p for p
in parts)
)
return f"{hex[:8]}-{hex[8:12]}-{hex[12:16]}-{hex[16:20]}-{hex[20:32]}"
3. 中断计数器系统
Scratchpad 中断计数:
# _internal/_scratchpad.py:13-19
@dataclass
class PregelScratchpad:
step: int
stop: int
call_counter: Callable[[], int] # 调用计数器
interrupt_counter: Callable[[], int] # 中断计数器
get_null_resume: Callable[[bool], Any]
resume: list[Any]
subgraph_counter: Callable[[], int]
中断位置记录:
# types.py:401-524
def interrupt(value: Any) -> Any:
# 跟踪中断索引
scratchpad = conf[CONFIG_KEY_SCRATCHPAD]
idx = scratchpad.interrupt_counter() # 原子递增
# 检查之前的恢复值
if scratchpad.resume:
if idx < len(scratchpad.resume):
# 如果是恢复状态,返回对应的恢复值
return scratchpad.resume[idx]
# 没有恢复值,抛出中断异常
raise GraphInterrupt((Interrupt.from_ns(value=value,
ns=conf[CONFIG_KEY_CHECKPOINT_NS]),))
4. 版本追踪机制
通道版本管理:
# _algo.py:146-170
def should_interrupt(checkpoint, interrupt_nodes, tasks):
version_type =
type(next(iter(checkpoint["channel_versions"].values()), None))
null_version = version_type()
seen = checkpoint["versions_seen"].get(INTERRUPT, {})
# 检查自上次中断以来是否有通道更新
any_updates_since_prev_interrupt = any(
version > seen.get(chan, null_version)
for chan, version in checkpoint["channel_versions"].items()
)
版本记录:
# _loop.py:683-691
if is_resuming:
self.checkpoint["versions_seen"].setdefault(INTERRUPT, {})
for k in self.channels:
if k in self.checkpoint["channel_versions"]:
version = self.checkpoint["channel_versions"][k]
self.checkpoint["versions_seen"][INTERRUPT][k] = version
5. 检查点命名空间系统
命名空间层次:
# 格式: "parent_ns:node_name:task_id"
checkpoint_ns = f"{parent_ns}{NS_SEP}{name}" if parent_ns else name
task_checkpoint_ns = f"{checkpoint_ns}:{task_id}"
6. 中断位置精确定位流程
1. 任务执行时:每个任务有唯一的 task_id 和 path
2. 中断发生时:interrupt() 函数被调用
3. 位置记录:中断计数器记录这是第几个中断
4. 异常抛出:GraphInterrupt 包含 Interrupt 对象,带有:
- value: 中断值
- id: 基于命名空间生成的唯一ID
5. 状态保存:中断信息写入检查点,关联到具体任务ID
6. 恢复时:通过任务ID和中断计数器精确定位中断位置
7. 多中断处理
中断序列管理:
# _loop.py:588-616
def _pending_interrupts(self):
"""返回没有对应恢复值的待处理中断ID集合"""
pending_interrupts = {}
pending_resumes = set()
for task_id, write_type, value in self.checkpoint_pending_writes:
if write_type == INTERRUPT:
pending_interrupts[task_id] = value[0].id
elif write_type == RESUME:
pending_resumes.add(task_id)
# 只保留未恢复的中断
hanging_interrupts = {
interrupt_id for interrupt_id in pending_interrupts.values()
if interrupt_id not in resumed_interrupt_ids
}
return hanging_interrupts
总结
LangGraph 通过以下机制精确识别中断位置:
1. 任务路径:唯一标识任务在图结构中的位置
2. 任务ID:基于路径、检查点ID等生成的唯一标识符
3. 中断计数器:记录每个任务内的中断顺序
4. 版本追踪:确保恢复时状态一致性
5. 命名空间:支持嵌套图的中断定位
6. 检查点持久化:将中断信息与任务ID绑定保存
这套系统确保了即使在复杂的嵌套图结构中,也能精确地知道在哪个节点的哪一行代
码发生了中断,并在恢复时准确回到该位置继续执行。
> /context
⎿
Context Usage
⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛀ ⛀ ⛀ ⛁ claude-sonnet-4-5-20250929 · 111k/200k tokens (56%)
⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁
⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ ⛁ System prompt: 2.3k tokens (1.2%)
⛁ ⛁ ⛁ ⛁ ⛁ ⛶ ⛶ ⛶ ⛶ ⛶ ⛁ System tools: 11.3k tokens (5.7%)
⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛁ MCP tools: 200 tokens (0.1%)
⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛁ Custom agents: 63 tokens (0.0%)
⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛁ Messages: 52.2k tokens (26.1%)
⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛶ ⛝ ⛝ ⛝ ⛶ Free space: 89k (44.5%)
⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ Autocompact buffer: 45.0k tokens (22.5%)
⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝ ⛝
MCP tools · /mcp
└ mcp__ide__getDiagnostics (ide): 70 tokens
└ mcp__ide__executeCode (ide): 130 tokens
Custom agents · /agents
└ git-commit-push (User): 63 tokens
SlashCommand Tool · 6 commands
└ Total: 892 tokens
> /cost
⎿ Total cost: $0.0000
Total duration (API): 6m 54s
Total duration (wall): 22m 47s
Total code changes: 0 lines added, 0 lines removed
Usage by model:
claude-sonnet: 0 input, 0 output, 0 cache read, 0 cache write
($0.0000)
claude-haiku: 0 input, 0 output, 0 cache read, 0 cache write
($0.0000)
- channel 分析过程
> 分析下 @libs/langgraph/langgraph/channels/ 里的核心功能
⎿ Listed directory libs/langgraph/langgraph/channels/
⏺ I'll analyze the core functionality in the
@libs/langgraph/langgraph/channels/ directory. Let me examine the files to
understand what channels are and how they work in LangGraph.
⏺ Read(libs/langgraph/langgraph/channels/base.py)
⎿ Read 106 lines
⏺ Read(libs/langgraph/langgraph/channels/__init__.py)
⎿ Read 28 lines
⏺ Read(libs/langgraph/langgraph/channels/last_value.py)
⎿ Read 152 lines
⏺ Read(libs/langgraph/langgraph/channels/topic.py)
⎿ Read 95 lines
⏺ Read(libs/langgraph/langgraph/channels/binop.py)
⎿ Read 135 lines
⏺ Read(libs/langgraph/langgraph/channels/ephemeral_value.py)
⎿ Read 80 lines
⏺ Read(libs/langgraph/langgraph/channels/named_barrier_value.py)
⎿ Read 168 lines
⏺ Read(libs/langgraph/langgraph/channels/any_value.py)
⎿ Read 73 lines
⏺ Read(libs/langgraph/langgraph/channels/untracked_value.py)
⎿ Read 74 lines
⏺ LangGraph Channels 核心功能分析
基于对 @libs/langgraph/langgraph/channels/ 目录的分析,这里是 LangGraph
中通道(Channels)系统的核心功能:
核心概念
通道(Channel) 是 LangGraph 中用于在图的节点之间传递状态和数据的核心抽象
。它们提供了状态管理、持久化和并发控制机制。
基础架构
BaseChannel (base.py)
- 抽象基类:定义了所有通道必须实现的接口
- 泛型设计:支持
Value(值类型)、Update(更新类型)、Checkpoint(检查点类型)
- 核心方法:
- get() - 获取当前值
- update() - 接收更新序列
- checkpoint() - 序列化状态
- from_checkpoint() - 从检查点恢复
- consume() - 消费通知
- finish() - 完成通知
通道类型及用途
1. LastValue (last_value.py)
- 功能:存储最后接收到的值,每步最多接收一个值
- 用途:简单的状态保持,适合单值更新的场景
- 特点:严格的单值约束,多个更新会抛出异常
2. LastValueAfterFinish
- 功能:类似 LastValue,但值只在 finish() 调用后才可用
- 用途:延迟发布模式,确保所有处理完成后再提供值
- 特点:一次性使用,获取后自动清除
3. Topic (topic.py)
- 功能:发布-订阅主题,支持累积或非累积模式
- 用途:消息广播、事件流处理
- 特点:
- accumulate=True:跨步骤累积消息
- accumulate=False:每步后清空消息
- 支持单个值或列表值作为更新
4. BinaryOperatorAggregate (binop.py)
- 功能:使用二元操作符聚合值
- 用途:累加器、计数器、状态聚合
- 特点:
- 支持自定义操作符(如 operator.add)
- 支持 Overwrite 特殊值进行重置
- 自动处理类型转换(如 Sequence → list)
5. EphemeralValue (ephemeral_value.py)
- 功能:存储上一步的值,当前步后清除
- 用途:临时状态传递、中间结果缓存
- 特点:生命周期仅限相邻步骤之间
6. NamedBarrierValue (named_barrier_value.py)
- 功能:等待所有指定名称的值都到达后才可用
- 用途:同步屏障、多源数据聚合
- 特点:
- 必须收集所有指定值才能获取
- 获取后自动重置(可选)
- 支持 AfterFinish 变体
7. AnyValue (any_value.py)
- 功能:存储最后值,假设多个值都相等
- 用途:并发安全的状态更新
- 特点:不验证值相等性,直接取最后一个
8. UntrackedValue (untracked_value.py)
- 功能:存储最后值但不参与检查点
- 用途:临时状态、不需要持久化的数据
- 特点:checkpoint() 始终返回 MISSING
设计模式
1. 状态生命周期管理:
- 持久化状态(LastValue, BinaryOperatorAggregate)
- 临时状态(EphemeralValue, UntrackedValue)
- 累积状态(Topic with accumulate=True)
2. 并发控制:
- 严格单值(LastValue)
- 多值聚合(Topic, BinaryOperatorAggregate)
- 屏障同步(NamedBarrierValue)
3. 错误处理:
- EmptyChannelError - 空通道访问
- InvalidUpdateError - 无效更新
- 类型安全验证
使用场景
- 工作流状态管理:LastValue 用于保持工作流状态
- 消息传递:Topic 用于节点间通信
- 数据聚合:BinaryOperatorAggregate 用于统计和累加
- 同步控制:NamedBarrierValue 用于多分支汇聚
- 临时缓存:EphemeralValue 用于中间结果传递
这个通道系统为 LangGraph
提供了灵活而强大的状态管理和数据流控制能力,支持复杂的图计算场景。