Skip to content

Streaming API (vNext)

Real-time agent responses with chunked token streams

vNext provides comprehensive streaming capabilities for real-time agent interactions. Streams deliver tokens as they're generated by the LLM, providing immediate feedback and improved user experience.

🔑 Core Types

Stream Interface

go
type Stream interface {
    Chunks() <-chan *StreamChunk
    Wait() (*Result, error)
    Cancel()
    Metadata() *StreamMetadata
    AsReader() io.Reader
}

StreamChunk

go
type StreamChunk struct {
    Type      ChunkType              `json:"type"`
    Content   string                 `json:"content,omitempty"`
    Delta     string                 `json:"delta,omitempty"`
    ToolName  string                 `json:"tool_name,omitempty"`
    ToolArgs  map[string]interface{} `json:"tool_args,omitempty"`
    ToolID    string                 `json:"tool_id,omitempty"`
    Metadata  map[string]interface{} `json:"metadata,omitempty"`
    Error     error                  `json:"error,omitempty"`
    Timestamp time.Time              `json:"timestamp"`
    Index     int                    `json:"index"`
}

Chunk Types

go
const (
    ChunkTypeText     ChunkType = "text"        // Complete text content
    ChunkTypeDelta    ChunkType = "delta"       // Incremental token update
    ChunkTypeThought  ChunkType = "thought"     // Agent reasoning
    ChunkTypeToolCall ChunkType = "tool_call"   // Tool invocation
    ChunkTypeToolRes  ChunkType = "tool_result" // Tool result
    ChunkTypeMetadata ChunkType = "metadata"    // Stream metadata
    ChunkTypeError    ChunkType = "error"       // Error condition
    ChunkTypeDone     ChunkType = "done"        // Stream completion
)

Key Chunk Types:

  • ChunkTypeDelta: Primary type for real-time streaming - contains individual tokens
  • ChunkTypeDone: Signals stream completion
  • ChunkTypeError: Contains error information
  • ChunkTypeThought: Agent reasoning (when enabled)
  • ChunkTypeToolCall/ChunkTypeToolRes: Tool interactions

StreamMetadata

go
type StreamMetadata struct {
    AgentName string                 `json:"agent_name"`
    SessionID string                 `json:"session_id,omitempty"`
    TraceID   string                 `json:"trace_id,omitempty"`
    StartTime time.Time              `json:"start_time"`
    Model     string                 `json:"model,omitempty"`
    Extra     map[string]interface{} `json:"extra,omitempty"`
}

⚙️ Stream Options

go
type StreamOptions struct {
    BufferSize       int                    // Channel buffer size (default: 100)
    Handler          StreamHandler          // Optional callback handler
    IncludeThoughts  bool                   // Include reasoning chunks
    IncludeToolCalls bool                   // Include tool call chunks
    IncludeMetadata  bool                   // Include metadata chunks
    TextOnly         bool                   // Only emit text/delta chunks
    FlushInterval    time.Duration          // Buffer flush frequency
    Timeout          time.Duration          // Stream timeout
    Metadata         map[string]interface{} // Additional metadata
}

type StreamHandler func(chunk *StreamChunk) bool // Return false to cancel

Option Builders

go
// Default options (recommended for most use cases)
opts := vnext.DefaultStreamOptions()

// Common option builders
vnext.WithBufferSize(200)              // Larger buffer for performance
vnext.WithStreamHandler(handler)       // Callback-based processing
vnext.WithThoughts()                   // Enable agent reasoning
vnext.WithToolCalls()                  // Enable tool interaction streaming
vnext.WithStreamMetadata()             // Include metadata chunks
vnext.WithTextOnly()                   // Text/delta only (faster)
vnext.WithStreamTimeout(30*time.Second) // Custom timeout
vnext.WithFlushInterval(50*time.Millisecond) // Faster chunk delivery

