Skip to content

Runner Component User Guide

Overview

Runner provides the interface to run Agents, responsible for session management and event stream processing. The core responsibilities of Runner are: obtain or create sessions, generate an Invocation ID, call the Agent (via agent.RunWithPlugins), process the returned event stream, and append non-partial response events to the session.

๐ŸŽฏ Key Features

  • ๐Ÿ’พ Session Management: Obtain/create sessions via sessionService, using inmemory.NewSessionService() by default.
  • ๐Ÿ”„ Event Handling: Receive Agent event streams and append non-partial response events to the session.
  • ๐Ÿ†” ID Generation: Automatically generate Invocation IDs and event IDs.
  • ๐Ÿ“Š Observability Integration: Integrates telemetry/trace to automatically record spans.
  • โœ… Completion Event: Generates a runner.completion event after the Agent event stream ends.
  • ๐Ÿ”Œ Plugins: Register once on a Runner to apply global hooks across agent, tool, and model lifecycles.

Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚       Runner        โ”‚  - Session management.
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  - Event stream processing.
          โ”‚
          โ”‚ agent.RunWithPlugins(ctx, invocation, r.agent)
          โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚       Agent         โ”‚  - Receives Invocation.
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  - Returns <-chan *event.Event.
          โ”‚
          โ”‚ Implementation is determined by the Agent.
          โ”‚
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚    Agent Impl       โ”‚  e.g., LLMAgent, ChainAgent.
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

๐Ÿš€ Quick Start

๐Ÿ“‹ Requirements

  • Go 1.21 or later.
  • Valid LLM API key (OpenAI-compatible interface).
  • Redis (optional, for distributed session management).

๐Ÿ’ก Minimal Example

package main

import (
    "context"
    "fmt"

    "trpc.group/trpc-go/trpc-agent-go/agent"
    "trpc.group/trpc-go/trpc-agent-go/agent/llmagent"
    "trpc.group/trpc-go/trpc-agent-go/model"
    "trpc.group/trpc-go/trpc-agent-go/model/openai"
    "trpc.group/trpc-go/trpc-agent-go/runner"
)

func main() {
    // 1. Create model.
    llmModel := openai.New("DeepSeek-V3-Online-64K")

    // 2. Create Agent.
    a := llmagent.New("assistant",
        llmagent.WithModel(llmModel),
        llmagent.WithInstruction("You are a helpful AI assistant."),
        llmagent.WithGenerationConfig(model.GenerationConfig{Stream: true}), // Enable streaming output.
    )

    // 3. Create Runner.
    r := runner.NewRunner("my-app", a)
    defer r.Close()  // Ensure cleanup (trpc-agent-go >= v0.5.0)

    // 4. Run conversation.
    ctx := context.Background()
    userMessage := model.NewUserMessage("Hello!")

    eventChan, err := r.Run(ctx, "user1", "session1", userMessage, agent.WithRequestID("request-ID"))
    if err != nil {
        panic(err)
    }

    // 5. Handle responses.
    for event := range eventChan {
        if event.Error != nil {
            fmt.Printf("Error: %s\n", event.Error.Message)
            continue
        }

        if len(event.Response.Choices) > 0 {
            fmt.Print(event.Response.Choices[0].Delta.Content)
        }

        // Recommended: stop when Runner emits its completion event.
        if event.IsRunnerCompletion() {
            break
        }
    }
}

๐Ÿš€ Run the Example

# Enter the example directory.
cd examples/runner

# Set API key.
export OPENAI_API_KEY="your-api-key"

# Basic run.
go run main.go

# Use Redis session.
docker run -d -p 6379:6379 redis:alpine
go run main.go -session redis

# Custom model.
go run main.go -model "gpt-4o-mini"

๐Ÿ’ฌ Interactive Features

After running the example, the following special commands are supported:

  • /history - Ask AI to show conversation history.
  • /new - Start a new session (reset conversation context).
  • /exit - End the conversation.

When the AI uses tools, detailed invocation processes will be displayed:

1
2
3
4
5
6
7
8
๐Ÿ”ง Tool Call:
   โ€ข calculator (ID: call_abc123)
     Params: {"operation":"multiply","a":25,"b":4}

๐Ÿ”„ Executing...
โœ… Tool Response (ID: call_abc123): {"operation":"multiply","a":25,"b":4,"result":100}

๐Ÿค– Assistant: I calculated 25 ร— 4 = 100 for you.

๐Ÿ”ง Core API

Create Runner

1
2
3
4
5
6
7
// Basic creation.
r := runner.NewRunner(appName, agent, options...)

// Common options.
r := runner.NewRunner("my-app", agent,
    runner.WithSessionService(sessionService),  // Session service.
)

๐Ÿงฉ Request-Scoped Agent Creation (Agent Factory)

By default, runner.NewRunner(...) takes a fully built agent.Agent and reuses that same instance for every request.

If your agent needs request-specific configuration (for example, prompt, model, sandbox instance, tools), you can build a fresh agent for every run.

Option A: Create the default agent on demand

r := runner.NewRunnerWithAgentFactory(
    "my-app",
    "assistant",
    func(ctx context.Context, ro agent.RunOptions) (agent.Agent, error) {
        // Use ro (or ro.RuntimeState / ro.CustomAgentConfigs) to decide
        // how to build the agent for this request.
        a := llmagent.New("assistant",
            llmagent.WithInstruction(ro.Instruction),
        )
        return a, nil
    },
)

Option B: Register named factories and select them by name

r := runner.NewRunner("my-app", defaultAgent,
    runner.WithAgentFactory("sandboxed", func(
        ctx context.Context,
        ro agent.RunOptions,
    ) (agent.Agent, error) {
        return llmagent.New("sandboxed"), nil
    }),
)

events, err := r.Run(ctx, userID, sessionID, message,
    agent.WithAgentByName("sandboxed"),
)
_ = events
_ = err

Notes:

  • The factory is called once per Runner.Run(...).
  • agent.WithAgent(...) still overrides everything (useful for tests).

