Workflows
Workflows provide structured ways to define complex AI-powered processes
Workflows
Workflows let you define complex sequences of tasks using clear, structured steps rather than relying on the reasoning of a single agent.
Topics
- Overview - Introduction to workflows
- Workflow State - Manage state across steps
- Control Flow - Branching, loops, and parallel execution
- Agents & Tools - Use agents and tools in workflows
- Snapshots - Save and restore workflow state
- Suspend & Resume - Pause and continue workflows
- Human-in-the-loop - Include human oversight
- Time Travel - Replay from any point
- Error Handling - Handle failures gracefully
Workflow Overview
Workflows provide a structured way to define complex AI-powered processes.
When to Use Workflows
- Complex multi-step processes
- Tasks requiring consistent execution order
- Processes that need human oversight at certain steps
- Reproducible, auditable AI operations
Creating a Workflow
import { Workflow } from '@mastra/core';
const myWorkflow = new Workflow({
name: 'myWorkflow',
trigger: { schema: z.object({ input: z.string() }) },
});
myWorkflow.step(stepOne).then(stepTwo).then(stepThree);
Key Concepts
- Steps - Individual units of work
- Triggers - What starts the workflow
- State - Data passed between steps
- Control Flow - Branching, loops, parallel execution
Workflow State
State holds data that persists across workflow steps.
State Schema
const workflowState = z.object({
userId: z.string(),
input: z.string(),
result: z.string().optional(),
history: z.array(z.string()).default([]),
});
Accessing State
myWorkflow.step('process', async ({ state }) => {
return {
result: processData(state.input),
timestamp: new Date().toISOString(),
};
});
State Updates
State updates are immutable - each step returns new state:
myWorkflow.step('step1', async ({ state }) => {
return { ...state, step1Done: true };
});
Control Flow
Control flow constructs let you create complex workflow patterns.
Branching
// Conditional branching
myWorkflow.step('check').then((state) => {
return state.input.length > 10 ? 'long' : 'short';
}).branch({
long: longPath,
short: shortPath,
});
Loops
// Do while
myWorkflow.doWhile({
condition: (state) => state.attempts < 3,
steps: [retryStep],
});
// Do until
myWorkflow.doUntil({
condition: (state) => state.success,
steps: [processStep],
});
Parallel Execution
// Run steps in parallel
myWorkflow.parallel([
fetchUserData,
fetchProductData,
fetchRecommendations,
]).then(combineResults);
Foreach
// Process array items
myWorkflow.foreach({
items: (state) => state.itemIds,
body: processItem,
});
Agents & Tools in Workflows
Use agents and tools within workflow steps.
Agent Steps
import { agentStep } from '@mastra/core/workflows';
myWorkflow.step(
agentStep({
agent: mastra.getAgent('researcher'),
name: 'research',
instructions: 'Research the given topic and summarize findings',
})
);
Tool Steps
import { toolStep } from '@mastra/core/workflows';
myWorkflow.step(
toolStep({
tool: myTool,
name: 'process',
})
);
Combining Agents and Tools
myWorkflow
.step(fetchDataTool)
.then(
agentStep({
agent: analyzerAgent,
instructions: 'Analyze the data and provide insights',
})
)
.then(formatOutputTool);
Workflow Snapshots
Snapshots save the complete state of a workflow at a point in time.
Saving Snapshots
const run = await myWorkflow.createRun();
await run.start({ input: 'test' });
// Save snapshot
const snapshotId = await run.saveSnapshot();
Restoring from Snapshot
const run = await myWorkflow.createRun({
snapshotId: 'snap_123',
});
// Continue from snapshot
await run.resume();
Snapshot Use Cases
- Resume workflows after system failure
- Audit workflow execution
- Debug workflow issues
- Create workflow templates
Suspend & Resume
Suspend workflows to pause execution and resume later.
Suspending
const run = await myWorkflow.createRun();
await run.start({ input: 'test' });
// Suspend at a step
myWorkflow.step('awaitApproval').suspend();
// Workflow pauses here
await run.suspend();
Resuming
// Resume with additional data
await run.resume({
approval: 'granted',
approvedBy: 'admin@example.com',
});
Use Cases
- Human approval workflows
- Waiting for external events
- Rate limiting / backoff
- Async processing
Human-in-the-loop
Include human oversight in workflow execution.
Defining Approval Points
myWorkflow.step('process').then(
step('approval', async ({ state }) => {
return {
pendingApproval: true,
action: state.action,
};
}).requiresApproval({
approvers: ['admin@example.com'],
timeout: '24h',
})
);
Approval Flow
- Workflow reaches approval step
- Approver receives notification
- Approver reviews and approves/rejects
- Workflow continues or aborts
Providing Feedback
await run.provideFeedback({
step: 'review',
feedback: 'Please refine the summary',
rating: 3,
});
Time Travel
Replay workflow execution from any previous point.
How It Works
- Every step is recorded with its result
- You can replay from any step
- State is restored to that point
- Execution continues with potentially different data
Time Travel API
const run = await myWorkflow.createRun();
await run.start({ input: 'test' });
// Get available checkpoints
const checkpoints = await run.getCheckpoints();
// Travel back to a checkpoint
await run.timeTravel(checkpoints[0].id);
// Or travel to a specific step
await run.timeTravelToStep('step_3');
Use Cases
- Fix errors without restarting
- Try different paths
- Debug complex workflows
- Regression testing
Error Handling
Handle errors gracefully in workflows.
Try-Catch
myWorkflow.step(
step('riskyOperation', async ({ state }) => {
try {
return { result: await riskyCall() };
} catch (error) {
return { error: error.message, recovered: true };
}
})
);
Error Branch
myWorkflow.step('main').error((error, state) => {
return { errorHandled: true, originalError: error };
}).then(errorRecovery);
Retry Configuration
myWorkflow.step('unreliable', async ({ state }) => {
return { data: await unreliableCall() };
}).retry({
maxAttempts: 3,
backoff: 'exponential',
retryOn: ['NetworkError', 'TimeoutError'],
});
Timeout
myWorkflow.step('slow', async ({ state }) => {
return { result: await slowOperation() };
}).timeout('30s');