tRPC-Agent-Go:GraphAgent 让 AI 工作流与 Agent 无缝融合
在 AI Agent 开发中,如何平衡智能性与可控性一直是核心挑战。tRPC-Agent-Go 的 GraphAgent 模块提供了一种独特的解决方案:通过图编排引擎与 Agent 框架的深度融合,让开发者既能享受 LLM 的智能决策能力,又能保持对执行流程的精确控制。本文将深入介绍 GraphAgent 的设计理念、核心机制和最佳实践,帮助你快速构建 AI Graph Agent 系统。
背景
围绕 AI Agent 的实践,这几年大体分成两条路线:
- 工作流编排:用节点和边显式控制流程(代表:LangGraph 、LangChain 、Eino )。优点是可控、可调试;不足是灵活性有限。
- 自主协作:把 LLM 作为最小原子让其自主决策(代表:Microsoft AutoGen 、Google ADK 、Agno )。优点是简单、自适应;不足是行为不可预测、难审计。
tRPC-Agent-Go 最初更偏向“自主协作”。随着首批用户在真实业务中落地,我们陆续收到大量关于“图编排(Graph)”的需求:希望具备可控的执行路径、完整的可观测性与可恢复能力(审计/合规、人工把关、精确恢复等),同时又不丢失 LLM 的智能决策优势。
基于这些反馈,我们推出了 GraphAgent——它既不是单独的工作流引擎,也不是纯 Agent,而是把两者融合到一起:
- 作为 Graph:BSP 执行模型、条件/工具路由、天然并行。
- 作为 Agent:实现统一接口,既能被上层调用,也能在图内调用子 Agent。
- 附加能力:检查点时间旅行、Human‑in‑the‑Loop、A2A 集成。
下面从“如何使用”到“设计实现”逐层展开。
快速开始
仓库地址 :
* GitHub 开源仓库:github.com/trpc-group/trpc-agent-go
第一个例子
先看一个最简单却完整的工作流:
flowchart LR
START([start]):::startNode --> P[prepare]:::processNode
P --> A[ask LLM]:::llmNode
A -. tool_calls .-> T[tools]:::toolNode
A -- no tool_calls --> F[fallback]:::processNode
T --> A
F --> END([finish]):::endNode
classDef startNode fill:#e1f5e1,stroke:#4caf50,stroke-width:2px
classDef endNode fill:#ffe1e1,stroke:#f44336,stroke-width:2px
classDef llmNode fill:#e3f2fd,stroke:#2196f3,stroke-width:2px
classDef toolNode fill:#fff3e0,stroke:#ff9800,stroke-width:2px
classDef processNode fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
用代码把这个图变成 tRPC-Agent-Go 的 GraphAgent 实现:
package main
import (
"context"
"fmt"
"strings"
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent"
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/model"
"trpc.group/trpc-go/trpc-agent-go/model/openai"
"trpc.group/trpc-go/trpc-agent-go/runner"
"trpc.group/trpc-go/trpc-agent-go/tool"
"trpc.group/trpc-go/trpc-agent-go/tool/function"
)
func main () {
ctx := context . Background ()
// 推荐:先定义 Schema,再构建图
// 对话类应用可直接复用消息模式(包含 messages/user_input/last_response/node_responses 等)
// 非对话/结构化任务可使用 NewStateSchema 显式定义字段与 Reducer
// 这里使用内置的消息模式
sg := graph . NewStateGraph ( graph . MessagesStateSchema ())
// 常量定义(避免魔法字符串)
const (
NodePrepare = "prepare"
NodeAsk = "ask"
NodeTools = "tools"
NodeFallback = "fallback"
NodeFinish = "finish"
ModelName = "gpt-4o-mini"
SystemPromptGeneral = "你是一个助手,可以使用计算器工具"
OutputKeyFinal = "final_output"
ToolNameCalculator = "calculator"
)
// prepare 节点:清洗输入
sg . AddNode ( NodePrepare , func ( ctx context . Context , s graph . State ) ( any , error ) {
input := s [ graph . StateKeyUserInput ].( string )
return graph . State { graph . StateKeyUserInput : strings . TrimSpace ( input )}, nil
})
// LLM 节点:智能决策
model := openai . New ( ModelName )
// 定义工具(注意泛型和 context)
type CalcInput struct {
Expression string `json:"expression"`
}
type CalcOutput struct {
Result float64 `json:"result"`
}
calcTool := function . NewFunctionTool [ CalcInput , CalcOutput ](
func ( ctx context . Context , in CalcInput ) ( CalcOutput , error ) {
// 简单计算器实现
return CalcOutput { Result : 42 }, nil // 示例
},
function . WithName ( ToolNameCalculator ),
function . WithDescription ( "计算数学表达式" ),
)
tools := map [ string ] tool . Tool { ToolNameCalculator : calcTool }
sg . AddLLMNode ( NodeAsk , model , SystemPromptGeneral , tools )
// 工具执行节点
sg . AddToolsNode ( NodeTools , tools )
// fallback 节点:无需工具时的处理
sg . AddNode ( NodeFallback , func ( ctx context . Context , s graph . State ) ( any , error ) {
return graph . State { graph . StateKeyLastResponse : "已直接回答" }, nil
})
// finish 节点:汇总输出
sg . AddNode ( NodeFinish , func ( ctx context . Context , s graph . State ) ( any , error ) {
response := s [ graph . StateKeyLastResponse ].( string )
return graph . State { OutputKeyFinal : response }, nil
})
// 配置路由
sg . SetEntryPoint ( NodePrepare )
sg . AddEdge ( NodePrepare , NodeAsk )
// AddToolsConditionalEdges 用于从 LLM 节点根据是否有 tool_calls 跳转到 tools 或 fallback 节点
// 工具选择由 LLM 决定,tools 节点可注册多个工具并执行调用
// LLM 和 tools 可循环,未调用工具时走 fallback 路径
sg . AddToolsConditionalEdges ( NodeAsk , NodeTools , NodeFallback ) // 条件路由
sg . AddEdge ( NodeTools , NodeAsk )
sg . AddEdge ( NodeFallback , NodeFinish )
sg . SetFinishPoint ( NodeFinish )
// 编译图
g , err := sg . Compile ()
if err != nil {
panic ( err )
}
// 创建 GraphAgent(使用不同变量名避免与 agent 包冲突)
graphAgent , err := graphagent . New ( "demo" , g )
if err != nil {
panic ( err )
}
// 创建 Runner 并执行
r := runner . NewRunner ( "app" , graphAgent )
// 运行并处理事件流
const (
UserID = "user1"
SessionID = "session1"
UserMessage = "1+1等于多少?"
)
eventCh , err := r . Run ( ctx , UserID , SessionID , model . NewUserMessage ( UserMessage ))
if err != nil {
panic ( err )
}
// 消费事件流(流式增量 + 最终消息)
for ev := range eventCh {
if ev . Error != nil {
fmt . Printf ( "错误: %s\n" , ev . Error . Message )
continue
}
if ev . Response == nil || len ( ev . Response . Choices ) == 0 {
continue
}
ch := ev . Response . Choices [ 0 ]
// 流式增量
if ev . Response . IsPartial && ch . Delta . Content != "" {
fmt . Print ( ch . Delta . Content )
continue
}
// 最终消息
if ! ev . Response . IsPartial && ch . Message . Content != "" {
fmt . Println ( "\n输出:" , ch . Message . Content )
}
}
}
在上述例子中我们可以看到使用 Graph Agent 需要显式建立各种节点,并对这些节点进行连边操作,接下来我们就详细介绍一下 Graph Agent 的各个核心概念。
核心概念
状态管理
GraphAgent 采用 Schema + Reducer 模式管理状态。先明确状态结构与合并规则,后续节点输入/输出的 key 就有了清晰来源与生命周期约定。
使用内置 Schema
import (
"trpc.group/trpc-go/trpc-agent-go/graph"
)
schema := graph . MessagesStateSchema ()
// 预定义字段(键名常量)与语义:
// - graph.StateKeyMessages ("messages") 对话历史([]model.Message;MessageReducer + MessageOp 原子合并)
// - graph.StateKeyUserInput ("user_input") 用户输入(string;一次性,成功执行后清空)
// - graph.StateKeyLastResponse ("last_response") 最后响应(string)
// - graph.StateKeyNodeResponses ("node_responses") 各节点输出(map[string]any;并行汇总读取)
// - graph.StateKeyMetadata ("metadata") 元数据(map[string]any;MergeReducer 合并)
// 其他一次性/系统键(按需使用):
// - graph.StateKeyOneShotMessages ("one_shot_messages") 一次性覆盖本轮输入([]model.Message)
// - graph.StateKeySession ("session") 会话对象(系统使用)
// - graph.StateKeyExecContext ("exec_context") 执行上下文(事件流等,系统使用)
自定义 Schema
import (
"reflect"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
schema := graph . NewStateSchema ()
// 添加自定义字段
schema . AddField ( "counter" , graph . StateField {
Type : reflect . TypeOf ( 0 ),
Default : func () any { return 0 },
Reducer : func ( old , new any ) any {
return old .( int ) + new .( int ) // 累加
},
})
// 字符串列表使用内置 Reducer
schema . AddField ( "items" , graph . StateField {
Type : reflect . TypeOf ([] string {}),
Default : func () any { return [] string {} },
Reducer : graph . StringSliceReducer ,
})
Reducer 机制确保状态字段按预定义规则安全合并,这在并发执行时尤其重要。
提示:建议为业务键定义常量,避免散落魔法字符串。
节点类型
GraphAgent 提供了四种内置节点类型:
Function 节点
最基础的节点,执行自定义逻辑:
import (
"context"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
const (
StateKeyInput = "input"
StateKeyOutput = "output"
)
sg . AddNode ( "process" , func ( ctx context . Context , state graph . State ) ( any , error ) {
data := state [ StateKeyInput ].( string )
processed := transform ( data )
// Function 节点需显式指定输出 key
return graph . State { StateKeyOutput : processed }, nil
})
LLM 节点
集成语言模型,自动管理对话历史:
import (
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/model/openai"
)
const (
LLMModelName = "gpt-4o-mini"
LLMSystemPrompt = "系统提示词"
LLMNodeAssistant = "assistant"
)
model := openai . New ( LLMModelName )
sg . AddLLMNode ( LLMNodeAssistant , model , LLMSystemPrompt , tools )
// LLM 节点的输入输出规则:
// 输入优先级: one_shot_messages > user_input > messages
// 输出: last_response、messages(原子更新)、node_responses(包含当前节点输出,便于并行汇总)
执行工具调用,注意是顺序执行 :
import (
"trpc.group/trpc-go/trpc-agent-go/graph"
)
const nodeTools = "tools"
sg . AddToolsNode ( nodeTools , tools )
// 多个工具会按 LLM 返回的顺序依次执行
// 如需并行,应该使用多个节点 + 并行边
Agent 节点
嵌入子 Agent,实现多 Agent 协作:
import (
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent"
)
const (
SubAgentNameAnalyzer = "analyzer"
GraphAgentNameMain = "main"
)
// 重要:节点 ID 必须与子 Agent 名称一致
sg . AddAgentNode ( SubAgentNameAnalyzer )
// Agent 实例在 GraphAgent 创建时注入
analyzer := createAnalyzer () // 名称必须是 "analyzer"
graphAgent , _ := graphagent . New ( GraphAgentNameMain , g ,
graphagent . WithSubAgents ([] agent . Agent { analyzer }))
边与路由
边定义了节点间的执行流转:
import (
"context"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
// 常量集中管理,提升可读性
const (
NodeA = "nodeA"
NodeB = "nodeB"
NodeDecision = "decision"
NodePathA = "pathA"
NodePathB = "pathB"
RouteToPathA = "route_to_pathA"
RouteToPathB = "route_to_pathB"
StateKeyFlag = "flag"
)
// 普通边:顺序执行
sg . AddEdge ( NodeA , NodeB )
// 条件边:动态路由(第三个参数为路径映射,建议显式提供以做静态校验)
// 先定义目标节点
sg . AddNode ( NodePathA , handlerA )
sg . AddNode ( NodePathB , handlerB )
// 再添加条件路由
sg . AddConditionalEdges ( NodeDecision ,
func ( ctx context . Context , s graph . State ) ( string , error ) {
if s [ StateKeyFlag ].( bool ) {
return RouteToPathA , nil
}
return RouteToPathB , nil
}, map [ string ] string {
RouteToPathA : NodePathA ,
RouteToPathB : NodePathB ,
})
// 工具条件边:处理 LLM 工具调用
const (
NodeLLM = "llm"
NodeToolsUse = "tools"
NodeFallback = "fallback"
)
sg . AddToolsConditionalEdges ( NodeLLM , NodeToolsUse , NodeFallback )
// 并行边:自动并行执行
const (
NodeSplit = "split"
NodeBranch1 = "branch1"
NodeBranch2 = "branch2"
)
sg . AddEdge ( NodeSplit , NodeBranch1 )
sg . AddEdge ( NodeSplit , NodeBranch2 ) // branch1 和 branch2 会并行执行
架构设计
整体架构
GraphAgent 的架构设计体现了我们对复杂系统的理解:通过清晰的分层来管理复杂性。每一层都有明确的职责,层与层之间通过标准接口通信。
flowchart TB
subgraph "Runner Layer"
R[Runner]:::runnerClass
S[Session Service]:::sessionClass
end
subgraph "GraphAgent"
GA[GraphAgent Wrapper]:::agentClass
CB[Callbacks]:::callbackClass
end
subgraph "Graph Engine"
SG[StateGraph Builder]:::builderClass
G[Graph]:::graphClass
E[Executor]:::executorClass
end
subgraph "Execution Components"
P[Planning]:::phaseClass
EX[Execution]:::phaseClass
U[Update]:::phaseClass
end
subgraph "Storage"
CP[Checkpoint]:::storageClass
ST[State Store]:::storageClass
end
R --> GA
GA --> G
G --> E
E --> P
E --> EX
E --> U
E --> CP
classDef runnerClass fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef sessionClass fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef agentClass fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef callbackClass fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef builderClass fill:#fff8e1,stroke:#f57c00,stroke-width:2px
classDef graphClass fill:#f1f8e9,stroke:#689f38,stroke-width:2px
classDef executorClass fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phaseClass fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef storageClass fill:#efebe9,stroke:#5d4037,stroke-width:2px
核心模块解析
每个核心文件都体现了我们对业界最佳实践的学习和创新:
graph/state_graph.go - StateGraph 构建器
提供链式声明式 Go API 来构建图结构,通过 fluent 方法链(AddNode → AddEdge → Compile)定义节点、边和条件路由。
graph/graph.go - 编译后的运行时
实现基于通道(Channel)的事件触发式执行机制。节点执行结果合并入 State;通道仅用于触发路由,写入哨兵值(sentinel value)而非业务数据。
graph/executor.go - Pregel/BSP 执行器
这是系统心脏,借鉴了 Google Pregel 论文。实现 BSP(Bulk Synchronous Parallel)风格的三阶段循环:Planning → Execution → Update。
graph/checkpoint/* - 检查点和恢复机制
通过 SQLite 事务实现原子落盘,保存状态快照和 PendingWrites(待执行的通道写入)。支持父子关系的检查点分支,实现基础的时间旅行和分支管理功能。
agent/graphagent/graph_agent.go - 生态融合的桥梁
GraphAgent 外壳负责会话注入、回调透传,让 Graph 能够无缝融入到 tRPC-Agent-Go 的生态中。
执行模型
GraphAgent 借鉴了 Google Pregel 的 BSP(Bulk Synchronous Parallel)模型,但适配到了单进程环境:
sequenceDiagram
participant R as Runner
participant GA as GraphAgent
participant E as Executor
participant N as Nodes
R->>GA: Run(invocation)
GA->>E: Execute(graph, state)
loop Each Superstep
E->>E: Planning: 分析通道确定待执行节点
E->>N: Execution: 并行执行节点
N-->>E: 返回状态更新
E->>E: Update: 合并状态(通过 Reducer)
end
E-->>GA: 完成事件
GA-->>R: 事件流
执行过程的关键点:
Planning Phase : 基于通道状态确定本步要执行的节点
Execution Phase : 每个节点获得状态的浅拷贝(maps.Copy),并行执行
Update Phase : 通过 Reducer 合并各节点的状态更新,保证并发安全
这种设计让每一步都能被清晰观测、安全中断和恢复。
与多 Agent 系统集成
GraphAgent 的设计初衷就是成为 tRPC-Agent-Go 多 Agent 生态的一部分,而不是独立存在。它实现了标准的 Agent 接口,可以和其他 Agent 类型无缝协作。
GraphAgent 作为 Agent
GraphAgent 实现了标准 Agent 接口:
import (
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/agent/chainagent"
)
// 可以直接在 ChainAgent, ParallelAgent, CycleAgent 中使用
chain := chainagent . New ( "chain" ,
chainagent . WithSubAgents ([] agent . Agent {
graphAgent1 , // 结构化流程1
graphAgent2 , // 结构化流程2
}))
在图中嵌入 Agent
在图内部,我们也可以把已有的子 Agent 作为一个节点来调用。下面的示例展示了如何创建子 Agent、声明对应节点,并在 GraphAgent 构造时注入。
import (
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent"
)
// 创建子 Agent
const (
SubAgentAnalyzer = "analyzer"
SubAgentReviewer = "reviewer"
)
analyzer := createAnalyzer () // 名称必须是 "analyzer"
reviewer := createReviewer () // 名称必须是 "reviewer"
// 在图中声明 Agent 节点
sg . AddAgentNode ( SubAgentAnalyzer )
sg . AddAgentNode ( SubAgentReviewer )
// 创建 GraphAgent 时注入子 Agent
graphAgent , _ := graphagent . New ( "workflow" , g ,
graphagent . WithSubAgents ([] agent . Agent {
analyzer ,
reviewer ,
}))
// 注意:子 Agent 只接收 user_input,输出写入 node_responses[nodeID]
混合模式示例
结构化流程中嵌入动态决策:
import (
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/agent/chainagent"
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
sg := graph . NewStateGraph ( schema )
// 结构化的数据准备
sg . AddNode ( "prepare" , prepareData )
// 动态决策点 - 使用 ChainAgent
dynamicAgent := chainagent . New ( "analyzer" ,
chainagent . WithSubAgents ([] agent . Agent { ... }))
sg . AddAgentNode ( "analyzer" )
// 继续结构化流程
sg . AddNode ( "finalize" , finalizeResults )
// 连接流程
sg . SetEntryPoint ( "prepare" )
sg . AddEdge ( "prepare" , "analyzer" ) // 交给动态 Agent
sg . AddEdge ( "analyzer" , "finalize" ) // 回到结构化流程
sg . SetFinishPoint ( "finalize" )
// 创建时注入
graphAgent , _ := graphagent . New ( "hybrid" , g ,
graphagent . WithSubAgents ([] agent . Agent { dynamicAgent }))
核心机制详解
状态管理:Schema + Reducer 模式
状态管理是图工作流的核心挑战之一。我们设计了一套基于 Schema + Reducer 的状态管理机制,既保证了类型安全,又支持高并发的原子更新。
flowchart LR
subgraph "State Schema"
MS[messages: MessageList]:::schemaClass
UI[user_input: string]:::schemaClass
LR[last_response: string]:::schemaClass
NR[node_responses: Map]:::schemaClass
end
subgraph "State Operations"
R1[MessageReducer]:::reducerClass
R2[AppendReducer]:::reducerClass
R3[DefaultReducer]:::reducerClass
end
subgraph "Concurrent Updates"
N1[Node 1 Output]:::nodeOutputClass
N2[Node 2 Output]:::nodeOutputClass
N3[Node 3 Output]:::nodeOutputClass
end
N1 --> R1
N2 --> R2
N3 --> R3
R1 --> MS
R2 --> NR
R3 --> LR
classDef schemaClass fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef reducerClass fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef nodeOutputClass fill:#fff8e1,stroke:#f57f17,stroke-width:2px
Graph 的状态底层是 map[string]any,通过 StateSchema 提供运行时类型校验和字段验证。Reducer 机制确保状态字段按预定义规则安全合并,避免并发更新冲突。
LLM 输入规则:三段式设计
LLM 节点的输入处理是我们花了很多时间打磨的功能。看起来简单的三段式规则,实际上解决了 AI 应用中最常见的上下文管理问题。
LLM 节点内置了一套固定的输入选择逻辑(无需额外配置):
优先用 one_shot_messages :完全覆盖本轮输入(含 system/user),执行后清空
其次用 user_input :在 messages 基础上追加本轮 user,再把 assistant 回答一起原子写回,随后清空 user_input
否则仅用 messages :常见于工具回路二次进 LLM(user_input 已被清空)
这套规则的精妙之处在于,它既保证了"预处理节点可以改写 user_input 并在同一轮生效",又与工具循环(tool_calls → tools → LLM)自然衔接。
示例(技术解析级别的小片段,演示三种输入路径):
// OneShot:完全覆盖本轮输入(包含 system/user),适合“前置节点构造完整 prompt”
import (
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/model"
)
const (
SystemPrompt = "你是审慎可靠的助手"
UserPrompt = "请用要点总结这段文本"
)
sg . AddNode ( "prepare_prompt" , func ( ctx context . Context , s graph . State ) ( any , error ) {
oneShot := [] model . Message {
model . NewSystemMessage ( SystemPrompt ),
model . NewUserMessage ( UserPrompt ),
}
return graph . State { graph . StateKeyOneShotMessages : oneShot }, nil
})
// 后续进入 LLM 节点时将仅使用 one_shot_messages,并在执行后清空
// UserInput:在历史 messages 基础上附加本轮用户输入
import (
"strings"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
const (
StateKeyCleanedInput = "cleaned_input"
)
sg . AddNode ( "clean_input" , func ( ctx context . Context , s graph . State ) ( any , error ) {
in := strings . TrimSpace ( s [ graph . StateKeyUserInput ].( string ))
return graph . State {
graph . StateKeyUserInput : in , // 将清洗后的输入写回,LLM 节点会把 user+assistant 原子写入 messages
StateKeyCleanedInput : in , // 同时保留业务自定义键
}, nil
})
// Messages-only:工具回路返回后,user_input 已清空;LLM 仅基于 messages(含 tool 响应)继续推理
import (
"trpc.group/trpc-go/trpc-agent-go/graph"
)
sg . AddToolsNode ( "exec_tools" , tools )
sg . AddToolsConditionalEdges ( "ask" , "exec_tools" , "fallback" )
// 再次回到 "ask"(或下游 LLM 节点)时,由于 user_input 已清空,将走 messages-only 分支
并发执行和状态安全
当一个节点有多条出边时,会自动触发并行执行:
import (
"trpc.group/trpc-go/trpc-agent-go/graph"
)
// 这样的图结构会自动并行执行
stateGraph .
AddNode ( "analyze" , analyzeData ).
AddNode ( "generate_report" , generateReport ).
AddNode ( "call_external_api" , callAPI ).
AddEdge ( "analyze" , "generate_report" ). // 这两个会并行执行
AddEdge ( "analyze" , "call_external_api" ) //
内部实现保证了并发安全:执行器为每个任务构造浅拷贝(maps.Copy)并在合并时加锁,同时通过 Reducer 机制来安全地合并并发更新。
高级特性
检查点与恢复
为了支持时间旅行与可靠恢复,可以为执行器配置检查点保存器。下面的示例演示如何使用 SQLite Saver 持久化检查点,并在后续运行中从特定检查点恢复。
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
"trpc.group/trpc-go/trpc-agent-go/agent" // 用于 WithRuntimeState
"trpc.group/trpc-go/trpc-agent-go/agent/graphagent" // 发布为可执行 Agent
"trpc.group/trpc-go/trpc-agent-go/graph" // 恢复配置键
"trpc.group/trpc-go/trpc-agent-go/graph/checkpoint/sqlite"
"trpc.group/trpc-go/trpc-agent-go/model"
)
// 配置检查点
db , _ := sql . Open ( "sqlite3" , "./checkpoints.db" )
saver , _ := sqlite . NewSaver ( db )
graphAgent , _ := graphagent . New ( "workflow" , g ,
graphagent . WithCheckpointSaver ( saver ))
// 执行时自动保存检查点(默认每步保存)
// 从检查点恢复
eventCh , err := r . Run ( ctx , userID , sessionID ,
model . NewUserMessage ( "resume" ),
agent . WithRuntimeState ( map [ string ] any {
graph . CfgKeyCheckpointID : "ckpt-123" ,
}),
)
Human-in-the-Loop
在关键路径上引入人工确认(HITL)能够显著提升可控性。下面的示例展示一个“中断—恢复”的基本流程:
import (
"context"
"fmt"
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/model"
)
const (
StateKeyContent = "content"
StateKeyDecision = "decision"
InterruptKeyReview = "review_key"
)
sg . AddNode ( "review" , func ( ctx context . Context , s graph . State ) ( any , error ) {
content := s [ StateKeyContent ].( string )
// 中断并等待人工输入
result , err := graph . Interrupt ( ctx , s , InterruptKeyReview ,
fmt . Sprintf ( "请审核: %s" , content ))
if err != nil {
return nil , err
}
return graph . State { StateKeyDecision : result }, nil
})
// 恢复执行(需要 import agent 包)
eventCh , err := r . Run ( ctx , userID , sessionID ,
model . NewUserMessage ( "resume" ),
agent . WithRuntimeState ( map [ string ] any {
graph . CfgKeyCheckpointID : checkpointID ,
graph . StateKeyResumeMap : map [ string ] any {
"review_key" : "approved" ,
},
}),
)
事件监控
事件流承载了整个图的执行过程与增量输出。下面的示例展示了如何遍历事件并区分图事件与模型增量:
import (
"fmt"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
for ev := range eventCh {
if ev . Response == nil {
continue
}
// 按对象类型分流(Graph 扩展事件类型见 graph/events.go)
switch ev . Response . Object {
case graph . ObjectTypeGraphNodeStart :
fmt . Println ( "节点开始" )
case graph . ObjectTypeGraphNodeComplete :
fmt . Println ( "节点完成" )
case graph . ObjectTypeGraphChannelUpdate :
fmt . Println ( "通道更新" )
case graph . ObjectTypeGraphCheckpoint , graph . ObjectTypeGraphCheckpointCommitted :
fmt . Println ( "检查点事件" )
}
// 同时处理模型增量/最终输出
if len ( ev . Response . Choices ) > 0 {
ch := ev . Response . Choices [ 0 ]
if ev . Response . IsPartial && ch . Delta . Content != "" {
fmt . Print ( ch . Delta . Content )
} else if ! ev . Response . IsPartial && ch . Message . Content != "" {
fmt . Println ( "\n输出:" , ch . Message . Content )
}
}
}
在实际使用中,建议结合 Event 的 Author 字段进行过滤:
节点级事件(模型、工具、节点起止):Author = <nodeID>(若无法获取 nodeID,则为 graph-node)
Pregel(规划/执行/更新/错误):Author = graph-pregel
执行器级别事件(状态更新/检查点等):Author = graph-executor
用户输入事件(Runner 写入):Author = user
利用这一约定,你可以精准订阅某个节点的流式输出,而无需在节点之间传递流式上下文(流式由事件通道统一承载,状态仍按 LangGraph 风格以结构化 State 传递)。
示例:仅消费节点 ask 的流式输出,并在完成时打印最终消息。
import (
"fmt"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
const NodeIDWatch = "ask"
for ev := range eventCh {
// 仅关注来自指定节点的事件
if ev . Author != NodeIDWatch {
continue
}
if ev . Response == nil || len ( ev . Response . Choices ) == 0 {
continue
}
choice := ev . Response . Choices [ 0 ]
// 节点的流式增量(Delta)
if ev . Response . IsPartial && choice . Delta . Content != "" {
fmt . Print ( choice . Delta . Content )
continue
}
// 节点的最终完整消息
if ! ev . Response . IsPartial && choice . Message . Content != "" {
fmt . Println ( "\n[ask] 最终输出:" , choice . Message . Content )
}
}
也可以在 Agent 级别配置回调:
import (
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/model"
)
// 方式一:构造回调并注册(推荐)
cb := agent . NewCallbacks ().
RegisterBeforeAgent ( func ( ctx context . Context , inv * agent . Invocation ) ( * model . Response , error ) {
// 返回非空 *model.Response 可直接短路此轮执行
return nil , nil
}).
RegisterAfterAgent ( func ( ctx context . Context , inv * agent . Invocation , runErr error ) ( * model . Response , error ) {
// 可对最终响应做统一修改/替换
return nil , nil
})
graphAgent , _ := graphagent . New ( "workflow" , g ,
graphagent . WithAgentCallbacks ( cb ),
)
实际案例
审批工作流
import (
"context"
"fmt"
"strings"
"trpc.group/trpc-go/trpc-agent-go/graph"
"trpc.group/trpc-go/trpc-agent-go/model/openai"
)
func buildApprovalWorkflow () ( * graph . Graph , error ) {
sg := graph . NewStateGraph ( graph . MessagesStateSchema ())
// AI 初审(定义 LLM 模型)
const (
ModelNameApprove = "gpt-4o-mini"
PromptApproveDecision = "判断申请是否符合要求,回复 approve 或 reject"
NodeAIReview = "ai_review"
NodeHumanReview = "human_review"
NodeApprove = "approve"
NodeReject = "reject"
RouteHumanReview = "route_human_review"
RouteReject = "route_reject"
RouteApprove = "route_approve"
StateKeyApplication = "application"
StateKeyDecision = "decision"
)
llm := openai . New ( ModelNameApprove )
sg . AddLLMNode ( NodeAIReview , llm , PromptApproveDecision , nil )
// 条件路由到人工审核或拒绝
sg . AddConditionalEdges ( NodeAIReview ,
func ( ctx context . Context , s graph . State ) ( string , error ) {
resp := s [ graph . StateKeyLastResponse ].( string )
if strings . Contains ( resp , "approve" ) {
return RouteHumanReview , nil
}
return RouteReject , nil
}, map [ string ] string {
RouteHumanReview : NodeHumanReview ,
RouteReject : NodeReject ,
})
// 人工审核节点
sg . AddNode ( NodeHumanReview , func ( ctx context . Context , s graph . State ) ( any , error ) {
app := s [ StateKeyApplication ].( string )
decision , err := graph . Interrupt ( ctx , s , "approval" ,
fmt . Sprintf ( "请审批: %s" , app ))
if err != nil {
return nil , err
}
return graph . State { StateKeyDecision : decision }, nil
})
// 结果处理
sg . AddNode ( NodeApprove , func ( ctx context . Context , s graph . State ) ( any , error ) {
// 执行批准逻辑
return graph . State { "status" : "approved" }, nil
})
sg . AddNode ( NodeReject , func ( ctx context . Context , s graph . State ) ( any , error ) {
return graph . State { "status" : "rejected" }, nil
})
// 配置流程
sg . SetEntryPoint ( NodeAIReview )
sg . AddConditionalEdges ( NodeHumanReview ,
func ( ctx context . Context , s graph . State ) ( string , error ) {
if s [ StateKeyDecision ] == "approve" {
return RouteApprove , nil
}
return RouteReject , nil
}, map [ string ] string {
RouteApprove : NodeApprove ,
RouteReject : NodeReject ,
})
return sg . Compile ()
}
总结
本文介绍了 tRPC-Agent-Go 的 GraphAgent 模块,详细阐述了其用法、设计理念与实现方式。GraphAgent 将图编排与 Agent 形态有机融合,既能满足工程上对确定性工作流的需求,又兼顾了智能决策的灵活性。它既是一个具备 BSP 超级步、条件和工具路由、并行等特性的 Graph,也实现了统一接口、支持嵌套子 Agent 的 Agent 能力。状态管理方面,GraphAgent 采用 Schema 与 Reducer 机制来维护共享 State,并通过 MessageOp 实现消息历史的原子更新,有效避免并发和重复问题。事件系统则统一承载流式增量与过程信息,结合 author 字段(如 nodeID、graph-pregel、graph-executor、user)可以精准过滤所需事件。此外,GraphAgent 还具备检查点时间旅行(支持谱系和原子持久化)、Human-in-the-Loop 中断与恢复、A2A 集成等可控能力。
GraphAgent 特别适用于审批与合规、内容审核、分步数据处理、人机协同等需要完整审计和精确恢复的关键业务场景,或者需要将多个 Agent 以结构化方式编排的复杂流程。而对于开放式对话、单步任务或主要依赖自主探索的场景,直接使用 LLMAgent 或 ChainAgent 往往更加轻便。
在实际落地时,建议对话式应用优先采用 MessagesStateSchema(),而非对话场景则可以用 NewStateSchema() 显式定义字段和 Reducer。节点名、路由标签和状态键建议用常量统一管理,并结合 AddConditionalEdges(..., map[string]string{...}) 明确静态可达性。通过事件流可以驱动 UI 或日志系统,按 Response.Object 区分事件类型,按 Author 精确过滤到具体节点的增量和最终结果。需要可恢复能力时可启用检查点,涉及关键决策时可引入中断与恢复机制,复杂流程下还可以嵌入子 Agent。
值得强调的是 GraphAgent 与 tRPC-Agent-Go 多 Agent 系统生态的完美融合:GraphAgent 与 ChainAgent/ParallelAgent/CycleAgent/LLMAgent 等可以灵活组合,你既可以在图中嵌入决策 Agent,也可以让 ChainAgent/ParallelAgent/CycleAgent 调度多个 GraphAgent,从而将结构化与智能化的优势结合起来,更好地服务于业务目标。
参考资料
代码仓库 :
- tRPC-Agent-Go 仓库
外部参考 :
- Google Pregel - BSP 模型论文
- Building effective agents - Anthropic 的 Agent 设计指南
使用与交流
欢迎大家使用 tRPC-Agent-Go 框架!如需详细的使用文档和示例,请访问 tRPC-Agent-Go 仓库 。
欢迎通过 GitHub Issues 讨论框架使用经验、分享最佳实践、提出改进建议。让我们一起推动 Go 语言在 AI Agent 领域的发展!
2026-06-09 08:42:05
2026-06-09 08:42:05