Resource Ownership Inside Agent Factories

AgentFactory is ideal for request-scoped Agent construction, but it does not transfer ownership of resources created inside the factory.

  • The Runner only asks the factory for an agent.Agent.
  • Runner.Close() only closes resources created or owned by the Runner itself; it does not automatically close request-scoped tool.ToolSet instances, temporary MCP connections, sandbox sessions, or similar resources created inside the factory.
  • The reason is structural: the agent.Agent interface does not expose a Close() method, so the Runner has no generic way to reclaim those resources.

Recommended patterns:

  • If a ToolSet or external connection can be reused across requests, create it once outside the factory, reuse it inside the factory, and close it during application shutdown.
  • If a resource must be created per request, the caller should clean it up explicitly after that run finishes. Common patterns are wrapping the Agent with cleanup logic, or running cleanup from an after-agent callback.

This boundary is especially important when using MCP ToolSets. See the ToolSet lifecycle notes in the tool documentation for more details.

Resume the Next User Turn at a Specific Agent

In a multi-Agent conversation, a transferred SubAgent may ask the user for missing information. The next request is a brand-new Runner.Run(...) call, so Runner needs an explicit signal if that next user message should resume at the same Agent instead of the normal entry Agent.

Enable the one-shot route consumer on Runner:

1
2
3
4
5
r := runner.NewRunner(
    "crm-app",
    coordinatorAgent,
    runner.WithAwaitUserReplyRouting(true),
)

There are two ways to produce that route:

  1. LLMAgent: enable llmagent.WithAwaitUserReplyTool(true) and instruct the model to call await_user_reply immediately before it asks the user for missing information.
  2. Custom Agent implementations: call agent.MarkAwaitingUserReply(invocation) before you emit the final clarifying question event.

Low-Level Example for a Custom Agent

type clarifierAgent struct{}

func (a *clarifierAgent) Run(
    ctx context.Context,
    inv *agent.Invocation,
) (<-chan *event.Event, error) {
    ch := make(chan *event.Event, 1)
    go func() {
        defer close(ch)

        if missingPhoneNumber(inv.Message) {
            _ = agent.MarkAwaitingUserReply(inv)
            _ = agent.EmitEvent(
                ctx,
                inv,
                ch,
                event.NewResponseEvent(
                    inv.InvocationID,
                    inv.AgentName,
                    &model.Response{
                        Done: true,
                        Choices: []model.Choice{{
                            Index: 0,
                            Message: model.Message{
                                Role:    model.RoleAssistant,
                                Content: "What phone number should I save?",
                            },
                        }},
                    },
                ),
            )
            return
        }

        _ = agent.EmitEvent(
            ctx,
            inv,
            ch,
            event.NewResponseEvent(
                inv.InvocationID,
                inv.AgentName,
                &model.Response{
                    Done: true,
                    Choices: []model.Choice{{
                        Index: 0,
                        Message: model.Message{
                            Role:    model.RoleAssistant,
                            Content: "Profile updated.",
                        },
                    }},
                },
            ),
        )
    }()
    return ch, nil
}

Behavior summary:

  • The route is stored in session state as a stable agent path, not by mutating message roles.
  • It is consumed once, right before the next user turn starts.
  • agent.WithAgent(...) and agent.WithAgentByName(...) still take precedence.
  • If the recorded Agent path no longer exists, Runner clears the stale route and falls back to the default entry Agent.
  • Nested SubAgents are resumed by their full invocation path, so the common coordinator + WithSubAgents(...) setup works without manually registering every child Agent.

Advanced note:

  • The built-in Runner records the stable root lookup key automatically, including AgentFactory cases where the runtime Info().Name differs from the registered factory name.
  • If you build and run invocations manually outside Runner, and the stable root lookup key is different from inv.AgentName, call agent.SetAwaitUserReplyRootLookupName(inv, rootLookupName) before the Agent emits its final clarifying reply.

๐Ÿ”Œ Plugins

Runner plugins are global, runner-scoped hooks. Register plugins once and they will apply automatically to all agents, tools, and model calls executed by that Runner.

1
2
3
4
5
6
7
8
9
import "trpc.group/trpc-go/trpc-agent-go/plugin"

r := runner.NewRunner("my-app", a,
    runner.WithPlugins(
        plugin.NewLogging(),
        plugin.NewGlobalInstruction("You must follow security policies."),
    ),
)
defer r.Close()

Notes:

  • Plugin names must be unique per Runner.
  • Plugins run in the order they are registered.
  • If a plugin implements plugin.Closer, Runner will call it in Close().

๐Ÿ”„ Ralph Loop Mode

Ralph Loop is an "outer loop" mode. Instead of trusting a Large Language Model (LLM) to decide when it is done, Runner will keep iterating until a verifiable completion condition is met.

Common completion conditions:

  • A completion promise in the assistant output (for example, <promise>DONE</promise>).
  • A verification command exits with code 0 (for example, go test ./...).
  • Additional custom checks via runner.Verifier.
  • MaxIterations is always recommended as a safety valve.
1
2
3
4
5
6
7
8
r := runner.NewRunner("my-app", a,
    runner.WithRalphLoop(runner.RalphLoopConfig{
        MaxIterations:     20,
        CompletionPromise: "DONE",
        VerifyCommand:     "go test ./... -count=1",
        VerifyTimeout:     2 * time.Minute,
    }),
)

When MaxIterations is reached without success, Runner emits an error event with error type stop_agent_error.

Run Conversation

// Execute a single conversation.
eventChan, err := r.Run(ctx, userID, sessionID, message, options...)

Request ID (requestID) and Run Control

Each call to Runner.Run is a run. If you want to cancel a run or query its status, you need a request identifier (requestID).

You can provide your own requestID (recommended) via agent.WithRequestID (for example, a Universally Unique Identifier (UUID)). Runner injects it into every emitted event.Event (event.RequestID).

requestID := "req-123"

eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    message,
    agent.WithRequestID(requestID),
)
if err != nil {
    panic(err)
}

