Runner 组件使用手册
概述
Runner 提供了运行 Agent 的接口,负责会话管理和事件流处理。Runner 的核心职责是:获取或创建会话、生成 Invocation ID、通过 agent.RunWithPlugins 调用 Agent、处理返回的事件流并将非 partial 响应事件追加到会话中。
🎯 核心特性
💾 会话管理 :通过 sessionService 获取/创建会话,默认使用 inmemory.NewSessionService()
🔄 事件处理 :接收 Agent 事件流,将非 partial 响应事件追加到会话中
🆔 ID 生成 :自动生成 Invocation ID 和事件 ID
📊 可观测集成 :集成 telemetry/trace,自动记录 span
✅ 完成事件 :在 Agent 事件流结束后生成 runner-completion 事件
🔌 插件 :在 Runner 上注册一次,全局作用于该 Runner 管理的 Agent、Tool 和模型调用。
架构设计
┌─────────────────────┐
│ Runner │ - 会话管理
└─────────┬───────────┘ - 事件流处理
│
│ agent.RunWithPlugins(ctx, invocation, r.agent)
│
┌─────────▼───────────┐
│ Agent │ - 接收 Invocation
└─────────┬───────────┘ - 返回 <-chan *event.Event
│
│ 具体实现由 Agent 决定
│
┌─────────▼───────────┐
│ Agent 实现 │ 如 LLMAgent, ChainAgent 等
└─────────────────────┘
🚀 快速开始
📋 环境要求
Go 1.21 或更高版本
有效的 LLM API 密钥(OpenAI 兼容接口)
Redis(可选,用于分布式会话管理)
💡 最简示例
package main
import (
"context"
"fmt"
"trpc.group/trpc-go/trpc-agent-go/agent"
"trpc.group/trpc-go/trpc-agent-go/agent/llmagent"
"trpc.group/trpc-go/trpc-agent-go/model"
"trpc.group/trpc-go/trpc-agent-go/model/openai"
"trpc.group/trpc-go/trpc-agent-go/runner"
)
func main () {
// 1. 创建模型
llmModel := openai . New ( "DeepSeek-V3-Online-64K" )
// 2. 创建 Agent
a := llmagent . New ( "assistant" ,
llmagent . WithModel ( llmModel ),
llmagent . WithInstruction ( "你是一个有帮助的AI助手" ),
llmagent . WithGenerationConfig ( model . GenerationConfig { Stream : true }), // 启用流式输出
)
// 3. 创建 Runner
r := runner . NewRunner ( "my-app" , a )
defer r . Close () // 确保资源被清理 (trpc-agent-go >= v0.5.0)
// 4. 运行对话
ctx := context . Background ()
userMessage := model . NewUserMessage ( "你好!" )
eventChan , err := r . Run ( ctx , "user1" , "session1" , userMessage , agent . WithRequestID ( "request-ID" ))
if err != nil {
panic ( err )
}
// 5. 处理响应
for event := range eventChan {
if event . Error != nil {
fmt . Printf ( "错误: %s\n" , event . Error . Message )
continue
}
if len ( event . Response . Choices ) > 0 {
fmt . Print ( event . Response . Choices [ 0 ]. Delta . Content )
}
// Recommended: stop when Runner emits its completion event.
if event . IsRunnerCompletion () {
break
}
}
}
🚀 运行示例
# 进入示例目录
cd examples/runner
# 设置API密钥
export OPENAI_API_KEY = "your-api-key"
# 基础运行
go run main.go
# 使用Redis会话
docker run -d -p 6379 :6379 redis:alpine
go run main.go -session redis
# 自定义模型
go run main.go -model "gpt-4o-mini"
💬 交互式功能
运行示例后,支持以下特殊命令:
/history - 请求 AI 显示对话历史
/new - 开始新的会话(重置对话上下文)
/exit - 结束对话
当 AI 使用工具时,会显示详细的调用过程:
🔧 工具调用:
• calculator (ID: call_abc123)
参数: {"operation":"multiply","a":25,"b":4}
🔄 执行中...
✅ 工具响应 (ID: call_abc123): {"operation":"multiply","a":25,"b":4,"result":100}
🤖 助手: 我为您计算了 25 × 4 = 100。
🔧 核心 API
Runner 创建
// 基础创建
r := runner . NewRunner ( appName , agent , options ... )
// 常用选项
r := runner . NewRunner ( "my-app" , agent ,
runner . WithSessionService ( sessionService ), // 会话服务
)
🧩 按请求动态创建 Agent(Agent Factory)
默认情况下,runner.NewRunner(...) 需要你先把 agent.Agent 完整构建好,然后
Runner 会在每次请求里复用同一个 Agent 实例。
如果你的 Agent 配置需要 跟当前请求绑定 (例如:提示词、模型、沙箱实例、工具集),
可以用 “Agent Factory” 在每次 Runner.Run(...) 时动态创建一个新的 Agent。
方式 A:默认 Agent 按需创建
r := runner . NewRunnerWithAgentFactory (
"my-app" ,
"assistant" ,
func ( ctx context . Context , ro agent . RunOptions ) ( agent . Agent , error ) {
// 你可以从 ro(或 ro.RuntimeState / ro.CustomAgentConfigs)读取
// 本次请求的参数,然后据此构建 Agent。
a := llmagent . New ( "assistant" ,
llmagent . WithInstruction ( ro . Instruction ),
)
return a , nil
},
)
方式 B:注册多个命名工厂,并通过名字选择
r := runner . NewRunner ( "my-app" , defaultAgent ,
runner . WithAgentFactory ( "sandboxed" , func (
ctx context . Context ,
ro agent . RunOptions ,
) ( agent . Agent , error ) {
return llmagent . New ( "sandboxed" ), nil
}),
)
events , err := r . Run ( ctx , userID , sessionID , message ,
agent . WithAgentByName ( "sandboxed" ),
)
_ = events
_ = err
说明:
每次调用 Runner.Run(...),Factory 会被调用一次。
agent.WithAgent(...) 依然优先生效(测试时很方便)。
🔌 插件
Runner 插件是一类全局、Runner 作用域的 Hook(钩子)。只需要在创建 Runner 时
注册一次,后续该 Runner 执行的所有 Agent、Tool 和模型调用都会自动生效。
import "trpc.group/trpc-go/trpc-agent-go/plugin"
r := runner . NewRunner ( "my-app" , a ,
runner . WithPlugins (
plugin . NewLogging (),
plugin . NewGlobalInstruction ( "You must follow security policies." ),
),
)
defer r . Close ()
说明:
插件名在同一个 Runner 内必须唯一。
插件按注册顺序执行。
如果插件实现了 plugin.Closer,Runner 会在 Close() 时调用它。
🔄 Ralph Loop Mode
Ralph Loop 是一种“外部循环(outer loop)”模式:不依赖 LLM 主观判断“我已经完成了”,
而是用可验证的完成条件来决定是否继续迭代执行。
常见完成条件:
Assistant 输出包含完成承诺(completion promise),例如
<promise>DONE</promise>。
校验命令退出码为 0(例如 go test ./...)。
通过 runner.Verifier 扩展自定义校验。
强烈建议设置 MaxIterations 作为安全阀。
r := runner . NewRunner ( "my-app" , a ,
runner . WithRalphLoop ( runner . RalphLoopConfig {
MaxIterations : 20 ,
CompletionPromise : "DONE" ,
VerifyCommand : "go test ./... -count=1" ,
VerifyTimeout : 2 * time . Minute ,
}),
)
当达到 MaxIterations 仍未满足完成条件时,Runner 会发出一个 error event,其错误类型为
stop_agent_error。
运行对话
// 执行单次对话
eventChan , err := r . Run ( ctx , userID , sessionID , message , options ... )
RequestID(request identifier,请求标识)与运行控制
每次调用 Runner.Run 都是一轮 run 。如果你需要取消某次 run,或者查询它的
运行状态,就需要一个 request identifier(requestID,请求标识)。
推荐由调用方自己生成 requestID,并通过 agent.WithRequestID 传入(比如用
Universally Unique Identifier(UUID,通用唯一标识)生成一个唯一字符串)。
Runner 会把它保存到 RunOptions.RequestID,并注入到每个事件 event.Event 的
event.RequestID 字段里。
requestID := "req-123"
eventChan , err := r . Run (
ctx ,
userID ,
sessionID ,
message ,
agent . WithRequestID ( requestID ),
)
if err != nil {
panic ( err )
}
managed := r .( runner . ManagedRunner )
status , ok := managed . RunStatus ( requestID )
_ = status
_ = ok
// 用 requestID 取消本次 run。
managed . Cancel ( requestID )
DetachedCancel(忽略父 ctx cancel)
在 Go 里,context.Context(通常命名为 ctx)同时承载“取消信号”和“截止时间”。
默认情况下,父 ctx 被取消(cancel)时,Runner 会停止这次 run。
如果你希望父 ctx 的 cancel 不影响 run,但仍然要用超时来限制总运行时长,可以:
eventChan , err := r . Run (
ctx ,
userID ,
sessionID ,
message ,
agent . WithRequestID ( requestID ),
agent . WithDetachedCancel ( true ),
agent . WithMaxRunDuration ( 30 * time . Second ),
)
Runner 会取以下两者中较早的时间作为真正的超时上限:
父 ctx 的 deadline(如果存在)
MaxRunDuration(如果设置了)
中断恢复(工具优先继续执行)
在真实业务里,用户可能在 Agent 还处于“工具调用阶段”时中断:
会话里的最后一条消息是带 tool_calls 的 assistant 消息;
但对应的工具结果(tool result)还没来得及写回 Session。
之后如果你想在同一个 sessionID 上“继续上次的任务”,可以开启
WithResume(true),让 Runner 先把上次未完成的工具调用执行完,再进入
下一轮 LLM 调用:
eventChan , err := r . Run (
ctx ,
userID ,
sessionID ,
model . Message {}, // 没有新的用户输入
agent . WithResume ( true ), // 开启恢复模式
)
开启 WithResume(true) 时,Runner 会:
读取当前 Session 中最新的一条事件;
如果最后一条是「带 tool_calls 的 assistant 回复」,且之后没有对应的
工具结果事件:
使用当前 Agent 注册的工具集合和回调,执行这些“未完成的工具调用”;
把工具执行结果写入 Session(作为 tool 消息事件);
工具执行结束后,再按正常流程发起新一轮 LLM 调用,此时模型能看到
“上一次的 tool_calls + 对应的工具结果”,不会重复要求调用同一工具。
如果最后一条事件是 user / tool 消息,或者是普通的 assistant 文本回复,
则 WithResume(true) 不会做任何额外处理,行为等同于普通的 Run 调用。
部分模型在生成 tool_calls 时,可能产出非严格 JSON 的参数(例如对象 key 未加引号、尾逗号等),从而导致工具执行或外部解析失败。
在 runner.Run 中启用 agent.WithToolCallArgumentsJSONRepairEnabled(true) 后,框架会对 toolCall.Function.Arguments 做一次尽力修复,详细使用方法可参照 ToolCall参数自动修复 。
传入对话历史(auto-seed + 复用 Session)
如果上游服务已经维护了会话历史,并希望让 Agent 看见这些上下文,可以直接传入整段
[]model.Message。Runner 会在 Session 为空时自动将其写入 Session,并在随后的轮次将
新事件(工具调用、后续回复等)继续写入。
方式 A:使用便捷函数 runner.RunWithMessages
msgs := [] model . Message {
model . NewSystemMessage ( "你是一个有帮助的助手" ),
model . NewUserMessage ( "第一条用户输入" ),
model . NewAssistantMessage ( "上一轮助手回复" ),
model . NewUserMessage ( "新的问题是什么?" ),
}
ch , err := runner . RunWithMessages ( ctx , r , userID , sessionID , msgs , agent . WithRequestID ( "request-ID" ))
示例:examples/runwithmessages(使用 RunWithMessages;Runner 会 auto-seed 并复用 Session)
方式 B:通过 RunOption 显式传入(与 Python ADK 一致的理念)
msgs := [] model . Message { /* 同上 */ }
ch , err := r . Run ( ctx , userID , sessionID , model . Message {}, agent . WithMessages ( msgs ))
注意:当显式传入 []model.Message 时,Runner 会在 Session 为空时自动把这段历史写入
Session。内容处理器不会读取这个选项,它只会从 Session 事件中派生消息(或在 Session
没有事件时回退到单条 invocation.Message)。RunWithMessages 仍会把最新的用户消息写入
invocation.Message。
✅ 图式流程的“优雅结束”与最终结果读取
很多同学在使用 GraphAgent(图式智能体)时,会误把 Response.IsFinalResponse() 当作“流程完成”的信号。请注意:IsFinalResponse() 只是“大模型本轮回复已结束”,但图上后续节点(例如 output 汇总节点)仍可能在继续执行。
最稳妥、统一的做法是:以 Runner 的“完成事件”作为运行结束的唯一判据:
for e := range eventChan {
// ... 处理流式分片、工具可视化等
if e . IsRunnerCompletion () { // Runner 的终止事件
break
}
}
此外,Runner 会把图在完成时的最终快照传递到这条“最后事件”里,因此你可以直接从该事件的 StateDelta 里读取图的最终输出(例如 graph.StateKeyLastResponse 对应的文本):
import (
"encoding/json"
"fmt"
"trpc.group/trpc-go/trpc-agent-go/graph"
)
for e := range eventChan {
if e . IsRunnerCompletion () {
if b , ok := e . StateDelta [ graph . StateKeyLastResponse ]; ok {
var final string
_ = json . Unmarshal ( b , & final )
fmt . Println ( "\nFINAL:" , final )
}
break
}
}
这样应用层可以始终“看最后一条事件”来判断流程结束并读取最终结果,避免因为提前退出而错过 output 等后续节点。
图完成前就致命退出时,如何只看最后一条事件拿到错误信息
有些运行不会走到最终的 graph.execution 完成事件,就已经因为致命错误提前结束。一个很常见的场景是:
某个节点回调先发出一条自定义 StateDelta,里面带了致命错误详情
随后流程直接中止,图本身来不及产出正常的最终快照
这时 Runner 仍然会发出最后那条 runner.completion 事件。对于这种真正的致命错误
(不包括 stop_agent_error 这种受控停止),Runner 现在会把“可安全兜底”的业务状态
带到这条最后事件上:
StateDelta:错误路径上累计出来的状态增量
这里有两个细节要注意:
Response.Error 仍然只保留在原始致命错误事件上,这样下游翻译层依然可以把
runner.completion 当作正常的结束信号处理。
graph.MetadataKeyNode、graph.MetadataKeyTool 这类图元数据键会在兜底复制时被过滤掉,
避免 AGUI 这类消费者把节点/工具生命周期事件重复翻译一遍。
这样业务代码就可以继续保持同一个规则:优先看最后一条事件里的业务错误详情,而不是为了拿错误信息去遍历整条事件流。
示例:
import (
"encoding/json"
"fmt"
"trpc.group/trpc-go/trpc-agent-go/event"
)
const stateKeyNodeFatal = "node_fatal_error"
func readLastEvent ( eventChan <- chan * event . Event ) error {
for e := range eventChan {
if ! e . IsRunnerCompletion () {
continue
}
if b , ok := e . StateDelta [ stateKeyNodeFatal ]; ok {
var detail map [ string ] any
if err := json . Unmarshal ( b , & detail ); err == nil {
fmt . Printf ( "fatal detail: %+v\n" , detail )
}
}
return nil
}
return nil
}
建议按下面这个心智模型理解:
正常成功且图产出了完成事件:从 completion event 的 StateDelta 里读取最终输出
(例如 graph.StateKeyLastResponse)
图完成前就致命退出:从同一条 completion event 的自定义 fatal key 里读取错误信息;
如果还需要结构化的 Response.Error,它仍然保留在原始致命错误事件上
stop_agent_error:仍然被视为“受控停止”信号,不会再重复镜像到 completion event
🔁 开关:让 Graph 的 LLM 节点输出最终响应事件
在 GraphAgent 里,一次 Run 可能会在多个节点里多次调用 LLM。当开启流式输出时,
一次模型调用通常会产生一串事件:
分片(partial)事件:IsPartial=true、Done=false,增量文本在
choice.Delta.Content
最终(final)事件:IsPartial=false、Done=true,完整文本在
choice.Message.Content
默认情况下,Graph 的 LLM 节点只输出分片事件,不输出最终 Done=true 的 assistant 消息
事件。这样可以避免“中间节点的输出”被当作普通助手回复(例如被 Runner 写进会话,或被
上层用户界面直接展示)。
如果你希望 Graph 的 LLM 节点也输出最终 Done=true 的 assistant 消息事件,可以开启这个
RunOption:
eventChan , err := r . Run (
ctx ,
userID ,
sessionID ,
message ,
agent . WithGraphEmitFinalModelResponses ( true ),
)
行为总结:
先讲清楚一句话:这个开关控制的是“图里每个 LLM 节点是否要额外输出最终 Done=true
的消息事件”,它不等价于“Runner 的完成事件一定会带(或一定不会带)
Response.Choices”。
假设你的图是:llm1 -> llm2 -> llm3,最后由 llm3 产出最终答案:
情况 1:agent.WithGraphEmitFinalModelResponses(false)(默认)
llm1/llm2/llm3:只输出分片事件(Done=false),不输出最终 Done=true 的
assistant 消息事件。
Runner 完成事件:为了让“只看最后一条事件也能拿到最终答案”,Runner 会把 llm3
的最终结果回显到完成事件的 Response.Choices(前提是图的完成事件里带了
Response.Choices)。同时,最终文本也始终能从
StateDelta[graph.StateKeyLastResponse] 读取。
情况 2:agent.WithGraphEmitFinalModelResponses(true)
llm1/llm2/llm3:除了分片事件外,还会各自输出最终 Done=true 的 assistant
消息事件(因此中间节点也可能出现完整 assistant 消息,Runner 也可能把这些非分片事件
写入会话)。
Runner 完成事件:为了避免和 llm3 的最终消息重复展示,Runner 会用响应 ID 做去重;
当它确认“最终消息已在前面的事件里出现过”时,就会省略回显,因此完成事件的
Response.Choices 可能为空,这是预期行为。最终文本仍然以
StateDelta[graph.StateKeyLastResponse] 为准。
建议:在 GraphAgent 场景里,请始终以 Runner “完成事件”的 StateDelta 作为最终输出的
唯一来源(例如 graph.StateKeyLastResponse)。当开启该选项时,请把“完成事件”里的
Response.Choices 当作可选字段,不要作为唯一依赖。
🎛️ 开关:StreamMode
Runner 支持在事件到达业务代码之前先做一次过滤:你可以用一个 RunOption 来选择
“本次运行”向 eventChan 转发哪些类别的事件。
使用 agent.WithStreamMode(...):
eventChan , err := r . Run (
ctx ,
userID ,
sessionID ,
message ,
agent . WithStreamMode ( agent . StreamModeMessages ),
)
支持的模式(图式工作流):
messages:模型输出事件(例如 chat.completion.chunk)
updates:graph.state.update / graph.channel.update / graph.execution
checkpoints:graph.checkpoint.*
tasks:任务生命周期事件(graph.node.*、graph.pregel.*)
debug:等价于 checkpoints + tasks
custom:节点主动发出的自定义事件(graph.node.custom)
注意事项:
当选择 agent.StreamModeMessages 时,Runner 会为本次运行自动开启 Graph 的最终响应事件
输出。若你需要关闭该行为,请在 agent.WithStreamMode(...) 之后调用
agent.WithGraphEmitFinalModelResponses(false) 覆盖。
StreamMode 只影响 Runner 向你的 eventChan 转发哪些事件;Runner 内部仍会处理并持久化
所有事件。
对于图式工作流,部分事件类型(例如 graph.checkpoint.*)只会在选择对应模式时才会产生。
Runner 总会额外发出一条 runner.completion 完成事件。
💾 会话管理
内存会话(默认)
import "trpc.group/trpc-go/trpc-agent-go/session/inmemory"
sessionService := inmemory . NewSessionService ()
r := runner . NewRunner ( "app" , agent ,
runner . WithSessionService ( sessionService ))
Redis 会话(分布式)
import "trpc.group/trpc-go/trpc-agent-go/session/redis"
// 创建 Redis 会话服务
sessionService , err := redis . NewService (
redis . WithRedisClientURL ( "redis://localhost:6379" ))
r := runner . NewRunner ( "app" , agent ,
runner . WithSessionService ( sessionService ))
会话配置
// Redis 支持的配置选项
sessionService , err := redis . NewService (
redis . WithRedisClientURL ( "redis://localhost:6379" ),
redis . WithSessionEventLimit ( 1000 ), // 限制会话事件数量
// redis.WithRedisInstance("redis-instance"), // 或使用实例名
)
🤖 Agent 配置
Runner 的核心职责是管理 Agent 的执行流程。创建好的 Agent 需要通过 Runner 执行。
基础 Agent 创建
// 创建基础 Agent(详细配置参见 agent.md)
agent := llmagent . New ( "assistant" ,
llmagent . WithModel ( model ),
llmagent . WithInstruction ( "你是一个有帮助的AI助手" ))
// 使用 Runner 执行 Agent
r := runner . NewRunner ( "my-app" , agent )
在请求级别切换 Agent
Runner 支持在构造时注册多个可选 Agent,并在单次 Run 时切换。
reader := llmagent . New ( "agent1" , llmagent . WithModel ( model ))
writer := llmagent . New ( "agent2" , llmagent . WithModel ( model ))
r := runner . NewRunner ( "appName" , reader , // 使用 reader agent 作为默认 agent
runner . WithAgent ( "writer" , writer ), // 按名称注册可选 Agent
)
// 使用 reader agent 作为默认 agent
ch , err := r . Run ( ctx , userID , sessionID , msg )
// 通过 Agent Name 指定使用 writer agent
ch , err := r . Run ( ctx , userID , sessionID , msg , agent . WithAgentByName ( "writer" ))
// 直接传入实例,无需预注册。
custom := llmagent . New ( "custom" , llmagent . WithModel ( model ))
ch , err := r . Run ( ctx , userID , sessionID , msg , agent . WithAgent ( custom ))
runner.NewRunner("appName", agent):在创建 runner 时设置默认 Agent;
runner.WithAgent("agentName", agent): 在创建 Runner 时预注册一个 Agent,供后续请求按名称切换;
agent.WithAgentByName("agentName"): 在单次请求里通过名称选用已注册的 Agent;
agent.WithAgent(agent): 在单次请求里直接传入一个 Agent 实例临时覆盖,无需预注册。
Agent 生效优先级:agent.WithAgent > agent.WithAgentByName > runner.NewRunner 设置的默认 Agent。
生成配置
Runner 会将生成配置传递给 Agent:
// 辅助函数
func intPtr ( i int ) * int { return & i }
func floatPtr ( f float64 ) * float64 { return & f }
genConfig := model . GenerationConfig {
MaxTokens : intPtr ( 2000 ),
Temperature : floatPtr ( 0.7 ),
Stream : true , // 启用流式输出
}
agent := llmagent . New ( "assistant" ,
llmagent . WithModel ( model ),
llmagent . WithGenerationConfig ( genConfig ))
工具集成
工具配置在 Agent 中完成,Runner 负责运行包含工具的 Agent:
// 创建工具(详细配置参见 tool.md)
tools := [] tool . Tool {
function . NewFunctionTool ( myFunction , function . WithName ( "my_tool" )),
// 更多工具...
}
// 将工具添加到 Agent
agent := llmagent . New ( "assistant" ,
llmagent . WithModel ( model ),
llmagent . WithTools ( tools ))
// Runner 运行配置了工具的 Agent
r := runner . NewRunner ( "my-app" , agent )
工具调用流程 :Runner 本身不直接处理工具调用,具体流程如下:
传递工具 :Runner 通过 Invocation 将上下文传递给 Agent
Agent 处理 :Agent.Run 方法负责具体的工具调用逻辑
事件转发 :Runner 接收 Agent 返回的事件流并转发
会话记录 :将非 partial 响应事件追加到会话中
多 Agent 支持
Runner 可以执行复杂的多 Agent 结构(详细配置参见 multiagent.md):
import "trpc.group/trpc-go/trpc-agent-go/agent/chainagent"
// 创建多 Agent 组合
multiAgent := chainagent . New ( "pipeline" ,
chainagent . WithSubAgents ([] agent . Agent { agent1 , agent2 }))
// 使用同一个 Runner 执行
r := runner . NewRunner ( "multi-app" , multiAgent )
📊 事件处理
事件类型
import "trpc.group/trpc-go/trpc-agent-go/event"
for event := range eventChan {
// 错误事件
if event . Error != nil {
fmt . Printf ( "错误: %s\n" , event . Error . Message )
continue
}
// 流式内容
if len ( event . Response . Choices ) > 0 {
choice := event . Response . Choices [ 0 ]
fmt . Print ( choice . Delta . Content )
}
// 工具调用
if len ( event . Response . Choices ) > 0 && len ( event . Response . Choices [ 0 ]. Message . ToolCalls ) > 0 {
for _ , toolCall := range event . Response . Choices [ 0 ]. Message . ToolCalls {
fmt . Printf ( "调用工具: %s\n" , toolCall . Function . Name )
}
}
// 完成事件
if event . Done {
break
}
}
完整事件处理示例
import (
"fmt"
"strings"
)
func processEvents ( eventChan <- chan * event . Event ) error {
var fullResponse strings . Builder
for event := range eventChan {
// 处理错误
if event . Error != nil {
return fmt . Errorf ( "事件错误: %w" , event . Error )
}
// 处理工具调用
if len ( event . Response . Choices ) > 0 && len ( event . Response . Choices [ 0 ]. Message . ToolCalls ) > 0 {
fmt . Println ( "🔧 工具调用:" )
for _ , toolCall := range event . Response . Choices [ 0 ]. Message . ToolCalls {
fmt . Printf ( " • %s (ID: %s)\n" ,
toolCall . Function . Name , toolCall . ID )
fmt . Printf ( " 参数: %s\n" ,
string ( toolCall . Function . Arguments ))
}
}
// 处理工具响应
if event . Response != nil {
for _ , choice := range event . Response . Choices {
if choice . Message . Role == model . RoleTool {
fmt . Printf ( "✅ 工具响应 (ID: %s): %s\n" ,
choice . Message . ToolID , choice . Message . Content )
}
}
}
// 处理流式内容
if len ( event . Response . Choices ) > 0 {
content := event . Response . Choices [ 0 ]. Delta . Content
if content != "" {
fmt . Print ( content )
fullResponse . WriteString ( content )
}
}
if event . Done {
fmt . Println () // 换行
break
}
}
return nil
}
🔮 执行上下文管理
Runner 创建并管理 Invocation 结构:
// Runner 创建的 Invocation 包含以下字段:
invocation := agent . NewInvocation (
agent . WithInvocationAgent ( r . agent ), // Agent 实例
agent . WithInvocationSession ( Session ), // 会话对象
agent . WithInvocationEndInvocation ( false ), // 结束标志
agent . WithInvocationMessage ( message ), // 用户消息
agent . WithInvocationRunOptions ( ro ), // 运行选项
)
// 注:Invocation 还包含其他字段如 AgentName、Branch、Model、
// TransferInfo、AgentCallbacks、ModelCallbacks、ToolCallbacks 等,
// 但这些字段由 Agent 内部使用和管理
✅ 使用注意事项
错误处理
// 处理 Runner.Run 的错误
eventChan , err := r . Run ( ctx , userID , sessionID , message )
if err != nil {
log . Printf ( "Runner 执行失败: %v" , err )
return err
}
// 处理事件流中的错误
for event := range eventChan {
if event . Error != nil {
log . Printf ( "事件错误: %s" , event . Error . Message )
continue
}
// 处理正常事件
}
安全中断执行
当你调用 Runner.Run 时,框架会启动 goroutines 来持续产出事件,直到本次 run
结束。
这里有两种“停止”,非常容易混淆:
停止读取事件 (你的代码不读 eventChan 了)
停止本次 run (agent 停止模型/工具调用并退出)
如果你只是停止读取,但 run 还在继续,agent goroutine 可能会在写事件通道时阻塞,
从而引发 goroutine 泄漏或“卡住的 run”。
安全姿势永远是:
触发取消 (ctx cancel / requestID cancel / StopError)
把事件通道读到关闭为止
方式 A:Ctrl+C(命令行程序)
在命令行程序里,常见做法是把 Ctrl+C 转换为 ctx cancel:
ctx , stop := signal . NotifyContext ( context . Background (), os . Interrupt )
defer stop ()
eventCh , err := r . Run ( ctx , userID , sessionID , message )
if err != nil {
return err
}
for range eventCh {
// 一直读到通道关闭:要么 ctx 被取消,要么 run 正常结束。
}
方式 B:取消上下文(推荐默认做法)
用 context.WithCancel 包裹 runner.Run 的 ctx,在你希望中断时调用
cancel()(例如:轮次上限、token 预算超限、用户点击“停止”等)。
llmflow 将 context.Canceled 视为正常退出,会关闭 agent 事件通道,
runner 的消费循环也会正常结束,避免写端阻塞。
ctx , cancel := context . WithCancel ( context . Background ())
defer cancel ()
eventCh , err := r . Run ( ctx , userID , sessionID , message )
if err != nil {
return err
}
turns := 0
for evt := range eventCh {
if evt . Error != nil {
log . Printf ( "事件错误: %s" , evt . Error . Message )
continue
}
// ... 处理事件 ...
if evt . IsFinalResponse () {
break
}
turns ++
if turns >= maxTurns {
cancel () // 停止后续模型或工具调用
}
}
如果你需要“尽快返回”(例如 HTTP handler 超时),但仍想避免写端阻塞,可以用单独
的 goroutine 去 drain:
// eventCh 是 Runner.Run 返回的事件通道。
// cancel 是 context.WithCancel 返回的取消函数。
go func () {
for range eventCh {
}
}()
cancel ()
return nil
方式 C:按 requestID 取消(ManagedRunner)
在服务端场景里,你经常需要在“另一个 goroutine / 另一个请求”里取消某次 run。
这时可以用 request identifier(requestID)来定位并取消。
生成 requestID,并通过 agent.WithRequestID 传入 Run。
将 runner 转换为 runner.ManagedRunner。
调用 Cancel(requestID)。
requestID := "req-123"
eventCh , err := r . Run (
ctx ,
userID ,
sessionID ,
message ,
agent . WithRequestID ( requestID ),
)
if err != nil {
return err
}
mr := r .( runner . ManagedRunner )
_ = mr . Cancel ( requestID )
for range eventCh {
}
方式 D:在 run 内部触发停止(StopError)
有时最适合决定“现在就停止”的位置是在工具、回调或处理器内部(例如:策略校验、
预算限制、业务规则)。
你可以返回 agent.NewStopError("原因")(也可以与其他错误 join / wrap)。
llmflow 会把它转换为 stop_agent_error 事件并停止流程。
但“硬截止”(强制时间上限)仍建议用 ctx deadline (context.WithTimeout /
agent.WithMaxRunDuration)来实现。
常见误区
只 break 事件循环 :run 可能还在后台继续,并在写通道时阻塞。
全部使用 context.Background():你没有办法取消,就无法中断 run。
工具实现忽略 ctx:取消是协作式的;长耗时工具应检查 ctx.Done(),
或把 ctx 传入网络/DB 请求。
可运行示例:
examples/cancelrun(Enter/Ctrl+C 取消、drain 事件通道)
examples/managedrunner(requestID cancel、detached cancel、最长运行时长)
资源管理
🔒 关闭 Runner(重要)
Runner 在不使用时必须调用 Close() 方法,否则会导致 goroutine 泄漏(要求 trpc-agent-go >= v0.5.0)。
Runner 只关闭它自己创建的资源
当 Runner 创建时如果未提供 Session Service,会自动创建默认的 inmemory Session Service。该 Service 内部会启动后台 goroutines(用于异步处理 summary、基于 TTL 的会话清理等任务)。Runner 只管理这个自己创建的 inmemory Session Service 的生命周期。 如果你通过 WithSessionService() 提供了自己的 Session Service,你需要自己管理它的生命周期——Runner 不会关闭它。
如果不调用拥有 inmemory Session Service 的 Runner 的 Close(),这些后台 goroutines 将永远运行,造成资源泄漏。
推荐做法 :
// ✅ 推荐:使用 defer 确保资源被清理
r := runner . NewRunner ( "my-app" , agent )
defer r . Close () // 确保在函数退出时关闭 (trpc-agent-go >= v0.5.0)
// 使用 runner
eventChan , err := r . Run ( ctx , userID , sessionID , message )
if err != nil {
return err
}
for event := range eventChan {
// 处理事件
if event . IsRunnerCompletion () {
break
}
}
当你提供自己的 Session Service 时 :
// 你创建并管理 session service 的生命周期
sessionService := redis . NewService ( redis . WithRedisClientURL ( "redis://localhost:6379" ))
defer sessionService . Close () // 你负责关闭它
// Runner 使用但不拥有这个 session service
r := runner . NewRunner ( "my-app" , agent ,
runner . WithSessionService ( sessionService ))
defer r . Close () // 这不会关闭 sessionService(因为是你提供的) (trpc-agent-go >= v0.5.0)
// ... 使用 runner
长期运行的服务 :
type Service struct {
runner runner . Runner
sessionService session . Service // 如果你自己管理它
}
func NewService () * Service {
r := runner . NewRunner ( "my-app" , agent )
return & Service { runner : r }
}
func ( s * Service ) Start () error {
// 启动服务逻辑
return nil
}
// 在服务关闭时调用 Close
func ( s * Service ) Stop () error {
// 关闭 runner(它会关闭自己拥有的 inmemory session service)
// 要求 trpc-agent-go >= v0.5.0
if err := s . runner . Close (); err != nil {
return err
}
// 如果你提供了自己的 session service,在这里关闭它
if s . sessionService != nil {
return s . sessionService . Close ()
}
return nil
}
注意事项 :
✅ Close() 是幂等的,多次调用是安全的
✅ Runner 只关闭它默认创建的 inmemory Session Service
✅ 如果你通过 WithSessionService() 提供了自己的 Session Service,Runner 不会关闭它(你需要自己管理)
❌ 如果 Runner 拥有 inmemory Session Service 但不调用 Close(),会导致 goroutine 泄漏
Context 生命周期控制
// 使用 context 控制单次运行的生命周期
ctx , cancel := context . WithCancel ( context . Background ())
defer cancel ()
// 确保消费完所有事件
eventChan , err := r . Run ( ctx , userID , sessionID , message )
if err != nil {
return err
}
for event := range eventChan {
// 处理事件
if event . Done {
break
}
}
状态检查
import (
"context"
"fmt"
"trpc.group/trpc-go/trpc-agent-go/model"
"trpc.group/trpc-go/trpc-agent-go/runner"
)
// 检查 Runner 是否能正常工作
func checkRunner ( r runner . Runner , ctx context . Context ) error {
testMessage := model . NewUserMessage ( "测试" )
eventChan , err := r . Run ( ctx , "test-user" , "test-session" , testMessage )
if err != nil {
return fmt . Errorf ( "Runner.Run 失败: %v" , err )
}
// 检查事件流
for event := range eventChan {
if event . Error != nil {
return fmt . Errorf ( "收到错误事件: %s" , event . Error . Message )
}
if event . Done {
break
}
}
return nil
}
📝 总结
Runner 组件是 tRPC-Agent-Go 框架的核心,提供了完整的对话管理和 Agent 编排能力。通过合理使用会话管理、工具集成和事件处理,可以构建强大的智能对话应用。
2026-03-07 02:41:29
2025-08-25 07:06:16