Graph 包使用指南
概述
Graph 将可控的工作流编排与可扩展的 Agent 能力结合,适用于: - 类型安全的状态管理与可预测路由; - LLM 决策、工具调用循环、可选的 Human in the Loop(HITL); - 可复用的组件,既可独立运行,也可作为子 Agent 组合。
特点: - Schema 驱动的 State 与 Reducer,避免并发分支写入同一字段时的数据竞争; - BSP 风格(计划/执行/合并)的确定性并行; - 内置节点类型封装 LLM、工具与 Agent,减少重复代码; - 流式事件、检查点与中断,便于观测与恢复。 - 节点级重试/退避(指数退避与抖动),支持执行器默认重试策略与带重试元数据的事件观测。
快速开始
最小工作流
下面是一个经典的“prepare → ask LLM → 可能调用工具”的循环,使用 graph.MessagesStateSchema()
(已定义 graph.StateKeyMessages
、graph.StateKeyUserInput
、graph.StateKeyLastResponse
等键)。
flowchart LR
START([start]):::startNode --> P[prepare]:::processNode
P --> A[ask LLM]:::llmNode
A -. tool_calls .-> T[tools]:::toolNode
A -- no tool_calls --> F[fallback]:::processNode
T --> A
F --> END([finish]):::endNode
classDef startNode fill:#e1f5e1,stroke:#4caf50,stroke-width:2px
classDef endNode fill:#ffe1e1,stroke:#f44336,stroke-width:2px
classDef llmNode fill:#e3f2fd,stroke:#2196f3,stroke-width:2px
classDef toolNode fill:#fff3e0,stroke:#ff9800,stroke-width:2px
classDef processNode fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
Graph 包允许您将复杂的 AI 工作流建模为有向图,其中节点代表处理步骤,边代表数据流和控制流。它特别适合构建需要条件路由、状态管理和多步骤处理的 AI 应用。
使用模式
Graph 包的使用遵循以下模式:
- 创建 Graph:使用
StateGraph
构建器定义工作流结构 - 创建 GraphAgent:将编译后的 Graph 包装为 Agent
- 创建 Runner:使用 Runner 管理会话和执行环境
- 执行工作流:通过 Runner 执行工作流并处理结果
这种模式提供了:
- 类型安全:通过状态模式确保数据一致性
- 会话管理:支持多用户、多会话的并发执行
- 事件流:实时监控工作流执行进度
- 错误处理:统一的错误处理和恢复机制
Agent 集成
GraphAgent 实现了 agent.Agent
接口,可以:
- 作为独立 Agent:通过 Runner 直接执行
- 作为 SubAgent:被其他 Agent(如 LLMAgent)作为子 Agent 使用
- 挂载 SubAgent:通过
graphagent.WithSubAgents
配置子 Agent,并在图中使用AddAgentNode
委托执行
这种设计使得 GraphAgent 既能接入其他 Agent,也能在自身工作流中灵活调度子 Agent。
主要特性
- 类型安全的状态管理:使用 Schema 定义状态结构,支持自定义 Reducer
- 条件路由:基于状态动态选择执行路径
- LLM 节点集成:内置对大型语言模型的支持
- 工具节点:支持函数调用和外部工具集成
- Agent 节点:通过子 Agent 将其他 Agent 融入图中
- 流式执行:支持实时事件流和进度跟踪
- 并发安全:线程安全的图执行
- 基于检查点的时间旅行:浏览执行历史并恢复之前的状态
- 人机协作 (HITL):支持带有中断和恢复功能的交互式工作流
- 原子检查点:原子存储检查点和待写入数据,确保可靠的恢复
- 检查点谱系:跟踪形成执行线程的相关检查点及其父子关系
核心概念
1. 图 (Graph)
图是工作流的核心结构,由节点和边组成:
虚拟节点:
Start
:虚拟起始节点,通过SetEntryPoint()
自动连接End
:虚拟结束节点,通过SetFinishPoint()
自动连接- 这些节点不需要显式创建,系统会自动处理连接
2. 节点 (Node)
节点代表工作流中的一个处理步骤:
3. 状态 (State)
状态是在节点间传递的数据容器:
内置状态键:
Graph 包提供了一些内置状态键,主要用于系统内部通信:
用户可访问的内置键:
StateKeyUserInput
:用户输入(一次性,消费后清空,由 LLM 节点自动持久化)StateKeyOneShotMessages
:一次性消息(完整覆盖本轮输入,消费后清空)StateKeyLastResponse
:最后响应(用于设置最终输出,Executor 会读取此值作为结果)StateKeyMessages
:消息历史(持久化,支持 append + MessageOp 补丁操作)StateKeyNodeResponses
:按节点存储的响应映射。键为节点 ID,值为该 节点的最终文本响应。StateKeyLastResponse
用于串行路径上的最终输 出;当多个并行节点在某处汇合时,应从StateKeyNodeResponses
中按节 点读取各自的输出。StateKeyMetadata
:元数据(用户可用的通用元数据存储)
系统内部键(用户不应直接使用):
StateKeySession
:会话信息(由 GraphAgent 自动设置)StateKeyExecContext
:执行上下文(由 Executor 自动设置)StateKeyToolCallbacks
:工具回调(由 Executor 自动设置)StateKeyModelCallbacks
:模型回调(由 Executor 自动设置)
用户应该使用自定义状态键来存储业务数据,只在必要时使用用户可访问的内置状态键。
4. 状态模式 (StateSchema)
状态模式定义状态的结构和行为:
使用指南
节点 I/O 约定
节点之间仅通过共享状态 State 传递数据。每个节点返回一个 state delta,按 Schema 的 Reducer 合并到全局 State,下游节点从 State 读取上游产出。
- 常用内置键(对用户可见)
user_input
:一次性用户输入,被下一个 LLM/Agent 节点消费后清空one_shot_messages
:一次性完整消息覆盖,用于下一次 LLM 调用,执行后清空messages
:持久化的消息历史(LLM/Tools 会追加),支持 MessageOp 补丁last_response
:最近一次助手文本回复node_responses
:map[nodeID]any,按节点保存最终文本回复。最近结果用last_response
- 函数节点(Function node)
- 输入:完整 State
- 输出:返回
graph.State
增量,写入自定义键(需在 Schema 中声明),如{"parsed_time":"..."}
- LLM 节点
- 输入优先级:
one_shot_messages
→user_input
→messages
- 输出:
- 向
messages
追加助手消息 - 设置
last_response
- 设置
node_responses[<llm_node_id>]
- 向
- 输入优先级:
- Tools 节点
- 输入:从
messages
中寻找最新的带tool_calls
的助手消息 - 输出:向
messages
追加工具返回消息
- 输入:从
- Agent 节点(子代理)
- 输入:Graph 的 State 通过
Invocation.RunOptions.RuntimeState
传入子代理- 子代理的 Model/Tool 回调可通过
agent.InvocationFromContext(ctx)
访问
- 子代理的 Model/Tool 回调可通过
- 结束输出:
- 设置
last_response
- 设置
node_responses[<agent_node_id>]
- 清空
user_input
- 设置
- 输入:Graph 的 State 通过
推荐用法
- 在 Schema 中声明业务字段(如
parsed_time
、final_payload
),函数节点写入/读取。 - 需要给 LLM 节点注入结构化提示时,可在前置节点写入
one_shot_messages
(例如加入包含解析信息的 system message)。 - 需要消费上游文本结果时:紧邻下游读取
last_response
,或在任意后续节点读取node_responses[节点ID]
。
示例:
examples/graph/io_conventions
:函数 + LLM + Agent 的 I/O 演示examples/graph/io_conventions_tools
:加入 Tools 节点,展示如何获取工具 JSON 并落入 Stateexamples/graph/retry
:节点级重试/退避演示
状态键常量与来源(可直接引用)
- 导入包:
import "trpc.group/trpc-go/trpc-agent-go/graph"
- 常量定义位置:
graph/state.go
- 用户可见、常用键
user_input
→ 常量graph.StateKeyUserInput
one_shot_messages
→ 常量graph.StateKeyOneShotMessages
messages
→ 常量graph.StateKeyMessages
last_response
→ 常量graph.StateKeyLastResponse
node_responses
→ 常量graph.StateKeyNodeResponses
- 其他常用键
session
→graph.StateKeySession
metadata
→graph.StateKeyMetadata
current_node_id
→graph.StateKeyCurrentNodeID
exec_context
→graph.StateKeyExecContext
tool_callbacks
→graph.StateKeyToolCallbacks
model_callbacks
→graph.StateKeyModelCallbacks
agent_callbacks
→graph.StateKeyAgentCallbacks
parent_agent
→graph.StateKeyParentAgent
使用示例:
事件元数据键(StateDelta)
- 导入包:
import "trpc.group/trpc-go/trpc-agent-go/graph"
- 常量定义位置:
graph/events.go
- 模型元数据:
_model_metadata
→graph.MetadataKeyModel
(结构体graph.ModelExecutionMetadata
) - 工具元数据:
_tool_metadata
→graph.MetadataKeyTool
(结构体graph.ToolExecutionMetadata
) - 节点元数据:
_node_metadata
→graph.MetadataKeyNode
(结构体graph.NodeExecutionMetadata
)。包含重试字段:Attempt
、MaxAttempts
、NextDelay
、Retrying
及时间相关信息。
使用示例:
1. 创建 GraphAgent 和 Runner
用户主要通过创建 GraphAgent 然后通过 Runner 来使用 Graph 包。这是推荐的使用模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
2. 使用 LLM 节点
LLM 节点实现了固定的三段式输入规则,无需配置:
- OneShot 优先:若存在
one_shot_messages
,以它为本轮输入。 - UserInput 其次:否则若存在
user_input
,自动持久化一次。 - 历史默认:否则以持久化历史作为输入。
重要说明:
- SystemPrompt 仅用于本次输入,不落持久化状态。
- 一次性键(
user_input
/one_shot_messages
)在成功执行后自动清空。 - 所有状态更新都是原子性的,确保一致性。
- GraphAgent/Runner 仅设置
user_input
,不再预先把用户消息写入messages
。这样可以允许在 LLM 节点之前的任意节点对user_input
进行修改,并能在同一轮生效。
三种输入范式
-
OneShot(
StateKeyOneShotMessages
):- 当该键存在时,本轮仅使用这里提供的
[]model.Message
调用模型, 通常包含完整的 system prompt 与 user prompt。调用后自动清空。 - 适用场景:前置节点专门构造 prompt 的工作流,需完全覆盖本轮输入。
- 当该键存在时,本轮仅使用这里提供的
-
UserInput(
StateKeyUserInput
):- 当
user_input
非空时,LLM 节点会取持久化历史messages
,并将 本轮的用户输入合并后发起调用。结束后会把用户输入与助手回复通过MessageOp
(例如AppendMessages
、ReplaceLastUser
)原子性写入 到messages
,并自动清空user_input
以避免重复追加。 - 适用场景:普通对话式工作流,允许在前置节点动态调整用户输入。
- 当
- Messages only(仅
StateKeyMessages
):- 多用于工具调用回路。当第一轮经由
user_input
发起后,路由到工具 节点执行,再回到 LLM 节点时,因为user_input
已被清空,LLM 将走 “Messages only” 分支,以历史中的 tool 响应继续推理。
- 多用于工具调用回路。当第一轮经由
LLM 指令中的占位符
LLM 节点的 instruction
支持占位符注入(与 LLMAgent 规则一致):
{key}
→ 替换为session.State["key"]
{key?}
→ 可选,缺失时替换为空{user:subkey}
、{app:subkey}
、{temp:subkey}
→ 访问用户/应用/临时命名空间(SessionService 会将 app/user 作用域合并到 session,并带上前缀)
说明:
- GraphAgent 会把当前
*session.Session
写入图状态的StateKeySession
,LLM 节点据此读取注入值 - 无前缀键(如
research_topics
)需要直接存在于session.State
示例:
可参考可运行示例:examples/graph/placeholder
。
通过 Reducer 与 MessageOp 实现的原子更新
Graph 包的消息状态支持 MessageOp
补丁操作(如 ReplaceLastUser
、
AppendMessages
等),由 MessageReducer
实现原子合并。这带来两个
直接收益:
- 允许在 LLM 节点之前修改
user_input
,LLM 节点会据此在一次返回中将 需要的操作(例如替换最后一条用户消息、追加助手消息)以补丁形式返回, 执行器一次性落库,避免竞态与重复。` - 兼容传统的直接
[]Message
追加用法,同时为复杂更新提供更高的表达力。
示例:在前置节点修改 user_input
,随后进入 LLM 节点。
3. GraphAgent 配置选项
GraphAgent 支持多种配置选项:
模型/工具回调需要在节点级配置,例如
AddLLMNode(..., graph.WithModelCallbacks(...))
或AddToolsNode(..., graph.WithToolCallbacks(...))
。
配置了子 Agent 后,可以在图中使用 Agent 节点委托执行:
Agent 节点会以节点 ID 作为查找键,因此需确保
AddAgentNode("assistant")
与subAgent.Info().Name == "assistant"
一致。
4. 条件路由
5. 工具节点集成
工具调用配对机制与二次进入 LLM:
- 从
messages
尾部向前扫描最近的assistant(tool_calls)
;遇到user
则停止,确保配对正确。 - 当工具节点完成后返回到 LLM 节点时,
user_input
已被清空,LLM 将走 “Messages only” 分支,以历史中的 tool 响应继续推理。
6. 节点重试与退避
为节点配置指数退避的重试策略(可选抖动)。失败的尝试不会产生写入;只有成功的一次才会落库并触发路由。
- 节点级策略(
WithRetryPolicy
):
- 执行器默认策略(当节点未配置时生效):
注意事项
- 中断(interrupt)不参与重试。
- 当设置了步骤超时(WithStepTimeout
)时,退避时间会被当前步骤的截止时间钳制。
- 事件会携带重试元数据,便于 CLI/UI 展示进度:
示例:examples/graph/retry
展示了一个会先失败后成功的节点,并在成功后进入下游 LLM 输出最终答案。
7. Runner 配置
Runner 提供了会话管理和执行环境:
8. 消息状态模式
对于对话式应用,可以使用预定义的消息状态模式:
9. 状态键使用场景
用户自定义状态键:用于存储业务逻辑数据
内置状态键:用于系统集成
高级功能
1. 中断和恢复(人机协作)
Graph 包通过中断和恢复功能支持人机协作 (HITL) 工作流。这使得工作流可以暂停执行,等待人工输入或审批,然后从中断的确切位置恢复。
基本用法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
上面的例子展示了如何声明节点、连边并运行。接下来先介绍执行方式与会话管理,然后进入核心概念与常见用法。
执行方式
- 用
graphagent.New
包装成通用agent.Agent
,交给runner.Runner
管理会话与事件流。
最小 GraphAgent + Runner 例子:
Runner 会话后端可选项:
- 内存:session/inmemory
(默认示例使用)
- Redis:session/redis
(生产更常用)
GraphAgent 配置选项
核心概念
状态管理
GraphAgent 采用 Schema + Reducer 模式管理状态。先明确状态结构与合并规则,后续节点输入/输出的 key 就有了清晰来源与生命周期约定。
使用内置 Schema
自定义 Schema
Reducer 机制确保状态字段按预定义规则安全合并,这在并发执行时尤其重要。
提示:建议为业务键定义常量,避免散落魔法字符串。
节点类型
GraphAgent 提供了四种内置节点类型:
Function 节点
最基础的节点,执行自定义逻辑:
LLM 节点
集成语言模型,自动管理对话历史:
Tools 节点
执行工具调用,注意是顺序执行:
将工具结果写入 State
在 Tools 节点之后,添加一个函数节点,从 graph.StateKeyMessages
汇总工具结果并写入结构化 State:
参考示例:examples/graph/io_conventions_tools
。
边与路由
边定义了节点间的执行流转:
提示:设置入口与结束点时,会隐式连接到虚拟的 Start/End 节点:
- SetEntryPoint("first")
等效于创建 Start -> first
的连边;
- SetFinishPoint("last")
等效于创建 last -> End
的连边。
无需显式添加这两条边。
常量名:graph.Start == "__start__"
,graph.End == "__end__"
。
命令模式(动态路由 / Fan-out)
节点除返回 graph.State
外,也可以返回 *graph.Command
或 []*graph.Command
,以同时更新状态并指定下一跳:
使用命令模式进行路由时,无需为 GoTo
目标添加显式静态边;仅需保证目标节点存在,并在需要作为终点时设置 SetFinishPoint
。
架构设计
整体架构
GraphAgent 的架构设计体现了我们对复杂系统的理解:通过清晰的分层来管理复杂性。每一层都有明确的职责,层与层之间通过标准接口通信。
flowchart TB
subgraph "Runner Layer"
R[Runner]:::runnerClass
S[Session Service]:::sessionClass
end
subgraph "GraphAgent"
GA[GraphAgent Wrapper]:::agentClass
CB[Callbacks]:::callbackClass
end
subgraph "Graph Engine"
SG[StateGraph Builder]:::builderClass
G[Graph]:::graphClass
E[Executor]:::executorClass
end
subgraph "Execution Components"
P[Planning]:::phaseClass
EX[Execution]:::phaseClass
U[Update]:::phaseClass
end
subgraph "Storage"
CP[Checkpoint]:::storageClass
ST[State Store]:::storageClass
end
R --> GA
GA --> G
G --> E
E --> P
E --> EX
E --> U
E --> CP
classDef runnerClass fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef sessionClass fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef agentClass fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef callbackClass fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef builderClass fill:#fff8e1,stroke:#f57c00,stroke-width:2px
classDef graphClass fill:#f1f8e9,stroke:#689f38,stroke-width:2px
classDef executorClass fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phaseClass fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef storageClass fill:#efebe9,stroke:#5d4037,stroke-width:2px
核心模块解析
核心组件一览:
graph/state_graph.go
- StateGraph 构建器
提供链式声明式 Go API 来构建图结构,通过 fluent 方法链(AddNode → AddEdge → Compile)定义节点、边和条件路由。
graph/graph.go
- 编译后的运行时
实现基于通道(Channel)的事件触发式执行机制。节点执行结果合并入 State;通道仅用于触发路由,写入哨兵值(sentinel value)而非业务数据。
graph/executor.go
- BSP 执行器
这是系统心脏,借鉴了 Google Pregel 论文。实现 BSP(Bulk Synchronous Parallel)风格的三阶段循环:Planning → Execution → Update。
graph/checkpoint/*
- 检查点和恢复机制
提供可选的检查点持久化(如 sqlite),原子保存状态与待写入动作,支持按谱系/检查点恢复。
agent/graphagent/graph_agent.go
- Graph 与 Agent 的桥梁
将编译后的 Graph 适配为通用 Agent,可复用会话、回调与事件流。
执行模型
GraphAgent 借鉴了 Google Pregel 的 BSP(Bulk Synchronous Parallel)模型,但适配到了单进程环境;在此基础上还支持检查点、HITL 中断/恢复与时间旅行:
sequenceDiagram
autonumber
participant R as Runner
participant GA as GraphAgent
participant EX as Executor
participant CK as Checkpoint Saver
participant DB as Storage
participant H as Human
R->>GA: Run(invocation)
GA->>EX: Execute(graph, state, options)
GA-->>R: Stream node/tool/model events
loop 每个超级步 (BSP)
EX->>EX: Planning — 计算前沿(Frontier)
par 并行执行节点
EX->>EX: 执行节点 i(状态浅拷贝)
EX-->>GA: 节点开始事件(author=nodeID)
and
EX->>EX: 执行节点 j(状态浅拷贝)
EX-->>GA: 节点开始事件
end
alt 节点触发 Interrupt(key,prompt)
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen,reason=interrupt)
CK->>DB: 原子提交
EX-->>GA: interrupt 事件(checkpoint_id,prompt)
GA-->>R: 转发中断事件并暂停
R->>H: 请求人工输入/审批
H-->>R: 提交决策/值
R->>GA: Run(resume) runtime_state{
R->>GA: checkpoint_id,resume_map}
GA->>EX: ResumeFromCheckpoint(checkpoint_id,resume_map)
EX->>CK: Load checkpoint
CK->>EX: state/frontier/pending_writes/versions_seen
EX->>EX: 重建前沿并应用恢复值
else 正常执行
EX-->>GA: 节点完成事件(含 tool/model 事件)
EX->>EX: Update — Reducer 合并状态
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen)
CK->>DB: 原子提交
end
end
Note over EX,CK: versions_seen 避免重复执行;
Note over EX,CK: pending_writes 重建通道;
Note over EX,CK: parent_id 形成谱系以支持时间旅行
opt 时间旅行(回溯/分支)
R->>GA: Run(runtime_state{checkpoint_id})
GA->>EX: ResumeFromCheckpoint(checkpoint_id)
EX->>CK: Load checkpoint + lineage
CK->>EX: 恢复状态并可创建新 lineage_id
end
EX-->>GA: done 事件(last_response)
GA-->>R: 输出最终消息
flowchart TB
%% 执行全景图(精简连线)
subgraph Client
R[Runner]:::runner --> GA[GraphAgent]:::agent
end
subgraph Engine[Graph Engine]
GA --> EX[Executor]:::executor
subgraph BSP["BSP Superstep"]
P[Planning]:::phase --> X[Execution]:::phase --> U[Update]:::phase
end
end
N[Nodes: LLM / Tools / Function / Agent]:::process
CK[(Checkpoint)]:::storage
H[Human]:::human
EX --> BSP
EX --> N
EX -.-> CK
GA <--> H
GA --> R
classDef runner fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef agent fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef executor fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phase fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
classDef storage fill:#efebe9,stroke:#6d4c41,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
执行过程的关键点:
- Planning Phase: 基于通道状态确定本步要执行的节点
- Execution Phase: 每个节点获得状态的浅拷贝(maps.Copy),并行执行
- Update Phase: 通过 Reducer 合并各节点的状态更新,保证并发安全
这种设计让每一步都能被清晰观测、安全中断和恢复。
运行态隔离与事件快照
- 执行器(Executor)可复用且并发安全,单次运行态存放于
ExecutionContext
,包括通道版本、待写(pending writes)、最近检查点等。 - 事件的
StateDelta
使用深拷贝快照,只包含可序列化且允许的键;内部键(如执行上下文、回调等)会被过滤,便于带外观测与持久化。
执行器配置
与多 Agent 系统集成
GraphAgent 的设计初衷就是成为 tRPC-Agent-Go 多 Agent 生态的一部分,而不是独立存在。它实现了标准的 Agent 接口,可以和其他 Agent 类型无缝协作。
GraphAgent 作为 Agent
GraphAgent 实现了标准 Agent 接口:
高级编排
下图展示复杂业务编排:入口清洗 → 智能路由 → 多子编队(Email、Weather、Research)→ 并行 fanout/聚合 → 最终合成与发布。
flowchart LR
%% Layout
subgraph UE["User & Entry"]
U((User)):::human --> IN["entry<br/>normalize"]:::process
end
subgraph FAB["Graph Orchestration"]
Rtr["where_to_go<br/>router"]:::router
Compose["compose<br/>LLM"]:::llm
end
IN --> Rtr
%% Email Agent (expanded)
subgraph EC["Email Agent"]
direction LR
CE["classifier<br/>LLM"]:::llm --> WE["writer<br/>LLM"]:::llm
end
%% Weather Agent (expanded)
subgraph WA["Weather Agent"]
direction LR
LE["locate<br/>LLM"]:::llm --> WT["weather tool"]:::tool
end
%% Routing from router to pods
Rtr -- email --> CE
Rtr -- weather --> LE
Rtr -- other --> REPLY["reply<br/>LLM"]:::llm
%% Fanout Pipeline (fanout → workers → aggregate)
subgraph FP["Fanout Pipeline"]
direction LR
Fan["plan_fanout"]:::process --> W1["worker A"]:::process
Fan --> W2["worker B"]:::process
Fan --> W3["worker C"]:::process
W1 --> Agg["aggregate"]:::process
W2 --> Agg
W3 --> Agg
end
Rtr -- research --> Fan
%% Human-in-the-loop (optional)
Compose -. review .- HG["human<br/>review"]:::human
%% Compose final (minimal wiring)
Agg --> Compose
WE --> Compose
WT --> Compose
REPLY --> Compose
Compose --> END([END]):::terminal
%% Styles
classDef router fill:#fff7e0,stroke:#f5a623,stroke-width:2px
classDef llm fill:#e3f2fd,stroke:#1e88e5,stroke-width:2px
classDef tool fill:#fff3e0,stroke:#fb8c00,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef terminal fill:#ffebee,stroke:#e53935,stroke-width:2px
要点: - 智能路由 where_to_go 可由 LLM 决策或函数节点实现(条件边)。 - Fanout Pipeline 使用 Command GoTo 进行运行时 fanout,三路并行后在 aggregate 节点聚合。 - 可选的人机把关位于聚合之后,确保关键输出经人工确认。 - 仅在 Compose 处展示一次保存检查点,既不喧宾夺主,又能体现可恢复能力。
在图中嵌入 Agent
在图内部,我们也可以把已有的子 Agent 作为一个节点来调用。下面的示例展示了如何创建子 Agent、声明对应节点,并在 GraphAgent 构造时注入。
混合模式示例
结构化流程中嵌入动态决策:
核心机制详解
状态管理:Schema + Reducer 模式
状态管理是图工作流的核心挑战之一。我们设计了一套基于 Schema + Reducer 的状态管理机制,既保证了类型安全,又支持高并发的原子更新。
flowchart LR
subgraph "State Schema"
MS[messages: MessageList]:::schemaClass
UI[user_input: string]:::schemaClass
LR[last_response: string]:::schemaClass
NR[node_responses: Map]:::schemaClass
end
subgraph "State Operations"
R1[MessageReducer]:::reducerClass
R2[AppendReducer]:::reducerClass
R3[DefaultReducer]:::reducerClass
end
subgraph "Concurrent Updates"
N1[Node 1 Output]:::nodeOutputClass
N2[Node 2 Output]:::nodeOutputClass
N3[Node 3 Output]:::nodeOutputClass
end
N1 --> R1
N2 --> R2
N3 --> R3
R1 --> MS
R2 --> NR
R3 --> LR
classDef schemaClass fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef reducerClass fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef nodeOutputClass fill:#fff8e1,stroke:#f57f17,stroke-width:2px
Graph 的状态底层是 map[string]any
,通过 StateSchema
提供运行时类型校验和字段验证。Reducer 机制确保状态字段按预定义规则安全合并,避免并发更新冲突。
常用键常量参考
- 用户可见:
graph.StateKeyUserInput
、graph.StateKeyOneShotMessages
、graph.StateKeyMessages
、graph.StateKeyLastResponse
、graph.StateKeyNodeResponses
、graph.StateKeyMetadata
- 系统内部:
session
、exec_context
、tool_callbacks
、model_callbacks
、agent_callbacks
、current_node_id
、parent_agent
- 命令/恢复:
__command__
、__resume_map__
常量均定义在 graph/state.go
与 graph/keys.go
,建议通过常量引用,避免硬编码。
节点级回调与生成参数
节点可通过可选项注册回调或参数(见 graph/state_graph.go
):
- graph.WithPreNodeCallback
/ graph.WithPostNodeCallback
/ graph.WithNodeErrorCallback
- LLM 节点可用 graph.WithGenerationConfig
、graph.WithModelCallbacks
- 工具节点可用 graph.WithToolCallbacks
- Agent 节点可用 graph.WithAgentNodeEventCallback
此外,graph.WithName
/graph.WithDescription
可为节点添加友好的名称与描述;graph.WithDestinations
可声明潜在动态路由目标(仅用于静态校验/可视化)。
LLM 输入规则:三段式设计
LLM 节点的输入处理是我们花了很多时间打磨的功能。看起来简单的三段式规则,实际上解决了 AI 应用中最常见的上下文管理问题。
LLM 节点内置了一套固定的输入选择逻辑(无需额外配置):
- 优先用
graph.StateKeyOneShotMessages
:完全覆盖本轮输入(含 system/user),执行后清空 - 其次用
graph.StateKeyUserInput
:在graph.StateKeyMessages
基础上追加本轮 user,再把 assistant 回答一起原子写回,随后清空graph.StateKeyUserInput
- 否则仅用
graph.StateKeyMessages
:常见于工具回路二次进 LLM(graph.StateKeyUserInput
已被清空)
这套规则的精妙之处在于,它既保证了"预处理节点可以改写 graph.StateKeyUserInput
并在同一轮生效",又与工具循环(tool_calls → tools → LLM)自然衔接。
示例(技术解析级别的小片段,演示三种输入路径):
指令占位符注入
AddLLMNode
的 instruction
支持占位符,语法与 llmagent
一致:
- {key}
/ {key?}
:从会话 session.State
读取键值,可选后缀 ?
缺失时为空;
- {user:subkey}
、{app:subkey}
、{temp:subkey}
:按命名空间读取。
GraphAgent 会把当前 *session.Session
放入状态(graph.StateKeySession
键),LLM 节点会在执行前对指令进行占位符展开。
提示:GraphAgent 会从会话事件播种 graph.StateKeyMessages
以保证多轮连贯;从检查点恢复时,若用户消息仅为 "resume",不会注入到 graph.StateKeyUserInput
,以避免干扰已恢复的状态。
并发执行和状态安全
当一个节点有多条出边时,会自动触发并行执行:
内部实现保证了并发安全:执行器为每个任务构造浅拷贝(maps.Copy)并在合并时加锁,同时通过 Reducer 机制来安全地合并并发更新。
节点 I/O 约定与常用键
节点之间仅通过共享 State
传递数据,节点函数返回的增量由 Schema 的 Reducer 合并。
- 函数节点(Function)
- 输入:完整
State
(按 Schema 声明读取) - 输出:只写业务键(例如
{"parsed_time":"..."}
),不要写内部键
- 输入:完整
- LLM 节点
- 输入优先级:
graph.StateKeyOneShotMessages
→graph.StateKeyUserInput
→graph.StateKeyMessages
- 输出:原子写回
graph.StateKeyMessages
、设置graph.StateKeyLastResponse
、设置graph.StateKeyNodeResponses[<llm_node_id>]
- 输入优先级:
- 工具节点(Tools)
- 自
graph.StateKeyMessages
尾部配对当前轮的assistant(tool_calls)
,按顺序追加工具返回到graph.StateKeyMessages
- 多个工具按 LLM 返回顺序顺序执行
- 自
- Agent 节点
- 通过
Invocation.RunOptions.RuntimeState
接收 Graph 的State
- 输出:设置
graph.StateKeyLastResponse
与graph.StateKeyNodeResponses[<agent_node_id>]
;执行成功后会清空graph.StateKeyUserInput
- 通过
实践建议:
- 串行读取:紧邻下游直接读取 graph.StateKeyLastResponse
;
- 并行/汇合读取:从 graph.StateKeyNodeResponses[<nodeID>]
读取指定节点输出;
- 为业务键在 Schema 中声明合适的 Reducer,避免并发写入冲突。
API 速查表
- 构图
graph.NewStateGraph(schema)
→ 构建器AddNode(id, func, ...opts)
/AddLLMNode(id, model, instruction, tools, ...opts)
AddToolsNode(id, tools, ...opts)
/AddAgentNode(id, ...opts)
AddEdge(from, to)
/AddConditionalEdges(from, condition, pathMap)
AddToolsConditionalEdges(llmNode, toolsNode, fallback)
SetEntryPoint(nodeID)
/SetFinishPoint(nodeID)
/Compile()
- 常用 State 键(用户可见)
graph.StateKeyUserInput
、graph.StateKeyOneShotMessages
、graph.StateKeyMessages
、graph.StateKeyLastResponse
、graph.StateKeyNodeResponses
、graph.StateKeyMetadata
- 节点级可选项
graph.WithGenerationConfig
、graph.WithModelCallbacks
、graph.WithToolCallbacks
graph.WithPreNodeCallback
、graph.WithPostNodeCallback
、graph.WithNodeErrorCallback
- 执行
graphagent.New(name, compiledGraph, ...opts)
→runner.NewRunner(app, agent)
→Run(...)
更多端到端用法见 examples/graph
(基础/并行/多轮/中断/工具/占位符)。
高级特性
检查点与恢复
为了支持时间旅行与可靠恢复,可以为执行器或 GraphAgent 配置检查点保存器。下面演示使用 SQLite Saver 持久化检查点并从特定检查点恢复。
检查点管理
使用管理器可以便捷地浏览、查询与删除检查点:
建议在生产中为 namespace
使用稳定的业务标识(如 svc:prod:flowX
),便于审计与对账。
默认值与注意事项
- 默认值(Executor)
ChannelBufferSize = 256
、MaxSteps = 100
、CheckpointSaveTimeout = 10s
- 步/节点超时可通过
Executor
的WithStepTimeout
/WithNodeTimeout
配置(目前 GraphAgent 选项未直接暴露)
- 会话
- 生产环境优先使用 Redis Session;设置合理 TTL 与清理策略
- Runner 会自动从会话事件播种多轮
graph.StateKeyMessages
- 检查点
- 采用稳定的
namespace
命名(如svc:prod:flowX
);使用CheckpointManager
按谱系审计与清理
- 采用稳定的
- 事件与背压
- 调整
WithChannelBufferSize
;按author
/object
过滤事件降低噪音
- 调整
- 命名与键
- 节点/路由标签/状态键使用常量;为需要合并的键声明 Reducer
- 治理与合规
- 关键路径引入 HITL;敏感信息优先落到
graph.StateKeyMetadata
,避免混入graph.StateKeyMessages
事件速览
- Author 约定
- 节点级:节点 ID(无法获取时为
graph.AuthorGraphNode
) - Pregel 阶段:
graph.AuthorGraphPregel
- 执行器/系统:
graph.AuthorGraphExecutor
- 用户输入:
user
(未导出常量)
- 节点级:节点 ID(无法获取时为
- 对象类型(子集)
- 节点:
graph.ObjectTypeGraphNodeStart | graph.ObjectTypeGraphNodeComplete | graph.ObjectTypeGraphNodeError
- Pregel:
graph.ObjectTypeGraphPregelPlanning | graph.ObjectTypeGraphPregelExecution | graph.ObjectTypeGraphPregelUpdate
- 通道/状态:
graph.ObjectTypeGraphChannelUpdate
/graph.ObjectTypeGraphStateUpdate
- 检查点:
graph.ObjectTypeGraphCheckpoint
、graph.ObjectTypeGraphCheckpointCreated
、graph.ObjectTypeGraphCheckpointCommitted
、graph.ObjectTypeGraphCheckpointInterrupt
- 节点:
更多示例见下文“事件监控”。
Human-in-the-Loop
在关键路径上引入人工确认(HITL)能够显著提升可控性。下面的示例展示一个“中断—恢复”的基本流程:
恢复辅助函数:
也可以在执行入口通过命令注入恢复值(无需提前到特定节点)。使用 Runner 传入 RuntimeState
即可:
事件监控
事件流承载了整个图的执行过程与增量输出。下面的示例展示了如何遍历事件并区分图事件与模型增量:
在实际使用中,建议结合 Event 的 Author
字段进行过滤:
- 节点级事件(模型、工具、节点起止):
Author = <nodeID>
(若无法获取 nodeID,则为graph-node
) - Pregel(规划/执行/更新/错误):
Author = graph.AuthorGraphPregel
- 执行器级别事件(状态更新/检查点等):
Author = graph.AuthorGraphExecutor
- 用户输入事件(Runner 写入):
Author = user
利用这一约定,你可以精准订阅某个节点的流式输出,而无需在节点之间传递流式上下文(流式由事件通道统一承载,状态仍按 LangGraph 风格以结构化 State 传递)。
示例:仅消费节点 ask
的流式输出,并在完成时打印最终消息。
事件元数据(StateDelta)
每个事件还携带 StateDelta
,可读取模型/工具等执行元数据:
也可以在 Agent 级别配置回调:
常见问题排查
- 报错 "graph must have an entry point"
- 未设置入口点。调用
SetEntryPoint()
,并确保目标节点已定义。
- 未设置入口点。调用
- 报错目标/源节点不存在
- 在连边/条件路由前先定义节点;条件路由的
pathMap
目标也需存在。
- 在连边/条件路由前先定义节点;条件路由的
- 工具未执行
- 确认 LLM 返回了
tool_calls
,并使用了AddToolsConditionalEdges(ask, tools, fallback)
; - 工具名需与模型声明一致;
- 配对规则是从最近一次
assistant(tool_calls)
回溯到下一个user
,检查消息顺序。
- 确认 LLM 返回了
- 没有观察到流式事件
- 调大
WithChannelBufferSize
并按Author
/对象类型过滤; - 确认从
Runner.Run(...)
消费事件。
- 调大
- 从检查点恢复未按预期继续
- 通过
agent.WithRuntimeState(map[string]any{ graph.CfgKeyCheckpointID: "..." })
传入; - HITL 恢复时提供
ResumeMap
;纯 "resume" 文本不会注入到graph.StateKeyUserInput
。
- 通过
- 并行下状态冲突
- 为列表/映射等声明合并型 Reducer(如
StringSliceReducer
、MergeReducer
),避免多个分支覆盖同一键。
- 为列表/映射等声明合并型 Reducer(如
实际案例
审批工作流
总结
本文介绍了 graph
包与 GraphAgent 的核心用法:如何声明节点与路由、如何通过 Schema 与 Reducer 安全合并状态、以及如何利用事件、检查点与中断实现可观测与可恢复。对于结构化流程(审批、内容审核、分步数据处理等),Graph 提供稳定、可审计的执行路径;对于需要智能决策的环节,可通过 LLM 节点与子 Agent 灵活扩展。
参考与示例
- 代码仓库: https://github.com/trpc-group/trpc-agent-go
- Graph 示例:
examples/graph
目录(基础/并行/多轮/中断与恢复等)- I/O 约定:
io_conventions
、io_conventions_tools
- 并行 / 扇出:
parallel
、fanout
、diamond
- 占位符:
placeholder
- 检查点 / 中断:
checkpoint
、interrupt
- I/O 约定:
- 进一步阅读:
graph/state_graph.go
、graph/executor.go
、agent/graphagent