managed := r.(runner.ManagedRunner)
status, ok := managed.RunStatus(requestID)
_ = status
_ = ok

// Cancel the run by requestID.
managed.Cancel(requestID)

Queue a New User Message into the Same Run

Sometimes you do not want to start a second run. You want to keep the current requestID, queue a new role=user message, and insert it only after the current assistant round is finished.

Use runner.EnqueueUserMessage(...):

requestID := "req-123"

eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    model.NewUserMessage("Draft a launch note."),
    agent.WithRequestID(requestID),
)
if err != nil {
    panic(err)
}

go func() {
    time.Sleep(time.Second)
    err := runner.EnqueueUserMessage(
        r,
        requestID,
        model.NewUserMessage("Also make the tone warmer."),
    )
    if err != nil {
        log.Printf("enqueue steer failed: %v", err)
    }
}()

_ = eventChan

Think of one assistant output as one round:

  • If the assistant only replies with text, the round ends at that reply
  • If the assistant emits tool_calls, the round ends only after that whole tool batch finishes

The queued user message can only be inserted between rounds. It is never inserted in the middle of a round.

The simplest valid shape is:

1
2
3
4
5
user(Q1)
assistant(tool_call A)
tool(result A)
user(Q2, queued steer)
assistant(...)

If one assistant message emits multiple tool calls, the framework still waits for the whole round:

1
2
3
4
5
6
user(Q1)
assistant(tool_calls A, B)
tool(result A)
tool(result B)
user(Q2, queued steer)
assistant(...)

It will not insert like this:

1
2
3
4
5
user(Q1)
assistant(tool_calls A, B)
tool(result A)
user(Q2, queued steer)
tool(result B)

because that would break the tool_call -> tool_response structure of the same assistant round.

So the behavior is:

  • This does not start a second run
  • The message is queued first, not written to session immediately
  • It is appended only after the previous assistant round and its tool work are fully finished
  • This keeps the tool_call -> tool_response structure intact
  • If the run has already finished, enqueue returns an error

If you want the implementation-level mapping, this happens after one runOneStep() finishes and before the next runOneStep() starts.

Runnable example: examples/steer/

Per-Request App Name Override (multi-tenant isolation)

By default, Runner uses the appName supplied at construction for session keys and event filter keys. If a single Runner instance serves multiple projects or tenants, you can override the app name on each Run call with agent.WithAppName:

// One runner, two projects.
r := runner.NewRunner("default-app", myAgent)

// Project A โ€” sessions are stored under "project-a".
evA, _ := r.Run(ctx, userID, sessionID, msg,
    agent.WithAppName("project-a"),
)

// Project B โ€” sessions are stored under "project-b", fully isolated from A.
evB, _ := r.Run(ctx, userID, sessionID, msg,
    agent.WithAppName("project-b"),
)

When WithAppName is not provided (or the value is empty), the runner falls back to the constructor-supplied default app name. The override affects:

Dimension Default (no override) With WithAppName("X")
session.Key.AppName constructor appName "X"
Default EventFilterKey constructor appName "X"

Other runner-level registrations (observability appid, agent registry) remain bound to the original constructor appName.

Note

appName must not be empty. If neither the constructor nor WithAppName provides a non-empty value, the session service returns session.ErrAppNameRequired.

Detached Cancellation (background execution)

In Go, context.Context (often named ctx) carries both cancellation and a deadline. By default, Runner stops when ctx is cancelled.

If you want the run to continue after a parent cancellation, enable detached cancellation and use a timeout to bound the total runtime:

1
2
3
4
5
6
7
8
9
eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    message,
    agent.WithRequestID(requestID),
    agent.WithDetachedCancel(true),
    agent.WithMaxRunDuration(30*time.Second),
)

Runner enforces the earlier of:

  • the parent context deadline (if any)
  • MaxRunDuration (if set)

Resume Interrupted Runs (tools-first resume)

In long-running conversations, users may interrupt the agent while it is still in a tool-calling phase (for example, the last message in the session is an assistant message with tool_calls, but no tool result has been written yet). When you later reuse the same sessionID, you can ask the Runner to resume from that point instead of asking the model to repeat the tool calls:

1
2
3
4
5
6
7
eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    model.Message{},                // no new user message
    agent.WithResume(true),         // enable resume mode
)

When WithResume(true) is set:

  • Runner inspects the latest persisted session event.
  • If the last event is an assistant response that contains tool_calls and there is no later tool result, Runner will execute those pending tools first (using the same tool set and callbacks as a normal step) and persist the tool results into the session.
  • After tools finish, the normal LLM cycle continues using the updated session history, so the model sees both the original tool calls and their results.

If the last event is a user or tool message (or a plain assistant reply without tool_calls), WithResume(true) is a no-op and the flow behaves like todayโ€™s Run call.

Tool Call Arguments Auto Repair

Some models may emit non-strict JSON arguments for tool_calls (for example, unquoted object keys or trailing commas), which can break tool execution or external parsing.

When agent.WithToolCallArgumentsJSONRepairEnabled(true) is enabled in runner.Run, the framework will best-effort repair toolCall.Function.Arguments. For detailed usage, see Tool Call Arguments Auto Repair.

Provide Conversation History (auto-seed + session reuse)

If your upstream service maintains the conversation and you want the agent to see that context, you can pass a full history ([]model.Message) directly. The runner will seed an empty session with that history automatically and then merge in new session events.

Option A: Use the convenience helper runner.RunWithMessages

1
2
3
4
5
6
7
8
msgs := []model.Message{
    model.NewSystemMessage("You are a helpful assistant."),
    model.NewUserMessage("First user input"),
    model.NewAssistantMessage("Previous assistant reply"),
    model.NewUserMessage("Whatโ€™s the next step?"),
}

ch, err := runner.RunWithMessages(ctx, r, userID, sessionID, msgs, agent.WithRequestID("request-ID"))

Example: examples/runwithmessages (uses RunWithMessages; runner auto-seeds and continues reusing the session)

