The Graph package is a core component in trpc-agent-go for building and executing workflows. It provides a type-safe, extensible graph execution engine that supports complex AI workflow orchestration.
Overview
The Graph package allows you to model complex AI workflows as directed graphs, where nodes represent processing steps and edges represent data flow and control flow. It is particularly suitable for building AI applications that require conditional routing, state management, and multi-step processing.
Usage Pattern
The usage of the Graph package follows this pattern:
Create Graph: Use StateGraph builder to define workflow structure
Create GraphAgent: Wrap the compiled Graph as an Agent
Create Runner: Use Runner to manage sessions and execution environment
Execute Workflow: Execute workflow through Runner and handle results
This pattern provides:
Type Safety: Ensures data consistency through state schema
Session Management: Supports concurrent execution for multiple users and sessions
Event Stream: Real-time monitoring of workflow execution progress
Error Handling: Unified error handling and recovery mechanisms
Agent Integration
GraphAgent implements the agent.Agent interface and can:
Act as Independent Agent: Execute directly through Runner
Act as SubAgent: Be used as a sub-agent by other Agents (such as LLMAgent)
No SubAgent Support: GraphAgent itself does not support sub-agents, focusing on workflow execution
This design allows GraphAgent to flexibly integrate into complex multi-Agent systems.
Key Features
Type-safe state management: Use Schema to define state structure, support custom Reducers
Conditional routing: Dynamically select execution paths based on state
LLM node integration: Built-in support for large language models
Tool nodes: Support function calls and external tool integration
Streaming execution: Support real-time event streams and progress tracking
Concurrency safety: Thread-safe graph execution
Core Concepts
1. Graph
A graph is the core structure of a workflow, consisting of nodes and edges:
import("trpc.group/trpc-go/trpc-agent-go/graph")// State is a key-value pair mapping.typeStatemap[string]any// User-defined state keys.const(StateKeyInput="input"// Input data.StateKeyResult="result"// Processing result.StateKeyProcessedData="processed_data"// Processed data.StateKeyStatus="status"// Processing status.)
Built-in State Keys:
The Graph package provides some built-in state keys, mainly for internal system communication:
User-accessible Built-in Keys:
StateKeyUserInput: User input (one-shot, cleared after consumption, persisted by LLM nodes)
StateKeyOneShotMessages: One-shot messages (complete override for current round, cleared after consumption)
StateKeyLastResponse: Last response (used to set final output, Executor reads this value as result)
StateKeyMessages: Message history (durable, supports append + MessageOp patch operations)
StateKeyNodeResponses: Per-node responses map. Key is node ID, value is the
node's final textual response. Use StateKeyLastResponse for the final
serial output; when multiple parallel nodes converge, read each node's
output from StateKeyNodeResponses.
StateKeyMetadata: Metadata (general metadata storage available to users)
System Internal Keys (users should not use directly):
StateKeySession: Session information (automatically set by GraphAgent)
StateKeyExecContext: Execution context (automatically set by Executor)
StateKeyToolCallbacks: Tool callbacks (automatically set by Executor)
StateKeyModelCallbacks: Model callbacks (automatically set by Executor)
Users should use custom state keys to store business data, and only use user-accessible built-in state keys when necessary.
4. State Schema
State schema defines the structure and behavior of state:
import("reflect""trpc.group/trpc-go/trpc-agent-go/graph")// Create state schema.schema:=graph.NewStateSchema()// Add field definitions.schema.AddField("counter",graph.StateField{Type:reflect.TypeOf(0),Reducer:graph.DefaultReducer,Default:func()any{return0},})
Usage Guide
1. Creating GraphAgent and Runner
Users mainly use the Graph package by creating GraphAgent and then using it through Runner. This is the recommended usage pattern:
packagemainimport("context""fmt""time""trpc.group/trpc-go/trpc-agent-go/agent/graphagent""trpc.group/trpc-go/trpc-agent-go/event""trpc.group/trpc-go/trpc-agent-go/graph""trpc.group/trpc-go/trpc-agent-go/model""trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/session/inmemory")funcmain(){// 1. Create state schema.schema:=graph.MessagesStateSchema()// 2. Create state graph builder.stateGraph:=graph.NewStateGraph(schema)// 3. Add nodes.stateGraph.AddNode("start",startNodeFunc).AddNode("process",processNodeFunc)// 4. Set edges.stateGraph.AddEdge("start","process")// 5. Set entry point and finish point.// SetEntryPoint automatically creates edge from virtual Start node to "start" node.// SetFinishPoint automatically creates edge from "process" node to virtual End node.stateGraph.SetEntryPoint("start").SetFinishPoint("process")// 6. Compile graph.compiledGraph,err:=stateGraph.Compile()iferr!=nil{panic(err)}// 7. Create GraphAgent.graphAgent,err:=graphagent.New("simple-workflow",compiledGraph,graphagent.WithDescription("Simple workflow example"),graphagent.WithInitialState(graph.State{}),)iferr!=nil{panic(err)}// 8. Create session service.sessionService:=inmemory.NewSessionService()// 9. Create Runner.appRunner:=runner.NewRunner("simple-app",graphAgent,runner.WithSessionService(sessionService),)// 10. Execute workflow.ctx:=context.Background()userID:="user"sessionID:=fmt.Sprintf("session-%d",time.Now().Unix())// Create user message (Runner automatically puts message content into StateKeyUserInput).message:=model.NewUserMessage("Hello World")// Execute through Runner.eventChan,err:=appRunner.Run(ctx,userID,sessionID,message)iferr!=nil{panic(err)}// Handle event stream.forevent:=rangeeventChan{ifevent.Error!=nil{fmt.Printf("Error: %s\n",event.Error.Message)continue}iflen(event.Choices)>0{choice:=event.Choices[0]ifchoice.Delta.Content!=""{fmt.Print(choice.Delta.Content)}}ifevent.Done{break}}}// Node function implementations.funcstartNodeFunc(ctxcontext.Context,stategraph.State)(any,error){// Get user input from built-in StateKeyUserInput (automatically set by Runner).input:=state[graph.StateKeyUserInput].(string)returngraph.State{StateKeyProcessedData:fmt.Sprintf("Processed: %s",input),},nil}funcprocessNodeFunc(ctxcontext.Context,stategraph.State)(any,error){processed:=state[StateKeyProcessedData].(string)result:=fmt.Sprintf("Result: %s",processed)returngraph.State{StateKeyResult:result,// Use built-in StateKeyLastResponse to set final output.graph.StateKeyLastResponse:fmt.Sprintf("Final result: %s",result),},nil}
2. Using LLM Nodes
LLM nodes implement a fixed three-stage input rule without extra configuration:
OneShot first: If one_shot_messages exists, use it as the input for this round.
UserInput next: Otherwise, if user_input exists, persist once to history.
History default: Otherwise, use durable messages as input.
// Create LLM model.model:=openai.New("gpt-4")// Add LLM node.stateGraph.AddLLMNode("analyze",model,`You are a document analysis expert. Analyze the provided document and:1. Classify document type and complexity2. Extract key themes3. Evaluate content qualityPlease provide structured analysis results.`,nil)// Tool mapping.
Important notes:
System prompt is only used for this round and is not persisted to state.
One-shot keys (user_input / one_shot_messages) are automatically cleared after successful execution.
All state updates are atomic.
GraphAgent/Runner only sets user_input and no longer pre-populates messages with a user message. This allows any pre-LLM node to modify user_input and have it take effect in the same round.
Three input paradigms
OneShot (StateKeyOneShotMessages):
When present, only the provided []model.Message is used for this round, typically including a full system prompt and user prompt. Automatically cleared afterwards.
Use case: a dedicated pre-node constructs the full prompt and must fully override input.
UserInput (StateKeyUserInput):
When non-empty, the LLM node uses durable messages plus this round's user input to call the model. After the call, it writes the user input and assistant reply to messages using MessageOp (e.g., AppendMessages, ReplaceLastUser) atomically, and clears user_input to avoid repeated appends.
Use case: conversational flows where pre-nodes may adjust user input.
Messages only (just StateKeyMessages):
Common in tool-call loops. After the first round via user_input, routing to tools and back to LLM, since user_input is cleared, the LLM uses only messages (history). The tail is often a tool response, enabling the model to continue reasoning based on tool outputs.
Atomic updates with Reducer and MessageOp
The Graph package supports MessageOp patch operations (e.g., ReplaceLastUser,
AppendMessages) on message state via MessageReducer to achieve atomic merges. Benefits:
Pre-LLM nodes can modify user_input. The LLM node returns a single state delta with the needed patch operations (replace last user message, append assistant message) for one-shot, race-free persistence.
Backwards compatible with appending []Message, while providing more expressive updates for complex cases.
Example: modify user_input in a pre-node before entering the LLM node.
stateGraph.AddNode("prepare_input",func(ctxcontext.Context,sgraph.State)(any,error){cleaned:=strings.TrimSpace(s[graph.StateKeyUserInput].(string))returngraph.State{graph.StateKeyUserInput:cleaned},nil}).AddLLMNode("ask",modelInstance,"You are a helpful assistant. Answer concisely.",nil).SetEntryPoint("prepare_input").SetFinishPoint("ask")
3. GraphAgent Configuration Options
GraphAgent supports various configuration options:
// Multiple options can be used when creating GraphAgent.graphAgent,err:=graphagent.New("workflow-name",compiledGraph,graphagent.WithDescription("Workflow description"),graphagent.WithInitialState(graph.State{"initial_data":"Initial data",}),graphagent.WithChannelBufferSize(1024),graphagent.WithTools([]tool.Tool{calculatorTool,searchTool,}),graphagent.WithModelCallbacks(&model.Callbacks{// Model callback configuration.}),graphagent.WithToolCallbacks(&tool.Callbacks{// Tool callback configuration.}),)
// Create tools.tools:=map[string]tool.Tool{"calculator":calculatorTool,"search":searchTool,}// Add tool node.stateGraph.AddToolsNode("tools",tools)// Add conditional routing from LLM to tools.stateGraph.AddToolsConditionalEdges("llm_node","tools","fallback_node")
Tool-call pairing and second entry into LLM:
Scan messages backward from the tail to find the most recent assistant(tool_calls); stop at user to ensure correct pairing.
When returning from tools to the LLM node, since user_input is cleared, the LLM follows the “Messages only” branch and continues based on the tool response in history.
6. Runner Configuration
Runner provides session management and execution environment:
// Create session service.sessionService:=inmemory.NewSessionService()// Or use Redis session service.// sessionService, err := redis.NewService(redis.WithRedisClientURL("redis://localhost:6379"))// Create Runner.appRunner:=runner.NewRunner("app-name",graphAgent,runner.WithSessionService(sessionService),// Can add more configuration options.)// Use Runner to execute workflow.// Runner only sets StateKeyUserInput; it no longer pre-populates StateKeyMessages.message:=model.NewUserMessage("User input")eventChan,err:=appRunner.Run(ctx,userID,sessionID,message)
7. Message State Schema
For conversational applications, you can use predefined message state schema:
// Use message state schema.schema:=graph.MessagesStateSchema()// This schema includes:// - messages: Conversation history (StateKeyMessages).// - user_input: User input (StateKeyUserInput).// - last_response: Last response (StateKeyLastResponse).// - node_responses: Map of nodeID -> response (StateKeyNodeResponses).// - metadata: Metadata (StateKeyMetadata).
8. State Key Usage Scenarios
User-defined State Keys: Used to store business logic data.
import("trpc.group/trpc-go/trpc-agent-go/graph")// Recommended: Use custom state keys.const(StateKeyDocumentLength="document_length"StateKeyComplexityLevel="complexity_level"StateKeyProcessingStage="processing_stage")// Use in nodes.returngraph.State{StateKeyDocumentLength:len(input),StateKeyComplexityLevel:"simple",StateKeyProcessingStage:"completed",},nil
import("time""trpc.group/trpc-go/trpc-agent-go/graph")// Get user input (automatically set by system).userInput:=state[graph.StateKeyUserInput].(string)// Set final output (system will read this value).returngraph.State{graph.StateKeyLastResponse:"Processing complete",},nil// Read per-node responses when multiple nodes (e.g., parallel LLM nodes)// produce outputs. Values are stored as a map[nodeID]any and merged across// steps. Use LastResponse for the final serial output; use NodeResponses for// converged parallel outputs.responses,_:=state[graph.StateKeyNodeResponses].(map[string]any)news:=responses["news"].(string)dialog:=responses["dialog"].(string)// Use them separately or combine into the final output.returngraph.State{"news_output":news,"dialog_output":dialog,graph.StateKeyLastResponse:news+"\n"+dialog,},nil// Store metadata.returngraph.State{graph.StateKeyMetadata:map[string]any{"timestamp":time.Now(),"version":"1.0",},},nil
import("context""trpc.group/trpc-go/trpc-agent-go/graph")funcroutingNodeFunc(ctxcontext.Context,stategraph.State)(any,error){// Decide next step based on conditions.ifshouldGoToA(state){return&graph.Command{Update:graph.State{"status":"going_to_a"},GoTo:"node_a",},nil}return&graph.Command{Update:graph.State{"status":"going_to_b"},GoTo:"node_b",},nil}
Fan-out and dynamic routing:
Return []*graph.Command from a node to create parallel branches that run in the next step.
Using Command{ GoTo: "target" } dynamically routes to target at runtime; no static edge is required for reachability checks. Ensure the target node exists, and use SetFinishPoint(target) if it is terminal.
stateGraph.AddNode("fanout",func(ctxcontext.Context,sgraph.State)(any,error){tasks:=[]*graph.Command{{Update:graph.State{"param":"A"},GoTo:"worker"},{Update:graph.State{"param":"B"},GoTo:"worker"},{Update:graph.State{"param":"C"},GoTo:"worker"},}returntasks,nil})stateGraph.AddNode("worker",func(ctxcontext.Context,sgraph.State)(any,error){p,_:=s["param"].(string)ifp==""{returngraph.State{},nil}returngraph.State{"results":[]string{p}},nil})// Entry/finishstateGraph.SetEntryPoint("fanout")stateGraph.SetFinishPoint("worker")// No need to add a static edge fanout->worker; routing is driven by GoTo.
import("trpc.group/trpc-go/trpc-agent-go/graph")// Create executor with configuration.executor,err:=graph.NewExecutor(compiledGraph,graph.WithChannelBufferSize(1024),graph.WithMaxSteps(50),)
4. Virtual Nodes and Routing
The Graph package uses virtual nodes to simplify workflow entry and exit:
import("trpc.group/trpc-go/trpc-agent-go/graph")// Special node identifiers.const(Start="__start__"// Virtual start node.End="__end__"// Virtual end node.)// Set entry point (automatically creates edge from Start -> nodeID).stateGraph.SetEntryPoint("first_node")// Set finish point (automatically creates edge from nodeID -> End).stateGraph.SetFinishPoint("last_node")// No need to explicitly add these edges:// stateGraph.AddEdge(Start, "first_node") // Not needed.// stateGraph.AddEdge("last_node", End) // Not needed.
This design makes workflow definitions more concise, developers only need to focus on actual business nodes and their connections.
Best Practices
1. State Management
Use constants to define state keys, avoid hardcoded strings
Create Helper functions for complex states
Use Schema to validate state structure
Distinguish between built-in state keys and user-defined state keys
import("errors""trpc.group/trpc-go/trpc-agent-go/graph")// Define user-defined state key constants.const(StateKeyInput="input"// User business data.StateKeyResult="result"// Processing result.StateKeyProcessedData="processed_data"// Processed data.StateKeyStatus="status"// Processing status.)// User-accessible built-in state keys (use with caution).// StateKeyUserInput - User input (automatically set by GraphAgent).// StateKeyLastResponse - Last response (read by Executor as final result).// StateKeyMessages - Message history (automatically updated by LLM nodes).// StateKeyMetadata - Metadata (general storage available to users).// System internal state keys (users should not use directly).// StateKeySession - Session information (automatically set by GraphAgent).// StateKeyExecContext - Execution context (automatically set by Executor).// StateKeyToolCallbacks - Tool callbacks (automatically set by Executor).// StateKeyModelCallbacks - Model callbacks (automatically set by Executor).// Create state Helper.typeStateHelperstruct{stategraph.State}func(h*StateHelper)GetInput()(string,error){ifinput,ok:=h.state[StateKeyInput].(string);ok{returninput,nil}return"",errors.New("input not found")}func(h*StateHelper)GetUserInput()(string,error){ifinput,ok:=h.state[graph.StateKeyUserInput].(string);ok{returninput,nil}return"",errors.New("user_input not found")}
import("context""fmt""trpc.group/trpc-go/trpc-agent-go/graph")funcsafeNodeFunc(ctxcontext.Context,stategraph.State)(any,error){input,ok:=state["input"].(string)if!ok{returnnil,fmt.Errorf("input field not found or wrong type")}ifinput==""{returnnil,fmt.Errorf("input cannot be empty")}// Processing logic...returnresult,nil}
packagemainimport("context""fmt""strings""time""trpc.group/trpc-go/trpc-agent-go/agent/graphagent""trpc.group/trpc-go/trpc-agent-go/event""trpc.group/trpc-go/trpc-agent-go/graph""trpc.group/trpc-go/trpc-agent-go/model""trpc.group/trpc-go/trpc-agent-go/model/openai""trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/session/inmemory""trpc.group/trpc-go/trpc-agent-go/tool""trpc.group/trpc-go/trpc-agent-go/tool/function")// User-defined state keys.const(StateKeyDocumentLength="document_length"StateKeyWordCount="word_count"StateKeyComplexityLevel="complexity_level"StateKeyProcessingStage="processing_stage")typedocumentWorkflowstruct{modelNamestringrunnerrunner.RunneruserIDstringsessionIDstring}func(w*documentWorkflow)setup()error{// 1. Create document processing graph.workflowGraph,err:=w.createDocumentProcessingGraph()iferr!=nil{returnfmt.Errorf("failed to create graph: %w",err)}// 2. Create GraphAgent.graphAgent,err:=graphagent.New("document-processor",workflowGraph,graphagent.WithDescription("Comprehensive document processing workflow"),graphagent.WithInitialState(graph.State{}),)iferr!=nil{returnfmt.Errorf("failed to create graph agent: %w",err)}// 3. Create session service.sessionService:=inmemory.NewSessionService()// 4. Create Runner.w.runner=runner.NewRunner("document-workflow",graphAgent,runner.WithSessionService(sessionService),)// 5. Set identifiers.w.userID="user"w.sessionID=fmt.Sprintf("workflow-session-%d",time.Now().Unix())returnnil}func(w*documentWorkflow)createDocumentProcessingGraph()(*graph.Graph,error){// Create state schema.schema:=graph.MessagesStateSchema()// Create model instance.modelInstance:=openai.New(w.modelName)// Create analysis tool.complexityTool:=function.NewFunctionTool(w.analyzeComplexity,function.WithName("analyze_complexity"),function.WithDescription("Analyze document complexity level"),)// Create state graph.stateGraph:=graph.NewStateGraph(schema)tools:=map[string]tool.Tool{"analyze_complexity":complexityTool,}// Build workflow graph.stateGraph.AddNode("preprocess",w.preprocessDocument).AddLLMNode("analyze",modelInstance,`You are a document analysis expert. Analyze the provided document and:1. Classify document type and complexity (simple, moderate, complex)2. Extract key themes3. Evaluate content qualityUse the analyze_complexity tool for detailed analysis.Only return complexity level: "simple" or "complex".`,tools).AddToolsNode("tools",tools).AddNode("route_complexity",w.routeComplexity).AddLLMNode("summarize",modelInstance,`You are a document summarization expert. Create a comprehensive and concise summary of the document.Focus on:1. Key points and main arguments2. Important details and insights3. Logical structure and flow4. Conclusions and implicationsProvide a well-structured summary that preserves important information.Remember: Only output the final result itself, no other text.`,map[string]tool.Tool{}).AddLLMNode("enhance",modelInstance,`You are a content enhancement expert. Improve the provided content by:1. Improving clarity and readability2. Improving structure and organization3. Adding relevant details where appropriate4. Ensuring consistency and coherenceFocus on making content more engaging and professional while maintaining the original meaning.Remember: Only output the final result itself, no other text.`,map[string]tool.Tool{}).AddNode("format_output",w.formatOutput).SetEntryPoint("preprocess").SetFinishPoint("format_output")// Add workflow edges.stateGraph.AddEdge("preprocess","analyze")stateGraph.AddToolsConditionalEdges("analyze","tools","route_complexity")stateGraph.AddEdge("tools","analyze")// Add complexity conditional routing.stateGraph.AddConditionalEdges("route_complexity",w.complexityCondition,map[string]string{"simple":"enhance","complex":"summarize",})stateGraph.AddEdge("enhance","format_output")stateGraph.AddEdge("summarize","format_output")// SetEntryPoint and SetFinishPoint automatically handle connections with virtual Start/End nodes.returnstateGraph.Compile()}// Node function implementations.func(w*documentWorkflow)preprocessDocument(ctxcontext.Context,stategraph.State)(any,error){varinputstringifuserInput,ok:=state[graph.StateKeyUserInput].(string);ok{input=userInput}ifinput==""{returnnil,fmt.Errorf("no input document found")}input=strings.TrimSpace(input)iflen(input)<10{returnnil,fmt.Errorf("document too short for processing (minimum 10 characters)")}returngraph.State{StateKeyDocumentLength:len(input),StateKeyWordCount:len(strings.Fields(input)),graph.StateKeyUserInput:input,StateKeyProcessingStage:"preprocessing",},nil}func(w*documentWorkflow)routeComplexity(ctxcontext.Context,stategraph.State)(any,error){returngraph.State{StateKeyProcessingStage:"complexity_routing",},nil}func(w*documentWorkflow)complexityCondition(ctxcontext.Context,stategraph.State)(string,error){ifmsgs,ok:=state[graph.StateKeyMessages].([]model.Message);ok{iflen(msgs)>0{lastMsg:=msgs[len(msgs)-1]ifstrings.Contains(strings.ToLower(lastMsg.Content),"simple"){return"simple",nil}}}return"complex",nil}func(w*documentWorkflow)formatOutput(ctxcontext.Context,stategraph.State)(any,error){varresultstringiflastResponse,ok:=state[graph.StateKeyLastResponse].(string);ok{result=lastResponse}finalOutput:=fmt.Sprintf(`DOCUMENT PROCESSING RESULTS========================Processing Stage: %sDocument Length: %d charactersWord Count: %d wordsComplexity Level: %sProcessed Content:%s`,state[StateKeyProcessingStage],state[StateKeyDocumentLength],state[StateKeyWordCount],state[StateKeyComplexityLevel],result,)returngraph.State{graph.StateKeyLastResponse:finalOutput,},nil}// Tool function.func(w*documentWorkflow)analyzeComplexity(ctxcontext.Context,argsmap[string]any)(any,error){text,ok:=args["text"].(string)if!ok{returnnil,fmt.Errorf("text argument is required")}wordCount:=len(strings.Fields(text))sentenceCount:=len(strings.Split(text,"."))varlevelstringvarscorefloat64ifwordCount<100{level="simple"score=0.3}elseifwordCount<500{level="moderate"score=0.6}else{level="complex"score=0.9}returnmap[string]any{"level":level,"score":score,"word_count":wordCount,"sentence_count":sentenceCount,},nil}// Execute workflow.func(w*documentWorkflow)processDocument(ctxcontext.Context,contentstring)error{message:=model.NewUserMessage(content)eventChan,err:=w.runner.Run(ctx,w.userID,w.sessionID,message)iferr!=nil{returnfmt.Errorf("failed to run workflow: %w",err)}returnw.processStreamingResponse(eventChan)}func(w*documentWorkflow)processStreamingResponse(eventChan<-chan*event.Event)error{varworkflowStartedboolvarfinalResultstringforevent:=rangeeventChan{ifevent.Error!=nil{fmt.Printf("❌ Error: %s\n",event.Error.Message)continue}iflen(event.Choices)>0{choice:=event.Choices[0]ifchoice.Delta.Content!=""{if!workflowStarted{fmt.Print("🤖 Workflow: ")workflowStarted=true}fmt.Print(choice.Delta.Content)}ifchoice.Message.Content!=""&&event.Done{finalResult=choice.Message.Content}}ifevent.Done{iffinalResult!=""&&strings.Contains(finalResult,"DOCUMENT PROCESSING RESULTS"){fmt.Printf("\n\n%s\n",finalResult)}break}}returnnil}
import("trpc.group/trpc-go/trpc-agent-go/agent/graphagent""trpc.group/trpc-go/trpc-agent-go/graph""trpc.group/trpc-go/trpc-agent-go/model/openai""trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/session/inmemory""trpc.group/trpc-go/trpc-agent-go/tool")// Create chat bot.funccreateChatBot(modelNamestring)(*runner.Runner,error){// Create state graph.stateGraph:=graph.NewStateGraph(graph.MessagesStateSchema())// Create model and tools.modelInstance:=openai.New(modelName)tools:=map[string]tool.Tool{"calculator":calculatorTool,"search":searchTool,}// Build conversation graph.stateGraph.AddLLMNode("chat",modelInstance,`You are a helpful AI assistant. Provide help based on user questions and use tools when needed.`,tools).AddToolsNode("tools",tools).AddToolsConditionalEdges("chat","tools","chat").SetEntryPoint("chat").SetFinishPoint("chat")// Compile graph.compiledGraph,err:=stateGraph.Compile()iferr!=nil{returnnil,err}// Create GraphAgent.graphAgent,err:=graphagent.New("chat-bot",compiledGraph,graphagent.WithDescription("Intelligent chat bot"),graphagent.WithInitialState(graph.State{}),)iferr!=nil{returnnil,err}// Create Runner.sessionService:=inmemory.NewSessionService()appRunner:=runner.NewRunner("chat-bot-app",graphAgent,runner.WithSessionService(sessionService),)returnappRunner,nil}
import("context""log""trpc.group/trpc-go/trpc-agent-go/agent""trpc.group/trpc-go/trpc-agent-go/agent/graphagent""trpc.group/trpc-go/trpc-agent-go/agent/llmagent""trpc.group/trpc-go/trpc-agent-go/graph""trpc.group/trpc-go/trpc-agent-go/model""trpc.group/trpc-go/trpc-agent-go/runner""trpc.group/trpc-go/trpc-agent-go/tool")// Create document processing GraphAgent.funccreateDocumentProcessor()(agent.Agent,error){// Create document processing graph.stateGraph:=graph.NewStateGraph(graph.MessagesStateSchema())// Add document processing nodes.stateGraph.AddNode("preprocess",preprocessDocument).AddLLMNode("analyze",modelInstance,analysisPrompt,tools).AddNode("format",formatOutput).SetEntryPoint("preprocess").SetFinishPoint("format")// Compile graph.compiledGraph,err:=stateGraph.Compile()iferr!=nil{returnnil,err}// Create GraphAgent.returngraphagent.New("document-processor",compiledGraph,graphagent.WithDescription("Professional document processing workflow"),)}// Create coordinator Agent, using GraphAgent as sub-agent.funccreateCoordinatorAgent()(agent.Agent,error){// Create document processing GraphAgent.documentProcessor,err:=createDocumentProcessor()iferr!=nil{returnnil,err}// Create other sub-agents.mathAgent:=llmagent.New("math-agent",llmagent.WithModel(modelInstance),llmagent.WithDescription("Mathematical calculation expert"),llmagent.WithTools([]tool.Tool{calculatorTool}),)// Create coordinator Agent.coordinator:=llmagent.New("coordinator",llmagent.WithModel(modelInstance),llmagent.WithDescription("Task coordinator, can delegate to professional sub-agents"),llmagent.WithInstruction(`You are a coordinator that can delegate tasks to professional sub-agents:- document-processor: Document processing and analysis- math-agent: Mathematical calculations and formula processingChoose appropriate sub-agents based on user needs to handle tasks.`),llmagent.WithSubAgents([]agent.Agent{documentProcessor,// GraphAgent as sub-agent.mathAgent,}),)returncoordinator,nil}// Usage example.funcmain(){// Create coordinator Agent.coordinator,err:=createCoordinatorAgent()iferr!=nil{log.Fatal(err)}// Create Runner.runner:=runner.NewRunner("coordinator-app",coordinator)// Execute task (coordinator will automatically choose appropriate sub-agent).message:=model.NewUserMessage("Please analyze this document and calculate the statistical data in it")eventChan,err:=runner.Run(ctx,userID,sessionID,message)// ...}
Key Features:
GraphAgent implements the agent.Agent interface and can be used as a sub-agent by other Agents
Coordinator Agents can delegate tasks to GraphAgent through the transfer_to_agent tool
GraphAgent focuses on workflow execution and does not support its own sub-agents
This design enables seamless integration of complex workflows with multi-Agent systems
Troubleshooting
Common Errors
"node not found": Check if node ID is correct
"invalid graph": Ensure graph has entry point and all nodes are reachable
"maximum execution steps exceeded": Check for loops or increase maximum steps
"state validation failed": Check state schema definition
Debugging Tips
Use event streams to monitor execution process
Add logs in node functions
Validate state schema definitions
Check condition function logic
Summary
The Graph package provides a powerful and flexible workflow orchestration system, particularly suitable for building complex AI applications. Through the combined use of GraphAgent and Runner, you can create efficient and maintainable workflow applications.
Key Points
Workflow Creation:
Use StateGraph builder to create graph structure
Define clear state schemas and data flow
Reasonably use conditional routing and tool nodes
Application Integration:
Wrap workflow graphs through GraphAgent
Use Runner to manage sessions and execution environment
This pattern makes the Graph package particularly suitable for building enterprise-level AI workflow applications, providing good scalability, maintainability, and user experience.