Default Options:

  • Buffer size: 100 chunks
  • Includes thoughts and tool calls
  • 5-minute timeout
  • 100ms flush interval

🚀 Basic Usage

Simple Streaming

go
// Create agent
agent, err := vnext.QuickChatAgent("gpt-4o-mini")

// Start streaming
stream, err := agent.RunStream(ctx, "Explain quantum computing")
if err != nil {
    log.Fatal(err)
}

// Process tokens in real-time
for chunk := range stream.Chunks() {
    switch chunk.Type {
    case vnext.ChunkTypeDelta:
        fmt.Print(chunk.Delta)  // Print tokens as they arrive
    case vnext.ChunkTypeDone:
        fmt.Println("\n✅ Complete")
    case vnext.ChunkTypeError:
        fmt.Printf("❌ Error: %v\n", chunk.Error)
    }
}

// Get final result
result, err := stream.Wait()

Streaming with Options

go
// Advanced streaming with configuration
stream, err := agent.RunStream(ctx, "Hello", 
    vnext.WithThoughts(),                    // Show reasoning
    vnext.WithBufferSize(200),              // Larger buffer
    vnext.WithStreamTimeout(30*time.Second), // Custom timeout
)

for chunk := range stream.Chunks() {
    switch chunk.Type {
    case vnext.ChunkTypeDelta:
        fmt.Print(chunk.Delta)
    case vnext.ChunkTypeThought:
        fmt.Printf("\n[💭 %s]\n", chunk.Content)
    case vnext.ChunkTypeDone:
        break
    }
}

Handler-Based Streaming

go
// Process chunks with a callback
handler := func(chunk *vnext.StreamChunk) bool {
    if chunk.Type == vnext.ChunkTypeDelta {
        fmt.Print(chunk.Delta)
    }
    return true // Continue streaming (false = stop)
}

stream, err := agent.RunStream(ctx, input, vnext.WithStreamHandler(handler))
result, err := stream.Wait()

🎛️ Advanced Usage

RunStreamWithOptions

go
// Combine run options with stream options
runOpts := &vnext.RunOptions{
    MaxTokens:   500,
    Temperature: 0.7,
    StopWords:   []string{"STOP"},
}

stream, err := agent.RunStreamWithOptions(ctx, input, runOpts,
    vnext.WithThoughts(),
    vnext.WithToolCalls(),
)

⚠️ Note: RunStreamWithOptions has known limitations in current implementation. Use RunStream for reliable streaming.

Text-Only Streaming

go
// Simplified streaming (text/delta chunks only)
stream, err := agent.RunStream(ctx, "Hello", vnext.WithTextOnly())
for chunk := range stream.Chunks() {
    if chunk.Type == vnext.ChunkTypeDelta {
        fmt.Print(chunk.Delta)
    }
}

Reader Interface

go
// Stream as io.Reader (text content only)
stream, err := agent.RunStream(ctx, "Generate code sample")
reader := stream.AsReader()
io.Copy(os.Stdout, reader)
result, err := stream.Wait()

Interactive Streaming

go
// Real-time conversation
scanner := bufio.NewScanner(os.Stdin)
for {
    fmt.Print("You: ")
    if !scanner.Scan() {
        break
    }
    input := strings.TrimSpace(scanner.Text())
    
    if input == "quit" {
        break
    }
    
    fmt.Print("Agent: ")
    stream, err := agent.RunStream(ctx, input)
    for chunk := range stream.Chunks() {
        if chunk.Type == vnext.ChunkTypeDelta {
            fmt.Print(chunk.Delta)
        }
    }
    fmt.Println()
}

🧰 Utility Functions

CollectStream

go
// Collect all streaming text into a single string
stream, err := agent.RunStream(ctx, "Hello")
output, result, err := vnext.CollectStream(stream)
fmt.Println("Complete response:", output)

StreamToChannel

go
// Convert to simple text channel
stream, err := agent.RunStream(ctx, "Hello")
textChan := vnext.StreamToChannel(stream)
for text := range textChan {
    fmt.Print(text)
}

