Streaming Guide
Real-time streaming is a core feature of AgenticGoKit, allowing you to receive responses as they're generated. This guide covers streaming patterns, chunk types, and best practices.
🎯 Why Streaming?
Streaming provides several advantages:
- Live UI updates - Display responses as they're generated
- Better UX - Show progress without waiting for completion
- Token-by-token output - See LLM responses in real-time
- Tool execution visibility - Track tool calls as they happen
- Thought process transparency - Observe agent reasoning
- Early cancellation - Stop processing when needed
🚀 Quick Start
Basic Stream Interface
package main
import (
"context"
"fmt"
"log"
agenticgokit "github.com/agenticgokit/agenticgokit/v1beta"
)
func main() {
// Create agent
agent, err := agenticgokit.NewBuilder("StreamingAgent").
WithPreset(agenticgokit.ChatAgent).
Build()
if err != nil {
log.Fatal(err)
}
// Start streaming
stream, err := agent.RunStream(context.Background(), "Tell me a story")
if err != nil {
log.Fatal(err)
}
// Process chunks as they arrive
for chunk := range stream.Chunks() {
switch chunk.Type {
case agenticgokit.ChunkTypeDelta:
fmt.Print(chunk.Delta)
case agenticgokit.ChunkTypeText:
fmt.Print(chunk.Content)
case agenticgokit.ChunkTypeThought:
fmt.Printf("\n[Thinking: %s]\n", chunk.Content)
case agenticgokit.ChunkTypeDone:
fmt.Println("\n✓ Complete")
case v1beta.ChunkTypeError:
fmt.Println("\n✗ Error:", chunk.Error)
}
}
// Get final result
result, err := stream.Wait()
if err != nil {
log.Fatal(err)
}
fmt.Println("\nFinal output:", result.FinalOutput)
}Stream with Context Cancellation
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := agent.RunStream(ctx, "Explain quantum computing")
if err != nil {
log.Fatal(err)
}
for chunk := range stream.Chunks() {
if chunk.Type == v1beta.ChunkTypeDelta {
fmt.Print(chunk.Delta)
}
}
result, err := stream.Wait()
if err == context.DeadlineExceeded {
fmt.Println("\nStream timed out")
}📦 Chunk Types
AgenticGoKit provides 13 chunk types for different streaming scenarios:
Core Content Types
ChunkTypeText
Complete text content (paragraphs or full messages).
if chunk.Type == agenticgokit.ChunkTypeText {
fmt.Println(chunk.Content) // "This is a complete paragraph."
}ChunkTypeDelta
Incremental text changes (token-by-token).
if chunk.Type == agenticgokit.ChunkTypeDelta {
fmt.Print(chunk.Delta) // "This", " is", " incremental"
}Use when:
- Typewriter effect in UI
- Real-time token streaming
- Maximum responsiveness
Agent Reasoning
ChunkTypeThought
Agent's internal reasoning process.
if chunk.Type == agenticgokit.ChunkTypeThought {
fmt.Printf("[Thinking: %s]\n", chunk.Content)
// "Analyzing the question to determine the best approach..."
}Use when:
- Debugging agent logic
- Showing "thinking" indicators
- Understanding agent decisions
Tool Integration
ChunkTypeToolCall
Tool execution request.
if chunk.Type == agenticgokit.ChunkTypeToolCall {
fmt.Printf("Calling tool: %s(%v)\n", chunk.ToolName, chunk.ToolArgs)
}ChunkTypeToolRes
Tool execution result.
if chunk.Type == agenticgokit.ChunkTypeToolRes {
fmt.Printf("Tool result: %s\n", chunk.Content)
}Workflow Lifecycle
ChunkTypeAgentStart
Emitted when an agent or workflow step begins execution.
if chunk.Type == agenticgokit.ChunkTypeAgentStart {
agentName := chunk.Metadata["agent_name"].(string)
fmt.Printf("▶ Starting: %s\n", agentName)
}Use when:
- Progress tracking in workflows
- Showing step indicators
- Logging workflow execution
ChunkTypeAgentComplete
Emitted when an agent or workflow step completes execution.
if chunk.Type == agenticgokit.ChunkTypeAgentComplete {
agentName := chunk.Metadata["agent_name"].(string)
duration := chunk.Metadata["duration"].(time.Duration)
fmt.Printf("✓ Completed: %s (%v)\n", agentName, duration)
}Multimodal Content
ChunkTypeImage
Image content in the stream.
if chunk.Type == agenticgokit.ChunkTypeImage {
fmt.Printf("Image: %s\n", chunk.ImageData.URL)
}ChunkTypeAudio
Audio content in the stream.
if chunk.Type == agenticgokit.ChunkTypeAudio {
fmt.Printf("Audio: %s (%s)\n", chunk.AudioData.URL, chunk.AudioData.Format)
}ChunkTypeVideo
Video content in the stream.
if chunk.Type == agenticgokit.ChunkTypeVideo {
fmt.Printf("Video: %s (%s)\n", chunk.VideoData.URL, chunk.VideoData.Format)
}Stream Control
ChunkTypeMetadata
Additional information (timestamps, token counts, etc.).
if chunk.Type == agenticgokit.ChunkTypeMetadata {
if tokens, ok := chunk.Metadata["tokens"].(int); ok {
fmt.Printf("Tokens used: %d\n", tokens)
}
}ChunkTypeError
Error information during streaming.
if chunk.Type == agenticgokit.ChunkTypeError {
fmt.Println("Error:", chunk.Error)
// Handle error gracefully
}ChunkTypeDone
Stream completion marker.
if chunk.Type == agenticgokit.ChunkTypeDone {
fmt.Println("✓ Stream complete")
// Chunk may contain final metadata
if result, ok := chunk.Metadata["final_result"]; ok {
fmt.Println("Final:", result)
}
}🎨 Streaming Patterns
Pattern 1: Stream Interface (Recommended)
Most flexible pattern with full control:
// Start streaming
stream, err := agent.RunStream(ctx, query)
if err != nil {
log.Fatal(err)
}
// Process chunks
for chunk := range stream.Chunks() {
switch chunk.Type {
case agenticgokit.ChunkTypeText, agenticgokit.ChunkTypeDelta:
fmt.Print(chunk.Delta)
case agenticgokit.ChunkTypeToolCall:
handleToolCall(chunk)
case agenticgokit.ChunkTypeAgentStart:
fmt.Printf("\n▶ %s\n", chunk.Metadata["agent_name"])
case agenticgokit.ChunkTypeAgentComplete:
fmt.Printf("✓ %s complete\n", chunk.Metadata["agent_name"])
case agenticgokit.ChunkTypeDone:
fmt.Println("\nDone!")
}
}
// Get final result
result, err := stream.Wait()
if err != nil {
log.Println("Error:", err)
}Pros:
- Clean and simple API
- Automatic channel management
- Built-in result aggregation
- Context cancellation support
Cons:
- Less control over buffering
- Fixed chunk types
Pattern 2: Filtered Streaming
Process only specific chunk types:
stream, err := agent.RunStream(ctx, query)
if err != nil {
log.Fatal(err)
}
// Filter for text content only
var response strings.Builder
for chunk := range stream.Chunks() {
if chunk.Type == v1beta.ChunkTypeDelta {
response.WriteString(chunk.Delta)
}
}
fmt.Println(response.String())Pros:
- Simple filtering
- Focused processing
- Clean text accumulation
Cons:
- May miss important metadata
- No tool/thought visibility
Pattern 3: Stream Cancellation
Cancel stream based on conditions:
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, _ := agent.RunStream(ctx, query)
// Simple text accumulation with cancellation
var response strings.Builder
for chunk := range stream.Chunks() {
if chunk.Type == v1beta.ChunkTypeDelta {
response.WriteString(chunk.Delta)
// Cancel if response too long
if response.Len() > 1000 {
stream.Cancel()
break
}
}
}
fmt.Println(response.String())Pros:
- Early termination support
- Resource control
- User-initiated stop
Cons:
- May lose partial results
- Needs cleanup handling
⚙️ Stream Interface Methods
Stream Methods
type Stream interface {
// Chunks returns channel of stream chunks
Chunks() <-chan *StreamChunk
// Wait blocks until stream completes and returns final result
Wait() (*Result, error)
// Cancel cancels the stream
Cancel()
// Metadata returns stream metadata
Metadata() map[string]interface{}
// AsReader returns io.Reader interface for text streaming
AsReader() io.Reader
}Using Stream Methods
stream, _ := agent.RunStream(ctx, "Query")
// Get metadata during streaming
metadata := stream.Metadata()
fmt.Println("Stream ID:", metadata["stream_id"])
// Process chunks
for chunk := range stream.Chunks() {
fmt.Print(chunk.Delta)
}
// Get final aggregated result
result, err := stream.Wait()
fmt.Println("Final:", result.FinalOutput)AsReader() for io.Reader Interface
stream, _ := agent.RunStream(ctx, "Query")
// Use as io.Reader
reader := stream.AsReader()
io.Copy(os.Stdout, reader) // Stream directly to stdout🎯 Best Practices
Guidelines:
- Always handle all chunk types for robustness
- Use context for timeouts and cancellation
- Process chunks quickly to avoid blocking
- Aggregate text for final output when needed
- Handle errors gracefully
Context Usage
Use context for control:
// Timeout after 30 seconds
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := agent.RunStream(ctx, query)
if err != nil {
log.Fatal(err)
}
for chunk := range stream.Chunks() {
// Process chunks
}
result, err := stream.Wait()
if err == context.DeadlineExceeded {
log.Println("Streaming timed out")
}User Cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start streaming
stream, _ := agent.RunStream(ctx, query)
// Cancel from another goroutine
go func() {
<-userCancelSignal
stream.Cancel() // Stop streaming immediately
}()
for chunk := range stream.Chunks() {
fmt.Print(chunk.Delta)
}
---
## 🔄 Workflow Streaming
Workflows support streaming to track multi-agent execution:
```go
// Create workflow
config := &v1beta.WorkflowConfig{
Mode: v1beta.Sequential,
Timeout: 120 * time.Second,
}
workflow, _ := v1beta.NewSequentialWorkflow(config)
workflow.AddStep(v1beta.WorkflowStep{Name: "extract", Agent: extractAgent})
workflow.AddStep(v1beta.WorkflowStep{Name: "transform", Agent: transformAgent})
workflow.AddStep(v1beta.WorkflowStep{Name: "load", Agent: loadAgent})
// Stream workflow execution
stream, err := workflow.RunStream(context.Background(), "Process data")
if err != nil {
log.Fatal(err)
}
for chunk := range stream.Chunks() {
switch chunk.Type {
case v1beta.ChunkTypeMetadata:
if stepName, ok := chunk.Metadata["step_name"].(string); ok {
fmt.Printf("→ Executing step: %s\n", stepName)
}
case v1beta.ChunkTypeDelta:
fmt.Print(chunk.Delta)
case v1beta.ChunkTypeDone:
fmt.Println("\n✓ Workflow complete")
}
}
result, _ := stream.Wait()
fmt.Println("Final result:", result.FinalOutput)🌐 Integration Examples
WebSocket Integration
func streamToWebSocket(ws *websocket.Conn, agent v1beta.Agent, query string) {
stream, err := agent.RunStream(context.Background(), query)
if err != nil {
ws.WriteJSON(map[string]interface{}{"type": "error", "error": err.Error()})
return
}
for chunk := range stream.Chunks() {
if chunk.Type == v1beta.ChunkTypeDelta {
ws.WriteJSON(map[string]interface{}{
"type": "content",
"content": chunk.Delta,
})
} else if chunk.Type == v1beta.ChunkTypeDone {
ws.WriteJSON(map[string]interface{}{
"type": "done",
})
}
}
}Server-Sent Events (SSE)
func streamToSSE(w http.ResponseWriter, agent v1beta.Agent, query string) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
flusher, _ := w.(http.Flusher)
stream, err := agent.RunStream(context.Background(), query)
if err != nil {
fmt.Fprintf(w, "data: {\"error\": \"%s\"}\n\n", err.Error())
flusher.Flush()
return
}
for chunk := range stream.Chunks() {
if chunk.Type == v1beta.ChunkTypeDelta {
fmt.Fprintf(w, "data: %s\n\n", chunk.Delta)
flusher.Flush()
} else if chunk.Type == v1beta.ChunkTypeDone {
fmt.Fprintf(w, "data: [DONE]\n\n")
flusher.Flush()
break
}
}
}CLI Progress Bar
import "github.com/schollz/progressbar/v3"
func streamWithProgress(agent v1beta.Agent, query string) {
bar := progressbar.NewOptions(-1,
progressbar.OptionSetDescription("Processing..."),
progressbar.OptionSpinnerType(14),
)
stream, err := agent.RunStream(context.Background(), query)
if err != nil {
log.Fatal(err)
}
var response strings.Builder
for chunk := range stream.Chunks() {
bar.Add(1)
if chunk.Type == v1beta.ChunkTypeDelta {
response.WriteString(chunk.Delta)
}
}
bar.Finish()
fmt.Println("\n", response.String())
}}()
var response strings.Builder
for chunk := range chunks {
bar.Add(1)
if chunk.Type == v1beta.ChunkTypeDelta {
response.WriteString(chunk.Content)
} else if chunk.Type == v1beta.ChunkTypeDone {
bar.Finish()
fmt.Printf("\n%s\n", response.String())
}
}
}
---
## 🎯 Best Practices
### 1. Always Use Context
```go
// ✅ Good - with timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
stream, _ := agent.RunStream(ctx, query)2. Handle All Chunk Types
// ✅ Good - handle all types
stream, _ := agent.RunStream(ctx, query)
for chunk := range stream.Chunks() {
switch chunk.Type {
case v1beta.ChunkTypeContent, v1beta.ChunkTypeDelta:
handleText(chunk)
case v1beta.ChunkTypeError:
handleError(chunk)
case v1beta.ChunkTypeDone:
handleDone(chunk)
default:
// Log or ignore unknown types
}
}3. Use Stream.Wait() for Final Result
// ✅ Good - get aggregated result
stream, _ := agent.RunStream(ctx, query)
for chunk := range stream.Chunks() {
processChunk(chunk)
}
result, err := stream.Wait()
if err != nil {
log.Fatal(err)
}
fmt.Println("Final:", result.FinalOutput)4. Set Appropriate Timeouts
// ✅ Good - reasonable timeout
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
stream, _ := agent.RunStream(ctx, query)5. Handle Errors Gracefully
// ✅ Good - error handling
stream, _ := agent.RunStream(ctx, query)
for chunk := range stream.Chunks() {
if chunk.Type == v1beta.ChunkTypeError {
log.Printf("Stream error: %v", chunk.Error)
// Show user-friendly message
// Attempt recovery or cleanup
break
}
}
result, err := stream.Wait()
if err != nil {
// Handle final error
}🐛 Troubleshooting
Issue: Stream Never Completes
Cause: Not calling stream.Wait() or not draining Chunks()
Solution: Always consume all chunks and call Wait()
stream, _ := agent.RunStream(ctx, query)
// Drain chunks
for chunk := range stream.Chunks() {
processChunk(chunk)
}
// Get final result
result, err := stream.Wait()Issue: Context Canceled Error
Cause: Context timeout or cancellation
Solution: Check context deadline and increase if needed
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
stream, err := agent.RunStream(ctx, query)
if err == context.DeadlineExceeded {
fmt.Println("Increase timeout or optimize query")
}Issue: Missing Chunks
Cause: Not processing all chunk types
Solution: Handle all ChunkType values
for chunk := range stream.Chunks() {
switch chunk.Type {
case v1beta.ChunkTypeDelta:
// Handle delta
case v1beta.ChunkTypeContent:
// Handle content
case v1beta.ChunkTypeMetadata:
// Handle metadata
case v1beta.ChunkTypeDone:
// Handle completion
}
}Solution: Always check for Done chunk
for chunk := range chunks {
if chunk.Type == v1beta.ChunkTypeDone {
break // Exit loop
}
}Issue: Memory Leak
Cause: Goroutine leak or unclosed channels
Solution: Ensure proper cleanup
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel() // Always cleanup
chunks := make(chan v1beta.StreamChunk, 100)
go func() {
defer close(chunks) // Close when done
agent.RunStream(ctx, query, chunks)
}()📚 Examples
See complete streaming examples:
🔗 Related Topics
- Core Concepts - Understanding agents and handlers
- Workflows - Multi-agent streaming
- Performance Guide - Optimize streaming performance
Ready for workflows? Continue to Workflows Guide →