跳转至

LangGraph 入门到精通教程:从零基础到实现简易版 LangGraph

LangGraph 是 LangChain 生态中最重要的低层编排框架之一,它让开发者能够以“图”(Graph)的形式构建复杂、状态持久、多 Actor 的 LLM 应用。不同于传统的 LangChain Chain 或 Agent,LangGraph 真正解决了“长生命周期、有状态、可恢复、可中断、可调试”的核心痛点,被广泛用于构建生产级 Agent、自主工作流、多 Agent 协作系统等场景。

本文从入门(零基础快速上手)到深入(源码级原理分析),再到实践(基于 LangGraph 原理手写一个简易版 LangGraph),帮助你真正“精通”LangGraph。所有代码均在 Python 3.10+ 环境下测试通过,基于官方仓库原理提炼。

LangGraph 是什么?为什么需要它?

LangGraph 的核心思想源于 Google Pregel(一种大规模图计算框架)和 NetworkX 图接口。它把 LLM 应用建模为有向图

  • 节点(Node):执行具体动作的函数或 Runnable(LLM 调用、工具调用、Python 函数等)。
  • 边(Edge):控制流(无条件边、条件分支边、动态 Send)。
  • 状态(State):共享的 TypedDict,通过 Channel(通道)进行更新,支持多种归约器(reducer,如 append、last_value)。
  • 执行引擎:Pregel 算法 —— “超级步”(superstep)模型,实现 Bulk Synchronous Parallel(BSP),支持循环、并行、检查点持久化。

与 LangChain 的区别

  • LangChain Chain 是线性/树状流程,适合简单任务。
  • LangGraph 是循环有向图(支持 cycle),天然适合 ReAct Agent、规划-执行-反思循环、多 Agent 协作。
  • 官方文档定位:LangGraph 是“为长期运行、有状态 Agent 设计的低层框架”,LangChain 很多高级 Agent(如 create_react_agent)底层就是 LangGraph。

核心优势

  1. 持久化(Checkpoint):任意时刻保存状态,支持故障恢复、人机协同。
  2. Human-in-the-Loop:中断执行,人工修改状态后继续。
  3. 流式 + 调试:stream_mode 支持 values/updates/debug/messages,与 LangSmith 无缝集成。
  4. 可扩展:自定义 Channel、Subgraph、Remote Pregel。
  5. 生产就绪:支持 Postgres/SQLite 持久化、异步执行、重试策略。

官方最新版本(2026 年 4 月):langgraph==1.1.7a1(预发布),核心位于 libs/langgraph/langgraph/ 下,主要模块包括:

  • graph/:StateGraph、MessageGraph、分支逻辑。
  • pregel/_loop.py_runner.py_algo.py_checkpoint.py 等实现 Pregel 执行引擎。
  • channels/:LastValue、Topic、BinaryOperatorAggregate 等通道实现。

安装与环境准备

pip install -U langgraph langchain langchain-openai langchain-community
# 可选:持久化
pip install langgraph-checkpoint-postgres  # 或 sqlite
# 可视化(可选)
pip install matplotlib networkx

设置环境变量:

import os
os.environ["OPENAI_API_KEY"] = "sk-..."

核心概念快速上手

State(状态)

LangGraph 的 State 必须是 TypedDict(或 dataclass),字段使用 Annotated + reducer 指定更新规则。源码核心在 graph/state.py

from typing import Annotated, TypedDict
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage

class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]  # 自动 append
    next: str | None  # 自定义字段,可用 last_value reducer

常见 reducer(channels 模块实现):

  • add_messages / add:列表追加。
  • last_value:保留最新值(默认)。
  • BinaryOperatorAggregate:累加(如 sum)。

节点与边

from langgraph.graph import StateGraph, START, END

def node_a(state: State):
    return {"messages": ["Node A 执行完毕"]}

graph_builder = StateGraph(State)
graph_builder.add_node("A", node_a)
graph_builder.add_edge(START, "A")
graph_builder.add_edge("A", END)

编译与运行

graph = graph_builder.compile()
result = graph.invoke({"messages": []})

compile() 内部会把图转换为 Pregel 对象(pregel/main.py)。

第一个完整例子:简单客服 Agent

我们实现一个支持工具调用的 ReAct 风格 Agent。

from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

@tool
def get_weather(city: str) -> str:
    """查询天气"""
    return f"{city} 今天晴,25°C"

tools = [get_weather]
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)

class State(TypedDict):
    messages: Annotated[list, add_messages]

def call_model(state: State):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

graph_builder = StateGraph(State)
graph_builder.add_node("agent", call_model)
graph_builder.add_node("tools", ToolNode(tools))

graph_builder.add_edge(START, "agent")
graph_builder.add_conditional_edges(
    "agent",
    tools_condition,  # 源码在 prebuilt 中,判断是否需要调用工具
    {"tools": "tools", "__end__": END}
)
graph_builder.add_edge("tools", "agent")  # 工具调用后回到 agent,形成循环

