The AG-UI (Agent-User Interaction) protocol is maintained by the open-source AG-UI Protocol project. It enables agents built in different languages, frameworks, and execution environments to deliver their runtime outputs to user interfaces through a unified event stream. The protocol tolerates loosely matched payloads and supports transports such as SSE and WebSocket.
tRPC-Agent-Go ships with native AG-UI integration. It provides an SSE server implementation by default, while also allowing you to swap in a custom service.Service to use transports like WebSocket and to extend the event translation logic.
Getting Started
Assuming you already have an agent, you can expose it via the AG-UI protocol with just a few lines of code:
import("net/http""trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/server/agui")// Create the agent.agent:=newAgent()// Build the Runner that will execute the agent.runner:=runner.NewRunner(agent.Info().Name,agent)// Create the AG-UI server and mount it on an HTTP route.server,err:=agui.New(runner,agui.WithPath("/agui"))iferr!=nil{log.Fatalf("create agui server failed: %v",err)}// Start the HTTP listener.iferr:=http.ListenAndServe("127.0.0.1:8080",server.Handler());err!=nil{log.Fatalf("server stopped with error: %v",err)}
Note: If WithPath is not specified, the AG-UI server mounts at / by default.
For an in-depth guide to Runners, refer to the runner documentation.
On the client side you can pair the server with frameworks that understand the AG-UI protocol, such as CopilotKit. It provides React/Next.js components with built-in SSE subscriptions. The sample at examples/agui/client/copilotkit builds a web UI that communicates with the agent through AG-UI, as shown below.
Advanced Usage
Custom transport
The AG-UI specification does not enforce a transport. The framework uses SSE by default, but you can implement the service.Service interface to switch to WebSocket or any other transport:
translator.New converts internal events into the standard AG-UI events. To enrich the stream while keeping the default behaviour, implement translator.Translator and use the AG-UI Custom event type to carry extra data:
For example, when using React Planner, if you want to apply different custom events to different tags, you can achieve this by implementing a custom Translator, as shown in the image below.
By default, the AG-UI Runner does not append extra agent.RunOptions to the underlying runner.Run. Implement RunOptionResolver, inject it with aguirunner.WithRunOptionResolver, and translate client-provided configuration (for example, modelName or knowledgeFilter) from ForwardedProps.
RunOptionResolver executes for every incoming RunAgentInput. Its return value is forwarded to runner.Run in order. Returning an error surfaces a RunError to the client, while returning nil means no extra options are added.
Event Translation Callback
AG-UI provides an event translation callback mechanism, allowing custom logic to be inserted before and after the event translation process.
translator.BeforeTranslateCallback: Triggered before the internal event is translated into an AG-UI event. The return value convention:
Return (customEvent, nil): Use customEvent as the input event for translation.
Return (nil, nil): Retain the current event and continue with the subsequent callbacks. If all callbacks return nil, the original event will be used.
Return an error: Terminates the current execution, and the client will receive a RunError.
translator.AfterTranslateCallback: Triggered after the AG-UI event translation is completed and just before it is sent to the client. The return value convention:
Return (customEvent, nil): Use customEvent as the final event to be sent to the client.
Return (nil, nil): Retain the current event and continue with the subsequent callbacks. If all callbacks return nil, the original event will be sent.
Return an error: Terminates the current execution, and the client will receive a RunError.
import(aguievents"github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/core/events""trpc.group/trpc-go/trpc-agent-go/event""trpc.group/trpc-go/trpc-agent-go/server/agui"aguirunner"trpc.group/trpc-go/trpc-agent-go/server/agui/runner""trpc.group/trpc-go/trpc-agent-go/server/agui/translator")callbacks:=translator.NewCallbacks().RegisterBeforeTranslate(func(ctxcontext.Context,event*event.Event)(*event.Event,error){// Logic to execute before event translationreturnnil,nil}).RegisterAfterTranslate(func(ctxcontext.Context,eventaguievents.Event)(aguievents.Event,error){// Logic to execute after event translationifmsg,ok:=event.(*aguievents.TextMessageContentEvent);ok{// Modify the message content in the eventreturnaguievents.NewTextMessageContentEvent(msg.MessageID,msg.Delta+" [via callback]"),nil}returnnil,nil})server,err:=agui.New(runner,agui.WithAGUIRunnerOptions(aguirunner.WithTranslateCallbacks(callbacks)))
Event translation callbacks can be used in various scenarios, such as:
Custom Event Handling: Modify event data or add additional business logic during the translation process.
Monitoring and Reporting: Insert monitoring and reporting logic before and after event translation. A full example of integrating with Langfuse observability platform can be found at examples/agui/server/langfuse.
RunAgentInput Hook
You can use WithRunAgentInputHook to mutate the AG-UI request before it reaches the runner. The following example reads other_content from ForwardedProps and appends it to the latest user message:
Returning nil keeps the original input object while preserving in-place edits.
Returning a custom *adapter.RunAgentInput replaces the original input; returning nil keeps it.
Returning an error aborts the request and the client receives a RunError event.
Session Storage and Event Aggregation
When constructing the AG-UI Runner, pass in SessionService. Events generated by real-time conversations will be written into the session through SessionService, making it convenient to replay the history later via MessagesSnapshot.
In streaming response scenarios, a single reply usually consists of multiple incremental text events. Writing all of them directly into the session can put significant pressure on SessionService.
To address this, the framework first aggregates events and then writes them into the session. In addition, it performs a periodic flush once per second by default, and each flush writes the current aggregation result into the session.
aggregator.WithEnabled(true) is used to control whether event aggregation is enabled. It is enabled by default. When enabled, it aggregates consecutive text events that share the same messageId. When disabled, no aggregation is performed on AG-UI events.
aguirunner.WithFlushInterval(time.Second) is used to control the periodic flush interval of aggregated results. The default is 1 second. Setting it to 0 disables the periodic flush mechanism.
import("trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/server/agui""trpc.group/trpc-go/trpc-agent-go/server/agui/aggregator"aguirunner"trpc.group/trpc-go/trpc-agent-go/server/agui/runner""trpc.group/trpc-go/trpc-agent-go/session/inmemory")runner:=runner.NewRunner(agent.Info().Name,agent)sessionService:=inmemory.NewSessionService()server,err:=agui.New(runner,agui.WithPath("/agui"),agui.WithMessagesSnapshotPath("/history"),agui.WithMessagesSnapshotEnabled(true),agui.WithAppName(appName),agui.WithSessionService(sessionService),agui.WithAGUIRunnerOptions(aguirunner.WithUserIDResolver(userIDResolver),aguirunner.WithAggregationOption(aggregator.WithEnabled(true)),// Enable event aggregation, enabled by defaultaguirunner.WithFlushInterval(time.Second),// Set the periodic flush interval for aggregation results, default is 1 second),)
If more complex aggregation strategies are required, you can implement aggregator.Aggregator and inject it through a custom factory. Note that although an aggregator is created separately for each session, avoiding cross-session state management and concurrency handling, the aggregation methods themselves may still be called concurrently, so concurrency must still be handled properly.
For example, while remaining compatible with the default text aggregation, you can accumulate the content of custom events of type "think" and then persist them.
import(aguievents"github.com/ag-ui-protocol/ag-ui/sdks/community/go/pkg/core/events""trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/server/agui""trpc.group/trpc-go/trpc-agent-go/server/agui/aggregator"aguirunner"trpc.group/trpc-go/trpc-agent-go/server/agui/runner""trpc.group/trpc-go/trpc-agent-go/session/inmemory")typethinkAggregatorstruct{musync.Mutexinneraggregator.Aggregatorthinkstrings.Builder}funcnewAggregator(ctxcontext.Context,opt...aggregator.Option)aggregator.Aggregator{return&thinkAggregator{inner:aggregator.New(ctx,opt...)}}func(c*thinkAggregator)Append(ctxcontext.Context,eventaguievents.Event)([]aguievents.Event,error){// Use a mutex to ensure concurrent safety of the inner aggregator and the think buffer, avoiding race conditions when multiple events are appended concurrently.c.mu.Lock()deferc.mu.Unlock()// For custom "think" events, use a strategy of "accumulate only, do not emit immediately":// 1. Force flush inner first to fully emit previously accumulated normal events.// 2. Append the current think content to the buffer only, without returning it immediately.// This ensures that think fragments are not interrupted by normal events, and that previous normal events are emitted before new thinking content.ifcustom,ok:=event.(*aguievents.CustomEvent);ok&&custom.Name==string(thinkEventTypeContent){flushed,err:=c.inner.Flush(ctx)iferr!=nil{returnnil,err}// Only participate in accumulation when the value is a string, to avoid polluting the buffer with values of unexpected types.ifv,ok:=custom.Value.(string);ok{c.think.WriteString(v)}// The current think event only participates in internal accumulation and is not immediately visible externally. The return value is the previously aggregated results from inner.returnflushed,nil}// Non-think events follow the default inner aggregation logic to ensure that the original text aggregation behavior is preserved.events,err:=c.inner.Append(ctx,event)iferr!=nil{returnnil,err}// If there is no accumulated think content, directly return the aggregation results from inner without additional wrapping.ifc.think.Len()==0{returnevents,nil}// If there is accumulated think content, insert a single aggregated think event before the current batch of events:// 1. Package the buffer content into a single CustomEvent.// 2. Clear the buffer to avoid sending duplicate content.// 3. Place this think event before the current events to preserve the time order: output the complete thinking content first, then subsequent events.think:=aguievents.NewCustomEvent(string(thinkEventTypeContent),aguievents.WithValue(c.think.String()))c.think.Reset()out:=make([]aguievents.Event,0,len(events)+1)out=append(out,think)out=append(out,events...)returnout,nil}func(c*thinkAggregator)Flush(ctxcontext.Context)([]aguievents.Event,error){// Flush likewise needs to ensure concurrent safety of the inner aggregator and the think buffer.c.mu.Lock()deferc.mu.Unlock()// Flush inner first to ensure that all normal events are emitted according to its own aggregation strategy.events,err:=c.inner.Flush(ctx)iferr!=nil{returnnil,err}// If there is still unflushed content in the think buffer,// wrap it into a single aggregated think event and insert it at the front of the current batch,// ensuring that the aggregated thinking content is not reordered by events produced by subsequent flushes.ifc.think.Len()>0{think:=aguievents.NewCustomEvent(string(thinkEventTypeContent),aguievents.WithValue(c.think.String()))c.think.Reset()events=append([]aguievents.Event{think},events...)}returnevents,nil}runner:=runner.NewRunner(agent.Info().Name,agent)sessionService:=inmemory.NewSessionService()server,err:=agui.New(runner,agui.WithPath("/agui"),agui.WithMessagesSnapshotPath("/history"),agui.WithMessagesSnapshotEnabled(true),agui.WithAppName(appName),agui.WithSessionService(sessionService),agui.WithAGUIRunnerOptions(aguirunner.WithUserIDResolver(userIDResolver),aguirunner.WithAggregatorFactory(newAggregator),),)
Message Snapshot
Message snapshots restore historical conversations when a page is first opened or a connection is re-established. The feature is controlled by agui.WithMessagesSnapshotEnabled(true) and is disabled by default. Once enabled, AG-UI exposes both the chat route and the snapshot route:
The chat route defaults to / and can be customised with agui.WithPath;
The snapshot route defaults to /history, can be customised with WithMessagesSnapshotPath, and returns the event flow RUN_STARTED → MESSAGES_SNAPSHOT → RUN_FINISHED.
When enabling message snapshots, configure the following options:
agui.WithMessagesSnapshotEnabled(true) enables the snapshot endpoint;
agui.WithMessagesSnapshotPath sets the custom snapshot route, defaulting to /history;
agui.WithAppName(name) specifies the application name;
agui.WithSessionService(service) injects the session.Service used to look up historical events;
aguirunner.WithUserIDResolver(resolver) customises how userID is resolved, defaulting to "user".
While serving a snapshot request, the framework parses threadId from the RunAgentInput as the SessionID, combines it with the custom UserIDResolver to obtain userID, and then builds a session.Key together with appName. It queries the session through session.Service, converts the stored events (Session.Events) into AG-UI messages, and wraps them in a MESSAGES_SNAPSHOT event alongside matching RUN_STARTED and RUN_FINISHED events.
import("trpc.group/trpc-go/trpc-agent-go/server/agui""trpc.group/trpc-go/trpc-agent-go/server/agui/adapter"aguirunner"trpc.group/trpc-go/trpc-agent-go/server/agui/runner""trpc.group/trpc-go/trpc-agent-go/session/inmemory")resolver:=func(ctxcontext.Context,input*adapter.RunAgentInput)(string,error){ifuser,ok:=input.ForwardedProps["userId"].(string);ok&&user!=""{returnuser,nil}return"anonymous",nil}sessionService:=inmemory.NewService(context.Background())server,err:=agui.New(runner,agui.WithPath("/chat"),// Custom chat route, defaults to "/"agui.WithAppName("demo-app"),// AppName used to build the session keyagui.WithSessionService(sessionService),// Session Service used to query sessionsagui.WithMessagesSnapshotEnabled(true),// Enable message snapshotsagui.WithMessagesSnapshotPath("/history"),// Snapshot route, defaults to "/history"agui.WithAGUIRunnerOptions(aguirunner.WithUserIDResolver(resolver),// Custom userID resolver),)iferr!=nil{log.Fatalf("create agui server failed: %v",err)}iferr:=http.ListenAndServe("127.0.0.1:8080",server.Handler());err!=nil{log.Fatalf("server stopped with error: %v",err)}
The format of AG-UI's MessagesSnapshot event can be found at messages.
Setting the BasePath for Routes
agui.WithBasePath sets the base route prefix for the AG-UI service. The default value is /, and it is used to mount the chat route and message snapshot route under a unified prefix, avoiding conflicts with existing services.
agui.WithPath and agui.WithMessagesSnapshotPath only define sub-routes under the base path. The framework will use url.JoinPath to concatenate them with the base path to form the final accessible routes.
server,err:=agui.New(runner,agui.WithBasePath("/agui"),// Set the AG-UI prefix routeagui.WithPath("/chat"),// Set the chat route, default is "/"agui.WithMessagesSnapshotEnabled(true),// Enable message snapshot featureagui.WithMessagesSnapshotPath("/history"),// Set the message snapshot route, default is "/history")iferr!=nil{log.Fatalf("create agui server failed: %v",err)}
In this case, the chat route will be /agui/chat, and the message snapshot route will be /agui/history.
Best Practices
Generating Documents
If a long-form document is inserted directly into the main conversation, it can easily “flood” the dialogue, making it hard for users to distinguish between conversation content and document content. To solve this, it’s recommended to use a document panel to carry long documents. By defining a workflow in the AG-UI event stream — “open document panel → write document content → close document panel” — you can pull long documents out of the main conversation and avoid disturbing normal interactions. A sample approach is as follows.
Backend: Define tools and constrain call order
Provide the Agent with two tools: open document panel and close document panel, and constrain the generation order in the prompt:
when entering the document generation flow, execute in the following order:
Call the “open document panel” tool first
Then output the document content
Finally call the “close document panel” tool
Converted into an AG-UI event stream, it looks roughly like this: