Message Passing and Event Flow in AgenticGoKit β
Overview β
At the heart of AgenticGoKit is an event-driven architecture that enables flexible communication between agents. This tutorial explains how messages flow through the system, how the Runner orchestrates this flow, and how you can leverage these patterns in your own applications.
Understanding message passing is crucial because it's the foundation of how agents communicate, share data, and coordinate their work in AgenticGoKit.
Prerequisites β
- Basic understanding of Go programming
- Familiarity with interfaces and goroutines
- Completed the 5-minute quickstart
Core Concepts β
Events: The Communication Currency β
In AgenticGoKit, all communication happens through Events. An Event is more than just a message - it's a structured packet of information with routing metadata.
// The Event interface - the core communication unit
type Event interface {
GetID() string // Unique identifier
GetTimestamp() time.Time // When event was created
GetTargetAgentID() string // Destination agent
GetSourceAgentID() string // Source agent
GetData() EventData // Actual payload data
GetMetadata() map[string]string // Routing metadata
GetSessionID() string // Session identifier
// ... other methods for setting values
}
Events carry:
- Data: The actual payload (questions, responses, etc.)
- Metadata: Routing information, session IDs, etc.
- Source/Target: Which agent sent it and where it's going
- Timestamps: When the event was created
EventData: The Payload β
// EventData holds the payload of an event
type EventData map[string]any
// Example usage
data := core.EventData{
"message": "What's the weather today?",
"user_id": "user-123",
"priority": "high",
}
Runners: The Traffic Controllers β
The Runner routes events to agents and manages flow. Create it from configuration for reliability and flexibility.
// Build a runner from config (agentflow.toml)
runner, err := core.NewRunnerFromConfig("agentflow.toml")
if err != nil { log.Fatal(err) }
// Register agents referenced in the config
_ = runner.RegisterAgent("assistant", assistantHandler)
// Start β Emit β Stop
ctx := context.Background()
_ = runner.Start(ctx)
defer runner.Stop()
_ = runner.Emit(core.NewEvent("assistant", core.EventData{"message": "hi"}, map[string]string{"session_id": "s1"}))
The Runner:
- Receives events via
Emit()
- Uses the configured orchestrator (route, sequential, collaborative, loop, mixed)
- Delivers events to agent(s)
- Collects results and routes follow-up events when applicable
How Message Passing Works β
1. Creating Events β
Events are typically created using the NewEvent()
function:
// Create a new event
event := core.NewEvent(
"weather-agent", // Target agent ID
core.EventData{ // Payload data
"message": "What's the weather in Paris?",
"location": "Paris",
},
map[string]string{ // Metadata
"session_id": "user-123",
"priority": "normal",
},
)
2. Emitting Events β
Events are sent using the Runner's Emit()
method:
// Send the event into the system (non-blocking)
if err := runner.Emit(event); err != nil {
log.Fatalf("emit failed: %v", err)
}
This is an asynchronous operationβprocessing happens in the background.
3. Event Processing Flow β
Behind the scenes, the Runner follows this flow:
βββββββββββ ββββββββββββ βββββββββββ
β Client ββββββΆβ Runner ββββββΆβ Agent A β
βββββββββββ ββββββββββββ βββββββββββ
β β
β βΌ
β βββββββββββ
βββββββββββββ Result β
β βββββββββββ
βΌ
βββββββββββ
β Agent B β
βββββββββββ
- Queues the event for processing
- Routes it to the target agent(s) via the Orchestrator
- Collects the results
- Forwards results to the next agent or back to the caller
4. Handling Results β
Results are typically handled through callbacks:
// Register a callback for when an agent completes processing
runner.RegisterCallback(core.HookAfterAgentRun, "my-callback",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
fmt.Printf("Agent %s completed\n", args.AgentID)
return args.State, nil
},
)
Under the Hood: The Runner Implementation (Internals) β
The Runner uses channels and goroutines to manage event flow. The internal loop processes events and invokes callbacks. Public code should use Start/Emit/Stop; internal helpers like processEvent are not part of the public API.
// Simplified internal sketch (do not call directly)
func (r *RunnerImpl) loop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-r.stopChan:
return
case event := <-r.queue:
// route + dispatch via orchestrator; invoke callbacks
}
}
}
Practical Example: Building a Conversational Agent β
Let's see how this works in practice with a simple conversational agent:
package main
import (
"context"
"fmt"
"log"
"github.com/kunalkushwaha/agenticgokit/core"
)
func main() {
// 1) Build runner from config (route orchestrator)
runner, err := core.NewRunnerFromConfig("agentflow.toml")
if err != nil { log.Fatal(err) }
// 2) Register a simple agent handler
_ = runner.RegisterAgent("assistant", core.AgentHandlerFunc(func(ctx context.Context, ev core.Event, st core.State) (core.AgentResult, error) {
out := st.Clone()
out.Set("response", "AgenticGoKit helps you build config-driven multi-agent systems in Go.")
return core.AgentResult{OutputState: out}, nil
}))
// 3) Start β Emit β Stop
ctx := context.Background()
if err := runner.Start(ctx); err != nil { log.Fatal(err) }
defer runner.Stop()
event := core.NewEvent("assistant", core.EventData{"message": "Tell me about AgenticGoKit"}, map[string]string{"session_id": "user-123"})
if err := runner.Emit(event); err != nil { log.Fatal(err) }
fmt.Println("Event emitted; check logs/callbacks for responses")
}
Tip: In examples, use a local LLM like Ollama with model gemma3:1b in your agentflow.toml
under [llm]
to avoid external API keys.
Advanced Message Passing Patterns β
1. Session Management β
Events can carry session IDs to maintain conversation context:
event := core.NewEvent(
"assistant",
core.EventData{"message": "What was my last question?"},
map[string]string{"session_id": "user-123"},
)
2. Event Chaining β
You can create chains of events where each agent's output becomes the next agent's input:
runner.RegisterCallback(core.HookAfterAgentRun, "chain-handler",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
if args.AgentID == "researcher" && args.Error == nil {
// Create a new event for the analyzer
nextEvent := core.NewEvent(
"analyzer",
core.EventData{"research_data": args.AgentResult.OutputState},
map[string]string{
"session_id": args.Event.GetSessionID(),
"route": "analyzer",
},
)
runner.Emit(nextEvent)
}
return args.State, nil
},
)
3. Broadcast Events β
Send the same event to multiple agents simultaneously using collaborative orchestration:
// This requires using a collaborative orchestrator
// which we'll cover in the orchestration tutorial
event := core.NewEvent(
"", // Empty target for broadcast
core.EventData{"message": "New data available"},
map[string]string{"broadcast": "true"},
)
4. Priority Handling β
Use metadata to implement priority queues:
event := core.NewEvent(
"processor",
core.EventData{"task": "urgent_analysis"},
map[string]string{
"priority": "high",
"deadline": time.Now().Add(5*time.Minute).Format(time.RFC3339),
},
)
Event Lifecycle and Hooks β
AgenticGoKit provides several hooks where you can intercept and modify the event processing flow:
// Available hooks
const (
HookBeforeEventHandling // Before any processing
HookBeforeAgentRun // Before agent execution
HookAfterAgentRun // After successful agent execution
HookAgentError // When agent execution fails
HookAfterEventHandling // After all processing
)
Example: Adding Logging and Metrics β
// Add comprehensive logging
runner.RegisterCallback(core.HookBeforeEventHandling, "logger",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
fmt.Printf("[%s] Processing event %s\n",
time.Now().Format(time.RFC3339),
args.Event.GetID(),
)
return args.State, nil
},
)
runner.RegisterCallback(core.HookAfterAgentRun, "metrics",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
duration := time.Since(args.Event.GetTimestamp())
fmt.Printf("Agent %s completed in %v\n", args.AgentID, duration)
// Record metrics
// metrics.RecordAgentDuration(args.AgentID, duration)
return args.State, nil
},
)
Error Handling in Message Passing β
When agents fail, AgenticGoKit provides sophisticated error routing:
// Register error handler
runner.RegisterCallback(core.HookAgentError, "error-handler",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
fmt.Printf("Agent %s failed: %v\n", args.AgentID, args.Error)
// Optionally emit a recovery event
recoveryEvent := core.NewEvent(
"error-recovery-agent",
core.EventData{
"original_event": args.Event,
"error": args.Error.Error(),
"failed_agent": args.AgentID,
},
map[string]string{
"session_id": args.Event.GetSessionID(),
"route": "error-recovery-agent",
},
)
runner.Emit(recoveryEvent)
return args.State, nil
},
)
Common Pitfalls and Solutions β
1. Deadlocks β
Problem: Waiting for results in the same goroutine that processes events.
Solution: Use callbacks or separate goroutines for waiting on results.
// Bad - blocks the event loop
result := <-resultChannel
// Good - use callbacks
runner.RegisterCallback(core.HookAfterAgentRun, "handler",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
// Handle result here
return args.State, nil
},
)
2. Event Loops β
Problem: Creating circular event chains that never terminate.
Solution: Implement loop detection or maximum hop counts in your metadata.
// Add hop count to prevent infinite loops
metadata := map[string]string{
"session_id": "user-123",
"hop_count": "1",
"max_hops": "5",
}
3. Lost Events β
Problem: Events that never get processed because the target agent doesn't exist.
Solution: Implement error routing and default handlers for unknown targets.
// The orchestrator will automatically handle unknown targets
// and emit error events that you can catch with error callbacks
4. Memory Leaks β
Problem: Events accumulating in queues without being processed.
Solution: Monitor queue sizes and implement proper shutdown procedures.
// Always stop the runner when done
defer runner.Stop()
// Monitor queue health
runner.RegisterCallback(core.HookBeforeEventHandling, "monitor",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
// Check queue sizes, memory usage, etc.
return args.State, nil
},
)
Performance Considerations β
1. Queue Sizing β
Prefer configuring queue size in agentflow.toml
when supported by your runner plugin. If not available, use the default from NewRunnerFromConfig
.
2. Event Batching β
// For high-volume scenarios, consider batching events
batchEvent := core.NewEvent(
"batch-processor",
core.EventData{
"events": []core.Event{event1, event2, event3},
},
metadata,
)
3. Async Processing β
// Use goroutines for non-blocking operations
runner.RegisterCallback(core.HookAfterAgentRun, "async-handler",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
go func() {
// Long-running operation
processResult(args.AgentResult)
}()
return args.State, nil
},
)
Debugging Message Flow β
1. Event Tracing β
// Enable detailed logging
runner.RegisterCallback(core.HookBeforeEventHandling, "tracer",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
fmt.Printf("Event %s: %+v\n", args.Event.GetID(), args.Event.GetData())
return args.State, nil
},
)
2. State Inspection β
// Inspect state at each step
runner.RegisterCallback(core.HookAfterAgentRun, "state-inspector",
func(ctx context.Context, args core.CallbackArgs) (core.State, error) {
if args.AgentResult.OutputState != nil {
fmt.Printf("State after %s: %+v\n",
args.AgentID,
args.AgentResult.OutputState.GetAll(),
)
}
return args.State, nil
},
)
### Troubleshooting
- Error: "orchestrator not configured" or "factory not registered": add a plugins file with blank imports for orchestrators/runner, for example:
- _ "github.com/kunalkushwaha/agenticgokit/plugins/orchestrator/route"
- _ "github.com/kunalkushwaha/agenticgokit/plugins/runner/default"
- LLM provider not registered: import the provider plugin, e.g. _ "github.com/kunalkushwaha/agenticgokit/plugins/llm/ollama" and set `[llm] type = "ollama", model = "gemma3:1b"` in `agentflow.toml`.
Best Practices β
- Always set session IDs for tracking related events
- Use meaningful event IDs for debugging
- Include sufficient metadata for routing and processing
- Handle errors gracefully with error callbacks
- Monitor queue health and processing times
- Use callbacks instead of blocking waits
- Implement proper shutdown procedures
- Add comprehensive logging for production systems
Conclusion β
The event-driven architecture of AgenticGoKit provides a flexible foundation for building complex agent systems. By understanding how events flow through the Runner to agents and back, you can create sophisticated communication patterns between your agents.
Key takeaways:
- Events are the primary communication mechanism
- Runners manage event flow and routing
- Callbacks provide hooks for customization
- Proper error handling is crucial
- Async patterns prevent blocking
Next Steps β
- Orchestration Patterns - Learn how different orchestration modes build on message passing
- State Management - Understand how data flows between agents
- Error Handling - Master robust error management patterns
- Debugging Guide - Learn to trace and debug event flows