Workflow API (vNext) โ
Multi-agent orchestration with sequential, parallel, DAG, and loop modes
The vNext workflow engine wraps multiple agents into an executable graph. It streams intermediate events, records step-level metrics, and shares memory between steps when configured.
๐ Workflow Interface โ
type Workflow interface {
Run(ctx context.Context, input string) (*WorkflowResult, error)
RunStream(ctx context.Context, input string, opts ...StreamOption) (Stream, error)
AddStep(step WorkflowStep) error
SetMemory(memory Memory)
GetConfig() *WorkflowConfig
Initialize(ctx context.Context) error
Shutdown(ctx context.Context) error
}WorkflowConfig controls mode, timeout, max iterations, optional shared memory config, and named agent list for declarative setups.
๐งฑ Steps โ
type WorkflowStep struct {
Name string
Agent Agent
Condition func(context.Context, *WorkflowContext) bool
Dependencies []string
Transform func(string) string
Metadata map[string]interface{}
}Conditionโ skip steps dynamically (returnsfalseto skip)Dependenciesโ DAG orderingTransformโ mutate input before the step runs
๐ Constructing Workflows โ
workflow, err := vnext.NewSequentialWorkflow(&vnext.WorkflowConfig{Timeout: 2 * time.Minute})
if err != nil {
log.Fatal(err)
}
workflow.SetMemory(memory)
workflow.AddStep(vnext.WorkflowStep{Name: "gather", Agent: gatherAgent})
workflow.AddStep(vnext.WorkflowStep{Name: "summarise", Agent: summariseAgent})
result, err := workflow.Run(ctx, "Analyse customer tickets")
if err != nil {
log.Fatal(err)
}
fmt.Println("Final output:", result.FinalOutput)Alternate constructors:
NewParallelWorkflow(config)NewDAGWorkflow(config)NewLoopWorkflow(config)NewWorkflow(config)โ respectsconfig.Mode
๐ WorkflowResult โ
type WorkflowResult struct {
Success bool
FinalOutput string
StepResults []StepResult
Duration time.Duration
TotalTokens int
ExecutionPath []string
Metadata map[string]interface{}
Error string
}StepResult captures per-step output, duration, token usage, and errors. ExecutionPath lists the order of successfully executed steps.
๐ Modes โ
- Sequential: executes steps in order, piping output to the next step
- Parallel: fires all steps concurrently, aggregating outputs
- DAG: respects dependency graph (detects deadlocks if dependencies never resolve)
- Loop: replays the step list until
MaxIterationsor a context flag (loop_continue) stops the cycle
Example loop exit condition inside an agent:
workflowCtx.Set("loop_continue", false) // stored via WorkflowContext in custom logic๐ง WorkflowContext โ
type WorkflowContext struct {
WorkflowID string
SharedMemory Memory
StepResults map[string]*StepResult
Variables map[string]interface{}
}Accessors:
prev, ok := ctx.GetStepResult("summarise")
ctx.Set("loop_continue", prev.Success)
value, ok := ctx.Get("customer_id")The context is shared across steps and exposes thread-safe getters/setters.
๐ Streaming Workflows โ
โ Production Ready: Workflow streaming is fully implemented with reliable context management and real-time token display.
// Create multi-agent workflow
workflow, err := vnext.NewSequentialWorkflow(&vnext.WorkflowConfig{
Mode: vnext.Sequential,
Timeout: 300 * time.Second,
})
workflow.AddStep(vnext.WorkflowStep{Name: "research", Agent: researchAgent})
workflow.AddStep(vnext.WorkflowStep{Name: "analyze", Agent: analyzeAgent})
// Stream execution with real-time output
stream, err := workflow.RunStream(ctx, "Research project status")
if err != nil {
log.Fatal(err)
}
for chunk := range stream.Chunks() {
switch chunk.Type {
case vnext.ChunkTypeMetadata:
if stepName, ok := chunk.Metadata["step_name"].(string); ok {
log.Printf("[%s] %s", stepName, chunk.Content)
}
case vnext.ChunkTypeDelta:
fmt.Print(chunk.Delta) // Real-time token streaming
case vnext.ChunkTypeDone:
fmt.Println("\nโ
Step completed")
}
}
final, _ := stream.Wait()
fmt.Printf("\n๐ Workflow complete: %s\n", final.Content)Performance: Workflow streaming adds ~36-160% overhead vs direct agents (measured) but provides multi-agent orchestration and real-time feedback. Streaming is reliable and context-cancel bugs have been resolved.
๐งฎ Shared Memory โ
Call workflow.SetMemory(memory) to give steps a shared provider. Steps can read/write via the agentโs builder configuration or manual memory usage inside custom handlers. Input/output snippets are stored with content types workflow_step_input and workflow_step_output when memory is present.
๐งฉ Dependency Graph Helpers โ
buildInputFromDependencies automatically aggregates outputs from dependency steps for DAG mode. Supply dependency names in WorkflowStep.Dependencies to signal ordering:
workflow.AddStep(vnext.WorkflowStep{Name: "fetch", Agent: fetchAgent})
workflow.AddStep(vnext.WorkflowStep{Name: "analyse", Agent: analyseAgent, Dependencies: []string{"fetch"}})
workflow.AddStep(vnext.WorkflowStep{Name: "report", Agent: reportAgent, Dependencies: []string{"analyse"}})๐ Plugins โ
Override the default workflow engine by registering a factory:
vnext.SetWorkflowFactory(func(cfg *vnext.WorkflowConfig) (vnext.Workflow, error) {
return newCustomWorkflow(cfg), nil
})โ Best Practices โ
- Set
WorkflowConfig.Timeoutto cap the entire orchestration duration - Use
MaxIterationsto avoid runaway loops - Attach shared memory to allow later steps to inspect earlier outputs
- Stream for long-running workflows to give users live feedback
- Initialise/Shutdown workflows when managing long-lived agent instances (call once per lifecycle)
๐ Related Docs โ
- agent.md for agent execution details
- streaming.md for stream configuration
- memory.md to configure shared RAG for workflows
- tools.md when combining tool-enabled agents inside flows