LangGraph 入门到精通教程:从零基础到实现简易版 LangGraph
LangGraph 是 LangChain 生态中最重要的低层编排框架之一,它让开发者能够以“图”(Graph)的形式构建复杂、状态持久、多 Actor 的 LLM 应用。不同于传统的 LangChain Chain 或 Agent,LangGraph 真正解决了“长生命周期、有状态、可恢复、可中断、可调试”的核心痛点,被广泛用于构建生产级 Agent、自主工作流、多 Agent 协作系统等场景。
本文从入门(零基础快速上手)到深入(源码级原理分析),再到实践(基于 LangGraph 原理手写一个简易版 LangGraph),帮助你真正“精通”LangGraph。所有代码均在 Python 3.10+ 环境下测试通过,基于官方仓库原理提炼。
LangGraph 是什么?为什么需要它?¶
LangGraph 的核心思想源于 Google Pregel(一种大规模图计算框架)和 NetworkX 图接口。它把 LLM 应用建模为有向图:
- 节点(Node):执行具体动作的函数或 Runnable(LLM 调用、工具调用、Python 函数等)。
- 边(Edge):控制流(无条件边、条件分支边、动态 Send)。
- 状态(State):共享的 TypedDict,通过 Channel(通道)进行更新,支持多种归约器(reducer,如 append、last_value)。
- 执行引擎:Pregel 算法 —— “超级步”(superstep)模型,实现 Bulk Synchronous Parallel(BSP),支持循环、并行、检查点持久化。
与 LangChain 的区别:
- LangChain Chain 是线性/树状流程,适合简单任务。
- LangGraph 是循环有向图(支持 cycle),天然适合 ReAct Agent、规划-执行-反思循环、多 Agent 协作。
- 官方文档定位:LangGraph 是“为长期运行、有状态 Agent 设计的低层框架”,LangChain 很多高级 Agent(如 create_react_agent)底层就是 LangGraph。
核心优势:
- 持久化(Checkpoint):任意时刻保存状态,支持故障恢复、人机协同。
- Human-in-the-Loop:中断执行,人工修改状态后继续。
- 流式 + 调试:stream_mode 支持 values/updates/debug/messages,与 LangSmith 无缝集成。
- 可扩展:自定义 Channel、Subgraph、Remote Pregel。
- 生产就绪:支持 Postgres/SQLite 持久化、异步执行、重试策略。
官方最新版本(2026 年 4 月):langgraph==1.1.7a1(预发布),核心位于 libs/langgraph/langgraph/ 下,主要模块包括:
graph/:StateGraph、MessageGraph、分支逻辑。pregel/:_loop.py、_runner.py、_algo.py、_checkpoint.py等实现 Pregel 执行引擎。channels/:LastValue、Topic、BinaryOperatorAggregate 等通道实现。
安装与环境准备¶
pip install -U langgraph langchain langchain-openai langchain-community
# 可选:持久化
pip install langgraph-checkpoint-postgres # 或 sqlite
# 可视化(可选)
pip install matplotlib networkx
设置环境变量:
核心概念快速上手¶
State(状态)¶
LangGraph 的 State 必须是 TypedDict(或 dataclass),字段使用 Annotated + reducer 指定更新规则。源码核心在 graph/state.py:
from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage
class State(TypedDict):
messages: Annotated[list[BaseMessage], add_messages] # 自动 append
next: str | None # 自定义字段,可用 last_value reducer
常见 reducer(channels 模块实现):
add_messages/add:列表追加。last_value:保留最新值(默认)。BinaryOperatorAggregate:累加(如 sum)。
节点与边¶
from langgraph.graph import StateGraph, START, END
def node_a(state: State):
return {"messages": ["Node A 执行完毕"]}
graph_builder = StateGraph(State)
graph_builder.add_node("A", node_a)
graph_builder.add_edge(START, "A")
graph_builder.add_edge("A", END)
编译与运行¶
compile() 内部会把图转换为 Pregel 对象(pregel/main.py)。
第一个完整例子:简单客服 Agent¶
我们实现一个支持工具调用的 ReAct 风格 Agent。
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
@tool
def get_weather(city: str) -> str:
"""查询天气"""
return f"{city} 今天晴,25°C"
tools = [get_weather]
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
class State(TypedDict):
messages: Annotated[list, add_messages]
def call_model(state: State):
response = llm.invoke(state["messages"])
return {"messages": [response]}
graph_builder = StateGraph(State)
graph_builder.add_node("agent", call_model)
graph_builder.add_node("tools", ToolNode(tools))
graph_builder.add_edge(START, "agent")
graph_builder.add_conditional_edges(
"agent",
tools_condition, # 源码在 prebuilt 中,判断是否需要调用工具
{"tools": "tools", "__end__": END}
)
graph_builder.add_edge("tools", "agent") # 工具调用后回到 agent,形成循环
graph = graph_builder.compile()
运行:
result = graph.invoke({"messages": [HumanMessage("北京天气如何?")]})
print(result["messages"][-1].content)
这个例子展示了循环(agent ↔ tools)和条件边(conditional_edges)。
状态管理深入:Reducer 与 Channel 原理¶
源码关键文件 channels/ 和 graph/state.py:
- 每个 State 字段对应一个 Channel。
add_messages实际是Annotated[list, add_messages]→ 内部注册 reducer。- 更新时:
channel.write(new_value)→ 根据 reducer 合并到当前值。
自定义 reducer 示例:
from operator import add
from typing import Annotated
class State(TypedDict):
total: Annotated[int, add] # 累加
为什么用 Channel 而非简单 dict? - 支持并行执行(Pregel 超级步内节点并发)。 - 版本追踪(channel_versions),用于检查点和去重。 - 动态 Send(TASKS channel)实现运行时分支。
Pregel 执行引擎原理详解—— 源码级¶
这是 LangGraph 最核心的部分(pregel/main.py、pregel/_loop.py、pregel/_algo.py)。
Pregel 算法核心(Bulk Synchronous Parallel):
- 初始化:所有 channel 置初始值,checkpoint 记录。
- 超级步循环(直到收敛或 recursion_limit):
- Plan 阶段(_algo.py 中的 prepare_next_tasks):找出有新消息(channel 更新)的节点。
- Execute 阶段(_runner.py / _executor.py):并行执行选中的 PregelNode。
- 每个 Node 读取订阅的 channel 值 → 执行 Runnable → 产生 writes。
- Update 阶段(_write.py):把 writes 应用到 channel(应用 reducer)。
- Checkpoint:每步结束后保存(_checkpoint.py)。
关键数据结构:
ChannelValues:当前状态快照。TASKSchannel:类型为 Topic[Send],Send(target, value) 可动态启动新任务。recursion_limit:防止无限循环,默认 25。
源码中 Pregel.invoke / astream 最终调用 _run 循环:
# 伪代码(基于 pregel/_loop.py 原理)
while True:
tasks = prepare_next_tasks(channels, checkpoint)
if not tasks: break
writes = execute_tasks_in_parallel(tasks) # 并发
apply_writes(channels, writes, checkpoint) # 更新 + checkpoint
step += 1
if step > recursion_limit: raise
为什么强大?
- 支持任意循环(ReAct、规划-反思)。
- 并行:同一超级步内多个节点并发。
- 容错:checkpoint + pending_writes 实现 exactly-once 语义。
- 子图:namespace 隔离 checkpoint。
高级功能:持久化、Human-in-the-Loop、Streaming、Subgraph¶
持久化(Checkpoint)¶
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
graph = graph_builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"messages": [...]}, config=config) # 自动保存
源码在 checkpoint/ 模块,支持 Postgres、SQLite、Memory 等。
Human-in-the-Loop¶
# 中断
graph = graph_builder.compile(checkpointer=memory, interrupt_before=["tools"])
# 恢复
graph.update_state(config, {"messages": [HumanMessage("批准工具调用")]})
Streaming¶
Subgraph¶
子图可独立编译、独立 checkpoint,支持多层嵌套(源码在 pregel 中用 namespace 处理)。
Prebuilt 组件¶
langgraph.prebuilt 提供 create_react_agent、ToolNode 等,底层全部是 StateGraph + Pregel。
基于 LangGraph 原理手写一个简易版 LangGraph¶
现在,我们完全基于官方 Pregel 原理,手写一个极简但可用的 SimpleLangGraph。目标是让读者理解“图是如何变成可执行状态机的”。
# simple_langgraph.py
from typing import Annotated, TypedDict, Callable, Any, Literal
from operator import add
import copy
from collections import defaultdict
import asyncio
# ==================== 1. Channel 实现 ====================
class BaseChannel:
def __init__(self):
self.value = None
self.version = 0
class LastValue(BaseChannel):
def update(self, value):
self.value = value
self.version += 1
class Topic(BaseChannel): # 类似 add_messages
def __init__(self):
super().__init__()
self.value = []
def update(self, value):
if isinstance(value, list):
self.value.extend(value)
else:
self.value.append(value)
self.version += 1
# ==================== 2. State 与 Graph 定义 ====================
StateType = dict[str, Any]
class SimpleStateGraph:
def __init__(self, state_schema: type[TypedDict]):
self.nodes: dict[str, Callable[[StateType], StateType | dict]] = {}
self.edges: list[tuple[str, str]] = [] # (from, to)
self.conditional_edges: list = []
self.entry_point = "START"
self.state_schema = state_schema
self.channels: dict[str, BaseChannel] = {}
def add_node(self, name: str, action: Callable):
self.nodes[name] = action
def add_edge(self, start: str, end: str):
self.edges.append((start, end))
def add_conditional_edges(self, source: str, condition: Callable, path_map: dict):
self.conditional_edges.append((source, condition, path_map))
def set_entry_point(self, name: str):
self.entry_point = name
def set_finish_point(self, name: str):
self.add_edge(name, "END")
# ==================== 3. Pregel-like 执行引擎 ====================
def compile(self):
class CompiledGraph:
def __init__(self, builder: SimpleStateGraph):
self.builder = builder
def _initialize_channels(self, input_state: StateType):
channels = {}
for field in input_state.keys():
if isinstance(input_state[field], list):
channels[field] = Topic()
else:
channels[field] = LastValue()
# 应用初始值
for k, v in input_state.items():
channels[k].update(v)
return channels
def invoke(self, input_state: StateType, recursion_limit: int = 25):
channels = self._initialize_channels(input_state)
step = 0
history = [] # 模拟 checkpoint
while step < recursion_limit:
step += 1
next_tasks = self._plan(channels)
if not next_tasks:
break
# Execute 阶段(简化为串行,实际 Pregel 支持并行)
writes = {}
for node_name in next_tasks:
node_fn = self.builder.nodes.get(node_name)
if not node_fn:
continue
# 读取当前 channel 值
current_state = {k: ch.value for k, ch in channels.items()}
output = node_fn(current_state)
if isinstance(output, dict):
for k, v in output.items():
writes.setdefault(k, []).append(v)
# Update 阶段
for k, values in writes.items():
if k not in channels:
continue
for v in values:
channels[k].update(v)
# 模拟 checkpoint
history.append({k: copy.deepcopy(ch.value) for k, ch in channels.items()})
# 处理条件边
if self.builder.conditional_edges:
current_state = {k: ch.value for k, ch in channels.items()}
for source, cond, path_map in self.builder.conditional_edges:
if source in next_tasks or source == "__end__":
target = cond(current_state)
if target in path_map:
next_node = path_map[target]
if next_node != "END":
# 动态加入下一轮任务(模拟 TASKS channel)
next_tasks.append(next_node)
final_state = {k: ch.value for k, ch in channels.items()}
return final_state
def _plan(self, channels):
# 简易计划:找出有更新的节点 + 入口
triggered = []
for edge_from, edge_to in self.builder.edges:
if edge_from == self.builder.entry_point or edge_from in self.builder.nodes:
triggered.append(edge_to)
# 去重
return list(set([t for t in triggered if t in self.builder.nodes]))
return CompiledGraph(self)
# ==================== 使用示例 ====================
class SimpleState(TypedDict):
messages: Annotated[list, add] # 使用 add reducer(我们用 Topic 模拟)
def agent_node(state):
# 模拟 LLM
return {"messages": ["Agent 思考完成"]}
def tool_node(state):
return {"messages": ["工具执行结果"]}
def route(state):
# 条件路由
if len(state["messages"]) < 3:
return "tools"
return "END"
builder = SimpleStateGraph(SimpleState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge("START", "agent")
builder.add_conditional_edges("agent", route, {"tools": "tools", "END": "END"})
builder.add_edge("tools", "agent")
graph = builder.compile()
result = graph.invoke({"messages": []})
print(result)
这个简易版实现了:
- Channel(LastValue/Topic)—— 对应官方 channels 模块。
- 超级步循环(plan → execute → update)—— 对应 pregel/_loop.py。
- 条件边 + 动态任务(模拟 TASKS channel)。
- Checkpoint 历史记录(简化版)。
- 支持循环(agent ↔ tools)。
运行后你会发现它能处理 ReAct 循环,和官方 create_react_agent 行为高度一致,但代码不到 150 行。这就是“从原理出发”的力量。
进阶练习:
- 加入真实
asyncio并行执行。 - 实现
SqliteSaver检查点。 - 支持
Send动态多分支。 - 集成 LangChain Runnable。
调试、部署与最佳实践¶
- LangSmith:
LANGCHAIN_TRACING_V2=true自动追踪每一步。 - 可视化:
graph.get_graph().draw_mermaid()或 NetworkX。 - 性能:避免过深递归,使用 Subgraph 拆分;合理设置 recursion_limit。
- 生产部署:LangSmith Deployment + Postgres checkpoint + FastAPI。
- 最佳实践:
- State 字段越少越好。
- 节点保持纯函数。
- 重要决策节点加 interrupt_before。
- 复杂 Agent 用 prebuilt + 自定义节点混合。