PrintStream

go
// Print stream to stdout (demo/testing helper)
stream, err := agent.RunStream(ctx, "Hello")
result, err := vnext.PrintStream(stream)

🏗️ Stream Builder

go
// Create custom streams
stream, writer := vnext.NewStreamBuilder().
    WithAgentName("custom-agent").
    WithSessionID("sess-123").
    WithModel("gpt-4o-mini").
    WithOption(vnext.WithThoughts()).
    Build(ctx)

// Write custom chunks
writer.Write(&vnext.StreamChunk{
    Type:  vnext.ChunkTypeDelta,
    Delta: "Hello",
})
writer.Close()

🔄 Agent Integration

Available Methods

go
type Agent interface {
    // Basic streaming
    RunStream(ctx context.Context, input string, opts ...StreamOption) (Stream, error)
    
    // Advanced streaming (experimental)
    RunStreamWithOptions(ctx context.Context, input string, 
        runOpts *RunOptions, streamOpts ...StreamOption) (Stream, error)
}

Agent Creation for Streaming

go
// Quick agent creation
agent, err := vnext.QuickChatAgent("gpt-4o-mini")

// Agent with configuration
agent, err := vnext.QuickChatAgentWithConfig("gemma2:2b", &vnext.Config{
    Name:         "streaming-agent",
    SystemPrompt: "You are a helpful assistant.",
    Timeout:      30 * time.Second,
    LLM: vnext.LLMConfig{
        Provider:    "ollama",
        Model:       "gemma3:1b",
        Temperature: 0.7,
        MaxTokens:   200,
        BaseURL:     "http://localhost:11434",
    },
})

🛡️ Error Handling & Lifecycle

Error Handling

go
stream, err := agent.RunStream(ctx, "Hello")
if err != nil {
    log.Fatal("Failed to start streaming:", err)
}

for chunk := range stream.Chunks() {
    if chunk.Error != nil {
        fmt.Printf("Stream error: %v\n", chunk.Error)
        break
    }
    
    switch chunk.Type {
    case vnext.ChunkTypeDelta:
        fmt.Print(chunk.Delta)
    case vnext.ChunkTypeDone:
        break
    }
}

// Always check final result
result, err := stream.Wait()
if err != nil {
    log.Printf("Stream completed with error: %v", err)
}

Cancellation & Timeouts

go
// Manual cancellation
stream, err := agent.RunStream(ctx, "Long task...")
go func() {
    time.Sleep(5 * time.Second)
    stream.Cancel() // Stop streaming
}()

// Context-based timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := agent.RunStream(ctx, "Hello")

// Option-based timeout
stream, err := agent.RunStream(ctx, "Hello", 
    vnext.WithStreamTimeout(10*time.Second))

Stream Lifecycle

go
stream, err := agent.RunStream(ctx, input)

// 1. Stream starts, chunks begin flowing
for chunk := range stream.Chunks() {
    // 2. Process chunks as they arrive
    if chunk.Type == vnext.ChunkTypeDone {
        break // 3. Stream completion signal
    }
}

// 4. Get final result and cleanup
result, err := stream.Wait()

🎯 Performance Tips

Buffer Sizing

go
// Small buffer (low memory, higher latency)
vnext.WithBufferSize(10)

// Large buffer (higher memory, lower latency) 
vnext.WithBufferSize(500)

// Default (balanced)
vnext.WithBufferSize(100)

Flush Intervals

go
// Faster delivery (more CPU)
vnext.WithFlushInterval(10 * time.Millisecond)

// Slower delivery (less CPU)
vnext.WithFlushInterval(200 * time.Millisecond)

// Default (balanced)
vnext.WithFlushInterval(100 * time.Millisecond)

Text-Only Mode

go
// Skip non-text chunks for better performance
stream, err := agent.RunStream(ctx, input, vnext.WithTextOnly())