graph = graph_builder.compile()

运行:

result = graph.invoke({"messages": [HumanMessage("北京天气如何?")]})
print(result["messages"][-1].content)

这个例子展示了循环(agent ↔ tools)和条件边(conditional_edges)。

状态管理深入:Reducer 与 Channel 原理

源码关键文件 channels/graph/state.py

  • 每个 State 字段对应一个 Channel。
  • add_messages 实际是 Annotated[list, add_messages] → 内部注册 reducer。
  • 更新时:channel.write(new_value) → 根据 reducer 合并到当前值。

自定义 reducer 示例:

from operator import add
from typing import Annotated

class State(TypedDict):
    total: Annotated[int, add]  # 累加

为什么用 Channel 而非简单 dict? - 支持并行执行(Pregel 超级步内节点并发)。 - 版本追踪(channel_versions),用于检查点和去重。 - 动态 Send(TASKS channel)实现运行时分支。

Pregel 执行引擎原理详解—— 源码级

这是 LangGraph 最核心的部分(pregel/main.pypregel/_loop.pypregel/_algo.py)。

Pregel 算法核心(Bulk Synchronous Parallel)

  1. 初始化:所有 channel 置初始值,checkpoint 记录。
  2. 超级步循环(直到收敛或 recursion_limit):
  3. Plan 阶段(_algo.py 中的 prepare_next_tasks):找出有新消息(channel 更新)的节点。
  4. Execute 阶段(_runner.py / _executor.py):并行执行选中的 PregelNode。
    • 每个 Node 读取订阅的 channel 值 → 执行 Runnable → 产生 writes。
  5. Update 阶段(_write.py):把 writes 应用到 channel(应用 reducer)。
  6. Checkpoint:每步结束后保存(_checkpoint.py)。

关键数据结构:

  • ChannelValues:当前状态快照。
  • TASKS channel:类型为 Topic[Send],Send(target, value) 可动态启动新任务。
  • recursion_limit:防止无限循环,默认 25。

源码中 Pregel.invoke / astream 最终调用 _run 循环:

# 伪代码(基于 pregel/_loop.py 原理)
while True:
    tasks = prepare_next_tasks(channels, checkpoint)
    if not tasks: break
    writes = execute_tasks_in_parallel(tasks)  # 并发
    apply_writes(channels, writes, checkpoint)  # 更新 + checkpoint
    step += 1
    if step > recursion_limit: raise

为什么强大?

  • 支持任意循环(ReAct、规划-反思)。
  • 并行:同一超级步内多个节点并发。
  • 容错:checkpoint + pending_writes 实现 exactly-once 语义。
  • 子图:namespace 隔离 checkpoint。

高级功能:持久化、Human-in-the-Loop、Streaming、Subgraph

持久化(Checkpoint)

from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string(":memory:")
graph = graph_builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "1"}}
graph.invoke({"messages": [...]}, config=config)  # 自动保存

源码在 checkpoint/ 模块,支持 Postgres、SQLite、Memory 等。

Human-in-the-Loop

# 中断
graph = graph_builder.compile(checkpointer=memory, interrupt_before=["tools"])
# 恢复
graph.update_state(config, {"messages": [HumanMessage("批准工具调用")]})

Streaming

for chunk in graph.stream(inputs, config, stream_mode="updates"):
    print(chunk)

Subgraph

子图可独立编译、独立 checkpoint,支持多层嵌套(源码在 pregel 中用 namespace 处理)。

Prebuilt 组件

langgraph.prebuilt 提供 create_react_agentToolNode 等,底层全部是 StateGraph + Pregel。

基于 LangGraph 原理手写一个简易版 LangGraph

现在,我们完全基于官方 Pregel 原理,手写一个极简但可用的 SimpleLangGraph。目标是让读者理解“图是如何变成可执行状态机的”。

# simple_langgraph.py
from typing import Annotated, TypedDict, Callable, Any, Literal
from operator import add
import copy
from collections import defaultdict
import asyncio

# ==================== 1. Channel 实现 ====================
class BaseChannel:
    def __init__(self):
        self.value = None
        self.version = 0

class LastValue(BaseChannel):
    def update(self, value):
        self.value = value
        self.version += 1

class Topic(BaseChannel):  # 类似 add_messages
    def __init__(self):
        super().__init__()
        self.value = []
    def update(self, value):
        if isinstance(value, list):
            self.value.extend(value)
        else:
            self.value.append(value)
        self.version += 1

# ==================== 2. State 与 Graph 定义 ====================
StateType = dict[str, Any]