Option B: Pass via RunOption explicitly (same philosophy as ADK Python)

msgs := []model.Message{ /* as above */ }
ch, err := r.Run(ctx, userID, sessionID, model.Message{}, agent.WithMessages(msgs))

When []model.Message is provided, the runner persists that history into the session on first use (if empty). The content processor does not read this option; it only derives messages from session events (or falls back to the single invocation.Message if the session has no events). RunWithMessages still sets invocation.Message to the latest user turn so graph/flow agents that inspect it continue to work.

User Message Rewriting

agent.WithUserMessageRewriter(...) rewrites the current-turn user message before the run starts. The rewritten result is written into the session as the effective input for the current turn and continues to participate in subsequent turns. This is useful for adding business context, normalizing user wording, or splitting one input into multiple messages that are easier for the model to process.

The signature of UserMessageRewriter is:

import (
    "trpc.group/trpc-go/trpc-agent-go/model"
)

type UserMessageRewriter func(
    ctx context.Context,
    args *UserMessageRewriteArgs,
) ([]model.Message, error)

type UserMessageRewriteArgs struct {
    AppName         string
    UserID          string
    SessionID       string
    RequestID       string
    OriginalMessage model.Message
}

OriginalMessage is the raw user input for the current turn. The other fields provide stable identifiers for the current run.

The returned messages are processed in order. The last message becomes invocation.Message, and any preceding messages are persisted as leading messages for the same turn. This allows the interface to support both 1 -> 1 rewrites and 1 -> N expansions. If the same call also passes historical messages via agent.WithMessages(...), the rewritten result is written together with that history. The rewriter must not return an empty slice; if it does, the runner returns an error immediately.

Example:

import (
    "trpc.group/trpc-go/trpc-agent-go/agent"
    "trpc.group/trpc-go/trpc-agent-go/model"
)

func rewriteUserMessage(
    ctx context.Context,
    args *agent.UserMessageRewriteArgs,
) ([]model.Message, error) {
    raw := strings.TrimSpace(args.OriginalMessage.Content)
    if raw == "" {
        return []model.Message{args.OriginalMessage}, nil
    }
    if needsContext(raw) {
        return []model.Message{
            model.NewUserMessage("Please interpret the following request with the business context below."),
            model.NewUserMessage(raw),
        }, nil
    }
    return []model.Message{
        model.NewUserMessage("Please rewrite the following request into a clearer and more complete user message: " + raw),
    }, nil
}

eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    userMessage,
    agent.WithUserMessageRewriter(rewriteUserMessage),
)

A complete example is available at examples/usermessagerewriter.

Override Runtime Surfaces for a Specific Node by nodeID

If you need to change one specific node in a runner.Run(...) call instead of changing the entire agent, pass agent.WithSurfacePatchForNode(nodeID, patch).

var patch agent.SurfacePatch
patch.SetInstruction("Answer in one short paragraph.")

events, err := r.Run(
    ctx,
    userID,
    sessionID,
    model.NewUserMessage("Summarize this report."),
    agent.WithSurfacePatchForNode(nodeID, patch),
)

Prefer obtaining a stable nodeID from structure.Export(...) and then pass it to WithSurfacePatchForNode(...). If you need to patch multiple nodes in the same run, pass multiple WithSurfacePatchForNode(...) options. For full details and more examples, see Agent: Override Runtime Surfaces by nodeID.

Override code executor Per Run

If you need to specify a different execution environment for a particular request on an agent that resolves its executor from RunOptions.CodeExecutor, such as LLMAgent, pass agent.WithCodeExecutor(exec) to runner.Run(...).

1
2
3
4
5
6
7
events, err := r.Run(
    ctx,
    userID,
    sessionID,
    model.NewUserMessage("Run the release checklist skill."),
    agent.WithCodeExecutor(containerExec),
)

Notes:

  • This option applies only to the current runner.Run(...) call and does not change the agent's default configuration.
  • This option only applies to agents that read RunOptions.CodeExecutor. If you use a custom agent, make sure its implementation handles this run option.
  • If the agent was created with llmagent.WithCodeExecutor(...), the executor passed here temporarily overrides that default for this run.
  • Capabilities that resolve their executor from RunOptions.CodeExecutor (for example workspace_exec) use the executor passed here for this run.
  • If you do not want Markdown fenced code blocks in model replies to auto-execute, set llmagent.WithEnableCodeExecutionResponseProcessor(false) when creating the agent. See Skill for more details.

โœ… Detecting End-of-Run and Reading Final Output (Graph-friendly)

When driving a GraphAgent workflow, the LLMโ€™s โ€œfinal responseโ€ is not the end of the workflowโ€”nodes like output may still be pending. Instead of checking Response.IsFinalResponse(), always stop on the Runnerโ€™s terminal completion event:

1
2
3
4
5
6
for e := range eventChan {
    // ... print streaming chunks, etc.
    if e.IsRunnerCompletion() {
        break
    }
}

For convenience, Runner now propagates the graphโ€™s final snapshot into this last event. You can extract the final textual output via graph.StateKeyLastResponse:

import "trpc.group/trpc-go/trpc-agent-go/graph"

for e := range eventChan {
    if e.IsRunnerCompletion() {
        if b, ok := e.StateDelta[graph.StateKeyLastResponse]; ok {
            var final string
            _ = json.Unmarshal(b, &final)
            fmt.Println("\nFINAL:", final)
        }
        break
    }
}

This keeps application code simple and consistent across Agent types while still preserving detailed graph events for advanced use.

Fatal Errors Before a Graph Completion Event

For the full framework-level recommendation, including the standard graph collector and A2A conventions, see Error Handling.

Sometimes a run stops early because of a fatal error before the graph emits its final graph.execution event. A common example is:

  • a node callback emits a custom state delta with fatal-error details
  • the run then aborts before the graph can produce its normal final snapshot

In that case, Runner still emits the final runner.completion event. When the terminal error is a real fatal error (not stop_agent_error), Runner now copies the accumulated fallback business state onto that last event for you:

  • StateDelta: the accumulated state delta from the error path

