Event 使用文档
Event 是 trpc-agent-go 中 Agent 与用户之间通信的核心机制。它就像一个消息信封,承载着 Agent 的响应内容、工具调用结果、错误信息等。通过 Event,你可以实时了解 Agent 的工作状态,处理流式响应,实现多 Agent 协作,以及追踪工具执行。
Event 概述
Event 是 Agent 与用户之间通信的载体。
用户通过 runner.Run()
方法获取事件流,然后监听事件通道来处理 Agent 的响应。
Event 结构
Event
表示 Agent 与用户之间的一次事件,结构定义如下:
| type Event struct {
// Response 是 Event 的基础响应结构,承载 LLM 的响应
*model.Response
// InvocationID 是本次调用的唯一标识
InvocationID string `json:"invocationId"`
// Author 是事件的发起者
Author string `json:"author"`
// ID 是事件的唯一标识符
ID string `json:"id"`
// Timestamp 是事件的时间戳
Timestamp time.Time `json:"timestamp"`
// Branch 是分支标识符,用于多 Agent 协作
Branch string `json:"branch,omitempty"`
// RequiresCompletion 表示此事件是否需要完成信号
RequiresCompletion bool `json:"requiresCompletion,omitempty"`
// CompletionID 用于此事件的完成信号
CompletionID string `json:"completionId,omitempty"`
// LongRunningToolIDs 是长运行函数调用的 ID 集合
// Agent 客户端将从此字段了解哪些函数调用是长时间运行的
// 仅对函数调用事件有效
LongRunningToolIDs map[string]struct{} `json:"longRunningToolIDs,omitempty"`
}
|
model.Response
是 Event 的基础响应结构,承载了 LLM 的响应、工具调用以及错误等信息,定义如下:
| type Response struct {
// 响应唯一标识
ID string `json:"id"`
// 对象类型(如 "chat.completion", "error" 等),帮助客户端识别处理方式
Object string `json:"object"`
// 创建时间戳
Created int64 `json:"created"`
// 使用的模型名称
Model string `json:"model"`
// 响应可选项,LLM 可能生成多个候选响应供用户选择,默认只有 1 个
Choices []Choice `json:"choices"`
// 使用统计信息,记录 token 使用情况
Usage *Usage `json:"usage,omitempty"`
// 系统指纹
SystemFingerprint *string `json:"system_fingerprint,omitempty"`
// 错误信息
Error *ResponseError `json:"error,omitempty"`
// 时间戳
Timestamp time.Time `json:"timestamp"`
// 表示整个对话是否完成
Done bool `json:"done"`
// 是否为部分响应
IsPartial bool `json:"is_partial"`
}
type Choice struct {
// 选择索引
Index int `json:"index"`
// 完整消息,包含整个响应
Message Message `json:"message,omitempty"`
// 增量消息,用于流式响应,只包含当前块的新内容
// 例如:完整响应 "Hello, how can I help you?" 在流式响应中:
// 第一个事件:Delta.Content = "Hello"
// 第二个事件:Delta.Content = ", how"
// 第三个事件:Delta.Content = " can I help you?"
Delta Message `json:"delta,omitempty"`
// 完成原因
FinishReason *string `json:"finish_reason,omitempty"`
}
type Message struct {
// 消息发起人的角色,例如 "system", "user", "assistant", "tool"
Role string `json:"role"`
// 消息内容
Content string `json:"content"`
// 多模式消息的内容片段
ContentParts []ContentPart `json:"content_parts,omitempty"`
// 工具响应所使用的工具的 ID
ToolID string `json:"tool_id,omitempty"`
// 工具响应所使用的工具的名称
ToolName string `json:"tool_name,omitempty"`
// 可选的工具调用
ToolCalls []ToolCall `json:"tool_calls,omitempty"`
}
type Usage struct {
// 提示词使用的 Token 数量.
PromptTokens int `json:"prompt_tokens"`
// 补全使用的 Token 数量.
CompletionTokens int `json:"completion_tokens"`
// 响应中使用的总 Token 数量.
TotalTokens int `json:"total_tokens"`
}
|
Event 类型
Event 在以下场景中会被创建和发送:
- 用户消息事件:用户发送消息时自动创建
- Agent 响应事件:Agent 生成响应时创建
- 流式响应事件:流式模式下每个响应块都会创建
- 工具调用事件:Agent 调用工具时创建
- 错误事件:发生错误时创建
- Agent 转移事件:Agent 转移给其他 Agent 时创建
- 完成事件:Agent 执行完成时创建
根据 model.Response.Object
字段,Event 可以分为以下类型:
| const (
// 错误事件
ObjectTypeError = "error"
// 工具响应事件
ObjectTypeToolResponse = "tool.response"
// 预处理事件
ObjectTypePreprocessingBasic = "preprocessing.basic"
ObjectTypePreprocessingContent = "preprocessing.content"
ObjectTypePreprocessingIdentity = "preprocessing.identity"
ObjectTypePreprocessingInstruction = "preprocessing.instruction"
ObjectTypePreprocessingPlanning = "preprocessing.planning"
// 后处理事件
ObjectTypePostprocessingPlanning = "postprocessing.planning"
ObjectTypePostprocessingCodeExecution = "postprocessing.code_execution"
// Agent 转移事件
ObjectTypeTransfer = "agent.transfer"
// Runner 完成事件
ObjectTypeRunnerCompletion = "runner.completion"
)
|
Event 创建
在开发自定义 Agent 类型或 Processor 时,需要创建 Event。
Event 提供了三种创建方法,适用于不同场景。
| // 创建新事件
func New(invocationID, author string, opts ...Option) *Event
// 创建错误事件
func NewErrorEvent(invocationID, author, errorType, errorMessage string) *Event
// 从响应创建事件
func NewResponseEvent(invocationID, author string, response *model.Response) *Event
|
参数说明:
invocationID string
:调用唯一标识
author string
:事件发起者
opts ...Option
:可选的配置选项(仅 New 方法)
errorType string
:错误类型(仅 NewErrorEvent 方法)
errorMessage string
:错误消息(仅 NewErrorEvent 方法)
response *model.Response
:响应对象(仅 NewResponseEvent 方法)
框架支持以下 Option 用以配置 Event:
WithBranch(branch string)
:设置事件的分支标识
WithResponse(response *model.Response)
:设置事件的响应内容
WithObject(o string)
:设置事件的类型
示例:
| // 创建基本事件
evt := event.New("invoke-123", "agent")
// 创建带分支的事件
evt := event.New("invoke-123", "agent", event.WithBranch("main"))
// 创建错误事件
evt := event.NewErrorEvent("invoke-123", "agent", "api_error", "请求超时")
// 从响应创建事件
response := &model.Response{
Object: "chat.completion",
Done: true,
Choices: []model.Choice{{Message: model.Message{Role: "assistant", Content: "Hello!"}}},
}
evt := event.NewResponseEvent("invoke-123", "agent", response)
|
Event 方法
Event 提供了 Clone
方法,用于创建 Event 的深拷贝。
| func (e *Event) Clone() *Event
|
Event 使用示例
这个示例展示了如何在实际应用中使用 Event 处理 Agent 的流式响应、工具调用和错误处理。
核心流程
- 发送用户消息:通过
runner.Run()
启动 Agent 处理
- 接收事件流:实时处理 Agent 返回的事件
- 处理不同类型事件:区分流式内容、工具调用、错误等
- 可视化输出:为用户提供友好的交互体验
代码示例
| // processMessage 处理单次消息交互
func (c *multiTurnChat) processMessage(ctx context.Context, userMessage string) error {
message := model.NewUserMessage(userMessage)
// 通过 runner 运行 agent
eventChan, err := c.runner.Run(ctx, c.userID, c.sessionID, message)
if err != nil {
return fmt.Errorf("failed to run agent: %w", err)
}
// 处理响应
return c.processResponse(eventChan)
}
// processResponse 处理响应,包括流式响应和工具调用可视化
func (c *multiTurnChat) processResponse(eventChan <-chan *event.Event) error {
fmt.Print("🤖 Assistant: ")
var (
fullContent string // 累积的完整内容
toolCallsDetected bool // 是否检测到工具调用
assistantStarted bool // Assistant 是否已开始回复
)
for event := range eventChan {
// 处理单个事件
if err := c.handleEvent(event, &toolCallsDetected, &assistantStarted, &fullContent); err != nil {
return err
}
// 检查是否为最终事件
if event.Done && !c.isToolEvent(event) {
fmt.Printf("\n")
break
}
}
return nil
}
// handleEvent 处理单个事件
func (c *multiTurnChat) handleEvent(
event *event.Event,
toolCallsDetected *bool,
assistantStarted *bool,
fullContent *string,
) error {
// 1. 处理错误事件
if event.Error != nil {
fmt.Printf("\n❌ Error: %s\n", event.Error.Message)
return nil
}
// 2. 处理工具调用
if c.handleToolCalls(event, toolCallsDetected, assistantStarted) {
return nil
}
// 3. 处理工具响应
if c.handleToolResponses(event) {
return nil
}
// 4. 处理内容
c.handleContent(event, toolCallsDetected, assistantStarted, fullContent)
return nil
}
// handleToolCalls 检测并显示工具调用
func (c *multiTurnChat) handleToolCalls(
event *event.Event,
toolCallsDetected *bool,
assistantStarted *bool,
) bool {
if len(event.Choices) > 0 && len(event.Choices[0].Message.ToolCalls) > 0 {
*toolCallsDetected = true
if *assistantStarted {
fmt.Printf("\n")
}
fmt.Printf("🔧 Tool calls initiated:\n")
for _, toolCall := range event.Choices[0].Message.ToolCalls {
fmt.Printf(" • %s (ID: %s)\n", toolCall.Function.Name, toolCall.ID)
if len(toolCall.Function.Arguments) > 0 {
fmt.Printf(" Args: %s\n", string(toolCall.Function.Arguments))
}
}
fmt.Printf("\n🔄 Executing tools...\n")
return true
}
return false
}
// handleToolResponses 检测并显示工具响应
func (c *multiTurnChat) handleToolResponses(event *event.Event) bool {
if event.Response != nil && len(event.Response.Choices) > 0 {
for _, choice := range event.Response.Choices {
if choice.Message.Role == model.RoleTool && choice.Message.ToolID != "" {
fmt.Printf("✅ Tool response (ID: %s): %s\n",
choice.Message.ToolID,
strings.TrimSpace(choice.Message.Content))
return true
}
}
}
return false
}
// handleContent 处理并显示内容
func (c *multiTurnChat) handleContent(
event *event.Event,
toolCallsDetected *bool,
assistantStarted *bool,
fullContent *string,
) {
if len(event.Choices) > 0 {
choice := event.Choices[0]
content := c.extractContent(choice)
if content != "" {
c.displayContent(content, toolCallsDetected, assistantStarted, fullContent)
}
}
}
// extractContent 根据流式模式提取内容
func (c *multiTurnChat) extractContent(choice model.Choice) string {
if c.streaming {
// 流式模式:使用增量内容
return choice.Delta.Content
}
// 非流式模式:使用完整消息内容
return choice.Message.Content
}
// displayContent 将内容打印到控制台
func (c *multiTurnChat) displayContent(
content string,
toolCallsDetected *bool,
assistantStarted *bool,
fullContent *string,
) {
if !*assistantStarted {
if *toolCallsDetected {
fmt.Printf("\n🤖 Assistant: ")
}
*assistantStarted = true
}
fmt.Print(content)
*fullContent += content
}
// isToolEvent 检查事件是否为工具响应
func (c *multiTurnChat) isToolEvent(event *event.Event) bool {
if event.Response == nil {
return false
}
// 检查是否有工具调用
if len(event.Choices) > 0 && len(event.Choices[0].Message.ToolCalls) > 0 {
return true
}
// 检查是否有工具 ID
if len(event.Choices) > 0 && event.Choices[0].Message.ToolID != "" {
return true
}
// 检查是否为工具角色
for _, choice := range event.Response.Choices {
if choice.Message.Role == model.RoleTool {
return true
}
}
return false
}
|