class SimpleStateGraph:
    def __init__(self, state_schema: type[TypedDict]):
        self.nodes: dict[str, Callable[[StateType], StateType | dict]] = {}
        self.edges: list[tuple[str, str]] = []  # (from, to)
        self.conditional_edges: list = []
        self.entry_point = "START"
        self.state_schema = state_schema
        self.channels: dict[str, BaseChannel] = {}

    def add_node(self, name: str, action: Callable):
        self.nodes[name] = action

    def add_edge(self, start: str, end: str):
        self.edges.append((start, end))

    def add_conditional_edges(self, source: str, condition: Callable, path_map: dict):
        self.conditional_edges.append((source, condition, path_map))

    def set_entry_point(self, name: str):
        self.entry_point = name

    def set_finish_point(self, name: str):
        self.add_edge(name, "END")

    # ==================== 3. Pregel-like 执行引擎 ====================
    def compile(self):
        class CompiledGraph:
            def __init__(self, builder: SimpleStateGraph):
                self.builder = builder

            def _initialize_channels(self, input_state: StateType):
                channels = {}
                for field in input_state.keys():
                    if isinstance(input_state[field], list):
                        channels[field] = Topic()
                    else:
                        channels[field] = LastValue()
                # 应用初始值
                for k, v in input_state.items():
                    channels[k].update(v)
                return channels

            def invoke(self, input_state: StateType, recursion_limit: int = 25):
                channels = self._initialize_channels(input_state)
                step = 0
                history = []  # 模拟 checkpoint

                while step < recursion_limit:
                    step += 1
                    next_tasks = self._plan(channels)
                    if not next_tasks:
                        break

                    # Execute 阶段(简化为串行,实际 Pregel 支持并行)
                    writes = {}
                    for node_name in next_tasks:
                        node_fn = self.builder.nodes.get(node_name)
                        if not node_fn:
                            continue
                        # 读取当前 channel 值
                        current_state = {k: ch.value for k, ch in channels.items()}
                        output = node_fn(current_state)
                        if isinstance(output, dict):
                            for k, v in output.items():
                                writes.setdefault(k, []).append(v)

                    # Update 阶段
                    for k, values in writes.items():
                        if k not in channels:
                            continue
                        for v in values:
                            channels[k].update(v)

                    # 模拟 checkpoint
                    history.append({k: copy.deepcopy(ch.value) for k, ch in channels.items()})

                    # 处理条件边
                    if self.builder.conditional_edges:
                        current_state = {k: ch.value for k, ch in channels.items()}
                        for source, cond, path_map in self.builder.conditional_edges:
                            if source in next_tasks or source == "__end__":
                                target = cond(current_state)
                                if target in path_map:
                                    next_node = path_map[target]
                                    if next_node != "END":
                                        # 动态加入下一轮任务(模拟 TASKS channel)
                                        next_tasks.append(next_node)

                final_state = {k: ch.value for k, ch in channels.items()}
                return final_state

            def _plan(self, channels):
                # 简易计划:找出有更新的节点 + 入口
                triggered = []
                for edge_from, edge_to in self.builder.edges:
                    if edge_from == self.builder.entry_point or edge_from in self.builder.nodes:
                        triggered.append(edge_to)
                # 去重
                return list(set([t for t in triggered if t in self.builder.nodes]))

        return CompiledGraph(self)

# ==================== 使用示例 ====================
class SimpleState(TypedDict):
    messages: Annotated[list, add]  # 使用 add reducer(我们用 Topic 模拟)

def agent_node(state):
    # 模拟 LLM
    return {"messages": ["Agent 思考完成"]}

def tool_node(state):
    return {"messages": ["工具执行结果"]}

def route(state):
    # 条件路由
    if len(state["messages"]) < 3:
        return "tools"
    return "END"

builder = SimpleStateGraph(SimpleState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
builder.add_edge("START", "agent")
builder.add_conditional_edges("agent", route, {"tools": "tools", "END": "END"})
builder.add_edge("tools", "agent")

graph = builder.compile()
result = graph.invoke({"messages": []})
print(result)

这个简易版实现了

  • Channel(LastValue/Topic)—— 对应官方 channels 模块。
  • 超级步循环(plan → execute → update)—— 对应 pregel/_loop.py。
  • 条件边 + 动态任务(模拟 TASKS channel)。
  • Checkpoint 历史记录(简化版)。
  • 支持循环(agent ↔ tools)。

运行后你会发现它能处理 ReAct 循环,和官方 create_react_agent 行为高度一致,但代码不到 150 行。这就是“从原理出发”的力量。

进阶练习

  1. 加入真实 asyncio 并行执行。
  2. 实现 SqliteSaver 检查点。
  3. 支持 Send 动态多分支。
  4. 集成 LangChain Runnable。

调试、部署与最佳实践

  • LangSmithLANGCHAIN_TRACING_V2=true 自动追踪每一步。
  • 可视化graph.get_graph().draw_mermaid() 或 NetworkX。
  • 性能:避免过深递归,使用 Subgraph 拆分;合理设置 recursion_limit。
  • 生产部署:LangSmith Deployment + Postgres checkpoint + FastAPI。
  • 最佳实践
  • State 字段越少越好。
  • 节点保持纯函数。
  • 重要决策节点加 interrupt_before。
  • 复杂 Agent 用 prebuilt + 自定义节点混合。

评论