Skip to content

Grap API

LangGraph 的核心是将代理工作流程建模为graphs。我们可以使用三个关键组件来定义代理的行为:

  1. State: 共享数据结构,用于表示应用程序的当前快照。它可以是任何数据类型,但通常使用共享状态模式来定义。

  2. Nodes: 用于编码代理逻辑的函数。它们接收当前状态作为输入,执行一些计算或产生副作用,并返回更新后的状态。

  3. Edges: 根据当前状态确定接下来要执行哪个Node的函数。它们可以是条件分支,也可以是固定转换。

通过组合NodesEdges,您可以创建复杂的循环工作流,并随着时间的推移不断演化状态。然而,真正的强大之处在于 LangGraph 如何管理这种状态。

需要强调的是:NodesEdges只不过是函数——它们可以包含 LLM,也可以只是普通的代码。

简而言之:Nodes执行工作,Edges指示下一步该做什么

LangGraph 的底层图算法使用message passing来定义通用程序。当一个节点完成其操作时,它会沿着一条或多条边向其他节点发送消息。这些接收节点随后执行各自的函数,并将生成的消息传递给下一组节点,如此循环往复。受谷歌 Pregel 系统的启发,该程序以离散的"super-steps."进行运行。

超步骤可以看作是对图节点的一次迭代。并行运行的节点属于同一个超步骤,而顺序运行的节点则属于不同的超级步骤。在图执行开始时,所有节点都处于inactive状态。当节点在其任何传入边(or "channels")上收到新消息(state) 时,它会变为active。然后,活动节点运行其函数并响应更新。在每个超步骤结束时,没有传入消息的节点通过将自己标记为inactive来投票halt。当所有节点都处于inactive状态且没有消息在传输中时,图执行终止。

StateGraph

StateGraph 类是主要使用的图类。它由用户定义的 State 对象进行参数化。

Compiling your graph

要构建graph,首先要定义state,然后添加nodesedges,最后进行编译。那么,编译graph究竟是什么?为什么需要编译graph?

编译是一个非常简单的步骤。它会对图的结构进行一些基本检查(例如,检查是否存在孤立节点)。你也可以在这里指定运行时参数,例如 checkpointers 和断点。只需调用以下.compile方法即可编译你的graph:

python
graph = graph_builder.compile(...)

警告

必须先编译graph才能使用它。

State

定义图时,首先要定义图的StateState包括schema of the graph以及用于指定如何更新statereducer functionsState的模式将是图中所有NodesEdges的输入模式,可以是 TypedDictPydantic 模型。所有Nodes都会向State发出更新,然后使用指定的 reducer 函数应用这些更新。

Schema

指定图的模式的主要官方方法是使用 TypedDict。如果你想在状态中提供默认值,请使用dataclass。 如果您需要递归数据验证,也支持使用 Pydantic BaseModel 作为你的图状态(但请注意,Pydantic 的性能不如 TypedDict or dataclass)。

默认情况下,该图将具有相同的输入和输出模式。如果你想更改这一点,也可以直接指定显式的输入和输出模式。当你有很多键,其中一些明确用于输入,另一些用于输出时,这非常有用。有关更多信息,请参阅指南

Multiple schemas

通常情况下,图中的所有节点都使用同一个模式进行通信。这意味着它们将读写相同的状态通道。但是,在某些情况下,我们希望对其进行更多控制:

  • 内部节点可以传递图中的输入/输出不需要的信息。
  • 我们可能还需要为图使用不同的输入/输出模式。例如,输出可能只包含一个相关的输出键。

节点可以向图中的私有状态通道写入数据,用于节点内部通信。我们可以简单地定义一个私有模式,PrivateState

也可以为图定义明确的输入和输出模式。在这些情况下,我们定义一个包含与图操作相关的 _all_ 键的 "internal" 模式。但是,我们还定义了inputoutput模式,它们是 "internal" 模式的子集,以限制图的输入和输出。有关更多详细信息,请参阅指南

python
class InputState(TypedDict):
    user_input: str

class OutputState(TypedDict):
    graph_output: str

class OverallState(TypedDict):
    foo: str
    user_input: str
    graph_output: str

class PrivateState(TypedDict):
    bar: str