Two details matter here:

  • Runner keeps the original fatal event as the only carrier of Response.Error, so downstream translators can still treat runner.completion as a normal finish signal.
  • Graph metadata keys such as graph.MetadataKeyNode and graph.MetadataKeyTool are filtered out from the fallback delta to avoid re-translating node/tool lifecycle events in consumers such as AGUI.

This lets application code keep the same simple rule: read the last event first for business-level fatal details, instead of scanning the whole stream to find the callback/error event.

If the graph uses graph.NewExecutionErrorCollector(), any collected execution_errors in that StateDelta may come from the default recoverable contract as well, for example errors that implement Recoverable() bool or errors wrapped by graph.MarkRecoverable(err).

Example:

package main

import (
    "context"
    "encoding/json"
    "fmt"

    "trpc.group/trpc-go/trpc-agent-go/event"
    "trpc.group/trpc-go/trpc-agent-go/graph"
    "trpc.group/trpc-go/trpc-agent-go/model"
)

const stateKeyNodeFatal = "node_fatal_error"

type RunSummary struct {
    TransportError  *model.ResponseError
    FatalDetail     map[string]any
    ExecutionErrors []graph.ExecutionError
}

func ConsumeRun(
    ctx context.Context,
    eventChan <-chan *event.Event,
) (*RunSummary, error) {
    summary := &RunSummary{}

    for {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        case evt, ok := <-eventChan:
            if !ok {
                return summary, nil
            }
            if evt.Response != nil && evt.Response.Error != nil {
                summary.TransportError = evt.Response.Error
            }
            if !evt.IsRunnerCompletion() {
                continue
            }

            if b, ok := evt.StateDelta[stateKeyNodeFatal]; ok {
                var detail map[string]any
                if err := json.Unmarshal(b, &detail); err != nil {
                    return nil, err
                }
                summary.FatalDetail = detail
            }

            executionErrors, err := graph.ExecutionErrorsFromStateDelta(
                evt.StateDelta,
                graph.StateKeyExecutionErrors,
            )
            if err != nil {
                return nil, err
            }
            summary.ExecutionErrors = executionErrors
            return summary, nil
        }
    }
}

func PrintSummary(summary *RunSummary) {
    if summary.TransportError != nil {
        fmt.Printf(
            "transport error: type=%s code=%s message=%s\n",
            summary.TransportError.Type,
            ptrValue(summary.TransportError.Code),
            summary.TransportError.Message,
        )
    }
    if summary.FatalDetail != nil {
        fmt.Printf("fatal detail: %+v\n", summary.FatalDetail)
    }
    for _, record := range summary.ExecutionErrors {
        if record.Error == nil {
            continue
        }
        fmt.Printf(
            "execution error: severity=%s node=%s code=%s message=%s\n",
            record.Severity,
            record.NodeName,
            ptrValue(record.Error.Code),
            record.Error.Message,
        )
    }
}

func ptrValue(value *string) string {
    if value == nil {
        return ""
    }
    return *value
}

Recommended mental model:

  • Success path with graph completion: read final output from the completion eventโ€™s StateDelta (for example, graph.StateKeyLastResponse)
  • Fatal exit before graph completion: read your custom fatal keys from the same completion event; if you also need the structured Response.Error, it remains on the original fatal event
  • stop_agent_error: still behaves like a controlled stop signal and is not duplicated onto the completion event

๐Ÿ” Option: Emit Final Graph LLM Responses

Graph-based agents (for example, GraphAgent) can call a Large Language Model (LLM) many times inside a single run. Each model call can produce a stream of events:

  • Partial chunks: IsPartial=true, Done=false, incremental text in choice.Delta.Content
  • Final message: IsPartial=false, Done=true, full text in choice.Message.Content

By default, graph LLM nodes only emit the partial chunks. This avoids treating intermediate node outputs as normal assistant replies (for example, persisting them into the Session by Runner or showing them to end users).

To opt into the newer behavior (emit the final Done=true assistant message events from graph LLM nodes), enable this RunOption:

1
2
3
4
5
6
7
eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    message,
    agent.WithGraphEmitFinalModelResponses(true),
)

Behavior summary:

First, one key idea: this option controls whether each graph Large Language Model (LLM) node emits an extra final Done=true assistant message event. It does not mean the Runner completion event will always have (or not have) Response.Choices.

Assume your graph is llm1 -> llm2 -> llm3, and llm3 produces the final answer:

  • Case 1: agent.WithGraphEmitFinalModelResponses(false) (default)
    • llm1/llm2/llm3: emit only partial chunks (Done=false), no final Done=true assistant message events.
    • Runner completion event: to keep the โ€œread only the last eventโ€ pattern working, Runner echoes llm3โ€™s final output into completion Response.Choices (when the graph provides final choices). The final text is also always available via StateDelta[graph.StateKeyLastResponse].
  • Case 2: agent.WithGraphEmitFinalModelResponses(true)
    • llm1/llm2/llm3: in addition to partial chunks, each node emits a final Done=true assistant message event (so intermediate nodes may now produce complete assistant messages, and Runner may persist those non-partial events into the Session).
    • Runner completion event: to avoid duplicating the final message, Runner deduplicates by response identifier (ID). When it can confirm the final message already appeared earlier, it omits the echo, so completion Response.Choices may be empty. The final text should still be read from StateDelta[graph.StateKeyLastResponse].

Recommendation: for GraphAgent workflows, always read the final output from the Runner completion eventโ€™s StateDelta (for example, graph.StateKeyLastResponse). Treat Response.Choices on the completion event as optional when this option is enabled.

Option: Keep Only Terminal Graph Message Events

When a graph contains multiple Large Language Model (LLM) nodes or sub-agent nodes, the caller-visible message stream may include intermediate drafts from earlier nodes. To preserve full backward compatibility, that behavior remains the default.

If you want the caller-visible stream to keep only terminal graph message events, enable:

1
2
3
4
5
6
7
eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    message,
    agent.WithGraphTerminalMessagesOnly(true),
)

Behavior summary:

  • Default (false): unchanged. Intermediate graph node message events are still forwarded.
  • Enabled (true): caller-visible message events are limited to terminal LLM nodes and terminal sub-agent nodes.
  • Parallel terminal nodes are all preserved. The option does not collapse a fan-out graph into a single winner.
  • Internal graph execution is unchanged. State handoff, history aggregation, tracing, and token accounting still use the full raw graph event stream.

This option is especially useful when your product experience should stream only the last user-facing graph step, while keeping intermediate graph messages internal.

For graph LLM nodes, pair this option with agent.WithGraphEmitFinalModelResponses(true) when you also want terminal Done=true assistant message events to be forwarded. See examples/graph/terminal_messages_only.

๐ŸŽ›๏ธ Option: StreamMode

Runner can filter the event stream before it reaches your application code. This provides a single, run-level switch to select which categories of events are forwarded to your eventChan.

Use agent.WithStreamMode(...):

1
2
3
4
5
6
7
eventChan, err := r.Run(
    ctx,
    userID,
    sessionID,
    message,
    agent.WithStreamMode(agent.StreamModeMessages),
)

Supported modes (graph workflows):

  • messages: model output events (for example, chat.completion.chunk)
  • updates: graph.state.update / graph.channel.update / graph.execution
  • checkpoints: graph.checkpoint.*
  • tasks: task lifecycle events (graph.node.*, graph.pregel.*)
  • debug: same as checkpoints + tasks
  • custom: node-emitted events (graph.node.custom)

Notes:

  • When agent.StreamModeMessages is selected, graph-based Large Language Model (LLM) nodes enable final model response events automatically for that run. To override it, call agent.WithGraphEmitFinalModelResponses(false) after agent.WithStreamMode(...).
  • StreamMode only affects what Runner forwards to your eventChan. Runner still processes and persists events internally.
  • For graph workflows, some event types (for example, graph.checkpoint.*) are emitted only when their corresponding mode is selected.
  • Runner always emits a final runner.completion event.

๐Ÿ’พ Session Management

In-memory Session (Default)

1
2
3
4
5
import "trpc.group/trpc-go/trpc-agent-go/session/inmemory"

sessionService := inmemory.NewSessionService()
r := runner.NewRunner("app", agent,
    runner.WithSessionService(sessionService))

Redis Session (Distributed)

1
2
3
4
5
6
7
8
import "trpc.group/trpc-go/trpc-agent-go/session/redis"

// Create Redis session service.
sessionService, err := redis.NewService(
    redis.WithRedisClientURL("redis://localhost:6379"))

r := runner.NewRunner("app", agent,
    runner.WithSessionService(sessionService))

Session Configuration

1
2
3
4
5
6
// Configuration options supported by Redis.
sessionService, err := redis.NewService(
    redis.WithRedisClientURL("redis://localhost:6379"),
    redis.WithSessionEventLimit(1000),         // Limit number of session events.
    // redis.WithRedisInstance("redis-instance"), // Or use an instance name.
)

๐Ÿค– Agent Configuration

Runner's core responsibility is to manage the Agent execution flow. A created Agent needs to be executed via Runner.

Basic Agent Creation

1
2
3
4
5
6
7
// Create a basic Agent (see agent.md for detailed configuration).
agent := llmagent.New("assistant",
    llmagent.WithModel(model),
    llmagent.WithInstruction("You are a helpful AI assistant."))

// Execute Agent with Runner.
r := runner.NewRunner("my-app", agent)

Switch Agents Per Request

Runner can register multiple optional agents at construction time and pick one per Run:

reader := llmagent.New("agent1", llmagent.WithModel(model))
writer := llmagent.New("agent2", llmagent.WithModel(model))

r := runner.NewRunner("my-app", reader, // Use reader as the default agent.
    runner.WithAgent("writer", writer), // Register an optional agent by name.
)

// Use the default reader agent.
ch, err := r.Run(ctx, userID, sessionID, msg)

// Pick the writer agent by name.
ch, err = r.Run(ctx, userID, sessionID, msg, agent.WithAgentByName("writer"))

// Override with an instance directly (no pre-registration needed).
custom := llmagent.New("custom", llmagent.WithModel(model))
ch, err = r.Run(ctx, userID, sessionID, msg, agent.WithAgent(custom))
  • runner.NewRunner("my-app", agent): Set the default agent when creating the Runner.
  • runner.WithAgent("agentName", agent): Pre-register an agent by name so later requests can switch via name.
  • agent.WithAgentByName("agentName"): Choose a registered agent by name for a single request without changing the default.
  • agent.WithAgent(agent): Provide an agent instance directly for a single request; highest priority and no pre-registration needed.

Agent selection priority: agent.WithAgent > agent.WithAgentByName > default agent set at construction.

The selected agent name is used as the event author and is recorded via appid.RegisterRunner for observability.

Generation Configuration

Runner passes generation configuration to the Agent:

// Helper functions.
func intPtr(i int) *int           { return &i }
func floatPtr(f float64) *float64 { return &f }

genConfig := model.GenerationConfig{
    MaxTokens:   intPtr(2000),
    Temperature: floatPtr(0.7),
    Stream:      true,  // Enable streaming output.
}

agent := llmagent.New("assistant",
    llmagent.WithModel(model),
    llmagent.WithGenerationConfig(genConfig))

Tool Integration

Tool configuration is done inside the Agent, while Runner is responsible for running the Agent with tools:

// Create tools (see tool.md for detailed configuration).
tools := []tool.Tool{
    function.NewFunctionTool(myFunction, function.WithName("my_tool")),
    // More tools...
}

// Add tools to the Agent.
agent := llmagent.New("assistant",
    llmagent.WithModel(model),
    llmagent.WithTools(tools))

// Runner runs the Agent configured with tools.
r := runner.NewRunner("my-app", agent)

