Graph Package Guide
Overview
Graph combines controllable workflow orchestration with extensible agent capabilities. It is suitable for:
- Type-safe state management and predictable routing.
- LLM decision making, tool-calling loops, and optional Human in the Loop (HITL).
- Reusable components that can run standalone or be composed as sub‑agents.
Highlights:
- Schema‑driven State and Reducers to avoid data races when concurrent branches write the same field.
- Deterministic parallelism with BSP style (Plan / Execute / Update).
- Built‑in node types wrap LLM, Tools, and Agent to reduce boilerplate.
- Streaming events, checkpoints, and interrupts for observability and recovery.
- Node‑level retry/backoff with exponential delay and jitter, plus executor‑level defaults and rich retry metadata in events.
- Node event emitter (EventEmitter) for emitting custom events, progress updates, and streaming text from within NodeFunc.
Quick Start
Minimal Workflow
Below is a classic "prepare → ask LLM → optionally call tools" loop using graph.MessagesStateSchema() (predefines graph.StateKeyMessages, graph.StateKeyUserInput, graph.StateKeyLastResponse, etc.).
flowchart LR
START([start]):::startNode --> P[prepare]:::processNode
P --> A[ask LLM]:::llmNode
A -. tool_calls .-> T[tools]:::toolNode
A -- no tool_calls --> F[fallback]:::processNode
T --> A
F --> END([finish]):::endNode
classDef startNode fill:#e1f5e1,stroke:#4caf50,stroke-width:2px
classDef endNode fill:#ffe1e1,stroke:#f44336,stroke-width:2px
classDef llmNode fill:#e3f2fd,stroke:#2196f3,stroke-width:2px
classDef toolNode fill:#fff3e0,stroke:#ff9800,stroke-width:2px
classDef processNode fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
The Graph package allows you to model complex AI workflows as directed graphs, where nodes represent processing steps and edges represent data flow and control flow. It is particularly suitable for building AI applications that require conditional routing, state management, and multi-step processing.
Usage Pattern
The usage of the Graph package follows this pattern:
- Create Graph: Use
StateGraphbuilder to define workflow structure - Create GraphAgent: Wrap the compiled Graph as an Agent
- Create Runner: Use Runner to manage sessions and execution environment
- Execute Workflow: Execute workflow through Runner and handle results
This pattern provides:
- Type Safety: Ensures data consistency through state schema
- Session Management: Supports concurrent execution for multiple users and sessions
- Event Stream: Real-time monitoring of workflow execution progress
- Error Handling: Unified error handling and recovery mechanisms
Agent Integration
GraphAgent implements the agent.Agent interface and can:
- Act as Independent Agent: Execute directly through Runner
- Act as SubAgent: Be used as a sub-agent by other Agents (such as LLMAgent)
- Host SubAgents: Register child agents via
graphagent.WithSubAgentsand invoke them throughAddAgentNode
This design lets GraphAgent plug into other agents while orchestrating its own specialized sub-agents.
Key Features
- Type-safe state management: Use Schema to define state structure, support custom Reducers
- Conditional routing: Dynamically select execution paths based on state
- LLM node integration: Built-in support for large language models
- Tool nodes: Support function calls and external tool integration
- Agent nodes: Delegate parts of the workflow to registered sub-agents
- Streaming execution: Support real-time event streams and progress tracking
- Concurrency safety: Thread-safe graph execution
- Checkpoint-based Time Travel: Navigate through execution history and restore previous states
- Human-in-the-Loop (HITL): Support for interactive workflows with interrupt and resume capabilities
- Atomic checkpointing: Atomic storage of checkpoints with pending writes for reliable recovery
- Checkpoint Lineage: Track related checkpoints forming execution threads with parent-child relationships
Core Concepts
1. Graph
A graph is the core structure of a workflow, consisting of nodes and edges:
Virtual Nodes:
Start: Virtual start node, automatically connected throughSetEntryPoint()End: Virtual end node, automatically connected throughSetFinishPoint()- These nodes don't need to be explicitly created, the system automatically handles connections
2. Node
A node represents a processing step in the workflow:
3. State
State is a data container passed between nodes:
Built-in State Keys:
The Graph package provides some built-in state keys, mainly for internal system communication:
User-accessible Built-in Keys:
StateKeyUserInput: User input (one-shot, cleared after consumption, persisted by LLM nodes)StateKeyOneShotMessages: One-shot messages (complete override for current round, cleared after consumption)StateKeyLastResponse: Last response (used to set final output, Executor reads this value as result)StateKeyLastToolResponse: Last tool output (JSON string, set by Tools nodes)StateKeyLastResponseID: Last response identifier (ID) (set by LLM nodes; may be empty whenStateKeyLastResponseis produced by a non-model node)StateKeyMessages: Message history (durable, supports append + MessageOp patch operations)StateKeyNodeResponses: Per-node outputs map. Key is node ID, value is the node's final output. For LLM and Agent nodes this is the final textual response; for Tools nodes this is a JSON array string of tool outputs (each item containstool_id,tool_name, andoutput).StateKeyMetadata: Metadata (general metadata storage available to users)
System Internal Keys (users should not use directly):
StateKeySession: Session information (automatically set by GraphAgent)StateKeyExecContext: Execution context (automatically set by Executor)StateKeyToolCallbacks: Tool callbacks (automatically set by Executor)StateKeyModelCallbacks: Model callbacks (automatically set by Executor)
Users should use custom state keys to store business data, and only use user-accessible built-in state keys when necessary.
4. State Schema
State schema defines the structure and behavior of state:
Usage Guide
Node I/O Conventions
Nodes communicate exclusively through the shared state. Each node returns a state delta which is merged into the graph state using the schema's reducers. Downstream nodes read whatever upstream nodes wrote.
-
Common built‑in keys (user‑facing)
user_input: One‑shot input for the next LLM/Agent node. Cleared after consumption.one_shot_messages: Full message override for the next LLM call. Cleared after consumption.one_shot_messages_by_node: Targeted one‑shot override for a specific node ID (map[nodeID][]Message). Cleared per entry after consumption.messages: Durable conversation history (LLM/Tools append here). Supports MessageOp patches.last_response: The last textual assistant response.last_response_id: The identifier (ID) of the last model response that producedlast_response(may be empty whenlast_responseis set by a non-model node).node_responses: Map[nodeID]any — per‑node final textual response. Uselast_responsefor the most recent.
-
Function node
- Input: the entire state
- Output: return a
graph.Statedelta with custom keys (declare them in the schema), e.g.{"parsed_time": "..."}
-
LLM node
- Input priority:
one_shot_messages_by_node[<node_id>]→one_shot_messages→user_input→messages - Output:
- Appends assistant message to
messages - Sets
last_response - Sets
last_response_id - Sets
node_responses[<llm_node_id>]
- Appends assistant message to
- Input priority:
-
Tools node
- Input: scans
messagesfor the latest assistant message withtool_calls - Output: appends tool responses to
messages
- Input: scans
- Agent node (sub‑agent)
- Input: state is injected into the sub‑agent's
Invocation.RunOptions.RuntimeState.- Model/Tool callbacks can access it via
agent.InvocationFromContext(ctx).
- Model/Tool callbacks can access it via
- Output on finish:
- Sets
last_response - Sets
node_responses[<agent_node_id>] - Clears
user_input
- Sets
- Input: state is injected into the sub‑agent's
Recommended patterns
- Add your own keys in the schema (e.g.,
parsed_time,final_payload) and write/read them in function nodes. - To feed structured hints into an LLM node, write
one_shot_messagesin the previous node (e.g., prepend a system message with parsed context). - Parallel branches: avoid writing
one_shot_messagesfrom multiple branches. Preferone_shot_messages_by_nodeso each LLM node consumes only its own one‑shot input. - To consume an upstream node's text, read
last_responseimmediately downstream or fetch fromnode_responses[that_node_id]later. - If the downstream node is an Agent node and you want it to consume an
upstream node's output as its input message, you must write it back into
user_input(for example, useWithSubgraphInputFromLastResponse()or a pre‑node callback).
One-shot messages scoped by node ID:
Preparing one-shot inputs for multiple nodes in a single upstream node:
In Go, a map assignment overwrites by key. Since
SetOneShotMessagesForNode(...) writes the same top-level key
(one_shot_messages_by_node) every time, you should avoid calling it multiple
times and then “merging” the returned graph.State values with plain
result[k] = v assignments (the last write wins).
Instead, build one map[nodeID][]model.Message and write it once:
Alternative (no helpers): write a raw state delta map (handy if you also need to update other keys in the same node):
Notes:
llm1NodeID/llm2NodeIDmust match the IDs you pass toAddLLMNode.- Each LLM node consumes
one_shot_messages_by_node[its_id]once, and clears only its own entry.
See examples:
examples/graph/io_conventions— Function + LLM + Agent I/Oexamples/graph/io_conventions_tools— Adds a Tools node path and shows how to capture tool JSONexamples/graph/oneshot_by_node— One-shot inputs scoped by LLM node IDexamples/graph/oneshot_by_node_preprocess— One upstream node prepares one-shot inputs for multiple LLM nodesexamples/graph/retry— Node-level retry/backoff demonstration
Constant references (import and keys)
- Import:
import "trpc.group/trpc-go/trpc-agent-go/graph" - Defined in:
graph/state.go
-
User‑facing keys
user_input→graph.StateKeyUserInputone_shot_messages→graph.StateKeyOneShotMessagesone_shot_messages_by_node→graph.StateKeyOneShotMessagesByNodemessages→graph.StateKeyMessageslast_response→graph.StateKeyLastResponselast_response_id→graph.StateKeyLastResponseIDnode_responses→graph.StateKeyNodeResponses
-
One-shot helpers
SetOneShotMessagesForNode(nodeID, msgs)→ per-node one-shot updateSetOneShotMessagesByNode(byNode)→ multi-node one-shot updateClearOneShotMessagesForNode(nodeID)→ clear one node entryClearOneShotMessagesByNode()→ clear the entire mapGetOneShotMessagesForNode(state, nodeID)→ read one node entry
- Other useful keys
session→graph.StateKeySessionmetadata→graph.StateKeyMetadatacurrent_node_id→graph.StateKeyCurrentNodeIDexec_context→graph.StateKeyExecContexttool_callbacks→graph.StateKeyToolCallbacksmodel_callbacks→graph.StateKeyModelCallbacksagent_callbacks→graph.StateKeyAgentCallbacksparent_agent→graph.StateKeyParentAgent
Snippet:
Event metadata keys (StateDelta)
- Import:
import "trpc.group/trpc-go/trpc-agent-go/graph" - Defined in:
graph/events.go
- Model metadata:
_model_metadata→graph.MetadataKeyModel(structgraph.ModelExecutionMetadata) - Tool metadata:
_tool_metadata→graph.MetadataKeyTool(structgraph.ToolExecutionMetadata) - Node metadata:
_node_metadata→graph.MetadataKeyNode(structgraph.NodeExecutionMetadata). Includes retry info:Attempt,MaxAttempts,NextDelay,Retryingand timing fields.
Snippet:
Node Event Emitter (EventEmitter)
During NodeFunc execution, nodes can proactively emit custom events to the outside through EventEmitter, for real-time delivery of progress, intermediate results, or custom business data.
Getting EventEmitter
EventEmitter Interface
Usage Examples
Event Flow Diagram
sequenceDiagram
autonumber
participant NF as NodeFunc
participant EE as EventEmitter
participant EC as EventChan
participant EX as Executor
participant TR as AGUI Translator
participant FE as Frontend Client
Note over NF,FE: Event Emission Phase
NF->>EE: GetEventEmitter(state)
EE-->>NF: Return EventEmitter instance
alt Emit Custom Event
NF->>EE: EmitCustom(eventType, payload)
else Emit Progress Event
NF->>EE: EmitProgress(progress, message)
else Emit Text Event
NF->>EE: EmitText(text)
end
Note over EE: Build NodeCustomEventMetadata
EE->>EE: Inject NodeID, InvocationID, Timestamp
EE->>EC: Send Event to channel
Note over EC,FE: Event Consumption Phase
EC->>EX: Executor receives event
EX->>TR: Pass event to Translator
alt Custom Event
TR->>TR: Convert to AG-UI CustomEvent
else Progress Event
TR->>TR: Convert to AG-UI CustomEvent (with progress info)
else Text Event (in message context)
TR->>TR: Convert to TextMessageContentEvent
else Text Event (not in message context)
TR->>TR: Convert to AG-UI CustomEvent
end
TR->>FE: SSE push AG-UI event
FE->>FE: Process and update UI
AGUI Event Conversion
When using AGUI Server, events emitted by nodes are automatically converted to AG-UI protocol events:
| Node Event Type | AG-UI Event Type | Description |
|---|---|---|
| Custom | CustomEvent | Custom event, payload in value field |
| Progress | CustomEvent | Progress event with progress and message |
| Text (in message context) | TextMessageContentEvent | Streaming text appended to current message |
| Text (not in message context) | CustomEvent | Contains nodeId and content fields |
Notes
- Thread Safety: EventEmitter is thread-safe and can be used in concurrent environments
- Graceful Degradation: If State has no valid ExecutionContext or EventChan,
GetEventEmitterreturns a no-op emitter that silently succeeds for all operations - Error Handling: Event emission failures do not interrupt node execution; it's recommended to only log warnings
- Custom Event Metadata:
_node_custom_metadata→graph.MetadataKeyNodeCustom(structgraph.NodeCustomEventMetadata)
1. Creating GraphAgent and Runner
Users mainly use the Graph package by creating GraphAgent and then using it through Runner. This is the recommended usage pattern:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | |
2. Using LLM Nodes
LLM nodes implement a fixed three-stage input rule without extra configuration:
- OneShot first:
- If
one_shot_messages_by_node[<node_id>]exists, use it as the input for this round. - Otherwise, if
one_shot_messagesexists, use it as the input for this round.
- If
- UserInput next: Otherwise, if
user_inputexists, persist once to history. - History default: Otherwise, use durable
messagesas input.
Important notes:
- System prompt is only used for this round and is not persisted to state.
- One-shot keys (
user_input/one_shot_messages/one_shot_messages_by_node) are automatically cleared after successful execution. - Parallel branches: if multiple branches need different one-shot inputs for
different LLM nodes in the same step, write
one_shot_messages_by_nodeinstead ofone_shot_messages. If one upstream node prepares inputs for multiple LLM nodes, prefergraph.SetOneShotMessagesByNode(...)to write all entries at once. - All state updates are atomic.
- GraphAgent/Runner only sets
user_inputand no longer pre-populatesmessageswith a user message. This allows any pre-LLM node to modifyuser_inputand have it take effect in the same round. - When Graph runs sub-agents, it preserves the parent run's
RequestID(request identifier) so the current user input is included exactly once, even when it already exists in session history.
Event emission (streaming vs final)
When the Large Language Model (LLM) is called with streaming enabled, a single model call can produce multiple events:
- Streaming chunks: incremental text in
choice.Delta.Content - A final message: full text in
choice.Message.Content
In graph workflows there are also two different "done" concepts:
- Model done: a single model call finishes (typically
Response.Done=true) - Workflow done: the whole graph run ends (use
event.IsRunnerCompletion())
By default, graph LLM nodes only emit the streaming chunks. They do not emit
the final Done=true assistant message event. This keeps intermediate node
outputs from being treated as normal assistant replies (for example, being
persisted into the Session by Runner).
If you want graph LLM nodes to also emit the final Done=true assistant
message events, enable agent.WithGraphEmitFinalModelResponses(true) when
running via Runner. See runner.md for details and examples.
Tip: if you are using Runner and you mainly care about streaming Large Language
Model (LLM) messages, you can enable agent.WithStreamMode(...) (see
"Event Monitoring"). When agent.StreamModeMessages is selected, graph LLM
nodes enable final model responses automatically for that run.
Tip: parsing JSON / structured output from streaming
- Streaming chunks (
choice.Delta.Content) are incremental and are not guaranteed to form valid JSON until the end of the model call. - If you need to parse JSON (or any structured text), do not
json.Unmarshalper chunk. Buffer and parse once you have the full string.
Common approaches:
- Inside the graph: parse in a downstream node from
node_responses[nodeID](orlast_responsein strictly serial flows), because those values are only set after the node finishes. - Outside the graph (event consumer): accumulate
Delta.Contentand parse when you see a non-partial response that carrieschoice.Message.Content(or when the workflow finishes).
Three input paradigms
-
OneShot (
StateKeyOneShotMessages):- When present, only the provided
[]model.Messageis used for this round, typically including a full system prompt and user prompt. Automatically cleared afterwards. - Use case: a dedicated pre-node constructs the full prompt and must fully override input.
- Parallel branches: when multiple branches prepare one-shot inputs for
different LLM nodes, prefer
StateKeyOneShotMessagesByNodeto avoid clobbering a shared global key. - If a single node prepares one-shot inputs for multiple LLM nodes, use
graph.SetOneShotMessagesByNode(...)to write them in one return value.
- When present, only the provided
-
UserInput (
StateKeyUserInput):- When non-empty, the LLM node uses durable
messagesplus this round's user input to call the model. After the call, it writes the user input and assistant reply tomessagesusingMessageOp(e.g.,AppendMessages,ReplaceLastUser) atomically, and clearsuser_inputto avoid repeated appends. - Use case: conversational flows where pre-nodes may adjust user input.
- When non-empty, the LLM node uses durable
- Messages only (just
StateKeyMessages):- Common in tool-call loops. After the first round via
user_input, routing to tools and back to LLM, sinceuser_inputis cleared, the LLM uses onlymessages(history). The tail is often atoolresponse, enabling the model to continue reasoning based on tool outputs.
- Common in tool-call loops. After the first round via
Atomic updates with Reducer and MessageOp
The Graph package supports MessageOp patch operations (e.g., ReplaceLastUser,
AppendMessages) on message state via MessageReducer to achieve atomic merges. Benefits:
- Pre-LLM nodes can modify
user_input. The LLM node returns a single state delta with the needed patch operations (replace last user message, append assistant message) for one-shot, race-free persistence. - Backwards compatible with appending
[]Message, while providing more expressive updates for complex cases.
Example: modify user_input in a pre-node before entering the LLM node.
Using RemoveAllMessages to Clear Message History
When chaining multiple LLM nodes, MessageReducer accumulates messages. If each
LLM node requires an isolated message context (without inheriting the previous
node's conversation history), use RemoveAllMessages to clear previous messages:
Use cases:
- Multiple independent LLM nodes within the same Graph, where each node doesn't need the previous node's conversation history
- Loop structures where each iteration requires a fresh message context
- Scenarios requiring complete message list reconstruction
Notes:
RemoveAllMessages{}is a specialMessageOpthatMessageReducerrecognizes and uses to clear the message list- Must set
RemoveAllMessages{}before settingStateKeyUserInput WithSubgraphIsolatedMessages(true)only works forAddSubgraphNode(agent nodes), not forAddLLMNode; useRemoveAllMessagesto isolate messages between LLM nodes- For agent nodes,
WithSubgraphIsolatedMessages(true)disables seeding session history into the sub‑agent’s request. This also hides tool call results, so it breaks multi‑turn tool calling. Use it only for single‑turn sub‑agents; otherwise, isolate via sub‑agent message filtering (see “Agent nodes: isolation vs multi‑turn tool calls”).
3. GraphAgent Configuration Options
GraphAgent supports various configuration options:
Model/tool callbacks are configured per node, e.g.
AddLLMNode(..., graph.WithModelCallbacks(...))orAddToolsNode(..., graph.WithToolCallbacks(...)).Callback Precedence: When both node-level and state-level callbacks are present: - Node-configured callbacks (via
WithModelCallbacks/WithToolCallbacks) take precedence. - State-level callbacks (viaStateKeyModelCallbacks/StateKeyToolCallbacks) are used as a fallback. This allows graph-level configuration to override runtime state when needed.
Session summary notes:
WithAddSessionSummary(true)takes effect only whenSession.Summariescontains a summary for the invocation’s filter key. Summaries are typically produced by SessionService + SessionSummarizer, and Runner will auto‑enqueue summarization after persisting events.- GraphAgent reads summaries only; it does not generate them. If you bypass Runner, call
sessionService.CreateSessionSummaryorEnqueueSummaryJobafter appending events. - Summary injection works only when
TimelineFilterModeisTimelineFilterAll.
Summary Format Customization
By default, session summaries are formatted with context tags and a note about preferring current conversation information. You can customize the summary format using WithSummaryFormatter to better match your specific use cases or model requirements.
Default Format:
Custom Format Example:
Use Cases:
- Simplified Format: Reduce token usage by using concise headings and minimal context notes
- Language Localization: Translate context notes to target language (e.g., Chinese, Japanese)
- Role-Specific Formatting: Different formats for different agent roles
- Model Optimization: Tailor format for specific model preferences
Important Notes:
- The formatter function receives raw summary text from the session and returns the formatted string
- Custom formatters should ensure that the summary is clearly distinguishable from other messages
- The default format is designed to be compatible with most models and use cases
- When
WithAddSessionSummary(false)is used, the formatter is never invoked
Concurrency considerations
When using Graph + GraphAgent in a concurrent environment (for example, serving many requests from a single long‑lived process), keep the following in mind:
- CheckpointSaver and Cache implementations must be concurrency‑safe
TheCheckpointSaverandCacheinterfaces are intentionally storage‑agnostic. A singleExecutor/GraphAgentinstance may call their methods from multiple goroutines when several invocations run in parallel. If you provide your own implementations, ensure:- All exported methods (
Get/GetTuple/List/Put/PutWrites/PutFull/DeleteLineageforCheckpointSaver, andGet/Set/ClearforCache) are safe for concurrent use. - Internal maps, connection pools, or in‑memory buffers are properly synchronized.
- All exported methods (
- NodeFunc, tools, and callbacks should treat state as per‑invocation, not global
Each node receives an isolated copy of graph state for the current task. This copy is safe to mutate inside the node, but it is not safe to:- Store references to that state (or its internal maps/slices) in global variables and modify them from other goroutines later.
- Access
StateKeyExecContext(*graph.ExecutionContext) and bypass its internal locks when reading/writingexecCtx.StateorexecCtx.pendingTasks. If you need shared mutable state across nodes or invocations, protect it with your own synchronization (for example,sync.Mutexorsync.RWMutex) or use external services (such as databases or caches).
- Do not share a single *agent.Invocation across goroutines
The framework expects eachRuncall (GraphAgent, Runner, or other Agent types) to operate on its own*agent.Invocation. Reusing the same*agent.Invocationinstance in multiple goroutines and callingRunconcurrently can cause data races on fields likeBranch,RunOptions, or callback state. Prefer:- Creating a fresh
*agent.Invocationper request, or - Cloning from a parent invocation using
invocation.Clone(...)when you need linkage.
- Creating a fresh
- Parallel tools require tool implementations to be safe for concurrent use
Tools in aToolsnode can be executed in parallel whenWithEnableParallelTools(true)is used on that node:- The framework guarantees that the
toolsmap is only read during execution. - It also guarantees that the shared graph state passed to tools is only read; updates are written back by nodes, not by tools.
However, each
tool.Toolimplementation and itstool.Callbacksmay be invoked from multiple goroutines at the same time. Make sure: - Tool implementations do not mutate shared global state without proper locking.
- Any internal caches, HTTP clients, or client pools inside tools are safe for concurrent use.
- The framework guarantees that the
These constraints are especially important in long‑running services where a single Graph/Executor/GraphAgent instance is reused for many invocations.
Once sub-agents are registered you can delegate within the graph via agent nodes:
The agent node uses its ID for the lookup, so keep
AddAgentNode("assistant")aligned withsubAgent.Info().Name == "assistant".
Agent nodes: passing data to the next agent
An agent node does not automatically "pipe" its output into the next agent node. Edges only control execution order; data flows through graph state.
By default, an agent node builds the child invocation message from
state[graph.StateKeyUserInput]. But user_input is one‑shot:
LLM/Agent nodes clear it after a successful run to avoid reusing the same input.
This is why a chain like A (agent) → B (agent) often looks like "A produced
output, but B got an empty input".
To make downstream agent nodes consume upstream outputs, explicitly map a state
field into user_input before the downstream agent runs:
Notes:
WithSubgraphInputFromLastResponse()maps the currentlast_responseinto this agent node'suser_inputfor this run, sonodeBconsumesnodeAas input.- If you need to pass a specific node's output (not the most recent), use a
pre‑node callback and read from
node_responses[targetNodeID], then write it intouser_input.
Agent nodes: combining original input with upstream output
Sometimes the downstream agent needs both:
- The upstream agent result (often
state[graph.StateKeyLastResponse]), and - The original user request for this run.
Because user_input is one‑shot and is cleared after LLM/Agent nodes, you
should persist the original user input under your own state key, then
compose the downstream user_input explicitly.
The simplest pattern is to add two function nodes:
- Capture the initial
user_inputonce. - Build the next
user_inputfromoriginal_user_input + last_response.
Important:
- Treat
graph.Statepassed into function nodes as read‑only. - Return a delta state update (a small
graph.State) instead of returning or mutating the full state. Returning full state can accidentally overwrite internal keys (execution context, callbacks, session) and break the workflow.
Agent nodes: state mappers (advanced)
Agent nodes support two mappers to control what data crosses the parent/child boundary:
WithSubgraphInputMapper: project parent state → child runtime state (Invocation.RunOptions.RuntimeState).WithSubgraphOutputMapper: project child results → parent state updates.
Use cases:
- Let the child read structured data from state (without stuffing it into
prompts): pass only selected keys to the child via
WithSubgraphInputMapper. Runnable example:examples/graph/subagent_runtime_state. - Copy structured outputs back to the parent graph: when the child is a
GraphAgent,
SubgraphResult.FinalStatecontains the child's final state snapshot and can be mapped into parent keys. Runnable example:examples/graph/agent_state_handoff. - Store the child LLM's final text under your own keys:
SubgraphResultalways includesLastResponse, so output mappers work for both GraphAgent and non-graph agents.
Agent nodes: isolation vs multi‑turn tool calls
Tool calling is usually multi‑turn within a single run: the model returns a
tool call, the framework executes the tool, then the next model request must
include the tool result (a role=tool message) so the model can continue.
WithSubgraphIsolatedMessages(true) is a strong isolation switch: it
prevents the sub‑agent from reading any session history when building the next
model request (internally it sets include_contents="none" for the sub‑agent).
This makes the sub‑agent “black‑box” (it only sees the current user_input),
but it also means the sub‑agent will not see tool results, so it cannot do
multi‑turn tool calling.
If a sub‑agent needs tools and needs to continue after tools return:
- Do not enable
WithSubgraphIsolatedMessages(true)on that agent node. - Instead, keep session seeding enabled and isolate the sub‑agent by filtering
its message history to only its own invocation. For
LLMAgent, use:llmagent.WithMessageFilterMode(llmagent.IsolatedInvocation).
Symptoms of the misconfiguration:
- The second model request looks the same as the first one (tool results never appear in the prompt).
- The agent repeats the first tool call or loops because it never “sees” the tool output.
Agent nodes: checkpoints and nested interrupts
If a sub-agent is a GraphAgent (a graph-based agent) and checkpointing is
enabled (via a CheckpointSaver), the child graph also needs its own checkpoint
namespace. Otherwise, the child graph can accidentally resume from a checkpoint
that belongs to the parent graph.
Default behavior for agent nodes that invoke a GraphAgent:
- The child GraphAgent runs under the child checkpoint namespace (the sub-agent name), even though the runtime state is cloned from the parent.
- The parent checkpoint identifier (ID) is not forwarded to the child unless you explicitly set it via a subgraph input mapper.
Nested Human-in-the-Loop (HITL) interrupt/resume:
- If the child GraphAgent calls
graph.Interrupt, the parent graph also interrupts and checkpoints. - Resume from the parent checkpoint as usual; the agent node resumes the child checkpoint automatically.
Runnable example: examples/graph/nested_interrupt.
4. Conditional Routing
4.1 Named Ends (Per‑node Ends)
When a node produces business outcomes (e.g., approve/reject/manual_review) and you want to route by those semantic labels, declare node‑local Named Ends (Ends).
Why this helps:
- Central, declarative mapping from labels to concrete targets at the node site.
- Compile‑time validation:
Compile()verifies every end target exists (or is the specialgraph.End). - Unified routing: reused by both
Command.GoToand conditional edges. - Decoupling: nodes express outcomes in business terms; the mapping ties outcomes to graph structure.
API:
Command‑style routing (Command.GoTo):
Conditional edges can reuse Ends: when AddConditionalEdges(from, condition, pathMap) receives a nil pathMap or no match is found, the executor tries the node's Ends; if still no match, the return string is treated as a concrete node ID.
Resolution precedence:
- Explicit mapping in the conditional edge's
pathMap. - The node's Ends mapping (label → concrete target).
- Treat the return string as a node ID.
Compile‑time checks:
WithEndsMap/WithEndstargets are validated inCompile().- Targets must exist in the graph or be the special constant
graph.End.
Notes:
- Use the constant
graph.Endto terminate; do not use the string "END". - With
Command.GoTo, you don't need to add a staticAddEdge(from, to)for the target; ensure the target exists and setSetFinishPoint(target)if it should end the graph.
Runnable example: examples/graph/multiends.
4.2 Multi‑conditional Fan‑out
Sometimes a single decision needs to spawn multiple branches in parallel for independent processing (e.g., route to both summarization and tagging).
API:
Notes:
- Like other routing, targets become runnable in the next BSP superstep (after the router node finishes). This can affect latency; see “BSP superstep barrier” below.
- Results are de‑duplicated before triggering; repeated keys do not trigger a target more than once in the same step.
- Resolution precedence for each branch key mirrors single‑conditional routing:
- explicit
pathMap; 2) node's Ends; 3) treat as node ID.
- explicit
- Visualization: when
pathMapis omitted, DOT falls back to the node's Ends mapping to render dashed conditional edges.
5. Tool Node Integration
Enable parallel tool execution for the Tools node (aligns with LLMAgent's option):
Tool-call pairing and second entry into LLM:
- Scan
messagesbackward from the tail to find the most recentassistant(tool_calls); stop atuserto ensure correct pairing. - When returning from tools to the LLM node, since
user_inputis cleared, the LLM follows the "Messages only" branch and continues based on the tool response in history.
Placeholder Variables in LLM Instructions
LLM nodes support placeholder injection in their instruction string (same rules as LLMAgent). Both native {key} and Mustache {{key}} syntaxes are accepted (Mustache is normalized to the native form automatically):
{key}/{{key}}→ Replaced with the string value corresponding to the keykeyin the session state (write viasess.SetState("key", ...)or SessionService).{key?}/{{key?}}→ optional; missing values become empty{user:subkey},{app:subkey},{temp:subkey}(and their Mustache forms) → access user/app/temp scopes (session services merge app/user state into session with these prefixes){invocation:subkey}/{{invocation:subkey}}→ replaced withinvocation.state["subkey"](set viainvocation.SetState("subkey", v))
Notes:
- GraphAgent writes the current
*session.Sessioninto graph state underStateKeySession; the LLM node reads values from there {invocation:*}values are read from the current*agent.Invocationfor this run- Unprefixed keys (e.g.,
research_topics) must be present directly insession.State
Example:
See the runnable example: examples/graph/placeholder.
Injecting retrieval output and user input
- Upstream nodes can place temporary values into the session's
temp:namespace so the LLM instruction can read them with placeholders. - Pattern:
Example: examples/graph/retrieval_placeholder.
Best practices for placeholders and session state
- Session-scoped vs persistent: write temporary values used to build
prompts to
temp:*on session state (often overwritten each turn viasess.SetState). Persistent configuration should go throughSessionServicewithuser:*/app:*. - Why
SetStateis recommended: LLM nodes expand placeholders from the session object present in graph state; usingsess.SetStateavoids unsafe concurrent map access. - Service guardrails: the in‑memory service intentionally disallows writing
temp:*(andapp:*via user updater); see session/inmemory/service.go. - Concurrency: when multiple branches run in parallel, avoid multiple nodes mutating the same
session.Statekeys. Prefer composing in a single node before the LLM, or store intermediate values in graph state then write once totemp:*. - Observability: if you want parts of the prompt to appear in completion events, also store a compact summary in graph state (e.g., under
metadata). The final event serializes non‑internal final state; see graph/events.go.
6. Node Retry & Backoff
Configure per‑node retry with exponential backoff and optional jitter. Failed attempts do not produce writes; only a successful attempt applies its state delta and routing.
- Per‑node policy via
WithRetryPolicy:
- Default policy via Executor (applies when a node has none):
Notes
- Interrupts are never retried.
- Backoff delay is clamped by the current step deadline when set (
WithStepTimeout). - Events carry retry metadata so UIs/CLIs can display progress:
Example: examples/graph/retry shows an unstable node that retries before a final LLM answer.
7. Runner Configuration
Runner provides session management and execution environment:
8. Message State Schema
For conversational applications, you can use predefined message state schema:
9. State Key Usage Scenarios
User-defined State Keys: Used to store business logic data.
Built-in State Keys: Used for system integration.
Advanced Features
1. Interrupt and Resume (Human-in-the-Loop)
The Graph package supports human-in-the-loop (HITL) workflows through interrupt and resume functionality. This enables workflows to pause execution, wait for human input or approval, and then resume from the exact point where they were interrupted.
Basic Usage
Turn the diagram into a runnable workflow:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | |
The example shows how to declare nodes, connect edges, and run. Next, we'll cover execution with GraphAgent + Runner, then core concepts and common practices.
2. Static Interrupts (Debug Breakpoints)
Static interrupts are "breakpoints" that pause the graph before or
after specific nodes execute. They are mainly used for debugging, and they
do not require you to call graph.Interrupt(...) inside node logic.
Key differences from HITL interrupts:
- HITL interrupt: a node calls
graph.Interrupt(ctx, state, key, prompt). Resuming requires providing a resume input for thatkey. - Static interrupt: you attach interrupt options when declaring nodes.
Resuming only requires the checkpoint coordinates (
lineage_id+checkpoint_id).
Enable static interrupts:
When a static interrupt is triggered, the executor raises an
*graph.InterruptError with:
Keyprefixed bygraph.StaticInterruptKeyPrefixBeforeorgraph.StaticInterruptKeyPrefixAfterValueset tograph.StaticInterruptPayload(phase,nodes,activeNodes)
Resuming:
- Run again with the same
lineage_idand thecheckpoint_idreturned by the interrupt event. - No resume input is required because no node called
graph.Interrupt(...).
See examples/graph/static_interrupt for an end-to-end runnable demo.
Execution
- Wrap the compiled graph with
graphagent.New(as a genericagent.Agent) and hand it torunner.Runnerto manage sessions and streaming events.
Minimal GraphAgent + Runner:
Session backends:
- In-memory:
session/inmemory(used by examples) - Redis:
session/redis(more common in production)
GraphAgent Options
Core Concepts
State Management
GraphAgent uses a Schema + Reducer model to manage state. You first define the state shape and merge rules; later nodes have clear expectations about the origin and lifecycle of keys they read/write.
Built‑in Schema
Custom Schema
Reducers ensure fields are merged safely per predefined rules, which is critical under concurrent execution.
Tip: define constants for business keys to avoid scattered magic strings.
Node Types
GraphAgent provides four built‑in node types:
Function Node
The most basic node, for custom logic:
LLM Node
Integrates an LLM and auto‑manages conversation history:
Node Cache
Enable caching for pure function-like nodes to avoid repeated computation.
- Graph-level settings:
WithCache(cache Cache)sets the cache backend (an in-memory implementation is provided for testing)WithCachePolicy(policy *CachePolicy)sets the default cache policy (key function + Time To Live, TTL)
- Node-level override:
WithNodeCachePolicy(policy *CachePolicy) - Clear by nodes:
ClearCache(nodes ...string)
References:
- Graph accessors and setters: graph/graph.go
- Defaults and in-memory backend:
- Interface/policy + canonical JSON + SHA‑256: graph/cache.go
- In-memory cache with read-write lock and deep copy: graph/cache.go
- Executor:
- Try Get before executing a node; on hit, skip the node function and only run callbacks + writes: graph/executor.go
- Persist Set after successful execution: graph/executor.go
- Attach
_cache_hitflag on node.complete events: graph/executor.go
Minimal usage:
Advanced usage:
- Field-based keys (recommended)
- Custom selector (for complex projections)
- Versioned namespace (avoid stale cache across versions)
- Per-node TTL (Time To Live)
- Clear cache (per node)
- Read cache-hit marker (
_cache_hit)
Advanced usage:
- Field-based keys (recommended): declare
WithCacheKeyFields("n", "user_id")on a node; internally this maps the sanitized input to{n, user_id}before default canonicalization and hashing. - Custom selector:
WithCacheKeySelector(func(m map[string]any) any { return map[string]any{"n": m["n"], "uid": m["uid"]} }) - Versioned namespace:
WithGraphVersion("v2025.03")expands the namespace to__writes__:<version>:<node>, reducing stale cache collisions across code changes.
Notes:
- Prefer caching only pure functions (no side effects)
- TTL=0 means no expiration; consider a persistent backend (Redis/SQLite) in production
- Key function sanitizes input to avoid volatile/non-serializable fields being part of the key: graph/cache_key.go
- Call
ClearCache("nodeID")after code changes or include a function identifier/version in the key
Runner + GraphAgent usage example:
Example:
- Interactive + Runner + GraphAgent: examples/graph/nodecache/main.go
Tools Node
Executes tool calls in sequence:
Reading Tool Results into State
Tools nodes already expose their outputs via:
graph.StateKeyLastToolResponse: JSON string of the last tool output in this node rungraph.StateKeyNodeResponses[<tools_node_id>]: JSON array string of all tool outputs from this node run
After a tools node, add a function node to collect tool outputs from graph.StateKeyMessages and write a structured result into state:
Reference example: examples/graph/io_conventions_tools.
Edges and Routing
Edges define control flow between nodes:
Join edges (wait-all fan-in)
When multiple upstream branches run in parallel, a normal AddEdge(from, to)
triggers to whenever any upstream node updates its edge channel. That can
make to execute multiple times.
If you need classic “wait for all branches, then run once” fan-in semantics,
use AddJoinEdge:
AddJoinEdge creates an internal barrier channel and triggers nodeJoin only
after every from node has reported completion. The barrier resets after it
triggers, so the same join can be reached again in loops.
Reference example: examples/graph/join_edge.
BSP superstep barrier (why downstream waits)
Graph executes workflows in BSP supersteps (Planning → Execution → Update). In each superstep:
- Planning computes the runnable frontier from channels updated in the previous superstep.
- Execution runs all runnable nodes concurrently (up to
WithMaxConcurrency). - Update merges state updates and applies routing signals (channel writes).
The next superstep starts only after all nodes in the current superstep finish. Because routing signals are applied in Update, a node triggered by an upstream completion always becomes runnable in the next superstep.
This can look like “depth‑K nodes wait for depth‑(K‑1) nodes”, even across independent branches.
Example graph:
flowchart LR
S[split] --> B[branch_b]
B --> C[branch_b_next]
S --> E[branch_e]
S --> F[branch_f]
Runtime behavior:
- Superstep 0:
split - Superstep 1:
branch_b,branch_e,branch_frun in parallel - Superstep 2:
branch_b_nextruns (even though it only depends onbranch_b)
Practical tips:
- Reduce supersteps: if
X → Yis always sequential, consider collapsing it into one node so you don’t pay an extra superstep. - Avoid extra “prepare” nodes when you only need to enrich a single node’s
input: move the preparation into the node, or use
graph.WithPreNodeCallbackon that node. - Use stable per-branch outputs in parallel flows: avoid reading a specific
branch from
last_response; usenode_responses[nodeID](or dedicated state keys) so fan-in logic does not depend on scheduling. - Choose the right fan-in:
- Use
AddEdge(from, to)whentoshould react to incremental updates (it may run multiple times). - Use
AddJoinEdge([...], to)when you need “wait for all, then run once”.
- Use
Tip: setting entry and finish points implicitly connects to virtual Start/End nodes:
SetEntryPoint("first")is equivalent to Start → first.SetFinishPoint("last")is equivalent to last → End. There's no need to add these two edges explicitly.
Constants: graph.Start == "__start__", graph.End == "__end__".
Message Visibility Options
The Agent can dynamically manage the visibility of messages generated by other Agents and historical session messages based on different scenarios. This is configurable through relevant options. When interacting with the model, only the visible content is passed as input.
TIPS:
- Messages from different sessionIDs are never visible to each other under any circumstances. The following control strategies only apply to messages sharing the same sessionID.
- Invocation.Message always visible regardless of the configuration.
- The related configuration only controls the initial value of State[graph.StateKeyMessages].
- The messages generated by the Agent node have a filterKey corresponding to the subAgent name. As a result, when using IsolatedRequest or IsolatedInvocation for filtering, these messages are not visible to the current GraphAgent.
- When the option is not configured, the default value is FullContext.
Config:
- graphagent.WithMessageFilterMode(MessageFilterMode):
- FullContext: Includes historical messages and messages generated in the current request, filtered by prefix matching with the filterKey.
- RequestContext: Only includes messages generated in the current request, filtered by prefix matching with the filterKey.
- IsolatedRequest: Only includes messages generated in the current request, filtered by exact matching with the filterKey.
- IsolatedInvocation: Only includes messages generated in the current Invocation context, filtered by exact matching with the filterKey.
Recommended Usage Examples (These examples are simplified configurations based on advanced usage):
Example 1: Message Visibility Control for graphAgent
Example 2: Message Visibility Control for LLM Agent node
Advanced Usage Examples:
You can independently control the visibility of historical messages and messages generated by other Agents for the current agent using WithMessageTimelineFilterModeand WithMessageBranchFilterMode.
When the current agent interacts with the model, only messages satisfying both conditions are input to the model. (invocation.Messageis always visible in any scenario.)
- WithMessageTimelineFilterMode: Controls visibility from a temporal dimension
- TimelineFilterAll: Includes historical messages and messages generated in the current request.
- TimelineFilterCurrentRequest: Only includes messages generated in the current request (one runner.Runcounts as one request).
- TimelineFilterCurrentInvocation: Only includes messages generated in the current invocation context.
- WithMessageBranchFilterMode: Controls visibility from a branch dimension (used to manage visibility of messages generated by other agents).
- BranchFilterModePrefix: Uses prefix matching between Event.FilterKeyand Invocation.eventFilterKey.
- BranchFilterModeAll: Includes messages from all agents.
- BranchFilterModeExact: Only includes messages generated by the current agent.
Command Mode (Dynamic Routing / Fan‑out)
Nodes can return graph.State, or *graph.Command / []*graph.Command to update state and direct the next hop:
When using command‑based routing, you don't need static edges to GoTo targets; just ensure the target nodes exist and call SetFinishPoint where appropriate.
Architecture
Overall Architecture
GraphAgent's architecture manages complexity via clear layering. Each layer has a well‑defined responsibility and communicates through standard interfaces.
flowchart TB
subgraph "Runner Layer"
R[Runner]:::runnerClass
S[Session Service]:::sessionClass
end
subgraph "GraphAgent"
GA[GraphAgent Wrapper]:::agentClass
CB[Callbacks]:::callbackClass
end
subgraph "Graph Engine"
SG[StateGraph Builder]:::builderClass
G[Graph]:::graphClass
E[Executor]:::executorClass
end
subgraph "Execution Components"
P[Planning]:::phaseClass
EX[Execution]:::phaseClass
U[Update]:::phaseClass
end
subgraph "Storage"
CP[Checkpoint]:::storageClass
ST[State Store]:::storageClass
end
R --> GA
GA --> G
G --> E
E --> P
E --> EX
E --> U
E --> CP
classDef runnerClass fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef sessionClass fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef agentClass fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef callbackClass fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef builderClass fill:#fff8e1,stroke:#f57c00,stroke-width:2px
classDef graphClass fill:#f1f8e9,stroke:#689f38,stroke-width:2px
classDef executorClass fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phaseClass fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef storageClass fill:#efebe9,stroke:#5d4037,stroke-width:2px
Core Modules
Overview of core components:
graph/state_graph.go — StateGraph builder
Provides a fluent, declarative Go API to build graphs via method chaining (AddNode → AddEdge → Compile) covering nodes, edges, and conditional routing.
graph/graph.go — Compiled runtime
Implements channel‑based, event‑triggered execution. Node results merge into State; channels are used to drive routing and carry sentinel values (not business data).
graph/executor.go — BSP executor
Heart of the system, inspired by Google's Pregel. Implements BSP (Bulk Synchronous Parallel) supersteps: Planning → Execution → Update.
graph/checkpoint/* — Checkpoints and recovery
Optional checkpoint persistence (e.g., sqlite). Atomically saves state and pending writes; supports lineage/checkpoint‑based recovery.
agent/graphagent/graph_agent.go — Bridge between Graph and Agent
Adapts a compiled Graph into a generic Agent, reusing sessions, callbacks, and streaming.
Execution Model
GraphAgent adapts Pregel's BSP (Bulk Synchronous Parallel) to a single‑process runtime and adds checkpoints, HITL interrupts/resumes, and time travel:
sequenceDiagram
autonumber
participant R as Runner
participant GA as GraphAgent
participant EX as Executor
participant CK as Checkpoint Saver
participant DB as Storage
participant H as Human
R->>GA: Run(invocation)
GA->>EX: Execute(graph, state, options)
GA-->>R: Stream node/tool/model events
loop Each superstep (BSP)
EX->>EX: Planning — compute frontier
par Parallel node execution
EX->>EX: Run node i (shallow state copy)
EX-->>GA: node-start event (author=nodeID)
and
EX->>EX: Run node j (shallow state copy)
EX-->>GA: node-start event
end
alt Node triggers Interrupt(key,prompt)
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen,reason=interrupt)
CK->>DB: atomic commit
EX-->>GA: interrupt event(checkpoint_id,prompt)
GA-->>R: propagate + pause
R->>H: ask for input/approval
H-->>R: provide decision/value
R->>GA: Run(resume) runtime_state{
R->>GA: checkpoint_id,resume_map}
GA->>EX: ResumeFromCheckpoint(checkpoint_id,resume_map)
EX->>CK: Load checkpoint
CK->>EX: state/frontier/pending_writes/versions_seen
EX->>EX: rebuild frontier and apply resume values
else Normal
EX-->>GA: node-complete events (incl. tool/model)
EX->>EX: Update — merge via reducers
EX->>CK: Save checkpoint(state,frontier,
EX->>CK: pending_writes,versions_seen)
CK->>DB: atomic commit
end
end
Note over EX,CK: versions_seen prevents re-execution
Note over EX,CK: pending_writes rebuilds channels
Note over EX,CK: parent_id forms lineage for time travel
opt Time travel (rewind/branch)
R->>GA: Run(runtime_state{checkpoint_id})
GA->>EX: ResumeFromCheckpoint(checkpoint_id)
EX->>CK: Load checkpoint + lineage
CK->>EX: Restore state; may create new lineage_id
end
EX-->>GA: done event (last_response)
GA-->>R: final output
flowchart TB
%% Execution panorama (compact wiring)
subgraph Client
R[Runner]:::runner --> GA[GraphAgent]:::agent
end
subgraph Engine[Graph Engine]
GA --> EX[Executor]:::executor
subgraph BSP["BSP Superstep"]
P[Planning]:::phase --> X[Execution]:::phase --> U[Update]:::phase
end
end
N[Nodes: LLM / Tools / Function / Agent]:::process
CK[(Checkpoint)]:::storage
H[Human]:::human
EX --> BSP
EX --> N
EX -.-> CK
GA <--> H
GA --> R
classDef runner fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef agent fill:#e3f2fd,stroke:#1976d2,stroke-width:2px
classDef executor fill:#e0f2f1,stroke:#00796b,stroke-width:2px
classDef phase fill:#ede7f6,stroke:#512da8,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
classDef storage fill:#efebe9,stroke:#6d4c41,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
Key points:
- Planning: determine runnable nodes from the channel frontier.
- Execution: each node gets a shallow state copy (maps.Copy) and runs in parallel.
- Update: reducers merge updates safely for concurrency.
This design enables per‑step observability and safe interruption/recovery.
Runtime Isolation and Event Snapshots
- The Executor is reusable and concurrency‑safe. Per‑run state lives in
ExecutionContext(channel versions, pending writes, last checkpoint, etc.). - Each event's
StateDeltais a deep‑copy snapshot containing only serializable and allowed keys; internal keys (execution context, callbacks, etc.) are filtered out for external telemetry and persistence.
Executor Configuration
Defaults and Notes
-
Defaults (Executor)
ChannelBufferSize = 256,MaxSteps = 100,MaxConcurrency = GOMAXPROCS(0),CheckpointSaveTimeout = 10s- Per‑step/node timeouts are available on
ExecutorviaWithStepTimeout/WithNodeTimeout(not exposed byGraphAgentoptions yet)
- Sessions
- Prefer Redis session backend in production; set TTLs and cleanup
- Runner seeds multi‑turn
graph.StateKeyMessagesfrom session events automatically
-
Checkpoints
- Use stable
namespacenames (e.g.,svc:prod:flowX); audit and clean up by lineage viaCheckpointManager
- Use stable
-
Events/backpressure
- Tune
WithChannelBufferSize; cap task parallelism viaWithMaxConcurrency - Filter events by
author/objectto reduce noise
- Tune
-
Naming and keys
- Use constants for node IDs, route labels, and state keys; define reducers for non‑trivial merges
- Governance
- Insert HITL on critical paths; prefer storing sensitive details under
graph.StateKeyMetadatarather thangraph.StateKeyMessages
Integrating with Multi‑Agent Systems
GraphAgent is designed to be part of the tRPC‑Agent‑Go multi‑agent ecosystem, not an island. It implements the standard Agent interface and collaborates with other agent types.
GraphAgent as an Agent
GraphAgent implements the standard Agent interface:
Advanced Orchestration
End‑to‑end business flow: entry normalization → smart routing → multiple pods (Email, Weather, Research) → parallel fan‑out/aggregation → final composition and publish.
flowchart LR
%% Layout
subgraph UE["User & Entry"]
U((User)):::human --> IN["entry<br/>normalize"]:::process
end
subgraph FAB["Graph Orchestration"]
Rtr["where_to_go<br/>router"]:::router
Compose["compose<br/>LLM"]:::llm
end
IN --> Rtr
%% Email Agent (expanded)
subgraph EC["Email Agent"]
direction LR
CE["classifier<br/>LLM"]:::llm --> WE["writer<br/>LLM"]:::llm
end
%% Weather Agent (expanded)
subgraph WA["Weather Agent"]
direction LR
LE["locate<br/>LLM"]:::llm --> WT["weather tool"]:::tool
end
%% Routing from router to pods
Rtr -- email --> CE
Rtr -- weather --> LE
Rtr -- other --> REPLY["reply<br/>LLM"]:::llm
%% Fanout Pipeline (fanout → workers → aggregate)
subgraph FP["Fanout Pipeline"]
direction LR
Fan["plan_fanout"]:::process --> W1["worker A"]:::process
Fan --> W2["worker B"]:::process
Fan --> W3["worker C"]:::process
W1 --> Agg["aggregate"]:::process
W2 --> Agg
W3 --> Agg
end
Rtr -- research --> Fan
%% Human-in-the-loop (optional)
Compose -. review .- HG["human<br/>review"]:::human
%% Compose final (minimal wiring)
Agg --> Compose
WE --> Compose
WT --> Compose
REPLY --> Compose
Compose --> END([END]):::terminal
%% Styles
classDef router fill:#fff7e0,stroke:#f5a623,stroke-width:2px
classDef llm fill:#e3f2fd,stroke:#1e88e5,stroke-width:2px
classDef tool fill:#fff3e0,stroke:#fb8c00,stroke-width:2px
classDef process fill:#f3e5f5,stroke:#8e24aa,stroke-width:2px
classDef human fill:#e8f5e9,stroke:#43a047,stroke-width:2px
classDef terminal fill:#ffebee,stroke:#e53935,stroke-width:2px
Highlights:
where_to_gocan be LLM‑decided or function‑driven (conditional edges).- Fanout Pipeline uses Command GoTo at runtime, then aggregates.
- Optional human review follows aggregation to gate critical output.
- Single checkpoint display at Compose balances clarity and recoverability.
Embedding Agents in a Graph
Inside a graph, you can call existing sub‑agents as nodes. The example below shows how to create sub‑agents, declare the corresponding nodes, and inject them when constructing the GraphAgent.
Passing only results: map last_response to downstream user_input
Scenario: A → B → C as black boxes. Downstream should only consume upstream's result text as this turn's input, without pulling full session history.
- Approach 1 (dependency‑free, universally available): add a pre‑node callback to the target Agent node that assigns parent
last_responsetouser_input. Optionally isolate messages.
- Approach 2 (enhanced option, more concise):
Notes: Both approaches ensure B only sees A's result, and C only sees B's. The option is more concise when available; the callback is zero‑dependency and works everywhere.
Hybrid Pattern Example
Embed dynamic decision‑making within a structured flow:
Core Mechanics in Depth
State Management: Schema + Reducer
State is a central challenge in graph workflows. We designed a Schema + Reducer mechanism that provides type safety and supports high‑concurrency atomic updates.
flowchart LR
subgraph "State Schema"
MS[messages: MessageList]:::schemaClass
UI[user_input: string]:::schemaClass
LR[last_response: string]:::schemaClass
NR[node_responses: Map]:::schemaClass
end
subgraph "Reducers"
R1[MessageReducer + MessageOp]:::reducerClass
R2[MergeReducer (Map)]:::reducerClass
R3[ReplaceReducer (String)]:::reducerClass
end
subgraph "Node Outputs"
N1[Node 1 Output]:::nodeOutputClass
N2[Node 2 Output]:::nodeOutputClass
N3[Node 3 Output]:::nodeOutputClass
end
N1 --> R1
N2 --> R2
N3 --> R3
R1 --> MS
R2 --> NR
R3 --> LR
classDef schemaClass fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
classDef reducerClass fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef nodeOutputClass fill:#fff8e1,stroke:#f57f17,stroke-width:2px
Graph state is a map[string]any with runtime validation provided by StateSchema. The reducer mechanism ensures safe merging and avoids conflicts under concurrent updates.
Common State Keys
- User‑visible:
graph.StateKeyUserInput,graph.StateKeyOneShotMessages,graph.StateKeyOneShotMessagesByNode,graph.StateKeyMessages,graph.StateKeyLastResponse,graph.StateKeyLastToolResponse,graph.StateKeyNodeResponses,graph.StateKeyMetadata - Internal:
session,exec_context,tool_callbacks,model_callbacks,agent_callbacks,current_node_id,parent_agent - Command/Resume:
__command__,__resume_map__
Constants live in graph/state.go and graph/keys.go. Prefer referencing constants over hard‑coding strings.
Node‑level Callbacks, Tools & Generation Parameters
Per‑node options (see graph/state_graph.go):
graph.WithPreNodeCallback/graph.WithPostNodeCallback/graph.WithNodeErrorCallback- LLM nodes:
graph.WithGenerationConfig,graph.WithModelCallbacks - Tooling:
graph.WithToolCallbacks,graph.WithToolSets(supply ToolSets in addition totools []tool.Tool),graph.WithRefreshToolSetsOnRun(rebuild tools from ToolSets on each run for dynamic sources such as MCP) - Agent nodes:
graph.WithAgentNodeEventCallback
Invocation‑level Call Options (per‑run overrides)
graph.WithGenerationConfig(...) is a compile‑time configuration: you set
it when building the graph. In real services you often want runtime control:
use the same graph, but override sampling parameters for this request.
Graph supports this with call options, which are attached to
Invocation.RunOptions and automatically propagated into nested GraphAgent
subgraphs.
Common use cases:
- One request needs higher
temperature, another needs lower. - The same graph has multiple LLM nodes, and you want different parameters per node.
- A parent graph calls a subgraph (Agent node), and you want overrides only inside that subgraph.
API:
graph.WithCallOptions(...)attaches call options to this run.graph.WithCallGenerationConfigPatch(...)overridesmodel.GenerationConfigfields for LLM nodes in the current graph scope.graph.DesignateNode(nodeID, ...)targets a specific node in the current graph.- For LLM nodes: affects that node's model call.
- For Agent nodes (subgraphs): affects the child invocation, so it becomes the default for the nested graph.
graph.DesignateNodeWithPath(graph.NodePath{...}, ...)targets a node inside nested subgraphs (path segments are node IDs).
Patch format:
- Use
model.GenerationConfigPatchand set only the fields you want to override. - Pointer fields use
nilfor "do not override", so you typically create pointers using helpers likemodel.Float64Ptr,model.IntPtr, etc. Stop:nilmeans "do not override"; an empty slice clears stop sequences.
Example:
See examples/graph/call_options_generation_config for a runnable demo.
ToolSets in Graphs vs Agents
graph.WithToolSets is a per‑node, compile‑time configuration. It attaches one or more tool.ToolSet instances to a specific LLM node when you build the graph:
Key points:
- Graph structure (including node ToolSets) is immutable after
Compile(). Changing ToolSets requires rebuilding the graph or providing a newGraphAgent. - Runtime‑level ToolSet changes should be handled at the Agent level (for example,
llmagent.AddToolSet,llmagent.RemoveToolSet,llmagent.SetToolSets) or by swapping the underlying Agent used by a graph Agent node.
Additionally, graph.WithName/graph.WithDescription add friendly labels; graph.WithDestinations declares potential dynamic destinations (for static checks/visualization only).
LLM Input Rules: Three‑Stage Design
The LLM input pipeline looks simple but solves common context‑management problems in AI apps.
Built‑in selection logic (no extra config):
- Prefer
graph.StateKeyOneShotMessages: fully override inputs (system/user) for this turn; cleared after execution. - Else use
graph.StateKeyUserInput: append this turn's user tograph.StateKeyMessages, then atomically write back user+assistant; finally cleargraph.StateKeyUserInput. - Else use
graph.StateKeyMessagesonly: common on tool loops re‑entering LLM (sincegraph.StateKeyUserInputhas been cleared).
The benefit: preprocess nodes can rewrite graph.StateKeyUserInput and take effect in the same turn, while seamlessly integrating with the tool loop (tool_calls → tools → LLM).
Examples (showing the three paths):
Instruction Placeholder Injection
AddLLMNode's instruction supports placeholders, same syntax as llmagent:
{key}/{key?}: read fromsession.State; optional?yields empty when missing.{user:subkey},{app:subkey},{temp:subkey}: read by namespace.
GraphAgent stores the current *session.Session into state (graph.StateKeySession) and expands placeholders before the LLM call.
Tip: GraphAgent seeds graph.StateKeyMessages from prior session events for multi‑turn continuity. When resuming from a checkpoint, a plain "resume" message is not injected as graph.StateKeyUserInput, preserving the recovered state.
Concurrency and State Safety
When a node has multiple outgoing edges, parallel execution is triggered automatically:
Internally, the executor constructs shallow copies (maps.Copy) per task and merges under a lock, with reducers ensuring safe concurrent updates.
Node I/O Conventions
Nodes communicate only via the shared State. Each node returns a state delta that merges via the Schema's reducers.
-
Function nodes
- Input: full
State(read keys declared in your schema) - Output: write business keys only (e.g.,
{"parsed_time":"..."}); avoid internal keys
- Input: full
-
LLM nodes
- Input priority:
graph.StateKeyOneShotMessagesByNode[<node_id>]→graph.StateKeyOneShotMessages→graph.StateKeyUserInput→graph.StateKeyMessages - Output: append to
graph.StateKeyMessagesatomically, setgraph.StateKeyLastResponse, setgraph.StateKeyNodeResponses[<llm_node_id>]
- Input priority:
-
Tools nodes
- Read the latest assistant message with
tool_callsfor the current round and append tool responses tograph.StateKeyMessages - Output: set
graph.StateKeyLastToolResponse, setgraph.StateKeyNodeResponses[<tools_node_id>](JSON array string) - Multiple tools execute in the order returned by the LLM
- Read the latest assistant message with
- Agent nodes
- Receive graph
StateviaInvocation.RunOptions.RuntimeState - Output: set
graph.StateKeyLastResponseandgraph.StateKeyNodeResponses[<agent_node_id>];graph.StateKeyUserInputis cleared after execution
- Receive graph
Good practice:
- Sequential reads: consume the immediate upstream text from
graph.StateKeyLastResponse. - Parallel/merge reads: read specific node outputs from
graph.StateKeyNodeResponses[<nodeID>]. - Declare business keys in your schema with suitable reducers to avoid data races.
API Cheat Sheet
-
Build graph
graph.NewStateGraph(schema)→ builderAddNode(id, func, ...opts)/AddLLMNode(id, model, instruction, tools, ...opts)AddToolsNode(id, tools, ...opts)/AddAgentNode(id, ...opts)AddEdge(from, to)/AddConditionalEdges(from, condition, pathMap)AddToolsConditionalEdges(llmNode, toolsNode, fallback)SetEntryPoint(nodeID)/SetFinishPoint(nodeID)/Compile()
-
State keys (user‑visible)
graph.StateKeyUserInput,graph.StateKeyOneShotMessages,graph.StateKeyMessages,graph.StateKeyLastResponse,graph.StateKeyNodeResponses,graph.StateKeyMetadata
-
Per‑node options
- LLM/tools:
graph.WithGenerationConfig,graph.WithModelCallbacksgraph.WithToolCallbacks,graph.WithToolSets
- Callbacks:
graph.WithPreNodeCallback,graph.WithPostNodeCallback,graph.WithNodeErrorCallback
- LLM/tools:
- Execution
graphagent.New(name, compiledGraph, ...opts)→runner.NewRunner(app, agent)→Run(...)
See examples under examples/graph for end‑to‑end patterns (basic/parallel/multi‑turn/interrupts/nested_interrupt/static_interrupt/tools/placeholder).
Visualization (DOT/Image)
Graph can export a Graphviz DOT (Directed Graph Language) description and render images via the dot (Graph Visualization layout engine) executable.
WithDestinationsdraws dotted gray edges for declared dynamic routes (visualization + static checks only; it does not affect runtime).- Conditional edges render as dashed gray edges with branch labels.
- Regular edges render as solid lines.
- Virtual
Start/Endnodes can be shown or hidden via an option.
Example:
API reference:
g.DOT(...)/g.WriteDOT(w, ...)on a compiled*graph.Graphg.RenderImage(ctx, format, outputPath, ...)(e.g.,png/svg)- Options:
WithRankDir(graph.RankDirLR|graph.RankDirTB),WithIncludeDestinations(bool),WithIncludeStartEnd(bool),WithGraphLabel(string)
Full example: examples/graph/visualization
Advanced Features
Checkpoints and Recovery
To support time‑travel and reliable recovery, configure a checkpoint saver on the Executor or GraphAgent. Below uses the SQLite saver to persist checkpoints and resume from a specific checkpoint.
Checkpoint Management
Use the manager to list, query, and delete checkpoints:
Use a stable business identifier for namespace in production (e.g., svc:prod:flowX) for clear auditing.
Time Travel: Read / Edit State
Resuming from a checkpoint gives you "time travel" (rewind to any checkpoint and continue). For HITL and debugging, you often want one extra step: edit the state at a checkpoint and keep running from there.
Important detail: on resume, the executor restores state from the checkpoint first. runtime_state does not override existing checkpoint keys; it only fills missing non-internal keys. If you need to change an existing key, you must write a new checkpoint.
Use graph.TimeTravel:
Notes:
EditStatewrites a new checkpoint withSource="update"andParentCheckpointID=base.- Internal keys are blocked by default; use
graph.WithAllowInternalKeys()only if you know what you're doing. - Updated checkpoints include metadata keys:
graph.CheckpointMetaKeyBaseCheckpointIDandgraph.CheckpointMetaKeyUpdatedKeys.
Runnable example: examples/graph/time_travel_edit_state.
Events at a Glance
-
Authors
- Node-level: node ID (fallback
graph.AuthorGraphNode) - Pregel phases:
graph.AuthorGraphPregel - Executor/system:
graph.AuthorGraphExecutor - User input:
user(no exported constant)
- Node-level: node ID (fallback
- Object types (subset)
- Node:
graph.ObjectTypeGraphNodeStart | graph.ObjectTypeGraphNodeComplete | graph.ObjectTypeGraphNodeError - Pregel:
graph.ObjectTypeGraphPregelPlanning | graph.ObjectTypeGraphPregelExecution | graph.ObjectTypeGraphPregelUpdate - Channel/state:
graph.ObjectTypeGraphChannelUpdate/graph.ObjectTypeGraphStateUpdate - Checkpoints:
graph.ObjectTypeGraphCheckpoint,graph.ObjectTypeGraphCheckpointCreated,graph.ObjectTypeGraphCheckpointCommitted,graph.ObjectTypeGraphCheckpointInterrupt
- Node:
See "Event Monitoring" for a full streaming example and metadata parsing.
Human‑in‑the‑Loop
Introduce human confirmation on critical paths. The example shows a basic interrupt → resume flow:
Nested graphs (child GraphAgent interrupts)
If your parent graph delegates to a child GraphAgent via an agent node
(AddAgentNode / AddSubgraphNode), the child graph can interrupt with
graph.Interrupt and the parent graph will also interrupt.
Resume from the parent checkpoint the same way as a non-nested interrupt. When the agent node runs again, it will resume the child checkpoint automatically.
Runnable example: examples/graph/nested_interrupt.
It supports multi-level nesting via the -depth flag.
Key idea: graph.Interrupt(ctx, state, key, prompt) uses key as the routing
key for ResumeMap. When you resume, the map key must match that key.
You will see two different identifiers:
- Node Identifier (Node ID): where the current graph paused (in nested graphs, this is often the parent agent node).
- Task Identifier (Task ID): the interrupt key used for
ResumeMaprouting. Forgraph.Interrupt, Task ID equals thekeyargument.
To resume without hard-coding the key, read the Task ID from the interrupted
checkpoint and use it as the ResumeMap key:
This works the same for multi-level nesting: resume from the parent checkpoint and the framework will resume each child checkpoint automatically.
Helpers:
You can also inject resume values at entry via a command (no need to jump to a specific node first). Pass it via Runner runtime state:
Event Monitoring
The event stream carries execution progress and incremental outputs. The example shows how to iterate events and distinguish graph events vs model deltas:
You can also filter by the event's Author field:
- Node‑level events (model, tools, node start/stop):
Author = <nodeID>(orgraph-nodeif unavailable) - Pregel (planning/execution/update/errors):
Author = graph.AuthorGraphPregel - Executor‑level (state updates/checkpoints):
Author = graph.AuthorGraphExecutor - User input (Runner writes):
Author = user
This convention lets you subscribe to a specific node's stream without passing streaming context through nodes (streaming travels via the event channel; state stays structured in a LangGraph‑like style).
Example: consume only node ask's streaming output and print the final message when done.
StreamMode
Runner can filter the event stream before it reaches your application code. This is useful when you only want a subset of events (for example, only model tokens for user interface streaming).
Use agent.WithStreamMode(...):
Supported modes (graph workflows):
messages: model output events (for example,chat.completion.chunk)updates:graph.state.update/graph.channel.update/graph.executioncheckpoints:graph.checkpoint.*tasks: task lifecycle events (graph.node.*,graph.pregel.*)debug: same ascheckpoints+taskscustom: node-emitted events (graph.node.custom)
Notes:
- When
agent.StreamModeMessagesis selected, graph-based Large Language Model (LLM) nodes enable final model response events automatically for that run. To override it, callagent.WithGraphEmitFinalModelResponses(false)afteragent.WithStreamMode(...). - StreamMode only affects what Runner forwards to your
eventCh. Runner still processes and persists events internally. - For graph workflows, some event types (for example,
graph.checkpoint.*) are emitted only when their corresponding mode is selected. - Runner always emits a final
runner.completionevent.
Event Metadata (StateDelta)
Each event also carries StateDelta, which includes execution metadata for models/tools:
Emit selected values from node callbacks
By default, mid‑run events like graph.state.update report which keys were updated (metadata‑only). Concrete values are not included to keep the stream lightweight and avoid exposing intermediate, potentially conflicting updates. The final graph.execution event's StateDelta carries the serialized final snapshot of allowed keys (see implementations in graph/executor.go:2001, graph/events.go:1276, graph/events.go:1330).
If you only need to surface a few values from the result of a specific node right after it completes, register an After‑node callback and emit a small custom event containing just those values:
Steps:
- Register
WithPostNodeCallbackon the target node. - In the callback, read
result any; when the node returnsgraph.State, this is the node's state delta. - Pick the needed keys, serialize to JSON, attach to a new event's
StateDelta. - Send via
agent.EmitEvent.
Example:
Recommendations:
- Emit only necessary keys to control bandwidth and avoid leaking sensitive data.
- Internal/volatile keys are filtered from final snapshots and should not be emitted (see graph/internal_keys.go:16).
- For textual intermediate outputs, prefer existing model streaming events (
choice.Delta.Content).
You can also configure agent‑level callbacks:
Troubleshooting
Q1: Graph has no entry point
- Error: "graph must have an entry point". Call
SetEntryPoint()and ensure the node exists.
Q2: Edge target/source does not exist
- Error mentions missing node. Define nodes before wiring edges/condition maps.
Q3: Tools don't run after LLM
- Ensure the LLM actually returned
tool_callsand you usedAddToolsConditionalEdges(ask, tools, fallback). - Check that tool names in your map match the model's declared tool names.
- Pairing walks from the latest assistant(tool_calls) until a new user; verify messages ordering.
Q4: LLM returns tool_calls but tools never execute (always routes to fallback)
- Root cause: Using
NewStateSchema()instead ofMessagesStateSchema(). AddToolsConditionalEdgesinternally checksstate[StateKeyMessages].([]model.Message)to determine if there are tool calls.- If the Schema lacks the
StateKeyMessagesfield,MessageReducerwon't be applied. The LLM node returns[]graph.MessageOpinstead of[]model.Message, causing type assertion to fail and always routing tofallbackNode. - Solution: Replace
graph.NewStateSchema()withgraph.MessagesStateSchema(), then add custom fields on top:
Q5: No streaming events observed
- Increase
WithChannelBufferSizeand filter byAuthor/object types. - Verify you're consuming events from
Runner.Run(...)and not from directExecutorcalls.
Q6: Resume did not continue where expected
- Pass
agent.WithRuntimeState(map[string]any{ graph.CfgKeyLineageID: "...", graph.CfgKeyCheckpointID: "..." }). - Provide
ResumeMapfor HITL continuation when needed. A plain "resume" message is not added tograph.StateKeyUserInput.
Q7: State conflicts in parallel
- Define reducers for lists/maps (e.g.,
StringSliceReducer,MergeReducer), avoid overwriting the same key from multiple branches without merge semantics.
Q8: What does the tools parameter in AddLLMNode do? When are tools actually called?
- tools parameter is declarative: The tools passed to
AddLLMNodeare placed inmodel.Request.Tools, telling the LLM what tools are available. - LLM decides whether to call tools: Based on the tools declaration and user input, the LLM decides whether to return
tool_callsin its response. - tools are NOT automatically executed:
AddLLMNodeonly declares tools and sends requests to the LLM; it does not execute tools. - Tool execution requires AddToolsNode + AddToolsConditionalEdges:
- Execution timing: When
AddToolsConditionalEdgesdetectstool_callsin the LLM response, it routes toAddToolsNode, which executes the actual tool calls.
Q9: Messages are duplicated in req.Messages when chaining multiple LLM nodes
- Symptom: The same user input appears multiple times in
req.Messages. - Root cause: When using
MessagesStateSchema(),MessageReduceraccumulates messages. Each LLM node execution appends a new user message ifStateKeyUserInputis not empty. When there are loops (e.g., tool call loops) or multiple LLM nodes chained together, messages keep accumulating. - Solution: Use
RemoveAllMessagesto clear previous messages before setting a newStateKeyUserInput: - Note:
WithSubgraphIsolatedMessages(true)only works forAddSubgraphNode, not forAddLLMNode.
Q10: Downstream Agent node receives an empty input
- Symptom: In a chain like
A (agent) → B (agent), the downstream agent runs butInvocation.Message.Contentis empty (or it behaves as if it didn't get A's output). - Root cause: Agent nodes consume
user_inputas their input message and clear it after a successful run. Edges do not automatically pipe outputs. - Solution: Explicitly write the desired upstream value into
user_inputbefore the downstream agent runs:If you need a specific node's output, read it fromnode_responses[targetNodeID]and write that intouser_input.
Q11: Downstream Agent needs both original input and upstream output
- Symptom:
WithSubgraphInputFromLastResponse()makes the downstream agent consumelast_responseas its current input, but you also need the original user request for this run. - Solution: Persist the original
user_inputinto your own state key (for example,original_user_input) and use a function node to compose the nextuser_inputasoriginal + upstreambefore the downstream agent runs. See "Agent nodes: combining original input with upstream output".
Real‑World Example
Approval Workflow
Summary
This guide introduced the core usage of the graph package and GraphAgent: declaring nodes and routes, safely merging state via Schema + Reducers, and leveraging events, checkpoints, and interrupts for observability and recovery. For structured flows (approvals, content moderation, stepwise processing), Graph provides stable, auditable execution. For intelligent decisions, extend with LLM nodes and sub‑agents.
References & Examples
- Repository: https://github.com/trpc-group/trpc-agent-go
- Graph examples:
examples/graph(basic/parallel/multi‑turn/interrupts and recovery)- I/O conventions:
io_conventions,io_conventions_tools - Parallel/fan‑out:
parallel,fanout,diamond - Placeholders:
placeholder - Checkpoints/interrupts:
checkpoint,interrupt,nested_interrupt,static_interrupt
- I/O conventions:
- Further reading:
graph/state_graph.go,graph/executor.go,agent/graphagent