def node_1(state: InputState) -> OverallState:
    # Write to OverallState
    return {"foo": state["user_input"] + " name"}

def node_2(state: OverallState) -> PrivateState:
    # Read from OverallState, write to PrivateState
    return {"bar": state["foo"] + " is"}

def node_3(state: PrivateState) -> OutputState:
    # Read from PrivateState, write to OutputState
    return {"graph_output": state["bar"] + " Lance"}

builder = StateGraph(OverallState,input_schema=InputState,output_schema=OutputState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
builder.add_node("node_3", node_3)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", "node_3")
builder.add_edge("node_3", END)

graph = builder.compile()
graph.invoke({"user_input":"My"})
# {'graph_output': 'My name is Lance'}

这里有两点微妙而重要的信息需要注意:

  1. 我们将 state: InputState 作为输入模式传递给 node_1。 但是,我们向 OverallState 中的一个通道 foo 写入数据。我们如何才能向一个未包含在输入模式中的状态通道写入数据呢?这是因为 节点可以向图状态中的任何状态通道写入数据 图状态是初始化时定义的状态通道的并集,其中包括 OverallState 以及过滤器 InputStateOutputState

  2. 我们可以使用以下方式初始化图:

    python
    StateGraph(
        OverallState,
        input_schema=InputState,
        output_schema=OutputState
    )

    那么,我们如何在 node_2 中写入 PrivateState 呢?如果在 StateGraph 初始化时没有传入该模式,图如何获得对该模式的访问权限?

    我们可以这样做,因为只要状态模式定义存在,_nodes 也可以声明额外的状态channels__。 我们可以这样做,因为 _nodes 也可以声明额外的状态 channels_,只要状态模式定义存在。在这种情况下, PrivateState模式已经定义,因此我们可以将bar 添加为图中的新状态通道并向其写入数据。

Reducers

Reducer 是理解节点更新如何应用于 State 的关键。 State 中的每个键都有其独立的 reducer 函数。 如果没有明确指定 reducer 函数,则假定对该键的所有更新都应覆盖它。reducer有几种不同的类型,首先是默认类型的reducer:

Default Reducer

这两个例子展示了如何使用默认的 reducer:

python
# [示例A]
from typing_extensions import TypedDict

class State(TypedDict):
    foo: int
    bar: list[str]

在这个例子中,没有任何键指定 reducer 函数。假设图的输入是:

{"foo": 1, "bar": ["hi"]}.那么,假设第一个Node返回 {"foo": 2}。 这被视为对状态的更新。请注意, Node 不需要返回完整的State模式,只需返回更新即可。 应用此更新后, State 将变为 {"foo": 2, "bar": ["hi"]}。如果第二个节点返回 {"bar": ["bye"]},则 State将为 {"foo": 2, "bar": ["bye"]}

python
# [示例B]
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: int
    bar: Annotated[list[str], add]

在这个例子中,我们使用了带 Annotated的类型来为第二个键 (bar)指定一个 reducer 函数(operator.add)。

请注意,第一个键保持不变。假设图的输入为{"foo": 1, "bar": ["hi"]}。那么假设第一个Node返回 {"foo": 2}。这被视为对状态的更新。请注意,Node不需要返回完整的State模式,只需返回更新即可。 应用此更新后,State将变为 {"foo": 2, "bar": ["hi"]}。 如果第二个节点返回{"bar": ["bye"]},则State将为{"foo": 2, "bar": ["hi", "bye"]}。请注意,这里的bar键是通过将两个列表相加而更新的。

Overwrite

NOTE

在某些情况下,您可能需要绕过 reducer 直接覆盖状态值。LangGraph 提供了Overwrite 类型来实现此目的。点击此处了解如何使用 Overwrite

Working with Messages in Graph State

Why use messages?

Most modern LLM providers have a chat model interface that accepts a list of messages as input. LangChain's chat model interface in particular accepts a list of message objects as inputs. These messages come in a variety of forms such as @[HumanMessage] (user input) or @[AIMessage] (LLM response).

To read more about what message objects are, please refer to the Messages conceptual guide.

Using Messages in your Graph

In many cases, it is helpful to store prior conversation history as a list of messages in your graph state. To do so, we can add a key (channel) to the graph state that stores a list of Message objects and annotate it with a reducer function (see messages key in the example below). The reducer function is vital to telling the graph how to update the list of Message objects in the state with each state update (for example, when a node sends an update). If you don't specify a reducer, every state update will overwrite the list of messages with the most recently provided value. If you wanted to simply append messages to the existing list, you could use operator.add as a reducer.

However, you might also want to manually update messages in your graph state (e.g. human-in-the-loop). If you were to use operator.add, the manual state updates you send to the graph would be appended to the existing list of messages, instead of updating existing messages. To avoid that, you need a reducer that can keep track of message IDs and overwrite existing messages, if updated. To achieve this, you can use the prebuilt @[add_messages] function. For brand new messages, it will simply append to existing list, but it will also handle the updates for existing messages correctly.

Serialization

In addition to keeping track of message IDs, the @[add_messages] function will also try to deserialize messages into LangChain Message objects whenever a state update is received on the messages channel.

See more information on LangChain serialization/deserialization here. This allows sending graph inputs / state updates in the following format:

python
# this is supported
{"messages": [HumanMessage(content="message")]}

# and this is also supported
{"messages": [{"type": "human", "content": "message"}]}

Since the state updates are always deserialized into LangChain Messages when using @[add_messages], you should use dot notation to access message attributes, like state["messages"][-1].content.

Below is an example of a graph that uses @[add_messages] as its reducer function.

python
from langchain.messages import AnyMessage
from langgraph.graph.message import add_messages
from typing import Annotated
from typing_extensions import TypedDict

class GraphState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

MessagesState

由于在状态中拥有消息列表非常常见,因此存在一个名为 MessagesState 的预置状态,使使用消息变得容易。MessagesState 定义有一个 messages 键,它是 AnyMessage 对象的列表,并使用 add_messages reducer。通常,需要跟踪的状态不仅仅是消息,因此我们看到人们会对该状态进行子类化,并添加更多字段,例如:

python
from langgraph.graph import MessagesState

class State(MessagesState):
    documents: list[str]

Nodes

在 LangGraph 中,节点是接受以下参数的 Python 函数(可以是同步或异步):

  1. state – The state of the graph
  2. config – 一个 RunnableConfig 对象,其中包含配置信息(例如 thread_id)和跟踪信息(例如 tags)。
  3. runtime – 一个Runtime对象,其中包含runtime context和其他信息,例如storestream_writer

NetworkX 类似,您可以使用 add_node 方法将这些节点添加到图中。

python
from dataclasses import dataclass
from typing_extensions import TypedDict

from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime

class State(TypedDict):
    input: str
    results: str

@dataclass
class Context:
    user_id: str

builder = StateGraph(State)

def plain_node(state: State):
    return state

def node_with_runtime(state: State, runtime: Runtime[Context]):
    print("In node: ", runtime.context.user_id)
    return {"results": f"Hello, {state['input']}!"}

def node_with_config(state: State, config: RunnableConfig):
    print("In node with thread_id: ", config["configurable"]["thread_id"])
    return {"results": f"Hello, {state['input']}!"}


builder.add_node("plain_node", plain_node)
builder.add_node("node_with_runtime", node_with_runtime)
builder.add_node("node_with_config", node_with_config)
...

在后台,函数会被转换为 RunnableLambda,从而为函数添加批处理和异步支持,以及原生跟踪和调试功能。 如果向图中添加节点时未指定名称,则会为其赋予与函数名称等效的默认名称。

python
builder.add_node(my_node)
# 然后,您可以通过引用 `"my_node"` 来创建指向/来自该节点的边。

START Node

The @[START] Node is a special node that represents the node that sends user input to the graph. The main purpose for referencing this node is to determine which nodes should be called first.

python
from langgraph.graph import START

graph.add_edge(START, "node_a")

END Node

The END Node is a special node that represents a terminal node. This node is referenced when you want to denote which edges have no actions after they are done.

python
from langgraph.graph import END

graph.add_edge("node_a", END)

Node Caching

LangGraph 支持根据节点的输入缓存tasks/nodes。要使用缓存:

  • 编译图时指定缓存(或指定入口点时)
  • 为节点指定缓存策略。每个缓存策略支持:
    • key_func 用于根据节点的输入生成缓存键,默认为使用 pickle 对输入进行hash运算。
    • ttl,缓存的生存时间(以秒为单位)。如果未指定,则缓存永不过期。
python
import time
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy

class State(TypedDict):
    x: int
    result: int


builder = StateGraph(State)


def expensive_node(state: State) -> dict[str, int]:
    # 昂贵的计算
    time.sleep(2)
    return {"result": state["x"] * 2}


builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))
builder.set_entry_point("expensive_node")
builder.set_finish_point("expensive_node")