Tool invocation flow: Runner itself does not directly handle tool invocation. The flow is as follows:

  1. Pass tools: Runner passes context to the Agent via Invocation.
  2. Agent processing: Agent.Run handles the tool invocation logic.
  3. Event forwarding: Runner receives the event stream returned by the Agent and forwards it.
  4. Session recording: Append non-partial response events to the session.

Multi-Agent Support

Runner can execute complex multi-Agent structures (see multiagent.md for details):

1
2
3
4
5
6
7
8
import "trpc.group/trpc-go/trpc-agent-go/agent/chainagent"

// Create a multi-Agent pipeline.
multiAgent := chainagent.New("pipeline",
    chainagent.WithSubAgents([]agent.Agent{agent1, agent2}))

// Execute with the same Runner.
r := runner.NewRunner("multi-app", multiAgent)

๐Ÿ“Š Event Processing

Completion Semantics

Runner uses a few related but different completion signals:

  • Done=true: the current event itself is complete. This can appear on final assistant messages, tool responses, graph events, and runner completion events.
  • runner.completion / event.IsRunnerCompletion(): the entire Runner.Run() call has finished. This is the recommended condition for stopping consumption of eventChan.

Event Types

import "trpc.group/trpc-go/trpc-agent-go/event"

for event := range eventChan {
    // Error event.
    if event.Error != nil {
        fmt.Printf("Error: %s\n", event.Error.Message)
        continue
    }

    // Streaming content.
    if len(event.Response.Choices) > 0 {
        choice := event.Response.Choices[0]
        fmt.Print(choice.Delta.Content)
    }

    // Tool invocation.
    if len(event.Response.Choices) > 0 && len(event.Response.Choices[0].Message.ToolCalls) > 0 {
        for _, toolCall := range event.Response.Choices[0].Message.ToolCalls {
            fmt.Printf("Call tool: %s\n", toolCall.Function.Name)
        }
    }

    // Entire Runner run finished.
    if event.IsRunnerCompletion() {
        break
    }
}

Complete Event Handling Example

import (
    "fmt"
    "strings"
)

func processEvents(eventChan <-chan *event.Event) error {
    var fullResponse strings.Builder

    for event := range eventChan {
        // Handle errors.
        if event.Error != nil {
            return fmt.Errorf("Event error: %w", event.Error)
        }

        // Handle tool calls.
        if len(event.Response.Choices) > 0 && len(event.Response.Choices[0].Message.ToolCalls) > 0 {
            fmt.Println("๐Ÿ”ง Tool Call:")
            for _, toolCall := range event.Response.Choices[0].Message.ToolCalls {
                fmt.Printf("  โ€ข %s (ID: %s)\n",
                    toolCall.Function.Name, toolCall.ID)
                fmt.Printf("    Params: %s\n",
                    string(toolCall.Function.Arguments))
            }
        }

        // Handle tool responses.
        if event.Response != nil {
            for _, choice := range event.Response.Choices {
                if choice.Message.Role == model.RoleTool {
                    fmt.Printf("โœ… Tool Response (ID: %s): %s\n",
                        choice.Message.ToolID, choice.Message.Content)
                }
            }
        }

        // Handle streaming content.
        if len(event.Response.Choices) > 0 {
            content := event.Response.Choices[0].Delta.Content
            if content != "" {
                fmt.Print(content)
                fullResponse.WriteString(content)
            }
        }

        if event.IsRunnerCompletion() {
            fmt.Println() // New line.
            break
        }
    }

    return nil
}

๐Ÿ”ฎ Execution Context Management

Runner creates and manages the Invocation structure:

// The Invocation created by Runner contains the following fields.
invocation := agent.NewInvocation(
    agent.WithInvocationAgent(r.agent),                               // Agent instance.
    agent.WithInvocationSession(&session.Session{ID: "session-001"}), // Session object.
    agent.WithInvocationEndInvocation(false),                         // End flag.
    agent.WithInvocationMessage(model.NewUserMessage("User input")),  // User message.
    agent.WithInvocationRunOptions(ro),                               // Run options.
)
// Note: Invocation also includes other fields such as AgentName, Branch, Model,
// TransferInfo, AgentCallbacks, ModelCallbacks, ToolCallbacks, etc.,
// but these fields are used and managed internally by the Agent.

โœ… Best Practices

Error Handling

// Handle errors from Runner.Run.
eventChan, err := r.Run(ctx, userID, sessionID, message, agent.WithRequestID("request-ID"))
if err != nil {
    log.Printf("Runner execution failed: %v", err)
    return err
}

// Handle errors in the event stream.
for event := range eventChan {
    if event.Error != nil {
        log.Printf("Event error: %s", event.Error.Message)
        continue
    }
    // Handle normal events.
}

Stopping a Run Safely

When you call Runner.Run, the framework starts goroutines that keep producing events until the run ends.

There are two different โ€œstopsโ€ people often confuse:

  1. Stopping your reader loop (your code stops reading events)
  2. Stopping the run (the agent stops calling models/tools and exits)

If you only stop reading but the run is still active, the agent goroutine may block trying to write to the event channel. This can lead to goroutine leaks and โ€œstuckโ€ runs.

The safe pattern is always:

  1. Trigger cancellation (ctx cancel / requestID cancel / StopError)
  2. Keep draining the event channel until it is closed

Option A: Ctrl+C (terminal programs)

In a CLI or local demo, a common approach is to translate Ctrl+C into context cancellation:

ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()

eventCh, err := r.Run(ctx, userID, sessionID, message)
if err != nil {
    return err
}

for range eventCh {
    // Drain until the run stops (ctx canceled or run completed).
}

Wrap Runner.Run with context.WithCancel and call cancel() when you decide to stop (for example, max turns, token budget, user clicked โ€œStopโ€, etc.).

llmflow treats context.Canceled as a graceful exit and closes the agent event channel, so the runner loop can finish cleanly without blocking writers.

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventCh, err := r.Run(ctx, userID, sessionID, message)
if err != nil {
    return err
}

