Graph Package Guide
Overview
Graph combines controllable workflow orchestration with extensible agent capabilities. It is suitable for:
- Type-safe state management and predictable routing.
- LLM decision making, tool-calling loops, and optional Human in the Loop (HITL).
- Reusable components that can run standalone or be composed as sub‑agents.
Highlights:
- Schema‑driven State and Reducers to avoid data races when concurrent branches write the same field.
- Deterministic parallelism with BSP style (Plan / Execute / Update).
- Built‑in node types wrap LLM, Tools, and Agent to reduce boilerplate.
- Streaming events, checkpoints, and interrupts for observability and recovery.
- Node‑level retry/backoff with exponential delay and jitter, plus executor‑level defaults and rich retry metadata in events.
Quick Start
Minimal Workflow
Below is a classic “prepare → ask LLM → optionally call tools” loop using graph.MessagesStateSchema() (predefines graph.StateKeyMessages, graph.StateKeyUserInput, graph.StateKeyLastResponse, etc.).
flowchart LR
START([start]):::startNode --> P[prepare]:::processNode
P --> A[ask LLM]:::llmNode
A -. tool_calls .-> T[tools]:::toolNode
A -- no tool_calls --> F[fallback]:::processNode
T --> A
F --> END([finish]):::endNode
classDef startNode fill:#e1f5e1,stroke:#4caf50,stroke-width:2px
classDef endNode fill:#ffe1e1,stroke:#f44336,stroke-width:2px
classDef llmNode fill:#e3f2fd,stroke:#2196f3,stroke-width:2px
classDef toolNode fill:#fff3e0,stroke:#ff9800,stroke-width:2px
classDef processNode fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
The Graph package allows you to model complex AI workflows as directed graphs, where nodes represent processing steps and edges represent data flow and control flow. It is particularly suitable for building AI applications that require conditional routing, state management, and multi-step processing.
Usage Pattern
The usage of the Graph package follows this pattern:
- Create Graph: Use
StateGraphbuilder to define workflow structure - Create GraphAgent: Wrap the compiled Graph as an Agent
- Create Runner: Use Runner to manage sessions and execution environment
- Execute Workflow: Execute workflow through Runner and handle results
This pattern provides:
- Type Safety: Ensures data consistency through state schema
- Session Management: Supports concurrent execution for multiple users and sessions
- Event Stream: Real-time monitoring of workflow execution progress
- Error Handling: Unified error handling and recovery mechanisms
Agent Integration
GraphAgent implements the agent.Agent interface and can:
- Act as Independent Agent: Execute directly through Runner
- Act as SubAgent: Be used as a sub-agent by other Agents (such as LLMAgent)
- Host SubAgents: Register child agents via
graphagent.WithSubAgentsand invoke them throughAddAgentNode
This design lets GraphAgent plug into other agents while orchestrating its own specialized sub-agents.
Key Features
- Type-safe state management: Use Schema to define state structure, support custom Reducers
- Conditional routing: Dynamically select execution paths based on state
- LLM node integration: Built-in support for large language models
- Tool nodes: Support function calls and external tool integration
- Agent nodes: Delegate parts of the workflow to registered sub-agents
- Streaming execution: Support real-time event streams and progress tracking
- Concurrency safety: Thread-safe graph execution
- Checkpoint-based Time Travel: Navigate through execution history and restore previous states
- Human-in-the-Loop (HITL): Support for interactive workflows with interrupt and resume capabilities
- Atomic checkpointing: Atomic storage of checkpoints with pending writes for reliable recovery
- Checkpoint Lineage: Track related checkpoints forming execution threads with parent-child relationships
Core Concepts
1. Graph
A graph is the core structure of a workflow, consisting of nodes and edges:
Virtual Nodes:
Start: Virtual start node, automatically connected throughSetEntryPoint()End: Virtual end node, automatically connected throughSetFinishPoint()- These nodes don't need to be explicitly created, the system automatically handles connections
2. Node
A node represents a processing step in the workflow:
3. State
State is a data container passed between nodes:
Built-in State Keys:
The Graph package provides some built-in state keys, mainly for internal system communication:
User-accessible Built-in Keys:
StateKeyUserInput: User input (one-shot, cleared after consumption, persisted by LLM nodes)StateKeyOneShotMessages: One-shot messages (complete override for current round, cleared after consumption)StateKeyLastResponse: Last response (used to set final output, Executor reads this value as result)StateKeyMessages: Message history (durable, supports append + MessageOp patch operations)StateKeyNodeResponses: Per-node responses map. Key is node ID, value is the node's final textual response. UseStateKeyLastResponsefor the final serial output; when multiple parallel nodes converge, read each node's output fromStateKeyNodeResponses.StateKeyMetadata: Metadata (general metadata storage available to users)
System Internal Keys (users should not use directly):
StateKeySession: Session information (automatically set by GraphAgent)StateKeyExecContext: Execution context (automatically set by Executor)StateKeyToolCallbacks: Tool callbacks (automatically set by Executor)StateKeyModelCallbacks: Model callbacks (automatically set by Executor)
Users should use custom state keys to store business data, and only use user-accessible built-in state keys when necessary.
4. State Schema
State schema defines the structure and behavior of state:
Usage Guide
Node I/O Conventions
Nodes communicate exclusively through the shared state. Each node returns a state delta which is merged into the graph state using the schema’s reducers. Downstream nodes read whatever upstream nodes wrote.
-
Common built‑in keys (user‑facing)
user_input: One‑shot input for the next LLM/Agent node. Cleared after consumption.one_shot_messages: Full message override for the next LLM call. Cleared after consumption.messages: Durable conversation history (LLM/Tools append here). Supports MessageOp patches.last_response: The last textual assistant response.node_responses: Map[nodeID]any — per‑node final textual response. Uselast_responsefor the most recent.
-
Function node
- Input: the entire state
- Output: return a
graph.Statedelta with custom keys (declare them in the schema), e.g.{"parsed_time": "..."}
-
LLM node
- Input priority:
one_shot_messages→user_input→messages - Output:
- Appends assistant message to
messages - Sets
last_response - Sets
node_responses[<llm_node_id>]
- Appends assistant message to
- Input priority:
-
Tools node
- Input: scans
messagesfor the latest assistant message withtool_calls - Output: appends tool responses to
messages
- Input: scans
- Agent node (sub‑agent)
- Input: state is injected into the sub‑agent’s
Invocation.RunOptions.RuntimeState.- Model/Tool callbacks can access it via
agent.InvocationFromContext(ctx).
- Model/Tool callbacks can access it via
- Output on finish:
- Sets
last_response - Sets
node_responses[<agent_node_id>] - Clears
user_input
- Sets
- Input: state is injected into the sub‑agent’s
Recommended patterns
- Add your own keys in the schema (e.g.,
parsed_time,final_payload) and write/read them in function nodes. - To feed structured hints into an LLM node, write
one_shot_messagesin the previous node (e.g., prepend a system message with parsed context). - To consume an upstream node’s text, read
last_responseimmediately downstream or fetch fromnode_responses[that_node_id]later.
See examples:
examples/graph/io_conventions— Function + LLM + Agent I/Oexamples/graph/io_conventions_tools— Adds a Tools node path and shows how to capture tool JSONexamples/graph/retry— Node-level retry/backoff demonstration
Constant references (import and keys)
- Import:
import "trpc.group/trpc-go/trpc-agent-go/graph" - Defined in:
graph/state.go
-
User‑facing keys
user_input→graph.StateKeyUserInputone_shot_messages→graph.StateKeyOneShotMessagesmessages→graph.StateKeyMessageslast_response→graph.StateKeyLastResponsenode_responses→graph.StateKeyNodeResponses
- Other useful keys
session→graph.StateKeySessionmetadata→graph.StateKeyMetadatacurrent_node_id→graph.StateKeyCurrentNodeIDexec_context→graph.StateKeyExecContexttool_callbacks→graph.StateKeyToolCallbacksmodel_callbacks→graph.StateKeyModelCallbacksagent_callbacks→graph.StateKeyAgentCallbacksparent_agent→graph.StateKeyParentAgent
Snippet:
Event metadata keys (StateDelta)
- Import:
import "trpc.group/trpc-go/trpc-agent-go/graph" - Defined in:
graph/events.go
- Model metadata:
_model_metadata→graph.MetadataKeyModel(structgraph.ModelExecutionMetadata) - Tool metadata:
_tool_metadata→graph.MetadataKeyTool(structgraph.ToolExecutionMetadata) - Node metadata:
_node_metadata→graph.MetadataKeyNode(structgraph.NodeExecutionMetadata). Includes retry info:Attempt,MaxAttempts,NextDelay,Retryingand timing fields.
Snippet:
1. Creating GraphAgent and Runner
Users mainly use the Graph package by creating GraphAgent and then using it through Runner. This is the recommended usage pattern:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | |
2. Using LLM Nodes
LLM nodes implement a fixed three-stage input rule without extra configuration:
- OneShot first: If
one_shot_messagesexists, use it as the input for this round. - UserInput next: Otherwise, if
user_inputexists, persist once to history. - History default: Otherwise, use durable
messagesas input.
Important notes:
- System prompt is only used for this round and is not persisted to state.
- One-shot keys (
user_input/one_shot_messages) are automatically cleared after successful execution. - All state updates are atomic.
- GraphAgent/Runner only sets
user_inputand no longer pre-populatesmessageswith a user message. This allows any pre-LLM node to modifyuser_inputand have it take effect in the same round.
Three input paradigms
-
OneShot (
StateKeyOneShotMessages):- When present, only the provided
[]model.Messageis used for this round, typically including a full system prompt and user prompt. Automatically cleared afterwards. - Use case: a dedicated pre-node constructs the full prompt and must fully override input.
- When present, only the provided
-
UserInput (
StateKeyUserInput):- When non-empty, the LLM node uses durable
messagesplus this round's user input to call the model. After the call, it writes the user input and assistant reply tomessagesusingMessageOp(e.g.,AppendMessages,ReplaceLastUser) atomically, and clearsuser_inputto avoid repeated appends. - Use case: conversational flows where pre-nodes may adjust user input.
- When non-empty, the LLM node uses durable
- Messages only (just
StateKeyMessages):- Common in tool-call loops. After the first round via
user_input, routing to tools and back to LLM, sinceuser_inputis cleared, the LLM uses onlymessages(history). The tail is often atoolresponse, enabling the model to continue reasoning based on tool outputs.
- Common in tool-call loops. After the first round via
Atomic updates with Reducer and MessageOp
The Graph package supports MessageOp patch operations (e.g., ReplaceLastUser,
AppendMessages) on message state via MessageReducer to achieve atomic merges. Benefits:
- Pre-LLM nodes can modify
user_input. The LLM node returns a single state delta with the needed patch operations (replace last user message, append assistant message) for one-shot, race-free persistence. - Backwards compatible with appending
[]Message, while providing more expressive updates for complex cases.
Example: modify user_input in a pre-node before entering the LLM node.
3. GraphAgent Configuration Options
GraphAgent supports various configuration options:
Model/tool callbacks are configured per node, e.g.
AddLLMNode(..., graph.WithModelCallbacks(...))orAddToolsNode(..., graph.WithToolCallbacks(...)).
Once sub-agents are registered you can delegate within the graph via agent nodes:
The agent node uses its ID for the lookup, so keep
AddAgentNode("assistant")aligned withsubAgent.Info().Name == "assistant".
4. Conditional Routing
4.1 Named Ends (Per‑node Ends)
When a node produces business outcomes (e.g., approve/reject/manual_review) and you want to route by those semantic labels, declare node‑local Named Ends (Ends).
Why this helps:
- Central, declarative mapping from labels to concrete targets at the node site.
- Compile‑time validation:
Compile()verifies every end target exists (or is the specialgraph.End). - Unified routing: reused by both
Command.GoToand conditional edges. - Decoupling: nodes express outcomes in business terms; the mapping ties outcomes to graph structure.
API:
Command‑style routing (Command.GoTo):
Conditional edges can reuse Ends: when AddConditionalEdges(from, condition, pathMap) receives a nil pathMap or no match is found, the executor tries the node’s Ends; if still no match, the return string is treated as a concrete node ID.
Resolution precedence:
- Explicit mapping in the conditional edge’s
pathMap. - The node’s Ends mapping (label → concrete target).
- Treat the return string as a node ID.
Compile‑time checks:
WithEndsMap/WithEndstargets are validated inCompile().- Targets must exist in the graph or be the special constant
graph.End.
Notes:
- Use the constant
graph.Endto terminate; do not use the string "END". - With
Command.GoTo, you don’t need to add a staticAddEdge(from, to)for the target; ensure the target exists and setSetFinishPoint(target)if it should end the graph.
Runnable example: examples/graph/multiends.
4.2 Multi‑conditional Fan‑out
Sometimes a single decision needs to spawn multiple branches in parallel for independent processing (e.g., route to both summarization and tagging).
API:
Notes:
- Results are de‑duplicated before triggering; repeated keys do not trigger a target more than once in the same step.
- Resolution precedence for each branch key mirrors single‑conditional routing:
- explicit
pathMap; 2) node’s Ends; 3) treat as node ID.
- explicit
- Visualization: when
pathMapis omitted, DOT falls back to the node’s Ends mapping to render dashed conditional edges.
5. Tool Node Integration
Enable parallel tool execution for the Tools node (aligns with LLMAgent’s option):
Tool-call pairing and second entry into LLM:
- Scan
messagesbackward from the tail to find the most recentassistant(tool_calls); stop atuserto ensure correct pairing. - When returning from tools to the LLM node, since
user_inputis cleared, the LLM follows the “Messages only” branch and continues based on the tool response in history.
Placeholder Variables in LLM Instructions
LLM nodes support placeholder injection in their instruction string (same rules as LLMAgent). Both native {key} and Mustache {{key}} syntaxes are accepted (Mustache is normalized to the native form automatically):
{key}/{{key}}→ replaced bysession.State["key"]{key?}/{{key?}}→ optional; missing values become empty{user:subkey},{app:subkey},{temp:subkey}(and their Mustache forms) → access user/app/temp scopes (session services merge app/user state into session with these prefixes)
Notes:
- GraphAgent writes the current
*session.Sessioninto graph state underStateKeySession; the LLM node reads values from there - Unprefixed keys (e.g.,
research_topics) must be present directly insession.State
Example:
See the runnable example: examples/graph/placeholder.
Injecting retrieval output and user input
- Upstream nodes can place ephemeral values into the session's
temp:namespace so the LLM instruction can read them with placeholders. - Pattern:
Example: examples/graph/retrieval_placeholder.
Best practices for placeholders and session state
- Ephemeral vs persistent: write per‑turn values to
temp:*onsession.State(session state). Persistent configuration should go throughSessionServicewithuser:*/app:*. - Why direct write is OK: LLM nodes expand placeholders from the session object present in graph state; see graph/state_graph.go. GraphAgent puts the session into state; see agent/graphagent/graph_agent.go.
- Service guardrails: the in‑memory service intentionally disallows writing
temp:*(andapp:*via user updater); see session/inmemory/service.go. - Concurrency: when multiple branches run in parallel, avoid multiple nodes mutating the same
session.Statekeys. Prefer composing in a single node before the LLM, or store intermediate values in graph state then write once totemp:*. - Observability: if you want parts of the prompt to appear in completion events, also store a compact summary in graph state (e.g., under
metadata). The final event serializes non‑internal final state; see graph/events.go.
6. Node Retry & Backoff
Configure per‑node retry with exponential backoff and optional jitter. Failed attempts do not produce writes; only a successful attempt applies its state delta and routing.
- Per‑node policy via
WithRetryPolicy:
- Default policy via Executor (applies when a node has none):
Notes
- Interrupts are never retried.
- Backoff delay is clamped by the current step deadline when set (
WithStepTimeout). - Events carry retry metadata so UIs/CLIs can display progress:
Example: examples/graph/retry shows an unstable node that retries before a final LLM answer.
7. Runner Configuration
Runner provides session management and execution environment:
8. Message State Schema
For conversational applications, you can use predefined message state schema:
9. State Key Usage Scenarios
User-defined State Keys: Used to store business logic data.
Built-in State Keys: Used for system integration.
Advanced Features
1. Interrupt and Resume (Human-in-the-Loop)
The Graph package supports human-in-the-loop (HITL) workflows through interrupt and resume functionality. This enables workflows to pause execution, wait for human input or approval, and then resume from the exact point where they were interrupted.
Basic Usage
Turn the diagram into a runnable workflow:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |
The example shows how to declare nodes, connect edges, and run. Next, we’ll cover execution with GraphAgent + Runner, then core concepts and common practices.
Execution
- Wrap the compiled graph with
graphagent.New(as a genericagent.Agent) and hand it torunner.Runnerto manage sessions and streaming events.
Minimal GraphAgent + Runner:
Session backends:
- In-memory:
session/inmemory(used by examples) - Redis:
session/redis(more common in production)
GraphAgent Options
Core Concepts
State Management
GraphAgent uses a Schema + Reducer model to manage state. You first define the state shape and merge rules; later nodes have clear expectations about the origin and lifecycle of keys they read/write.
Built‑in Schema
Custom Schema
Reducers ensure fields are merged safely per predefined rules, which is critical under concurrent execution.
Tip: define constants for business keys to avoid scattered magic strings.
Node Types
GraphAgent provides four built‑in node types:
Function Node
The most basic node, for custom logic:
LLM Node
Integrates an LLM and auto‑manages conversation history:
Node Cache
Enable caching for pure function-like nodes to avoid repeated computation.
- Graph-level settings:
WithCache(cache Cache)sets the cache backend (an in-memory implementation is provided for testing)WithCachePolicy(policy *CachePolicy)sets the default cache policy (key function + Time To Live, TTL)
- Node-level override:
WithNodeCachePolicy(policy *CachePolicy) - Clear by nodes:
ClearCache(nodes ...string)
References:
- Graph accessors and setters: graph/graph.go
- Defaults and in-memory backend:
- Interface/policy + canonical JSON + SHA‑256: graph/cache.go
- In-memory cache with read-write lock and deep copy: graph/cache.go
- Executor:
- Try Get before executing a node; on hit, skip the node function and only run callbacks + writes: graph/executor.go
- Persist Set after successful execution: graph/executor.go
- Attach
_cache_hitflag on node.complete events: graph/executor.go
Minimal usage:
Advanced usage:
- Field-based keys (recommended)
- Custom selector (for complex projections)
- Versioned namespace (avoid stale cache across versions)
- Per-node TTL (Time To Live)
- Clear cache (per node)
- Read cache-hit marker (
_cache_hit)
Advanced usage:
- Field-based keys (recommended): declare
WithCacheKeyFields("n", "user_id")on a node; internally this maps the sanitized input to{n, user_id}before default canonicalization and hashing. - Custom selector:
WithCacheKeySelector(func(m map[string]any) any { return map[string]any{"n": m["n"], "uid": m["uid"]} }) - Versioned namespace:
WithGraphVersion("v2025.03")expands the namespace to__writes__:<version>:<node>, reducing stale cache collisions across code changes.
Notes:
- Prefer caching only pure functions (no side effects)
- TTL=0 means no expiration; consider a persistent backend (Redis/SQLite) in production
- Key function sanitizes input to avoid volatile/non-serializable fields being part of the key: graph/cache_key.go
- Call
ClearCache("nodeID")after code changes or include a function identifier/version in the key
Runner + GraphAgent usage example:
Example:
- Interactive + Runner + GraphAgent: examples/graph/nodecache/main.go
Tools Node
Executes tool calls in sequence:
Reading Tool Results into State
After a tools node, add a function node to collect tool outputs from graph.StateKeyMessages and write a structured result into state:
Reference example: examples/graph/io_conventions_tools.
Edges and Routing
Edges define control flow between nodes:
Tip: setting entry and finish points implicitly connects to virtual Start/End nodes:
SetEntryPoint("first")is equivalent to Start → first.SetFinishPoint("last")is equivalent to last → End. There’s no need to add these two edges explicitly.
Constants: graph.Start == "__start__", graph.End == "__end__".
Command Mode (Dynamic Routing / Fan‑out)
Nodes can return graph.State, or *graph.Command / []*graph.Command to update state and direct the next hop:
When using command‑based routing, you don’t need static edges to GoTo targets; just ensure the target nodes exist and call SetFinishPoint where appropriate.
Architecture
Overall Architecture
GraphAgent’s architecture manages complexity via clear layering. Each layer has a well‑defined responsibility and communicates through standard interfaces.
flowchart TB
subgraph "Runner Layer"
R[Runner]:::runnerClass
S[Session Service]:::sessionClass
end
subgraph "GraphAgent"
GA[GraphAgent Wrapper]:::agentClass
CB[Callbacks]:::callbackClass
end
subgraph "Graph Engine"
SG[StateGraph Builder]:::builderClass
G[Graph]:::graphClass
E[Executor]:::executorClass
end
subgraph "Execution Components"
P[Planning]:::phaseClass
EX[Execution]:::phaseClass
U[Update]:::phaseClass
end
subgraph "Storage"
CP[Checkpoint]:::storageClass
ST[State Store]:::storageClass
end
R --> GA
GA --> G
G --> E
E --> P
E --> EX
E --> U
E --> CP
classDef runnerClass fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef sessionClass fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef agentClass fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef callbackClass fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef builderClass fill:#fff8e1,stroke:#f57c00,stroke-width:2px
classDef graphClass fill:#f1f8e9,stroke:#689f38,stroke-width:2px
classDef executorClass fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phaseClass fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef storageClass fill:#efebe9,stroke:#5d4037,stroke-width:2px
Core Modules
Overview of core components:
graph/state_graph.go — StateGraph builder
Provides a fluent, declarative Go API to build graphs via method chaining (AddNode → AddEdge → Compile) covering nodes, edges, and conditional routing.
graph/graph.go — Compiled runtime
Implements channel‑based, event‑triggered execution. Node results merge into State; channels are used to drive routing and carry sentinel values (not business data).
graph/executor.go — BSP executor
Heart of the system, inspired by Google’s Pregel. Implements BSP (Bulk Synchronous Parallel) supersteps: Planning → Execution → Update.
graph/checkpoint/* — Checkpoints and recovery
Optional checkpoint persistence (e.g., sqlite). Atomically saves state and pending writes; supports lineage/checkpoint‑based recovery.
agent/graphagent/graph_agent.go — Bridge between Graph and Agent
Adapts a compiled Graph into a generic Agent, reusing sessions, callbacks, and streaming.
Execution Model
GraphAgent adapts Pregel’s BSP (Bulk Synchronous Parallel) to a single‑process runtime and adds checkpoints, HITL interrupts/resumes, and time travel:
sequenceDiagram
autonumber
participant R as Runner
participant GA as GraphAgent
participant EX as Executor
participant CK as Checkpoint Saver
participant DB as Storage
participant H as Human
R->>GA: Run(invocation)
GA->>EX: Execute(graph, state, options)
GA-->>R: Stream node/tool/model events
loop Each superstep (BSP)
EX->>EX: Planning — compute frontier
par Parallel node execution
EX->>EX: Run node i (shallow state copy)
EX-->>GA: node-start event (author=nodeID)
and
EX->>EX: Run node j (shallow state copy)
EX-->>GA: node-start event
end
alt Node triggers Interrupt(key,prompt)
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen,reason=interrupt)
CK->>DB: atomic commit
EX-->>GA: interrupt event(checkpoint_id,prompt)
GA-->>R: propagate + pause
R->>H: ask for input/approval
H-->>R: provide decision/value
R->>GA: Run(resume) runtime_state{
R->>GA: checkpoint_id,resume_map}
GA->>EX: ResumeFromCheckpoint(checkpoint_id,resume_map)
EX->>CK: Load checkpoint
CK->>EX: state/frontier/pending_writes/versions_seen
EX->>EX: rebuild frontier and apply resume values
else Normal
EX-->>GA: node-complete events (incl. tool/model)
EX->>EX: Update — merge via reducers
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen)
CK->>DB: atomic commit
end
end
Note over EX,CK: versions_seen prevents re-execution
Note over EX,CK: pending_writes rebuilds channels
Note over EX,CK: parent_id forms lineage for time travel
opt Time travel (rewind/branch)
R->>GA: Run(runtime_state{checkpoint_id})
GA->>EX: ResumeFromCheckpoint(checkpoint_id)
EX->>CK: Load checkpoint + lineage
CK->>EX: Restore state; may create new lineage_id
end
EX-->>GA: done event (last_response)
GA-->>R: final output
flowchart TB
%% Execution panorama (compact wiring)
subgraph Client
R[Runner]:::runner --> GA[GraphAgent]:::agent
end
subgraph Engine[Graph Engine]
GA --> EX[Executor]:::executor
subgraph BSP["BSP Superstep"]
P[Planning]:::phase --> X[Execution]:::phase --> U[Update]:::phase
end
end
N[Nodes: LLM / Tools / Function / Agent]:::process
CK[(Checkpoint)]:::storage
H[Human]:::human
EX --> BSP
EX --> N
EX -.-> CK
GA <--> H
GA --> R
classDef runner fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef agent fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef executor fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phase fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
classDef storage fill:#efebe9,stroke:#6d4c41,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
Key points:
- Planning: determine runnable nodes from the channel frontier.
- Execution: each node gets a shallow state copy (maps.Copy) and runs in parallel.
- Update: reducers merge updates safely for concurrency.
This design enables per‑step observability and safe interruption/recovery.
Runtime Isolation and Event Snapshots
- The Executor is reusable and concurrency‑safe. Per‑run state lives in
ExecutionContext(channel versions, pending writes, last checkpoint, etc.). - Each event’s
StateDeltais a deep‑copy snapshot containing only serializable and allowed keys; internal keys (execution context, callbacks, etc.) are filtered out for external telemetry and persistence.
Executor Configuration
Defaults and Notes
-
Defaults (Executor)
ChannelBufferSize = 256,MaxSteps = 100,CheckpointSaveTimeout = 10s- Per‑step/node timeouts are available on
ExecutorviaWithStepTimeout/WithNodeTimeout(not exposed byGraphAgentoptions yet)
- Sessions
- Prefer Redis session backend in production; set TTLs and cleanup
- Runner seeds multi‑turn
graph.StateKeyMessagesfrom session events automatically
-
Checkpoints
- Use stable
namespacenames (e.g.,svc:prod:flowX); audit and clean up by lineage viaCheckpointManager
- Use stable
-
Events/backpressure
- Tune
WithChannelBufferSize; filter events byauthor/objectto reduce noise
- Tune
-
Naming and keys
- Use constants for node IDs, route labels, and state keys; define reducers for non‑trivial merges
- Governance
- Insert HITL on critical paths; prefer storing sensitive details under
graph.StateKeyMetadatarather thangraph.StateKeyMessages
Integrating with Multi‑Agent Systems
GraphAgent is designed to be part of the tRPC‑Agent‑Go multi‑agent ecosystem, not an island. It implements the standard Agent interface and collaborates with other agent types.
GraphAgent as an Agent
GraphAgent implements the standard Agent interface:
Advanced Orchestration
End‑to‑end business flow: entry normalization → smart routing → multiple pods (Email, Weather, Research) → parallel fan‑out/aggregation → final composition and publish.
flowchart LR
%% Layout
subgraph UE["User & Entry"]
U((User)):::human --> IN["entry<br/>normalize"]:::process
end
subgraph FAB["Graph Orchestration"]
Rtr["where_to_go<br/>router"]:::router
Compose["compose<br/>LLM"]:::llm
end
IN --> Rtr
%% Email Agent (expanded)
subgraph EC["Email Agent"]
direction LR
CE["classifier<br/>LLM"]:::llm --> WE["writer<br/>LLM"]:::llm
end
%% Weather Agent (expanded)
subgraph WA["Weather Agent"]
direction LR
LE["locate<br/>LLM"]:::llm --> WT["weather tool"]:::tool
end
%% Routing from router to pods
Rtr -- email --> CE
Rtr -- weather --> LE
Rtr -- other --> REPLY["reply<br/>LLM"]:::llm
%% Fanout Pipeline (fanout → workers → aggregate)
subgraph FP["Fanout Pipeline"]
direction LR
Fan["plan_fanout"]:::process --> W1["worker A"]:::process
Fan --> W2["worker B"]:::process
Fan --> W3["worker C"]:::process
W1 --> Agg["aggregate"]:::process
W2 --> Agg
W3 --> Agg
end
Rtr -- research --> Fan
%% Human-in-the-loop (optional)
Compose -. review .- HG["human<br/>review"]:::human
%% Compose final (minimal wiring)
Agg --> Compose
WE --> Compose
WT --> Compose
REPLY --> Compose
Compose --> END([END]):::terminal
%% Styles
classDef router fill:#fff7e0,stroke:#f5a623,stroke-width:2px
classDef llm fill:#e3f2fd,stroke:#1e88e5,stroke-width:2px
classDef tool fill:#fff3e0,stroke:#fb8c00,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef terminal fill:#ffebee,stroke:#e53935,stroke-width:2px
Highlights:
where_to_gocan be LLM‑decided or function‑driven (conditional edges).- Fanout Pipeline uses Command GoTo at runtime, then aggregates.
- Optional human review follows aggregation to gate critical output.
- Single checkpoint display at Compose balances clarity and recoverability.
Embedding Agents in a Graph
Inside a graph, you can call existing sub‑agents as nodes. The example below shows how to create sub‑agents, declare the corresponding nodes, and inject them when constructing the GraphAgent.
Passing only results: map last_response to downstream user_input
Scenario: A → B → C as black boxes. Downstream should only consume upstream’s result text as this turn’s input, without pulling full session history.
- Approach 1 (dependency‑free, universally available): add a pre‑node callback to the target Agent node that assigns parent
last_responsetouser_input. Optionally isolate messages.
- Approach 2 (enhanced option, more concise):
Notes: Both approaches ensure B only sees A’s result, and C only sees B’s. The option is more concise when available; the callback is zero‑dependency and works everywhere.
Hybrid Pattern Example
Embed dynamic decision‑making within a structured flow:
Core Mechanics in Depth
State Management: Schema + Reducer
State is a central challenge in graph workflows. We designed a Schema + Reducer mechanism that provides type safety and supports high‑concurrency atomic updates.
flowchart LR
subgraph "State Schema"
MS[messages: MessageList]:::schemaClass
UI[user_input: string]:::schemaClass
LR[last_response: string]:::schemaClass
NR[node_responses: Map]:::schemaClass
end
subgraph "Reducers"
R1[MessageReducer + MessageOp]:::reducerClass
R2[MergeReducer (Map)]:::reducerClass
R3[ReplaceReducer (String)]:::reducerClass
end
subgraph "Node Outputs"
N1[Node 1 Output]:::nodeOutputClass
N2[Node 2 Output]:::nodeOutputClass
N3[Node 3 Output]:::nodeOutputClass
end
N1 --> R1
N2 --> R2
N3 --> R3
R1 --> MS
R2 --> NR
R3 --> LR
classDef schemaClass fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef reducerClass fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef nodeOutputClass fill:#fff8e1,stroke:#f57f17,stroke-width:2px
Graph state is a map[string]any with runtime validation provided by StateSchema. The reducer mechanism ensures safe merging and avoids conflicts under concurrent updates.
Common State Keys
- User‑visible:
graph.StateKeyUserInput,graph.StateKeyOneShotMessages,graph.StateKeyMessages,graph.StateKeyLastResponse,graph.StateKeyNodeResponses,graph.StateKeyMetadata - Internal:
session,exec_context,tool_callbacks,model_callbacks,agent_callbacks,current_node_id,parent_agent - Command/Resume:
__command__,__resume_map__
Constants live in graph/state.go and graph/keys.go. Prefer referencing constants over hard‑coding strings.
Node‑level Callbacks, Tools & Generation Parameters
Per‑node options (see graph/state_graph.go):
graph.WithPreNodeCallback/graph.WithPostNodeCallback/graph.WithNodeErrorCallback- LLM nodes:
graph.WithGenerationConfig,graph.WithModelCallbacks - Tooling:
graph.WithToolCallbacks,graph.WithToolSets(supply ToolSets in addition totools []tool.Tool),graph.WithRefreshToolSetsOnRun(rebuild tools from ToolSets on each run for dynamic sources such as MCP) - Agent nodes:
graph.WithAgentNodeEventCallback
ToolSets in Graphs vs Agents
graph.WithToolSets is a per‑node, compile‑time configuration. It attaches one or more tool.ToolSet instances to a specific LLM node when you build the graph:
Key points:
- Graph structure (including node ToolSets) is immutable after
Compile(). Changing ToolSets requires rebuilding the graph or providing a newGraphAgent. - Runtime‑level ToolSet changes should be handled at the Agent level (for example,
llmagent.AddToolSet,llmagent.RemoveToolSet,llmagent.SetToolSets) or by swapping the underlying Agent used by a graph Agent node.
Additionally, graph.WithName/graph.WithDescription add friendly labels; graph.WithDestinations declares potential dynamic destinations (for static checks/visualization only).
LLM Input Rules: Three‑Stage Design
The LLM input pipeline looks simple but solves common context‑management problems in AI apps.
Built‑in selection logic (no extra config):
- Prefer
graph.StateKeyOneShotMessages: fully override inputs (system/user) for this turn; cleared after execution. - Else use
graph.StateKeyUserInput: append this turn’s user tograph.StateKeyMessages, then atomically write back user+assistant; finally cleargraph.StateKeyUserInput. - Else use
graph.StateKeyMessagesonly: common on tool loops re‑entering LLM (sincegraph.StateKeyUserInputhas been cleared).
The benefit: preprocess nodes can rewrite graph.StateKeyUserInput and take effect in the same turn, while seamlessly integrating with the tool loop (tool_calls → tools → LLM).
Examples (showing the three paths):
Instruction Placeholder Injection
AddLLMNode’s instruction supports placeholders, same syntax as llmagent:
{key}/{key?}: read fromsession.State; optional?yields empty when missing.{user:subkey},{app:subkey},{temp:subkey}: read by namespace.
GraphAgent stores the current *session.Session into state (graph.StateKeySession) and expands placeholders before the LLM call.
Tip: GraphAgent seeds graph.StateKeyMessages from prior session events for multi‑turn continuity. When resuming from a checkpoint, a plain "resume" message is not injected as graph.StateKeyUserInput, preserving the recovered state.
Concurrency and State Safety
When a node has multiple outgoing edges, parallel execution is triggered automatically:
Internally, the executor constructs shallow copies (maps.Copy) per task and merges under a lock, with reducers ensuring safe concurrent updates.
Node I/O Conventions
Nodes communicate only via the shared State. Each node returns a state delta that merges via the Schema’s reducers.
-
Function nodes
- Input: full
State(read keys declared in your schema) - Output: write business keys only (e.g.,
{"parsed_time":"..."}); avoid internal keys
- Input: full
-
LLM nodes
- Input priority:
graph.StateKeyOneShotMessages→graph.StateKeyUserInput→graph.StateKeyMessages - Output: append to
graph.StateKeyMessagesatomically, setgraph.StateKeyLastResponse, setgraph.StateKeyNodeResponses[<llm_node_id>]
- Input priority:
-
Tools nodes
- Read the latest assistant message with
tool_callsfor the current round and append tool responses tograph.StateKeyMessages - Multiple tools execute in the order returned by the LLM
- Read the latest assistant message with
- Agent nodes
- Receive graph
StateviaInvocation.RunOptions.RuntimeState - Output: set
graph.StateKeyLastResponseandgraph.StateKeyNodeResponses[<agent_node_id>];graph.StateKeyUserInputis cleared after execution
- Receive graph
Good practice:
- Sequential reads: consume the immediate upstream text from
graph.StateKeyLastResponse. - Parallel/merge reads: read specific node outputs from
graph.StateKeyNodeResponses[<nodeID>]. - Declare business keys in your schema with suitable reducers to avoid data races.
API Cheat Sheet
-
Build graph
graph.NewStateGraph(schema)→ builderAddNode(id, func, ...opts)/AddLLMNode(id, model, instruction, tools, ...opts)AddToolsNode(id, tools, ...opts)/AddAgentNode(id, ...opts)AddEdge(from, to)/AddConditionalEdges(from, condition, pathMap)AddToolsConditionalEdges(llmNode, toolsNode, fallback)SetEntryPoint(nodeID)/SetFinishPoint(nodeID)/Compile()
-
State keys (user‑visible)
graph.StateKeyUserInput,graph.StateKeyOneShotMessages,graph.StateKeyMessages,graph.StateKeyLastResponse,graph.StateKeyNodeResponses,graph.StateKeyMetadata
-
Per‑node options
- LLM/tools:
graph.WithGenerationConfig,graph.WithModelCallbacksgraph.WithToolCallbacks,graph.WithToolSets
- Callbacks:
graph.WithPreNodeCallback,graph.WithPostNodeCallback,graph.WithNodeErrorCallback
- LLM/tools:
- Execution
graphagent.New(name, compiledGraph, ...opts)→runner.NewRunner(app, agent)→Run(...)
See examples under examples/graph for end‑to‑end patterns (basic/parallel/multi‑turn/interrupts/tools/placeholder).
Visualization (DOT/Image)
Graph can export a Graphviz DOT (Directed Graph Language) description and render images via the dot (Graph Visualization layout engine) executable.
WithDestinationsdraws dotted gray edges for declared dynamic routes (visualization + static checks only; it does not affect runtime).- Conditional edges render as dashed gray edges with branch labels.
- Regular edges render as solid lines.
- Virtual
Start/Endnodes can be shown or hidden via an option.
Example:
API reference:
g.DOT(...)/g.WriteDOT(w, ...)on a compiled*graph.Graphg.RenderImage(ctx, format, outputPath, ...)(e.g.,png/svg)- Options:
WithRankDir(graph.RankDirLR|graph.RankDirTB),WithIncludeDestinations(bool),WithIncludeStartEnd(bool),WithGraphLabel(string)
Full example: examples/graph/visualization
Advanced Features
Checkpoints and Recovery
To support time‑travel and reliable recovery, configure a checkpoint saver on the Executor or GraphAgent. Below uses the SQLite saver to persist checkpoints and resume from a specific checkpoint.
Checkpoint Management
Use the manager to list, query, and delete checkpoints:
Use a stable business identifier for namespace in production (e.g., svc:prod:flowX) for clear auditing.
Events at a Glance
-
Authors
- Node-level: node ID (fallback
graph.AuthorGraphNode) - Pregel phases:
graph.AuthorGraphPregel - Executor/system:
graph.AuthorGraphExecutor - User input:
user(no exported constant)
- Node-level: node ID (fallback
- Object types (subset)
- Node:
graph.ObjectTypeGraphNodeStart | graph.ObjectTypeGraphNodeComplete | graph.ObjectTypeGraphNodeError - Pregel:
graph.ObjectTypeGraphPregelPlanning | graph.ObjectTypeGraphPregelExecution | graph.ObjectTypeGraphPregelUpdate - Channel/state:
graph.ObjectTypeGraphChannelUpdate/graph.ObjectTypeGraphStateUpdate - Checkpoints:
graph.ObjectTypeGraphCheckpoint,graph.ObjectTypeGraphCheckpointCreated,graph.ObjectTypeGraphCheckpointCommitted,graph.ObjectTypeGraphCheckpointInterrupt
- Node:
See “Event Monitoring” for a full streaming example and metadata parsing.
Human‑in‑the‑Loop
Introduce human confirmation on critical paths. The example shows a basic interrupt → resume flow:
Helpers:
You can also inject resume values at entry via a command (no need to jump to a specific node first). Pass it via Runner runtime state:
Event Monitoring
The event stream carries execution progress and incremental outputs. The example shows how to iterate events and distinguish graph events vs model deltas:
You can also filter by the event’s Author field:
- Node‑level events (model, tools, node start/stop):
Author = <nodeID>(orgraph-nodeif unavailable) - Pregel (planning/execution/update/errors):
Author = graph.AuthorGraphPregel - Executor‑level (state updates/checkpoints):
Author = graph.AuthorGraphExecutor - User input (Runner writes):
Author = user
This convention lets you subscribe to a specific node’s stream without passing streaming context through nodes (streaming travels via the event channel; state stays structured in a LangGraph‑like style).
Example: consume only node ask’s streaming output and print the final message when done.
Event Metadata (StateDelta)
Each event also carries StateDelta, which includes execution metadata for models/tools:
Emit selected values from node callbacks
By default, mid‑run events like graph.state.update report which keys were updated (metadata‑only). Concrete values are not included to keep the stream lightweight and avoid exposing intermediate, potentially conflicting updates. The final graph.execution event’s StateDelta carries the serialized final snapshot of allowed keys (see implementations in graph/executor.go:2001, graph/events.go:1276, graph/events.go:1330).
If you only need to surface a few values from the result of a specific node right after it completes, register an After‑node callback and emit a small custom event containing just those values:
Steps:
- Register
WithPostNodeCallbackon the target node. - In the callback, read
result any; when the node returnsgraph.State, this is the node’s state delta. - Pick the needed keys, serialize to JSON, attach to a new event’s
StateDelta. - Send via
agent.EmitEvent.
Example:
Recommendations:
- Emit only necessary keys to control bandwidth and avoid leaking sensitive data.
- Internal/volatile keys are filtered from final snapshots and should not be emitted (see graph/internal_keys.go:16).
- For textual intermediate outputs, prefer existing model streaming events (
choice.Delta.Content).
You can also configure agent‑level callbacks:
Troubleshooting
-
Graph has no entry point
- Error: "graph must have an entry point". Call
SetEntryPoint()and ensure the node exists.
- Error: "graph must have an entry point". Call
-
Edge target/source does not exist
- Error mentions missing node. Define nodes before wiring edges/condition maps.
-
Tools don’t run after LLM
- Ensure the LLM actually returned
tool_callsand you usedAddToolsConditionalEdges(ask, tools, fallback). - Check that tool names in your map match the model’s declared tool names.
- Pairing walks from the latest assistant(tool_calls) until a new user; verify messages ordering.
- Ensure the LLM actually returned
-
No streaming events observed
- Increase
WithChannelBufferSizeand filter byAuthor/object types. - Verify you’re consuming events from
Runner.Run(...)and not from directExecutorcalls.
- Increase
-
Resume did not continue where expected
- Pass
agent.WithRuntimeState(map[string]any{ graph.CfgKeyCheckpointID: "..." }). - Provide
ResumeMapfor HITL continuation when needed. A plain "resume" message is not added tograph.StateKeyUserInput.
- Pass
- State conflicts in parallel
- Define reducers for lists/maps (e.g.,
StringSliceReducer,MergeReducer), avoid overwriting the same key from multiple branches without merge semantics.
- Define reducers for lists/maps (e.g.,
Real‑World Example
Approval Workflow
Summary
This guide introduced the core usage of the graph package and GraphAgent: declaring nodes and routes, safely merging state via Schema + Reducers, and leveraging events, checkpoints, and interrupts for observability and recovery. For structured flows (approvals, content moderation, stepwise processing), Graph provides stable, auditable execution. For intelligent decisions, extend with LLM nodes and sub‑agents.
References & Examples
- Repository: https://github.com/trpc-group/trpc-agent-go
- Graph examples:
examples/graph(basic/parallel/multi‑turn/interrupts and recovery)- I/O conventions:
io_conventions,io_conventions_tools - Parallel/fan‑out:
parallel,fanout,diamond - Placeholders:
placeholder - Checkpoints/interrupts:
checkpoint,interrupt
- I/O conventions:
- Further reading:
graph/state_graph.go,graph/executor.go,agent/graphagent