graph = builder.compile(cache=InMemoryCache())

print(graph.invoke({"x": 5}, stream_mode='updates'))    
# [{'expensive_node': {'result': 10}}]
print(graph.invoke({"x": 5}, stream_mode='updates'))    
# [{'expensive_node': {'result': 10}, '__metadata__': {'cached': True}}]
  1. First run takes two seconds to run (due to mocked expensive computation).
  2. Second run utilizes cache and returns quickly.

Edges

Edges 定义了逻辑的路由方式以及图的终止方式。这是你的代理如何工作以及不同节点如何相互通信的重要组成部分。 以下是一些主要的edges类型:

  • Normal Edges: 直接从一个节点跳转到下一个节点。
  • Conditional Edges: 调用函数来确定接下来要访问的节点。
  • Entry Point: 当用户输入到达时,决定先调用哪个节点。
  • Conditional Entry Point: 调用一个函数来确定当用户输入到达时首先调用哪个(哪些)节点。

一个节点可以有多条出边。如果一个节点有多条出边,那么所有这些目标节点都将在下一个超级步骤中并行执行。

Normal Edges

如果你总是想从节点 A 到节点 B,可以直接使用 add_edge 方法。

python
graph.add_edge("node_a", "node_b")

Conditional Edges

如果你想选择性地路由到一条或多条边(或者选择性地终止),你可以使用 add_conditional_edges 方法。此方法接受一个节点名称和一个"routing function",该函数将在该节点执行后调用:

python
graph.add_conditional_edges("node_a", routing_function)

与节点类似,routing_function接受图的当前state并返回一个值。

默认情况下,返回值 routing_function 用作要将状态发送到下一个节点的节点(或节点列表)的名称。所有这些节点都将作为下一个超级步骤的一部分并行运行。

您还可以选择提供一个字典,将 routing_function's 的输出映射到下一个节点的名称。

python
graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False: "node_c"})

TIP

如果要将状态更新和路由合并到一个函数中,请使用Command而不是条件边。

Entry point

入口点是graph启动时运行的第一个node(s)。你可以使用虚拟 START 节点的 add_edge 方法向要执行的第一个节点添加边,以指定从哪里进入图。

python
from langgraph.graph import START

graph.add_edge(START, "node_a")

Conditional entry point

条件入口点允许您根据自定义逻辑从不同的节点开始。您可以使用虚拟 START 节点中的 add_conditional_edges 来实现此目的。

python
from langgraph.graph import START

graph.add_conditional_edges(START, routing_function)

您还可以选择提供一个字典,将 routing_function's 的输出映射到下一个节点的名称。

python
graph.add_conditional_edges(START, routing_function, {True: "node_b", False: "node_c"})

Send

默认情况下,NodesEdges是预先定义的,并且在相同的共享状态下运行。然而,有时可能无法事先知道确切的边界,或者您可能希望同时存在不同版本的 Statemap-reduce 设计模式就是一个常见的例子。在这种设计模式中,第一个节点可以生成对象列表,而你可能希望将其他节点应用于所有这些对象。对象的数量可能事先未知(意味着边的数量可能未知),并且下游Node的输入State 应该不同(每个生成的对象对应一个状态)。

为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。 Send 函数接受两个参数:第一个参数是节点的名称,第二个参数是要传递给该节点的状态。

python
def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state['subjects']]

graph.add_conditional_edges("node_a", continue_to_jokes)

Command

将控制流 (edges) 和状态更新 (nodes) 结合起来可能很有用。例如,你可能希望在同一个节点中同时执行状态更新并决定接下来要访问哪个节点。LangGraph 提供了一种方法,即通过节点函数返回 Command 对象来实现:

python
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        # state update
        update={"foo": "bar"},
        # control flow
        goto="my_other_node"
    )

使用 Command 还可以实现动态控制流行为(与条conditional edges相同):

python
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    if state["foo"] == "bar":
        return Command(update={"foo": "baz"}, goto="my_other_node")

NOTE

在节点函数中返回 Command 时,必须添加返回类型注解,其中包含节点路由到的节点名称列表,例如 Command[Literal["my_other_node"]]。这是图形渲染所必需的,它告诉 LangGraph my_node 可以导航到 my_other_node

何时应该使用 Command 而不是条件边?

  • 当您需要同时更新图状态路由到不同的节点时,请使用 Command。例如,在实现multi-agent handoffs切换时,需要将请求路由到不同的代理并将一些信息传递给该代理。
  • 使用conditional edges在节点之间进行有条件路由,而无需更新状态。

如果您使用subgraphs,您可能需要从子图中的一个节点导航到另一个子图(即父图中的另一个节点)。为此,您可以在 Command 中指定 graph=Command.PARENT

python
def my_node(state: State) -> Command[Literal["other_subgraph"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # where `other_subgraph` is a node in the parent graph
        graph=Command.PARENT
    )

NOTE

graph设置为 Command.PARENT 将导航到最近的父图。 当你从子图节点向父图节点发送更新,且更新的键同时存在于父图和子图的state schemas中时,您必须在父图状态中为要更新的键定义一个 reducer。请参见此示例

这在实现multi-agent handoffs交接时尤其有用。详情请参阅本指南

Using inside tools

A common use case is updating graph state from inside a tool. For example, in a customer support application you might want to look up customer information based on their account number or ID in the beginning of the conversation.

Refer to this guide for detail.

Human-in-the-loop

@[Command] is an important part of human-in-the-loop workflows: when using interrupt() to collect user input, @[Command] is then used to supply the input and resume execution via Command(resume="User input"). Check out this conceptual guide for more information.

Graph migrations

LangGraph can easily handle migrations of graph definitions (nodes, edges, and state) even when using a checkpointer to track state.

  • For threads at the end of the graph (i.e. not interrupted) you can change the entire topology of the graph (i.e. all nodes and edges, remove, add, rename, etc)
  • For threads currently interrupted, we support all topology changes other than renaming / removing nodes (as that thread could now be about to enter a node that no longer exists) -- if this is a blocker please reach out and we can prioritize a solution.
  • For modifying state, we have full backwards and forwards compatibility for adding and removing keys
  • State keys that are renamed lose their saved state in existing threads
  • State keys whose types change in incompatible ways could currently cause issues in threads with state from before the change -- if this is a blocker please reach out and we can prioritize a solution.

Runtime context

When creating a graph, you can specify a context_schema for runtime context passed to nodes. This is useful for passing information to nodes that is not part of the graph state. For example, you might want to pass dependencies such as model name or a database connection.

python
@dataclass
class ContextSchema:
    llm_provider: str = "openai"

graph = StateGraph(State, context_schema=ContextSchema)

You can then pass this context into the graph using the context parameter of the invoke method.

python
graph.invoke(inputs, context={"llm_provider": "anthropic"})

You can then access and use this context inside a node or conditional edge:

python
from langgraph.runtime import Runtime

def node_a(state: State, runtime: Runtime[ContextSchema]):
    llm = get_llm(runtime.context.llm_provider)
    # ...

See this guide for a full breakdown on configuration.

Recursion limit

The recursion limit sets the maximum number of super-steps the graph can execute during a single execution. Once the limit is reached, LangGraph will raise GraphRecursionError. By default this value is set to 25 steps. The recursion limit can be set on any graph at runtime, and is passed to invoke/stream via the config dictionary. Importantly, recursion_limit is a standalone config key and should not be passed inside the configurable key as all other user-defined configuration. See the example below:

python
graph.invoke(inputs, config={"recursion_limit": 5}, context={"llm": "anthropic"})

Read this how-to to learn more about how the recursion limit works.

Accessing and handling the recursion counter

The current step counter is accessible in config["metadata"]["langgraph_step"] within any node, allowing for proactive recursion handling before hitting the recursion limit. This enables you to implement graceful degradation strategies within your graph logic.

How it works

The step counter is stored in config["metadata"]["langgraph_step"]. The recursion limit check follows the logic: step > stop where stop = step + recursion_limit + 1. When the limit is exceeded, LangGraph raises a GraphRecursionError.

Accessing the current step counter

You can access the current step counter within any node to monitor execution progress.

python
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph

def my_node(state: dict, config: RunnableConfig) -> dict:
    current_step = config["metadata"]["langgraph_step"]
    print(f"Currently on step: {current_step}")
    return state

Proactive recursion handling

You can check the step counter and proactively route to a different node before hitting the limit. This allows for graceful degradation within your graph.

python
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, END

def reasoning_node(state: dict, config: RunnableConfig) -> dict:
    current_step = config["metadata"]["langgraph_step"]
    recursion_limit = config["recursion_limit"]  # always present, defaults to 25

    # Check if we're approaching the limit (e.g., 80% threshold)
    if current_step >= recursion_limit * 0.8:
        return {
            **state,
            "route_to": "fallback",
            "reason": "Approaching recursion limit"
        }

    # Normal processing
    return {"messages": state["messages"] + ["thinking..."]}

def fallback_node(state: dict, config: RunnableConfig) -> dict:
    """Handle cases where recursion limit is approaching"""
    return {
        **state,
        "messages": state["messages"] + ["Reached complexity limit, providing best effort answer"]
    }

def route_based_on_state(state: dict) -> str:
    if state.get("route_to") == "fallback":
        return "fallback"
    elif state.get("done"):
        return END
    return "reasoning"

# Build graph
graph = StateGraph(dict)
graph.add_node("reasoning", reasoning_node)
graph.add_node("fallback", fallback_node)
graph.add_conditional_edges("reasoning", route_based_on_state)
graph.add_edge("fallback", END)
graph.set_entry_point("reasoning")

app = graph.compile()

Proactive vs reactive approaches

There are two main approaches to handling recursion limits: proactive (monitoring within the graph) and reactive (catching errors externally).

python
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, END
from langgraph.errors import GraphRecursionError

# Proactive Approach (recommended)
def agent_with_monitoring(state: dict, config: RunnableConfig) -> dict:
    """Proactively monitor and handle recursion within the graph"""
    current_step = config["metadata"]["langgraph_step"]
    recursion_limit = config["recursion_limit"]

    # Early detection - route to internal handling
    if current_step >= recursion_limit - 2:  # 2 steps before limit
        return {
            **state,
            "status": "recursion_limit_approaching",
            "final_answer": "Reached iteration limit, returning partial result"
        }

    # Normal processing
    return {"messages": state["messages"] + [f"Step {current_step}"]}

# Reactive Approach (fallback)
try:
    result = graph.invoke(initial_state, {"recursion_limit": 10})
except GraphRecursionError as e:
    # Handle externally after graph execution fails
    result = fallback_handler(initial_state)

The key differences between these approaches are:

ApproachDetectionHandlingControl Flow
Proactive (using langgraph_step)Before limit reachedInside graph via conditional routingGraph continues to completion node
Reactive (catching GraphRecursionError)After limit exceededOutside graph in try/catchGraph execution terminated

Proactive advantages:

  • Graceful degradation within the graph
  • Can save intermediate state in checkpoints
  • Better user experience with partial results
  • Graph completes normally (no exception)

Reactive advantages:

  • Simpler implementation
  • No need to modify graph logic
  • Centralized error handling

Other available metadata

Along with langgraph_step, the following metadata is also available in config["metadata"]:

python
def inspect_metadata(state: dict, config: RunnableConfig) -> dict:
    metadata = config["metadata"]

    print(f"Step: {metadata['langgraph_step']}")
    print(f"Node: {metadata['langgraph_node']}")
    print(f"Triggers: {metadata['langgraph_triggers']}")
    print(f"Path: {metadata['langgraph_path']}")
    print(f"Checkpoint NS: {metadata['langgraph_checkpoint_ns']}")

    return state

Visualization

It's often nice to be able to visualize graphs, especially as they get more complex. LangGraph comes with several built-in ways to visualize graphs. See this how-to guide for more info.

最近更新