turns := 0
for evt := range eventCh {
    if evt.Error != nil {
        log.Printf("event error: %s", evt.Error.Message)
        continue
    }
    // ... handle evt ...
    if evt.IsRunnerCompletion() {
        break
    }
    turns++
    if turns >= maxTurns {
        cancel() // stop further model/tool calls.
    }
}

If you need to return early (for example, your HTTP handler timed out) but still want to avoid blocking writers, you can drain in a separate goroutine:

1
2
3
4
5
6
go func() {
    for range eventCh {
    }
}()
cancel()
return nil

Option C: Cancel by requestID (ManagedRunner)

In server scenarios, you often want to cancel a run from a different goroutine or even a different request. For that, use a request identifier (requestID).

  1. Generate a requestID and pass it into Run via agent.WithRequestID.
  2. Type-assert the runner to runner.ManagedRunner.
  3. Call Cancel(requestID).
requestID := "req-123"

eventCh, err := r.Run(
    ctx,
    userID,
    sessionID,
    message,
    agent.WithRequestID(requestID),
)
if err != nil {
    return err
}

mr := r.(runner.ManagedRunner)
_ = mr.Cancel(requestID)

for range eventCh {
}

Option D: Stop from inside the run (StopError)

Sometimes the best place to decide โ€œstop nowโ€ is inside a tool, callback, or processor (for example, policy checks, budget limits, or user-defined rules).

Return agent.NewStopError("reason") (or wrap it with other errors). llmflow converts it into a stop_agent_error event and stops the flow.

Still prefer context deadlines (WithTimeout, WithMaxRunDuration) for hard cutoffs.

Common mistakes

  • Breaking the event-loop reader without cancellation: the run may keep going and block on channel writes.
  • Using context.Background() everywhere: you cannot stop a run if you have no way to cancel.
  • Writing tools that ignore ctx: cancellation is cooperative; long-running tools should check ctx.Done() or pass ctx into network/DB requests.

See runnable demos:

  • examples/cancelrun (cancel via Enter/Ctrl+C, drain events)
  • examples/managedrunner (requestID cancel, detached cancel, max duration)

Resource Management

๐Ÿ”’ Closing Runner (Important)

You MUST call Close() when the Runner is no longer needed to prevent goroutine leaks(trpc-agent-go >= v0.5.0).

Runner Only Closes Resources It Created

When a Runner is created without providing a Session Service, it automatically creates a default inmemory Session Service. This service starts background goroutines internally (for asynchronous summary processing, TTL-based session cleanup, etc.). Runner only manages the lifecycle of this self-created inmemory Session Service. If you provide your own Session Service via WithSessionService(), you are responsible for managing its lifecycleโ€”Runner won't close it.

If you don't call Close() on a Runner that owns an inmemory Session Service, the background goroutines will run forever, causing resource leaks.

Recommended Practice:

// โœ… Recommended: Use defer to ensure cleanup
r := runner.NewRunner("my-app", agent)
defer r.Close()  // Ensure cleanup on function exit (trpc-agent-go >= v0.5.0)

// Use the runner
eventChan, err := r.Run(ctx, userID, sessionID, message)
if err != nil {
    return err
}

for event := range eventChan {
    // Process events
    if event.IsRunnerCompletion() {
        break
    }
}

When You Provide Your Own Session Service:

// You create and manage the session service lifecycle
sessionService := redis.NewService(redis.WithRedisClientURL("redis://localhost:6379"))
defer sessionService.Close()  // YOU are responsible for closing it

// Runner uses but doesn't own this session service
r := runner.NewRunner("my-app", agent,
    runner.WithSessionService(sessionService))
defer r.Close()  // This will NOT close sessionService (you provided it) (trpc-agent-go >= v0.5.0)

// ... use the runner

Long-Running Services:

type Service struct {
    runner runner.Runner
    sessionService session.Service  // If you manage it yourself
}

func NewService() *Service {
    r := runner.NewRunner("my-app", agent)
    return &Service{runner: r}
}

func (s *Service) Start() error {
    // Service startup logic
    return nil
}

// Call Close when shutting down the service
func (s *Service) Stop() error {
    // Close runner (which closes its owned inmemory session service)
    // trpc-agent-go >= v0.5.0
    if err := s.runner.Close(); err != nil {
        return err
    }

    // If you provided your own session service, close it here
    if s.sessionService != nil {
        return s.sessionService.Close()
    }

    return nil
}

Important Notes:

  • โœ… Close() is idempotent; calling it multiple times is safe
  • โœ… Runner only closes the inmemory Session Service it creates by default
  • โœ… If you provide your own Session Service via WithSessionService(), Runner won't close it (you manage it yourself)
  • โŒ Not calling Close() when Runner owns an inmemory Session Service will cause goroutine leaks

Context Lifecycle Control

// Use context to control the lifecycle of a single run
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Ensure all events are consumed
eventChan, err := r.Run(ctx, userID, sessionID, message)
if err != nil {
    return err
}

for event := range eventChan {
    // Process events
    if event.IsRunnerCompletion() {
        break
    }
}

Health Check

import (
    "context"
    "fmt"

    "trpc.group/trpc-go/trpc-agent-go/model"
    "trpc.group/trpc-go/trpc-agent-go/runner"
)

// Check whether Runner works properly.
func checkRunner(r runner.Runner, ctx context.Context) error {
    testMessage := model.NewUserMessage("test")
    eventChan, err := r.Run(ctx, "test-user", "test-session", testMessage)
    if err != nil {
        return fmt.Errorf("Runner.Run failed: %v", err)
    }

    // Check the event stream.
    for event := range eventChan {
        if event.Error != nil {
            return fmt.Errorf("Received error event: %s", event.Error.Message)
        }
        if event.IsRunnerCompletion() {
            break
        }
    }

    return nil
}

๐Ÿ“ Summary

The Runner component is a core part of the tRPC-Agent-Go framework, providing complete conversation management and Agent orchestration capabilities. By properly using session management, tool integration, and event handling, you can build powerful intelligent conversational applications.