Load Balancing and Scaling Patterns
Building horizontally scalable agent systems with intelligent load distribution
As your agent systems grow, you need strategies to distribute load effectively and scale horizontally. This guide covers load balancing patterns, horizontal scaling strategies, and performance optimization techniques for AgenticGoKit applications.
🏗️ Scaling Architecture Overview
⚖️ Load Balancing Strategies
1. Round Robin Load Balancing
go
package balancer
import (
"context"
"sync"
"sync/atomic"
"github.com/kunalkushwaha/agenticgokit/core"
)
type RoundRobinBalancer struct {
agents []core.AgentHandler
counter int64
mu sync.RWMutex
}
func NewRoundRobinBalancer(agents []core.AgentHandler) *RoundRobinBalancer {
return &RoundRobinBalancer{
agents: agents,
}
}
func (rb *RoundRobinBalancer) Execute(ctx context.Context, event core.Event, state *core.State) (*core.AgentResult, error) {
if len(rb.agents) == 0 {
return nil, errors.New("no agents available")
}
// Get next agent in round-robin fashion
index := atomic.AddInt64(&rb.counter, 1) % int64(len(rb.agents))
agent := rb.agents[index]
return agent.Execute(ctx, event, state)
}
func (rb *RoundRobinBalancer) AddAgent(agent core.AgentHandler) {
rb.mu.Lock()
defer rb.mu.Unlock()
rb.agents = append(rb.agents, agent)
}
func (rb *RoundRobinBalancer) RemoveAgent(index int) {
rb.mu.Lock()
defer rb.mu.Unlock()
if index >= 0 && index < len(rb.agents) {
rb.agents = append(rb.agents[:index], rb.agents[index+1:]...)
}
}
2. Weighted Load Balancing
go
type WeightedBalancer struct {
agents []WeightedAgent
totalWeight int
mu sync.RWMutex
}
type WeightedAgent struct {
Agent core.AgentHandler
Weight int
CurrentWeight int
}
func NewWeightedBalancer() *WeightedBalancer {
return &WeightedBalancer{
agents: make([]WeightedAgent, 0),
}
}
func (wb *WeightedBalancer) AddAgent(agent core.AgentHandler, weight int) {
wb.mu.Lock()
defer wb.mu.Unlock()
wb.agents = append(wb.agents, WeightedAgent{
Agent: agent,
Weight: weight,
CurrentWeight: 0,
})
wb.totalWeight += weight
}
func (wb *WeightedBalancer) Execute(ctx context.Context, event core.Event, state *core.State) (*core.AgentResult, error) {
wb.mu.Lock()
defer wb.mu.Unlock()
if len(wb.agents) == 0 {
return nil, errors.New("no agents available")
}
// Weighted round-robin algorithm
selected := wb.selectAgent()
if selected == nil {
return nil, errors.New("no agent selected")
}
return selected.Execute(ctx, event, state)
}
func (wb *WeightedBalancer) selectAgent() core.AgentHandler {
var selected *WeightedAgent
for i := range wb.agents {
agent := &wb.agents[i]
agent.CurrentWeight += agent.Weight
if selected == nil || agent.CurrentWeight > selected.CurrentWeight {
selected = agent
}
}
if selected != nil {
selected.CurrentWeight -= wb.totalWeight
return selected.Agent
}
return nil
}
3. Least Connections Load Balancing
go
type LeastConnectionsBalancer struct {
agents []ConnectionTrackingAgent
mu sync.RWMutex
}
type ConnectionTrackingAgent struct {
Agent core.AgentHandler
Connections int64
mu sync.RWMutex
}
func NewLeastConnectionsBalancer(agents []core.AgentHandler) *LeastConnectionsBalancer {
trackingAgents := make([]ConnectionTrackingAgent, len(agents))
for i, agent := range agents {
trackingAgents[i] = ConnectionTrackingAgent{
Agent: agent,
}
}
return &LeastConnectionsBalancer{
agents: trackingAgents,
}
}
func (lcb *LeastConnectionsBalancer) Execute(ctx context.Context, event core.Event, state *core.State) (*core.AgentResult, error) {
agent := lcb.selectLeastConnectedAgent()
if agent == nil {
return nil, errors.New("no agents available")
}
// Increment connection count
atomic.AddInt64(&agent.Connections, 1)
defer atomic.AddInt64(&agent.Connections, -1)
return agent.Agent.Execute(ctx, event, state)
}
func (lcb *LeastConnectionsBalancer) selectLeastConnectedAgent() *ConnectionTrackingAgent {
lcb.mu.RLock()
defer lcb.mu.RUnlock()
if len(lcb.agents) == 0 {
return nil
}
var selected *ConnectionTrackingAgent
minConnections := int64(^uint64(0) >> 1) // Max int64
for i := range lcb.agents {
agent := &lcb.agents[i]
connections := atomic.LoadInt64(&agent.Connections)
if connections < minConnections {
minConnections = connections
selected = agent
}
}
return selected
}
4. Performance-Based Load Balancing
go
type PerformanceBasedBalancer struct {
agents []PerformanceTrackingAgent
mu sync.RWMutex
}
type PerformanceTrackingAgent struct {
Agent core.AgentHandler
ResponseTimes []time.Duration
SuccessRate float64
LastUpdate time.Time
mu sync.RWMutex
}
func NewPerformanceBasedBalancer(agents []core.AgentHandler) *PerformanceBasedBalancer {
trackingAgents := make([]PerformanceTrackingAgent, len(agents))
for i, agent := range agents {
trackingAgents[i] = PerformanceTrackingAgent{
Agent: agent,
ResponseTimes: make([]time.Duration, 0, 100),
SuccessRate: 1.0,
LastUpdate: time.Now(),
}
}
return &PerformanceBasedBalancer{
agents: trackingAgents,
}
}
func (pbb *PerformanceBasedBalancer) Execute(ctx context.Context, event core.Event, state *core.State) (*core.AgentResult, error) {
agent := pbb.selectBestPerformingAgent()
if agent == nil {
return nil, errors.New("no agents available")
}
start := time.Now()
result, err := agent.Agent.Execute(ctx, event, state)
duration := time.Since(start)
// Record performance metrics
pbb.recordPerformance(agent, duration, err == nil)
return result, err
}
func (pbb *PerformanceBasedBalancer) selectBestPerformingAgent() *PerformanceTrackingAgent {
pbb.mu.RLock()
defer pbb.mu.RUnlock()
if len(pbb.agents) == 0 {
return nil
}
var selected *PerformanceTrackingAgent
bestScore := -1.0
for i := range pbb.agents {
agent := &pbb.agents[i]
score := pbb.calculatePerformanceScore(agent)
if score > bestScore {
bestScore = score
selected = agent
}
}
return selected
}
func (pbb *PerformanceBasedBalancer) calculatePerformanceScore(agent *PerformanceTrackingAgent) float64 {
agent.mu.RLock()
defer agent.mu.RUnlock()
// Calculate average response time
avgResponseTime := time.Duration(0)
if len(agent.ResponseTimes) > 0 {
total := time.Duration(0)
for _, rt := range agent.ResponseTimes {
total += rt
}
avgResponseTime = total / time.Duration(len(agent.ResponseTimes))
}
// Normalize response time (lower is better)
responseTimeScore := 1.0 / (1.0 + avgResponseTime.Seconds())
// Combine success rate and response time
return (agent.SuccessRate * 0.7) + (responseTimeScore * 0.3)
}
func (pbb *PerformanceBasedBalancer) recordPerformance(agent *PerformanceTrackingAgent, duration time.Duration, success bool) {
agent.mu.Lock()
defer agent.mu.Unlock()
// Record response time
agent.ResponseTimes = append(agent.ResponseTimes, duration)
if len(agent.ResponseTimes) > 100 {
agent.ResponseTimes = agent.ResponseTimes[1:]
}
// Update success rate (exponential moving average)
alpha := 0.1
if success {
agent.SuccessRate = agent.SuccessRate*(1-alpha) + alpha
} else {
agent.SuccessRate = agent.SuccessRate * (1 - alpha)
}
agent.LastUpdate = time.Now()
}
🔄 Horizontal Scaling Patterns
1. Stateless Agent Design
go
// Stateless agent that can be scaled horizontally
type StatelessAgent struct {
name string
llmProvider core.ModelProvider
cache CacheProvider // Shared cache
memory MemoryProvider // Shared memory
}
func NewStatelessAgent(name string, provider core.ModelProvider, cache CacheProvider, memory MemoryProvider) *StatelessAgent {
return &StatelessAgent{
name: name,
llmProvider: provider,
cache: cache,
memory: memory,
}
}
func (sa *StatelessAgent) Execute(ctx context.Context, event core.Event, state *core.State) (*core.AgentResult, error) {
// All state is passed in or retrieved from shared resources
// No local state that would prevent horizontal scaling
// Get context from shared memory
context := sa.memory.GetContext(state.SessionID)
// Check shared cache
cacheKey := sa.generateCacheKey(event)
if cached := sa.cache.Get(cacheKey); cached != nil {
return cached.(*core.AgentResult), nil
}
// Process request
result, err := sa.processRequest(ctx, event, context)
if err != nil {
return nil, err
}
// Store in shared cache
sa.cache.Set(cacheKey, result, 5*time.Minute)
return result, nil
}
func (sa *StatelessAgent) generateCacheKey(event core.Event) string {
return fmt.Sprintf("%s:%s:%x", sa.name, event.Type,
sha256.Sum256([]byte(fmt.Sprintf("%v", event.Data))))
}
2. Auto-Scaling Controller
go
type AutoScaler struct {
minInstances int
maxInstances int
currentInstances int
targetCPU float64
targetMemory float64
scaleUpCooldown time.Duration
scaleDownCooldown time.Duration
lastScaleAction time.Time
metrics MetricsProvider
orchestrator Orchestrator
mu sync.RWMutex
}
func NewAutoScaler(min, max int, orchestrator Orchestrator) *AutoScaler {
return &AutoScaler{
minInstances: min,
maxInstances: max,
currentInstances: min,
targetCPU: 70.0, // 70% CPU utilization
targetMemory: 80.0, // 80% memory utilization
scaleUpCooldown: 2 * time.Minute,
scaleDownCooldown: 5 * time.Minute,
orchestrator: orchestrator,
}
}
func (as *AutoScaler) Start(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
as.evaluateScaling()
case <-ctx.Done():
return
}
}
}
func (as *AutoScaler) evaluateScaling() {
as.mu.Lock()
defer as.mu.Unlock()
// Get current metrics
cpuUsage := as.metrics.GetCPUUsage()
memoryUsage := as.metrics.GetMemoryUsage()
requestRate := as.metrics.GetRequestRate()
// Determine if scaling is needed
shouldScaleUp := (cpuUsage > as.targetCPU || memoryUsage > as.targetMemory) &&
as.currentInstances < as.maxInstances &&
time.Since(as.lastScaleAction) > as.scaleUpCooldown
shouldScaleDown := cpuUsage < as.targetCPU*0.5 && memoryUsage < as.targetMemory*0.5 &&
as.currentInstances > as.minInstances &&
time.Since(as.lastScaleAction) > as.scaleDownCooldown
if shouldScaleUp {
as.scaleUp()
} else if shouldScaleDown {
as.scaleDown()
}
}
func (as *AutoScaler) scaleUp() {
newInstances := min(as.currentInstances+1, as.maxInstances)
if newInstances > as.currentInstances {
err := as.orchestrator.AddInstance()
if err == nil {
as.currentInstances = newInstances
as.lastScaleAction = time.Now()
log.Printf("Scaled up to %d instances", as.currentInstances)
}
}
}
func (as *AutoScaler) scaleDown() {
newInstances := max(as.currentInstances-1, as.minInstances)
if newInstances < as.currentInstances {
err := as.orchestrator.RemoveInstance()
if err == nil {
as.currentInstances = newInstances
as.lastScaleAction = time.Now()
log.Printf("Scaled down to %d instances", as.currentInstances)
}
}
}
3. Health-Aware Load Balancing
go
type HealthAwareBalancer struct {
agents []HealthTrackingAgent
healthCheck time.Duration
mu sync.RWMutex
}
type HealthTrackingAgent struct {
Agent core.AgentHandler
Healthy bool
LastCheck time.Time
Endpoint string
}
func NewHealthAwareBalancer(agents []core.AgentHandler) *HealthAwareBalancer {
trackingAgents := make([]HealthTrackingAgent, len(agents))
for i, agent := range agents {
trackingAgents[i] = HealthTrackingAgent{
Agent: agent,
Healthy: true,
}
}
hab := &HealthAwareBalancer{
agents: trackingAgents,
healthCheck: 30 * time.Second,
}
// Start health checking
go hab.startHealthChecking()
return hab
}
func (hab *HealthAwareBalancer) Execute(ctx context.Context, event core.Event, state *core.State) (*core.AgentResult, error) {
healthyAgents := hab.getHealthyAgents()
if len(healthyAgents) == 0 {
return nil, errors.New("no healthy agents available")
}
// Use round-robin among healthy agents
index := rand.Intn(len(healthyAgents))
return healthyAgents[index].Execute(ctx, event, state)
}
func (hab *HealthAwareBalancer) getHealthyAgents() []core.AgentHandler {
hab.mu.RLock()
defer hab.mu.RUnlock()
var healthy []core.AgentHandler
for _, agent := range hab.agents {
if agent.Healthy {
healthy = append(healthy, agent.Agent)
}
}
return healthy
}
func (hab *HealthAwareBalancer) startHealthChecking() {
ticker := time.NewTicker(hab.healthCheck)
defer ticker.Stop()
for range ticker.C {
hab.checkHealth()
}
}
func (hab *HealthAwareBalancer) checkHealth() {
hab.mu.Lock()
defer hab.mu.Unlock()
for i := range hab.agents {
agent := &hab.agents[i]
// Perform health check
healthy := hab.performHealthCheck(agent)
if agent.Healthy != healthy {
agent.Healthy = healthy
if healthy {
log.Printf("Agent %d is now healthy", i)
} else {
log.Printf("Agent %d is now unhealthy", i)
}
}
agent.LastCheck = time.Now()
}
}
func (hab *HealthAwareBalancer) performHealthCheck(agent *HealthTrackingAgent) bool {
// Simple health check - try to execute a test request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
testEvent := core.NewEvent("health_check", "ping")
testState := &core.State{}
_, err := agent.Agent.Execute(ctx, testEvent, testState)
return err == nil
}
🐳 Container Orchestration
Docker Compose for Development
yaml
# docker-compose.yml
version: '3.8'
services:
agent-1:
build: .
environment:
- INSTANCE_ID=1
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgres://user:pass@postgres:5432/agentflow
depends_on:
- redis
- postgres
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
reservations:
cpus: '0.25'
memory: 256M
agent-2:
build: .
environment:
- INSTANCE_ID=2
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgres://user:pass@postgres:5432/agentflow
depends_on:
- redis
- postgres
deploy:
resources:
limits:
cpus: '0.5'
memory: 512M
agent-3:
build: .
environment:
- INSTANCE_ID=3
- REDIS_URL=redis://redis:6379
- POSTGRES_URL=postgres://user:pass@postgres:5432/agentflow
depends_on:
- redis
- postgres
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- agent-1
- agent-2
- agent-3
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
postgres:
image: pgvector/pgvector:pg15
environment:
POSTGRES_DB: agentflow
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
redis_data:
postgres_data:
Kubernetes Deployment
yaml
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: agenticgokit-agents
spec:
replicas: 3
selector:
matchLabels:
app: agenticgokit-agents
template:
metadata:
labels:
app: agenticgokit-agents
spec:
containers:
- name: agent
image: agenticgokit:latest
ports:
- containerPort: 8080
env:
- name: REDIS_URL
value: "redis://redis-service:6379"
- name: POSTGRES_URL
valueFrom:
secretKeyRef:
name: db-secret
key: postgres-url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: agenticgokit-service
spec:
selector:
app: agenticgokit-agents
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: agenticgokit-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: agenticgokit-agents
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
📊 Performance Monitoring
Load Balancer Metrics
go
type LoadBalancerMetrics struct {
totalRequests int64
requestsPerAgent map[string]int64
responseTimesByAgent map[string][]time.Duration
errorsByAgent map[string]int64
mu sync.RWMutex
}
func NewLoadBalancerMetrics() *LoadBalancerMetrics {
return &LoadBalancerMetrics{
requestsPerAgent: make(map[string]int64),
responseTimesByAgent: make(map[string][]time.Duration),
errorsByAgent: make(map[string]int64),
}
}
func (lbm *LoadBalancerMetrics) RecordRequest(agentID string, duration time.Duration, success bool) {
lbm.mu.Lock()
defer lbm.mu.Unlock()
lbm.totalRequests++
lbm.requestsPerAgent[agentID]++
// Record response time
if lbm.responseTimesByAgent[agentID] == nil {
lbm.responseTimesByAgent[agentID] = make([]time.Duration, 0, 100)
}
lbm.responseTimesByAgent[agentID] = append(lbm.responseTimesByAgent[agentID], duration)
// Keep only recent response times
if len(lbm.responseTimesByAgent[agentID]) > 100 {
lbm.responseTimesByAgent[agentID] = lbm.responseTimesByAgent[agentID][1:]
}
// Record errors
if !success {
lbm.errorsByAgent[agentID]++
}
}
func (lbm *LoadBalancerMetrics) GetStats() map[string]interface{} {
lbm.mu.RLock()
defer lbm.mu.RUnlock()
stats := map[string]interface{}{
"total_requests": lbm.totalRequests,
"agents": make(map[string]interface{}),
}
for agentID, requests := range lbm.requestsPerAgent {
agentStats := map[string]interface{}{
"requests": requests,
"errors": lbm.errorsByAgent[agentID],
}
// Calculate average response time
if responseTimes := lbm.responseTimesByAgent[agentID]; len(responseTimes) > 0 {
total := time.Duration(0)
for _, rt := range responseTimes {
total += rt
}
agentStats["avg_response_time"] = total / time.Duration(len(responseTimes))
}
stats["agents"].(map[string]interface{})[agentID] = agentStats
}
return stats
}
🎯 Best Practices
1. Configuration for Scaling
toml
# agentflow.toml - Production scaling configuration
[runtime]
max_concurrent_agents = 50
timeout_seconds = 30
enable_metrics = true
metrics_port = 8080
[load_balancer]
strategy = "performance_based" # round_robin, weighted, least_connections, performance_based
health_check_interval = "30s"
unhealthy_threshold = 3
healthy_threshold = 2
[scaling]
min_instances = 2
max_instances = 20
target_cpu_utilization = 70
target_memory_utilization = 80
scale_up_cooldown = "2m"
scale_down_cooldown = "5m"
[shared_resources]
redis_url = "${REDIS_URL}"
postgres_url = "${POSTGRES_URL}"
vector_db_url = "${VECTOR_DB_URL}"
2. Health Check Implementation
go
func (a *ScalableAgent) HealthCheck() map[string]interface{} {
health := map[string]interface{}{
"status": "healthy",
"timestamp": time.Now().Format(time.RFC3339),
}
// Check LLM provider
if err := a.checkLLMProvider(); err != nil {
health["status"] = "unhealthy"
health["llm_provider_error"] = err.Error()
}
// Check shared cache
if err := a.checkCache(); err != nil {
health["status"] = "degraded"
health["cache_error"] = err.Error()
}
// Check memory system
if err := a.checkMemory(); err != nil {
health["status"] = "degraded"
health["memory_error"] = err.Error()
}
// Add performance metrics
health["metrics"] = map[string]interface{}{
"requests_per_minute": a.getRequestsPerMinute(),
"avg_response_time": a.getAverageResponseTime(),
"error_rate": a.getErrorRate(),
}
return health
}
3. Graceful Shutdown
go
func (a *ScalableAgent) GracefulShutdown(ctx context.Context) error {
log.Println("Starting graceful shutdown...")
// Stop accepting new requests
a.stopAcceptingRequests()
// Wait for ongoing requests to complete
done := make(chan struct{})
go func() {
a.waitForOngoingRequests()
close(done)
}()
select {
case <-done:
log.Println("All requests completed")
case <-ctx.Done():
log.Println("Shutdown timeout reached, forcing shutdown")
}
// Clean up resources
a.cleanup()
log.Println("Graceful shutdown completed")
return nil
}
Load balancing and scaling are essential for building production-ready agent systems that can handle varying loads efficiently and maintain high availability.
🚀 Next Steps
- Testing Strategies - Test your scaled agent systems
- Circuit Breaker Patterns - Add fault tolerance to scaled systems
- Production Monitoring - Monitor your scaled deployments
- Production Deployment - Deploy your scaled systems