Graph Package Guide
Overview
Graph combines controllable workflow orchestration with extensible agent capabilities. It is suitable for: - Type-safe state management and predictable routing. - LLM decision making, tool-calling loops, and optional Human in the Loop (HITL). - Reusable components that can run standalone or be composed as sub‑agents.
Highlights: - Schema‑driven State and Reducers to avoid data races when concurrent branches write the same field. - Deterministic parallelism with BSP style (Plan / Execute / Update). - Built‑in node types wrap LLM, Tools, and Agent to reduce boilerplate. - Streaming events, checkpoints, and interrupts for observability and recovery. - Node‑level retry/backoff with exponential delay and jitter, plus executor‑level defaults and rich retry metadata in events.
Quick Start
Minimal Workflow
Below is a classic “prepare → ask LLM → optionally call tools” loop using graph.MessagesStateSchema()
(predefines graph.StateKeyMessages
, graph.StateKeyUserInput
, graph.StateKeyLastResponse
, etc.).
flowchart LR
START([start]):::startNode --> P[prepare]:::processNode
P --> A[ask LLM]:::llmNode
A -. tool_calls .-> T[tools]:::toolNode
A -- no tool_calls --> F[fallback]:::processNode
T --> A
F --> END([finish]):::endNode
classDef startNode fill:#e1f5e1,stroke:#4caf50,stroke-width:2px
classDef endNode fill:#ffe1e1,stroke:#f44336,stroke-width:2px
classDef llmNode fill:#e3f2fd,stroke:#2196f3,stroke-width:2px
classDef toolNode fill:#fff3e0,stroke:#ff9800,stroke-width:2px
classDef processNode fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
The Graph package allows you to model complex AI workflows as directed graphs, where nodes represent processing steps and edges represent data flow and control flow. It is particularly suitable for building AI applications that require conditional routing, state management, and multi-step processing.
Usage Pattern
The usage of the Graph package follows this pattern:
- Create Graph: Use
StateGraph
builder to define workflow structure - Create GraphAgent: Wrap the compiled Graph as an Agent
- Create Runner: Use Runner to manage sessions and execution environment
- Execute Workflow: Execute workflow through Runner and handle results
This pattern provides:
- Type Safety: Ensures data consistency through state schema
- Session Management: Supports concurrent execution for multiple users and sessions
- Event Stream: Real-time monitoring of workflow execution progress
- Error Handling: Unified error handling and recovery mechanisms
Agent Integration
GraphAgent implements the agent.Agent
interface and can:
- Act as Independent Agent: Execute directly through Runner
- Act as SubAgent: Be used as a sub-agent by other Agents (such as LLMAgent)
- Host SubAgents: Register child agents via
graphagent.WithSubAgents
and invoke them throughAddAgentNode
This design lets GraphAgent plug into other agents while orchestrating its own specialized sub-agents.
Key Features
- Type-safe state management: Use Schema to define state structure, support custom Reducers
- Conditional routing: Dynamically select execution paths based on state
- LLM node integration: Built-in support for large language models
- Tool nodes: Support function calls and external tool integration
- Agent nodes: Delegate parts of the workflow to registered sub-agents
- Streaming execution: Support real-time event streams and progress tracking
- Concurrency safety: Thread-safe graph execution
- Checkpoint-based Time Travel: Navigate through execution history and restore previous states
- Human-in-the-Loop (HITL): Support for interactive workflows with interrupt and resume capabilities
- Atomic checkpointing: Atomic storage of checkpoints with pending writes for reliable recovery
- Checkpoint Lineage: Track related checkpoints forming execution threads with parent-child relationships
Core Concepts
1. Graph
A graph is the core structure of a workflow, consisting of nodes and edges:
Virtual Nodes:
Start
: Virtual start node, automatically connected throughSetEntryPoint()
End
: Virtual end node, automatically connected throughSetFinishPoint()
- These nodes don't need to be explicitly created, the system automatically handles connections
2. Node
A node represents a processing step in the workflow:
3. State
State is a data container passed between nodes:
Built-in State Keys:
The Graph package provides some built-in state keys, mainly for internal system communication:
User-accessible Built-in Keys:
StateKeyUserInput
: User input (one-shot, cleared after consumption, persisted by LLM nodes)StateKeyOneShotMessages
: One-shot messages (complete override for current round, cleared after consumption)StateKeyLastResponse
: Last response (used to set final output, Executor reads this value as result)StateKeyMessages
: Message history (durable, supports append + MessageOp patch operations)StateKeyNodeResponses
: Per-node responses map. Key is node ID, value is the node's final textual response. UseStateKeyLastResponse
for the final serial output; when multiple parallel nodes converge, read each node's output fromStateKeyNodeResponses
.StateKeyMetadata
: Metadata (general metadata storage available to users)
System Internal Keys (users should not use directly):
StateKeySession
: Session information (automatically set by GraphAgent)StateKeyExecContext
: Execution context (automatically set by Executor)StateKeyToolCallbacks
: Tool callbacks (automatically set by Executor)StateKeyModelCallbacks
: Model callbacks (automatically set by Executor)
Users should use custom state keys to store business data, and only use user-accessible built-in state keys when necessary.
4. State Schema
State schema defines the structure and behavior of state:
Usage Guide
Node I/O Conventions
Nodes communicate exclusively through the shared state. Each node returns a state delta which is merged into the graph state using the schema’s reducers. Downstream nodes read whatever upstream nodes wrote.
- Common built‑in keys (user‑facing)
user_input
: One‑shot input for the next LLM/Agent node. Cleared after consumption.one_shot_messages
: Full message override for the next LLM call. Cleared after consumption.messages
: Durable conversation history (LLM/Tools append here). Supports MessageOp patches.last_response
: The last textual assistant response.node_responses
: Map[nodeID]any — per‑node final textual response. Uselast_response
for the most recent.
- Function node
- Input: the entire state
- Output: return a
graph.State
delta with custom keys (declare them in the schema), e.g.{"parsed_time": "..."}
- LLM node
- Input priority:
one_shot_messages
→user_input
→messages
- Output:
- Appends assistant message to
messages
- Sets
last_response
- Sets
node_responses[<llm_node_id>]
- Appends assistant message to
- Input priority:
- Tools node
- Input: scans
messages
for the latest assistant message withtool_calls
- Output: appends tool responses to
messages
- Input: scans
- Agent node (sub‑agent)
- Input: state is injected into the sub‑agent’s
Invocation.RunOptions.RuntimeState
.- Model/Tool callbacks can access it via
agent.InvocationFromContext(ctx)
.
- Model/Tool callbacks can access it via
- Output on finish:
- Sets
last_response
- Sets
node_responses[<agent_node_id>]
- Clears
user_input
- Sets
- Input: state is injected into the sub‑agent’s
Recommended patterns
- Add your own keys in the schema (e.g.,
parsed_time
,final_payload
) and write/read them in function nodes. - To feed structured hints into an LLM node, write
one_shot_messages
in the previous node (e.g., prepend a system message with parsed context). - To consume an upstream node’s text, read
last_response
immediately downstream or fetch fromnode_responses[that_node_id]
later.
See examples:
examples/graph/io_conventions
— Function + LLM + Agent I/Oexamples/graph/io_conventions_tools
— Adds a Tools node path and shows how to capture tool JSONexamples/graph/retry
— Node-level retry/backoff demonstration
Constant references (import and keys)
- Import:
import "trpc.group/trpc-go/trpc-agent-go/graph"
- Defined in:
graph/state.go
- User‑facing keys
user_input
→graph.StateKeyUserInput
one_shot_messages
→graph.StateKeyOneShotMessages
messages
→graph.StateKeyMessages
last_response
→graph.StateKeyLastResponse
node_responses
→graph.StateKeyNodeResponses
- Other useful keys
session
→graph.StateKeySession
metadata
→graph.StateKeyMetadata
current_node_id
→graph.StateKeyCurrentNodeID
exec_context
→graph.StateKeyExecContext
tool_callbacks
→graph.StateKeyToolCallbacks
model_callbacks
→graph.StateKeyModelCallbacks
agent_callbacks
→graph.StateKeyAgentCallbacks
parent_agent
→graph.StateKeyParentAgent
Snippet:
Event metadata keys (StateDelta)
- Import:
import "trpc.group/trpc-go/trpc-agent-go/graph"
- Defined in:
graph/events.go
- Model metadata:
_model_metadata
→graph.MetadataKeyModel
(structgraph.ModelExecutionMetadata
) - Tool metadata:
_tool_metadata
→graph.MetadataKeyTool
(structgraph.ToolExecutionMetadata
) - Node metadata:
_node_metadata
→graph.MetadataKeyNode
(structgraph.NodeExecutionMetadata
). Includes retry info:Attempt
,MaxAttempts
,NextDelay
,Retrying
and timing fields.
Snippet:
1. Creating GraphAgent and Runner
Users mainly use the Graph package by creating GraphAgent and then using it through Runner. This is the recommended usage pattern:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
2. Using LLM Nodes
LLM nodes implement a fixed three-stage input rule without extra configuration:
- OneShot first: If
one_shot_messages
exists, use it as the input for this round. - UserInput next: Otherwise, if
user_input
exists, persist once to history. - History default: Otherwise, use durable
messages
as input.
Important notes:
- System prompt is only used for this round and is not persisted to state.
- One-shot keys (
user_input
/one_shot_messages
) are automatically cleared after successful execution. - All state updates are atomic.
- GraphAgent/Runner only sets
user_input
and no longer pre-populatesmessages
with a user message. This allows any pre-LLM node to modifyuser_input
and have it take effect in the same round.
Three input paradigms
-
OneShot (
StateKeyOneShotMessages
):- When present, only the provided
[]model.Message
is used for this round, typically including a full system prompt and user prompt. Automatically cleared afterwards. - Use case: a dedicated pre-node constructs the full prompt and must fully override input.
- When present, only the provided
-
UserInput (
StateKeyUserInput
):- When non-empty, the LLM node uses durable
messages
plus this round's user input to call the model. After the call, it writes the user input and assistant reply tomessages
usingMessageOp
(e.g.,AppendMessages
,ReplaceLastUser
) atomically, and clearsuser_input
to avoid repeated appends. - Use case: conversational flows where pre-nodes may adjust user input.
- When non-empty, the LLM node uses durable
- Messages only (just
StateKeyMessages
):- Common in tool-call loops. After the first round via
user_input
, routing to tools and back to LLM, sinceuser_input
is cleared, the LLM uses onlymessages
(history). The tail is often atool
response, enabling the model to continue reasoning based on tool outputs.
- Common in tool-call loops. After the first round via
Atomic updates with Reducer and MessageOp
The Graph package supports MessageOp
patch operations (e.g., ReplaceLastUser
,
AppendMessages
) on message state via MessageReducer
to achieve atomic merges. Benefits:
- Pre-LLM nodes can modify
user_input
. The LLM node returns a single state delta with the needed patch operations (replace last user message, append assistant message) for one-shot, race-free persistence. - Backwards compatible with appending
[]Message
, while providing more expressive updates for complex cases.
Example: modify user_input
in a pre-node before entering the LLM node.
3. GraphAgent Configuration Options
GraphAgent supports various configuration options:
Model/tool callbacks are configured per node, e.g.
AddLLMNode(..., graph.WithModelCallbacks(...))
orAddToolsNode(..., graph.WithToolCallbacks(...))
.
Once sub-agents are registered you can delegate within the graph via agent nodes:
The agent node uses its ID for the lookup, so keep
AddAgentNode("assistant")
aligned withsubAgent.Info().Name == "assistant"
.
4. Conditional Routing
5. Tool Node Integration
Tool-call pairing and second entry into LLM:
- Scan
messages
backward from the tail to find the most recentassistant(tool_calls)
; stop atuser
to ensure correct pairing. - When returning from tools to the LLM node, since
user_input
is cleared, the LLM follows the “Messages only” branch and continues based on the tool response in history.
Placeholder Variables in LLM Instructions
LLM nodes support placeholder injection in their instruction
string (same rules as LLMAgent):
{key}
→ replaced bysession.State["key"]
{key?}
→ optional; missing values become empty{user:subkey}
,{app:subkey}
,{temp:subkey}
→ access user/app/temp scopes (session services merge app/user state into session with these prefixes)
Notes:
- GraphAgent writes the current
*session.Session
into graph state underStateKeySession
; the LLM node reads values from there - Unprefixed keys (e.g.,
research_topics
) must be present directly insession.State
Example:
See the runnable example: examples/graph/placeholder
.
6. Node Retry & Backoff
Configure per‑node retry with exponential backoff and optional jitter. Failed attempts do not produce writes; only a successful attempt applies its state delta and routing.
- Per‑node policy via
WithRetryPolicy
:
- Default policy via Executor (applies when a node has none):
Notes
- Interrupts are never retried.
- Backoff delay is clamped by the current step deadline when set (WithStepTimeout
).
- Events carry retry metadata so UIs/CLIs can display progress:
Example: examples/graph/retry
shows an unstable node that retries before a final LLM answer.
7. Runner Configuration
Runner provides session management and execution environment:
8. Message State Schema
For conversational applications, you can use predefined message state schema:
9. State Key Usage Scenarios
User-defined State Keys: Used to store business logic data.
Built-in State Keys: Used for system integration.
Advanced Features
1. Interrupt and Resume (Human-in-the-Loop)
The Graph package supports human-in-the-loop (HITL) workflows through interrupt and resume functionality. This enables workflows to pause execution, wait for human input or approval, and then resume from the exact point where they were interrupted.
Basic Usage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
The example shows how to declare nodes, connect edges, and run. Next, we’ll cover execution with GraphAgent + Runner, then core concepts and common practices.
Execution
- Wrap the compiled graph with
graphagent.New
(as a genericagent.Agent
) and hand it torunner.Runner
to manage sessions and streaming events.
Minimal GraphAgent + Runner:
Session backends:
- In-memory: session/inmemory
(used by examples)
- Redis: session/redis
(more common in production)
GraphAgent Options
Core Concepts
State Management
GraphAgent uses a Schema + Reducer model to manage state. You first define the state shape and merge rules; later nodes have clear expectations about the origin and lifecycle of keys they read/write.
Built‑in Schema
Custom Schema
Reducers ensure fields are merged safely per predefined rules, which is critical under concurrent execution.
Tip: define constants for business keys to avoid scattered magic strings.
Node Types
GraphAgent provides four built‑in node types:
Function Node
The most basic node, for custom logic:
LLM Node
Integrates an LLM and auto‑manages conversation history:
Tools Node
Executes tool calls in sequence:
Reading Tool Results into State
After a tools node, add a function node to collect tool outputs from graph.StateKeyMessages
and write a structured result into state:
Reference example: examples/graph/io_conventions_tools
.
Edges and Routing
Edges define control flow between nodes:
Tip: setting entry and finish points implicitly connects to virtual Start/End nodes:
- SetEntryPoint("first")
is equivalent to Start → first.
- SetFinishPoint("last")
is equivalent to last → End.
There’s no need to add these two edges explicitly.
Constants: graph.Start == "__start__"
, graph.End == "__end__"
.
Command Mode (Dynamic Routing / Fan‑out)
Nodes can return graph.State
, or *graph.Command
/ []*graph.Command
to update state and direct the next hop:
When using command‑based routing, you don’t need static edges to GoTo
targets; just ensure the target nodes exist and call SetFinishPoint
where appropriate.
Architecture
Overall Architecture
GraphAgent’s architecture manages complexity via clear layering. Each layer has a well‑defined responsibility and communicates through standard interfaces.
flowchart TB
subgraph "Runner Layer"
R[Runner]:::runnerClass
S[Session Service]:::sessionClass
end
subgraph "GraphAgent"
GA[GraphAgent Wrapper]:::agentClass
CB[Callbacks]:::callbackClass
end
subgraph "Graph Engine"
SG[StateGraph Builder]:::builderClass
G[Graph]:::graphClass
E[Executor]:::executorClass
end
subgraph "Execution Components"
P[Planning]:::phaseClass
EX[Execution]:::phaseClass
U[Update]:::phaseClass
end
subgraph "Storage"
CP[Checkpoint]:::storageClass
ST[State Store]:::storageClass
end
R --> GA
GA --> G
G --> E
E --> P
E --> EX
E --> U
E --> CP
classDef runnerClass fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef sessionClass fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef agentClass fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef callbackClass fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef builderClass fill:#fff8e1,stroke:#f57c00,stroke-width:2px
classDef graphClass fill:#f1f8e9,stroke:#689f38,stroke-width:2px
classDef executorClass fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phaseClass fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef storageClass fill:#efebe9,stroke:#5d4037,stroke-width:2px
Core Modules
Overview of core components:
graph/state_graph.go
— StateGraph builder
Provides a fluent, declarative Go API to build graphs via method chaining (AddNode → AddEdge → Compile) covering nodes, edges, and conditional routing.
graph/graph.go
— Compiled runtime
Implements channel‑based, event‑triggered execution. Node results merge into State; channels are used to drive routing and carry sentinel values (not business data).
graph/executor.go
— BSP executor
Heart of the system, inspired by Google’s Pregel. Implements BSP (Bulk Synchronous Parallel) supersteps: Planning → Execution → Update.
graph/checkpoint/*
— Checkpoints and recovery
Optional checkpoint persistence (e.g., sqlite). Atomically saves state and pending writes; supports lineage/checkpoint‑based recovery.
agent/graphagent/graph_agent.go
— Bridge between Graph and Agent
Adapts a compiled Graph into a generic Agent, reusing sessions, callbacks, and streaming.
Execution Model
GraphAgent adapts Pregel’s BSP (Bulk Synchronous Parallel) to a single‑process runtime and adds checkpoints, HITL interrupts/resumes, and time travel:
sequenceDiagram
autonumber
participant R as Runner
participant GA as GraphAgent
participant EX as Executor
participant CK as Checkpoint Saver
participant DB as Storage
participant H as Human
R->>GA: Run(invocation)
GA->>EX: Execute(graph, state, options)
GA-->>R: Stream node/tool/model events
loop Each superstep (BSP)
EX->>EX: Planning — compute frontier
par Parallel node execution
EX->>EX: Run node i (shallow state copy)
EX-->>GA: node-start event (author=nodeID)
and
EX->>EX: Run node j (shallow state copy)
EX-->>GA: node-start event
end
alt Node triggers Interrupt(key,prompt)
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen,reason=interrupt)
CK->>DB: atomic commit
EX-->>GA: interrupt event(checkpoint_id,prompt)
GA-->>R: propagate + pause
R->>H: ask for input/approval
H-->>R: provide decision/value
R->>GA: Run(resume) runtime_state{
R->>GA: checkpoint_id,resume_map}
GA->>EX: ResumeFromCheckpoint(checkpoint_id,resume_map)
EX->>CK: Load checkpoint
CK->>EX: state/frontier/pending_writes/versions_seen
EX->>EX: rebuild frontier and apply resume values
else Normal
EX-->>GA: node-complete events (incl. tool/model)
EX->>EX: Update — merge via reducers
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen)
CK->>DB: atomic commit
end
end
Note over EX,CK: versions_seen prevents re-execution
Note over EX,CK: pending_writes rebuilds channels
Note over EX,CK: parent_id forms lineage for time travel
opt Time travel (rewind/branch)
R->>GA: Run(runtime_state{checkpoint_id})
GA->>EX: ResumeFromCheckpoint(checkpoint_id)
EX->>CK: Load checkpoint + lineage
CK->>EX: Restore state; may create new lineage_id
end
EX-->>GA: done event (last_response)
GA-->>R: final output
flowchart TB
%% Execution panorama (compact wiring)
subgraph Client
R[Runner]:::runner --> GA[GraphAgent]:::agent
end
subgraph Engine[Graph Engine]
GA --> EX[Executor]:::executor
subgraph BSP["BSP Superstep"]
P[Planning]:::phase --> X[Execution]:::phase --> U[Update]:::phase
end
end
N[Nodes: LLM / Tools / Function / Agent]:::process
CK[(Checkpoint)]:::storage
H[Human]:::human
EX --> BSP
EX --> N
EX -.-> CK
GA <--> H
GA --> R
classDef runner fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef agent fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef executor fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phase fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
classDef storage fill:#efebe9,stroke:#6d4c41,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
Key points:
- Planning: determine runnable nodes from the channel frontier.
- Execution: each node gets a shallow state copy (maps.Copy) and runs in parallel.
- Update: reducers merge updates safely for concurrency.
This design enables per‑step observability and safe interruption/recovery.
Runtime Isolation and Event Snapshots
- The Executor is reusable and concurrency‑safe. Per‑run state lives in
ExecutionContext
(channel versions, pending writes, last checkpoint, etc.). - Each event’s
StateDelta
is a deep‑copy snapshot containing only serializable and allowed keys; internal keys (execution context, callbacks, etc.) are filtered out for external telemetry and persistence.
Executor Configuration
Defaults and Notes
- Defaults (Executor)
ChannelBufferSize = 256
,MaxSteps = 100
,CheckpointSaveTimeout = 10s
- Per‑step/node timeouts are available on
Executor
viaWithStepTimeout
/WithNodeTimeout
(not exposed byGraphAgent
options yet)
- Sessions
- Prefer Redis session backend in production; set TTLs and cleanup
- Runner seeds multi‑turn
graph.StateKeyMessages
from session events automatically
- Checkpoints
- Use stable
namespace
names (e.g.,svc:prod:flowX
); audit and clean up by lineage viaCheckpointManager
- Use stable
- Events/backpressure
- Tune
WithChannelBufferSize
; filter events byauthor
/object
to reduce noise
- Tune
- Naming and keys
- Use constants for node IDs, route labels, and state keys; define reducers for non‑trivial merges
- Governance
- Insert HITL on critical paths; prefer storing sensitive details under
graph.StateKeyMetadata
rather thangraph.StateKeyMessages
Integrating with Multi‑Agent Systems
GraphAgent is designed to be part of the tRPC‑Agent‑Go multi‑agent ecosystem, not an island. It implements the standard Agent interface and collaborates with other agent types.
GraphAgent as an Agent
GraphAgent implements the standard Agent interface:
Advanced Orchestration
End‑to‑end business flow: entry normalization → smart routing → multiple pods (Email, Weather, Research) → parallel fan‑out/aggregation → final composition and publish.
flowchart LR
%% Layout
subgraph UE["User & Entry"]
U((User)):::human --> IN["entry<br/>normalize"]:::process
end
subgraph FAB["Graph Orchestration"]
Rtr["where_to_go<br/>router"]:::router
Compose["compose<br/>LLM"]:::llm
end
IN --> Rtr
%% Email Agent (expanded)
subgraph EC["Email Agent"]
direction LR
CE["classifier<br/>LLM"]:::llm --> WE["writer<br/>LLM"]:::llm
end
%% Weather Agent (expanded)
subgraph WA["Weather Agent"]
direction LR
LE["locate<br/>LLM"]:::llm --> WT["weather tool"]:::tool
end
%% Routing from router to pods
Rtr -- email --> CE
Rtr -- weather --> LE
Rtr -- other --> REPLY["reply<br/>LLM"]:::llm
%% Fanout Pipeline (fanout → workers → aggregate)
subgraph FP["Fanout Pipeline"]
direction LR
Fan["plan_fanout"]:::process --> W1["worker A"]:::process
Fan --> W2["worker B"]:::process
Fan --> W3["worker C"]:::process
W1 --> Agg["aggregate"]:::process
W2 --> Agg
W3 --> Agg
end
Rtr -- research --> Fan
%% Human-in-the-loop (optional)
Compose -. review .- HG["human<br/>review"]:::human
%% Compose final (minimal wiring)
Agg --> Compose
WE --> Compose
WT --> Compose
REPLY --> Compose
Compose --> END([END]):::terminal
%% Styles
classDef router fill:#fff7e0,stroke:#f5a623,stroke-width:2px
classDef llm fill:#e3f2fd,stroke:#1e88e5,stroke-width:2px
classDef tool fill:#fff3e0,stroke:#fb8c00,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef terminal fill:#ffebee,stroke:#e53935,stroke-width:2px
Highlights:
- where_to_go
can be LLM‑decided or function‑driven (conditional edges).
- Fanout Pipeline uses Command GoTo at runtime, then aggregates.
- Optional human review follows aggregation to gate critical output.
- Single checkpoint display at Compose balances clarity and recoverability.
Embedding Agents in a Graph
Inside a graph, you can call existing sub‑agents as nodes. The example below shows how to create sub‑agents, declare the corresponding nodes, and inject them when constructing the GraphAgent.
Hybrid Pattern Example
Embed dynamic decision‑making within a structured flow:
Core Mechanics in Depth
State Management: Schema + Reducer
State is a central challenge in graph workflows. We designed a Schema + Reducer mechanism that provides type safety and supports high‑concurrency atomic updates.
flowchart LR
subgraph "State Schema"
MS[messages: MessageList]:::schemaClass
UI[user_input: string]:::schemaClass
LR[last_response: string]:::schemaClass
NR[node_responses: Map]:::schemaClass
end
subgraph "Reducers"
R1[MessageReducer + MessageOp]:::reducerClass
R2[MergeReducer (Map)]:::reducerClass
R3[ReplaceReducer (String)]:::reducerClass
end
subgraph "Node Outputs"
N1[Node 1 Output]:::nodeOutputClass
N2[Node 2 Output]:::nodeOutputClass
N3[Node 3 Output]:::nodeOutputClass
end
N1 --> R1
N2 --> R2
N3 --> R3
R1 --> MS
R2 --> NR
R3 --> LR
classDef schemaClass fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef reducerClass fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef nodeOutputClass fill:#fff8e1,stroke:#f57f17,stroke-width:2px
Graph state is a map[string]any
with runtime validation provided by StateSchema
. The reducer mechanism ensures safe merging and avoids conflicts under concurrent updates.
Common State Keys
- User‑visible:
graph.StateKeyUserInput
,graph.StateKeyOneShotMessages
,graph.StateKeyMessages
,graph.StateKeyLastResponse
,graph.StateKeyNodeResponses
,graph.StateKeyMetadata
- Internal:
session
,exec_context
,tool_callbacks
,model_callbacks
,agent_callbacks
,current_node_id
,parent_agent
- Command/Resume:
__command__
,__resume_map__
Constants live in graph/state.go
and graph/keys.go
. Prefer referencing constants over hard‑coding strings.
Node‑level Callbacks & Generation Parameters
Per‑node options (see graph/state_graph.go
):
- graph.WithPreNodeCallback
/ graph.WithPostNodeCallback
/ graph.WithNodeErrorCallback
- LLM nodes: graph.WithGenerationConfig
, graph.WithModelCallbacks
- Tool nodes: graph.WithToolCallbacks
- Agent nodes: graph.WithAgentNodeEventCallback
Additionally, graph.WithName
/graph.WithDescription
add friendly labels; graph.WithDestinations
declares potential dynamic destinations (for static checks/visualization only).
LLM Input Rules: Three‑Stage Design
The LLM input pipeline looks simple but solves common context‑management problems in AI apps.
Built‑in selection logic (no extra config):
- Prefer
graph.StateKeyOneShotMessages
: fully override inputs (system/user) for this turn; cleared after execution. - Else use
graph.StateKeyUserInput
: append this turn’s user tograph.StateKeyMessages
, then atomically write back user+assistant; finally cleargraph.StateKeyUserInput
. - Else use
graph.StateKeyMessages
only: common on tool loops re‑entering LLM (sincegraph.StateKeyUserInput
has been cleared).
The benefit: preprocess nodes can rewrite graph.StateKeyUserInput
and take effect in the same turn, while seamlessly integrating with the tool loop (tool_calls → tools → LLM).
Examples (showing the three paths):
Instruction Placeholder Injection
AddLLMNode
’s instruction
supports placeholders, same syntax as llmagent
:
- {key}
/ {key?}
: read from session.State
; optional ?
yields empty when missing.
- {user:subkey}
, {app:subkey}
, {temp:subkey}
: read by namespace.
GraphAgent stores the current *session.Session
into state (graph.StateKeySession
) and expands placeholders before the LLM call.
Tip: GraphAgent seeds graph.StateKeyMessages
from prior session events for multi‑turn continuity. When resuming from a checkpoint, a plain "resume" message is not injected as graph.StateKeyUserInput
, preserving the recovered state.
Concurrency and State Safety
When a node has multiple outgoing edges, parallel execution is triggered automatically:
Internally, the executor constructs shallow copies (maps.Copy) per task and merges under a lock, with reducers ensuring safe concurrent updates.
Node I/O Conventions
Nodes communicate only via the shared State
. Each node returns a state delta that merges via the Schema’s reducers.
- Function nodes
- Input: full
State
(read keys declared in your schema) - Output: write business keys only (e.g.,
{"parsed_time":"..."}
); avoid internal keys
- Input: full
- LLM nodes
- Input priority:
graph.StateKeyOneShotMessages
→graph.StateKeyUserInput
→graph.StateKeyMessages
- Output: append to
graph.StateKeyMessages
atomically, setgraph.StateKeyLastResponse
, setgraph.StateKeyNodeResponses[<llm_node_id>]
- Input priority:
- Tools nodes
- Read the latest assistant message with
tool_calls
for the current round and append tool responses tograph.StateKeyMessages
- Multiple tools execute in the order returned by the LLM
- Read the latest assistant message with
- Agent nodes
- Receive graph
State
viaInvocation.RunOptions.RuntimeState
- Output: set
graph.StateKeyLastResponse
andgraph.StateKeyNodeResponses[<agent_node_id>]
;graph.StateKeyUserInput
is cleared after execution
- Receive graph
Good practice:
- Sequential reads: consume the immediate upstream text from graph.StateKeyLastResponse
.
- Parallel/merge reads: read specific node outputs from graph.StateKeyNodeResponses[<nodeID>]
.
- Declare business keys in your schema with suitable reducers to avoid data races.
API Cheat Sheet
- Build graph
graph.NewStateGraph(schema)
→ builderAddNode(id, func, ...opts)
/AddLLMNode(id, model, instruction, tools, ...opts)
AddToolsNode(id, tools, ...opts)
/AddAgentNode(id, ...opts)
AddEdge(from, to)
/AddConditionalEdges(from, condition, pathMap)
AddToolsConditionalEdges(llmNode, toolsNode, fallback)
SetEntryPoint(nodeID)
/SetFinishPoint(nodeID)
/Compile()
- State keys (user‑visible)
graph.StateKeyUserInput
,graph.StateKeyOneShotMessages
,graph.StateKeyMessages
,graph.StateKeyLastResponse
,graph.StateKeyNodeResponses
,graph.StateKeyMetadata
- Per‑node options
graph.WithGenerationConfig
,graph.WithModelCallbacks
,graph.WithToolCallbacks
graph.WithPreNodeCallback
,graph.WithPostNodeCallback
,graph.WithNodeErrorCallback
- Execution
graphagent.New(name, compiledGraph, ...opts)
→runner.NewRunner(app, agent)
→Run(...)
See examples under examples/graph
for end‑to‑end patterns (basic/parallel/multi‑turn/interrupts/tools/placeholder).
Advanced Features
Checkpoints and Recovery
To support time‑travel and reliable recovery, configure a checkpoint saver on the Executor or GraphAgent. Below uses the SQLite saver to persist checkpoints and resume from a specific checkpoint.
Checkpoint Management
Use the manager to list, query, and delete checkpoints:
Use a stable business identifier for namespace
in production (e.g., svc:prod:flowX
) for clear auditing.
Events at a Glance
- Authors
- Node-level: node ID (fallback
graph.AuthorGraphNode
) - Pregel phases:
graph.AuthorGraphPregel
- Executor/system:
graph.AuthorGraphExecutor
- User input:
user
(no exported constant)
- Node-level: node ID (fallback
- Object types (subset)
- Node:
graph.ObjectTypeGraphNodeStart | graph.ObjectTypeGraphNodeComplete | graph.ObjectTypeGraphNodeError
- Pregel:
graph.ObjectTypeGraphPregelPlanning | graph.ObjectTypeGraphPregelExecution | graph.ObjectTypeGraphPregelUpdate
- Channel/state:
graph.ObjectTypeGraphChannelUpdate
/graph.ObjectTypeGraphStateUpdate
- Checkpoints:
graph.ObjectTypeGraphCheckpoint
,graph.ObjectTypeGraphCheckpointCreated
,graph.ObjectTypeGraphCheckpointCommitted
,graph.ObjectTypeGraphCheckpointInterrupt
- Node:
See “Event Monitoring” for a full streaming example and metadata parsing.
Human‑in‑the‑Loop
Introduce human confirmation on critical paths. The example shows a basic interrupt → resume flow:
Helpers:
You can also inject resume values at entry via a command (no need to jump to a specific node first). Pass it via Runner runtime state:
Event Monitoring
The event stream carries execution progress and incremental outputs. The example shows how to iterate events and distinguish graph events vs model deltas:
You can also filter by the event’s Author
field:
- Node‑level events (model, tools, node start/stop):
Author = <nodeID>
(orgraph-node
if unavailable) - Pregel (planning/execution/update/errors):
Author = graph.AuthorGraphPregel
- Executor‑level (state updates/checkpoints):
Author = graph.AuthorGraphExecutor
- User input (Runner writes):
Author = user
This convention lets you subscribe to a specific node’s stream without passing streaming context through nodes (streaming travels via the event channel; state stays structured in a LangGraph‑like style).
Example: consume only node ask
’s streaming output and print the final message when done.
Event Metadata (StateDelta)
Each event also carries StateDelta
, which includes execution metadata for models/tools:
You can also configure agent‑level callbacks:
Troubleshooting
- Graph has no entry point
- Error: "graph must have an entry point". Call
SetEntryPoint()
and ensure the node exists.
- Error: "graph must have an entry point". Call
- Edge target/source does not exist
- Error mentions missing node. Define nodes before wiring edges/condition maps.
- Tools don’t run after LLM
- Ensure the LLM actually returned
tool_calls
and you usedAddToolsConditionalEdges(ask, tools, fallback)
. - Check that tool names in your map match the model’s declared tool names.
- Pairing walks from the latest assistant(tool_calls) until a new user; verify messages ordering.
- Ensure the LLM actually returned
- No streaming events observed
- Increase
WithChannelBufferSize
and filter byAuthor
/object types. - Verify you’re consuming events from
Runner.Run(...)
and not from directExecutor
calls.
- Increase
- Resume did not continue where expected
- Pass
agent.WithRuntimeState(map[string]any{ graph.CfgKeyCheckpointID: "..." })
. - Provide
ResumeMap
for HITL continuation when needed. A plain "resume" message is not added tograph.StateKeyUserInput
.
- Pass
- State conflicts in parallel
- Define reducers for lists/maps (e.g.,
StringSliceReducer
,MergeReducer
), avoid overwriting the same key from multiple branches without merge semantics.
- Define reducers for lists/maps (e.g.,
Real‑World Example
Approval Workflow
Summary
This guide introduced the core usage of the graph
package and GraphAgent: declaring nodes and routes, safely merging state via Schema + Reducers, and leveraging events, checkpoints, and interrupts for observability and recovery. For structured flows (approvals, content moderation, stepwise processing), Graph provides stable, auditable execution. For intelligent decisions, extend with LLM nodes and sub‑agents.
References & Examples
- Repository: https://github.com/trpc-group/trpc-agent-go
- Graph examples:
examples/graph
(basic/parallel/multi‑turn/interrupts and recovery)- I/O conventions:
io_conventions
,io_conventions_tools
- Parallel/fan‑out:
parallel
,fanout
,diamond
- Placeholders:
placeholder
- Checkpoints/interrupts:
checkpoint
,interrupt
- I/O conventions:
- Further reading:
graph/state_graph.go
,graph/executor.go
,agent/graphagent