🔧 Provider Support

Supported Providers

  • Ollama: Full streaming support with local models
  • OpenAI: Real-time streaming via API
  • Azure OpenAI: Streaming through Azure endpoints

Provider-Specific Configuration

go
// Ollama (local)
agent, err := vnext.QuickChatAgentWithConfig("ollama-agent", &vnext.Config{
    LLM: vnext.LLMConfig{
        Provider: "ollama",
        Model:    "gemma3:1b",
        BaseURL:  "http://localhost:11434",
    },
})

// OpenAI (cloud)
agent, err := vnext.QuickChatAgent("gpt-4o-mini")

// Azure OpenAI (cloud)
agent, err := vnext.QuickChatAgentWithConfig("azure-agent", &vnext.Config{
    LLM: vnext.LLMConfig{
        Provider: "azure",
        Model:    "gpt-4o-mini",
        Endpoint: os.Getenv("AZURE_OPENAI_ENDPOINT"),
        APIKey:   os.Getenv("AZURE_OPENAI_API_KEY"),
    },
})

📚 Examples

Simple Streaming

bash
# Run the basic streaming example
cd examples/vnext/simple-streaming
go run main.go

Comprehensive Demo

bash
# Run multi-mode streaming demo
cd examples/vnext/streaming-demo  
go run main.go

🐛 Troubleshooting

Common Issues

Stream Never Starts

go
// Check context and agent configuration
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

agent, err := vnext.QuickChatAgent("gpt-4o-mini")
if err != nil {
    log.Fatal("Agent creation failed:", err)
}

stream, err := agent.RunStream(ctx, "Hello")
if err != nil {
    log.Fatal("Stream start failed:", err)
}

No Chunks Received

go
// Ensure proper chunk type handling
for chunk := range stream.Chunks() {
    fmt.Printf("Chunk type: %s, Delta: %s\n", chunk.Type, chunk.Delta)
    
    if chunk.Type == vnext.ChunkTypeDelta {
        fmt.Print(chunk.Delta)
    }
}

RunStreamWithOptions Returns 0 Chunks

go
// Known limitation - use RunStream instead
stream, err := agent.RunStream(ctx, input,
    vnext.WithStreamTimeout(30*time.Second),
    vnext.WithThoughts(),
)

Interactive Input Truncated

go
// Use bufio.Scanner for multi-word input
scanner := bufio.NewScanner(os.Stdin)
fmt.Print("Input: ")
if scanner.Scan() {
    input := strings.TrimSpace(scanner.Text()) // Full line
}

Performance Issues

Slow Streaming

go
// Try smaller models or adjust buffer size
vnext.WithBufferSize(200),              // Larger buffer
vnext.WithFlushInterval(50*time.Millisecond), // Faster flush

High Memory Usage

go
// Use text-only mode
vnext.WithTextOnly(),
vnext.WithBufferSize(50), // Smaller buffer

🌟 Best Practices

  1. Always handle ChunkTypeDelta - Primary streaming content
  2. Check for ChunkTypeError - Handle streaming errors
  3. Call stream.Wait() - Get final result and cleanup
  4. Use appropriate buffer sizes - Balance memory vs latency
  5. Handle context cancellation - Proper timeout management
  6. Prefer RunStream over RunStreamWithOptions - More reliable
  7. Use bufio.Scanner for interactive input - Multi-word support

📈 Performance Metrics

Track streaming performance:

go
startTime := time.Now()
tokenCount := 0

for chunk := range stream.Chunks() {
    if chunk.Type == vnext.ChunkTypeDelta {
        tokenCount++
        fmt.Print(chunk.Delta)
    }
}

duration := time.Since(startTime)
tokensPerSecond := float64(tokenCount) / duration.Seconds()
fmt.Printf("\nTokens/sec: %.1f\n", tokensPerSecond)

Released under the Apache 2.0 License.