Streaming API (vNext)
Real-time agent responses with chunked token streams
vNext provides comprehensive streaming capabilities for real-time agent interactions. Streams deliver tokens as they're generated by the LLM, providing immediate feedback and improved user experience.
🔑 Core Types
Stream Interface
go
type Stream interface {
Chunks() <-chan *StreamChunk
Wait() (*Result, error)
Cancel()
Metadata() *StreamMetadata
AsReader() io.Reader
}StreamChunk
go
type StreamChunk struct {
Type ChunkType `json:"type"`
Content string `json:"content,omitempty"`
Delta string `json:"delta,omitempty"`
ToolName string `json:"tool_name,omitempty"`
ToolArgs map[string]interface{} `json:"tool_args,omitempty"`
ToolID string `json:"tool_id,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
Error error `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
Index int `json:"index"`
}Chunk Types
go
const (
ChunkTypeText ChunkType = "text" // Complete text content
ChunkTypeDelta ChunkType = "delta" // Incremental token update
ChunkTypeThought ChunkType = "thought" // Agent reasoning
ChunkTypeToolCall ChunkType = "tool_call" // Tool invocation
ChunkTypeToolRes ChunkType = "tool_result" // Tool result
ChunkTypeMetadata ChunkType = "metadata" // Stream metadata
ChunkTypeError ChunkType = "error" // Error condition
ChunkTypeDone ChunkType = "done" // Stream completion
)Key Chunk Types:
ChunkTypeDelta: Primary type for real-time streaming - contains individual tokensChunkTypeDone: Signals stream completionChunkTypeError: Contains error informationChunkTypeThought: Agent reasoning (when enabled)ChunkTypeToolCall/ChunkTypeToolRes: Tool interactions
StreamMetadata
go
type StreamMetadata struct {
AgentName string `json:"agent_name"`
SessionID string `json:"session_id,omitempty"`
TraceID string `json:"trace_id,omitempty"`
StartTime time.Time `json:"start_time"`
Model string `json:"model,omitempty"`
Extra map[string]interface{} `json:"extra,omitempty"`
}⚙️ Stream Options
go
type StreamOptions struct {
BufferSize int // Channel buffer size (default: 100)
Handler StreamHandler // Optional callback handler
IncludeThoughts bool // Include reasoning chunks
IncludeToolCalls bool // Include tool call chunks
IncludeMetadata bool // Include metadata chunks
TextOnly bool // Only emit text/delta chunks
FlushInterval time.Duration // Buffer flush frequency
Timeout time.Duration // Stream timeout
Metadata map[string]interface{} // Additional metadata
}
type StreamHandler func(chunk *StreamChunk) bool // Return false to cancelOption Builders
go
// Default options (recommended for most use cases)
opts := vnext.DefaultStreamOptions()
// Common option builders
vnext.WithBufferSize(200) // Larger buffer for performance
vnext.WithStreamHandler(handler) // Callback-based processing
vnext.WithThoughts() // Enable agent reasoning
vnext.WithToolCalls() // Enable tool interaction streaming
vnext.WithStreamMetadata() // Include metadata chunks
vnext.WithTextOnly() // Text/delta only (faster)
vnext.WithStreamTimeout(30*time.Second) // Custom timeout
vnext.WithFlushInterval(50*time.Millisecond) // Faster chunk deliveryDefault Options:
- Buffer size: 100 chunks
- Includes thoughts and tool calls
- 5-minute timeout
- 100ms flush interval
🚀 Basic Usage
Simple Streaming
go
// Create agent
agent, err := vnext.QuickChatAgent("gpt-4o-mini")
// Start streaming
stream, err := agent.RunStream(ctx, "Explain quantum computing")
if err != nil {
log.Fatal(err)
}
// Process tokens in real-time
for chunk := range stream.Chunks() {
switch chunk.Type {
case vnext.ChunkTypeDelta:
fmt.Print(chunk.Delta) // Print tokens as they arrive
case vnext.ChunkTypeDone:
fmt.Println("\n✅ Complete")
case vnext.ChunkTypeError:
fmt.Printf("❌ Error: %v\n", chunk.Error)
}
}
// Get final result
result, err := stream.Wait()Streaming with Options
go
// Advanced streaming with configuration
stream, err := agent.RunStream(ctx, "Hello",
vnext.WithThoughts(), // Show reasoning
vnext.WithBufferSize(200), // Larger buffer
vnext.WithStreamTimeout(30*time.Second), // Custom timeout
)
for chunk := range stream.Chunks() {
switch chunk.Type {
case vnext.ChunkTypeDelta:
fmt.Print(chunk.Delta)
case vnext.ChunkTypeThought:
fmt.Printf("\n[💭 %s]\n", chunk.Content)
case vnext.ChunkTypeDone:
break
}
}Handler-Based Streaming
go
// Process chunks with a callback
handler := func(chunk *vnext.StreamChunk) bool {
if chunk.Type == vnext.ChunkTypeDelta {
fmt.Print(chunk.Delta)
}
return true // Continue streaming (false = stop)
}
stream, err := agent.RunStream(ctx, input, vnext.WithStreamHandler(handler))
result, err := stream.Wait()🎛️ Advanced Usage
RunStreamWithOptions
go
// Combine run options with stream options
runOpts := &vnext.RunOptions{
MaxTokens: 500,
Temperature: 0.7,
StopWords: []string{"STOP"},
}
stream, err := agent.RunStreamWithOptions(ctx, input, runOpts,
vnext.WithThoughts(),
vnext.WithToolCalls(),
)⚠️ Note: RunStreamWithOptions has known limitations in current implementation. Use RunStream for reliable streaming.
Text-Only Streaming
go
// Simplified streaming (text/delta chunks only)
stream, err := agent.RunStream(ctx, "Hello", vnext.WithTextOnly())
for chunk := range stream.Chunks() {
if chunk.Type == vnext.ChunkTypeDelta {
fmt.Print(chunk.Delta)
}
}Reader Interface
go
// Stream as io.Reader (text content only)
stream, err := agent.RunStream(ctx, "Generate code sample")
reader := stream.AsReader()
io.Copy(os.Stdout, reader)
result, err := stream.Wait()Interactive Streaming
go
// Real-time conversation
scanner := bufio.NewScanner(os.Stdin)
for {
fmt.Print("You: ")
if !scanner.Scan() {
break
}
input := strings.TrimSpace(scanner.Text())
if input == "quit" {
break
}
fmt.Print("Agent: ")
stream, err := agent.RunStream(ctx, input)
for chunk := range stream.Chunks() {
if chunk.Type == vnext.ChunkTypeDelta {
fmt.Print(chunk.Delta)
}
}
fmt.Println()
}🧰 Utility Functions
CollectStream
go
// Collect all streaming text into a single string
stream, err := agent.RunStream(ctx, "Hello")
output, result, err := vnext.CollectStream(stream)
fmt.Println("Complete response:", output)StreamToChannel
go
// Convert to simple text channel
stream, err := agent.RunStream(ctx, "Hello")
textChan := vnext.StreamToChannel(stream)
for text := range textChan {
fmt.Print(text)
}PrintStream
go
// Print stream to stdout (demo/testing helper)
stream, err := agent.RunStream(ctx, "Hello")
result, err := vnext.PrintStream(stream)🏗️ Stream Builder
go
// Create custom streams
stream, writer := vnext.NewStreamBuilder().
WithAgentName("custom-agent").
WithSessionID("sess-123").
WithModel("gpt-4o-mini").
WithOption(vnext.WithThoughts()).
Build(ctx)
// Write custom chunks
writer.Write(&vnext.StreamChunk{
Type: vnext.ChunkTypeDelta,
Delta: "Hello",
})
writer.Close()🔄 Agent Integration
Available Methods
go
type Agent interface {
// Basic streaming
RunStream(ctx context.Context, input string, opts ...StreamOption) (Stream, error)
// Advanced streaming (experimental)
RunStreamWithOptions(ctx context.Context, input string,
runOpts *RunOptions, streamOpts ...StreamOption) (Stream, error)
}Agent Creation for Streaming
go
// Quick agent creation
agent, err := vnext.QuickChatAgent("gpt-4o-mini")
// Agent with configuration
agent, err := vnext.QuickChatAgentWithConfig("gemma2:2b", &vnext.Config{
Name: "streaming-agent",
SystemPrompt: "You are a helpful assistant.",
Timeout: 30 * time.Second,
LLM: vnext.LLMConfig{
Provider: "ollama",
Model: "gemma3:1b",
Temperature: 0.7,
MaxTokens: 200,
BaseURL: "http://localhost:11434",
},
})🛡️ Error Handling & Lifecycle
Error Handling
go
stream, err := agent.RunStream(ctx, "Hello")
if err != nil {
log.Fatal("Failed to start streaming:", err)
}
for chunk := range stream.Chunks() {
if chunk.Error != nil {
fmt.Printf("Stream error: %v\n", chunk.Error)
break
}
switch chunk.Type {
case vnext.ChunkTypeDelta:
fmt.Print(chunk.Delta)
case vnext.ChunkTypeDone:
break
}
}
// Always check final result
result, err := stream.Wait()
if err != nil {
log.Printf("Stream completed with error: %v", err)
}Cancellation & Timeouts
go
// Manual cancellation
stream, err := agent.RunStream(ctx, "Long task...")
go func() {
time.Sleep(5 * time.Second)
stream.Cancel() // Stop streaming
}()
// Context-based timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := agent.RunStream(ctx, "Hello")
// Option-based timeout
stream, err := agent.RunStream(ctx, "Hello",
vnext.WithStreamTimeout(10*time.Second))Stream Lifecycle
go
stream, err := agent.RunStream(ctx, input)
// 1. Stream starts, chunks begin flowing
for chunk := range stream.Chunks() {
// 2. Process chunks as they arrive
if chunk.Type == vnext.ChunkTypeDone {
break // 3. Stream completion signal
}
}
// 4. Get final result and cleanup
result, err := stream.Wait()🎯 Performance Tips
Buffer Sizing
go
// Small buffer (low memory, higher latency)
vnext.WithBufferSize(10)
// Large buffer (higher memory, lower latency)
vnext.WithBufferSize(500)
// Default (balanced)
vnext.WithBufferSize(100)Flush Intervals
go
// Faster delivery (more CPU)
vnext.WithFlushInterval(10 * time.Millisecond)
// Slower delivery (less CPU)
vnext.WithFlushInterval(200 * time.Millisecond)
// Default (balanced)
vnext.WithFlushInterval(100 * time.Millisecond)Text-Only Mode
go
// Skip non-text chunks for better performance
stream, err := agent.RunStream(ctx, input, vnext.WithTextOnly())🔧 Provider Support
Supported Providers
- Ollama: Full streaming support with local models
- OpenAI: Real-time streaming via API
- Azure OpenAI: Streaming through Azure endpoints
Provider-Specific Configuration
go
// Ollama (local)
agent, err := vnext.QuickChatAgentWithConfig("ollama-agent", &vnext.Config{
LLM: vnext.LLMConfig{
Provider: "ollama",
Model: "gemma3:1b",
BaseURL: "http://localhost:11434",
},
})
// OpenAI (cloud)
agent, err := vnext.QuickChatAgent("gpt-4o-mini")
// Azure OpenAI (cloud)
agent, err := vnext.QuickChatAgentWithConfig("azure-agent", &vnext.Config{
LLM: vnext.LLMConfig{
Provider: "azure",
Model: "gpt-4o-mini",
Endpoint: os.Getenv("AZURE_OPENAI_ENDPOINT"),
APIKey: os.Getenv("AZURE_OPENAI_API_KEY"),
},
})🔗 Related Documentation
- Agent API - Agent creation and basic methods
- Configuration - LLM provider configuration
- Tools - Tool integration with streaming
- Memory - Stateful streaming conversations
- Builder - Agent builder patterns
📚 Examples
Simple Streaming
bash
# Run the basic streaming example
cd examples/vnext/simple-streaming
go run main.goComprehensive Demo
bash
# Run multi-mode streaming demo
cd examples/vnext/streaming-demo
go run main.go🐛 Troubleshooting
Common Issues
Stream Never Starts
go
// Check context and agent configuration
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
agent, err := vnext.QuickChatAgent("gpt-4o-mini")
if err != nil {
log.Fatal("Agent creation failed:", err)
}
stream, err := agent.RunStream(ctx, "Hello")
if err != nil {
log.Fatal("Stream start failed:", err)
}No Chunks Received
go
// Ensure proper chunk type handling
for chunk := range stream.Chunks() {
fmt.Printf("Chunk type: %s, Delta: %s\n", chunk.Type, chunk.Delta)
if chunk.Type == vnext.ChunkTypeDelta {
fmt.Print(chunk.Delta)
}
}RunStreamWithOptions Returns 0 Chunks
go
// Known limitation - use RunStream instead
stream, err := agent.RunStream(ctx, input,
vnext.WithStreamTimeout(30*time.Second),
vnext.WithThoughts(),
)Interactive Input Truncated
go
// Use bufio.Scanner for multi-word input
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("Input: ")
if scanner.Scan() {
input := strings.TrimSpace(scanner.Text()) // Full line
}Performance Issues
Slow Streaming
go
// Try smaller models or adjust buffer size
vnext.WithBufferSize(200), // Larger buffer
vnext.WithFlushInterval(50*time.Millisecond), // Faster flushHigh Memory Usage
go
// Use text-only mode
vnext.WithTextOnly(),
vnext.WithBufferSize(50), // Smaller buffer🌟 Best Practices
- Always handle ChunkTypeDelta - Primary streaming content
- Check for ChunkTypeError - Handle streaming errors
- Call stream.Wait() - Get final result and cleanup
- Use appropriate buffer sizes - Balance memory vs latency
- Handle context cancellation - Proper timeout management
- Prefer RunStream over RunStreamWithOptions - More reliable
- Use bufio.Scanner for interactive input - Multi-word support
📈 Performance Metrics
Track streaming performance:
go
startTime := time.Now()
tokenCount := 0
for chunk := range stream.Chunks() {
if chunk.Type == vnext.ChunkTypeDelta {
tokenCount++
fmt.Print(chunk.Delta)
}
}
duration := time.Since(startTime)
tokensPerSecond := float64(tokenCount) / duration.Seconds()
fmt.Printf("\nTokens/sec: %.1f\n", tokensPerSecond)