mastra/Core Concepts

Workflows

Workflows provide structured ways to define complex AI-powered processes

workflowsstepscontrol-flowstate

Workflows

Workflows let you define complex sequences of tasks using clear, structured steps rather than relying on the reasoning of a single agent.

Topics

Workflow Overview

Workflows provide a structured way to define complex AI-powered processes.

When to Use Workflows

  • Complex multi-step processes
  • Tasks requiring consistent execution order
  • Processes that need human oversight at certain steps
  • Reproducible, auditable AI operations

Creating a Workflow

import { Workflow } from '@mastra/core';

const myWorkflow = new Workflow({
  name: 'myWorkflow',
  trigger: { schema: z.object({ input: z.string() }) },
});

myWorkflow.step(stepOne).then(stepTwo).then(stepThree);

Key Concepts

  • Steps - Individual units of work
  • Triggers - What starts the workflow
  • State - Data passed between steps
  • Control Flow - Branching, loops, parallel execution

Workflow State

State holds data that persists across workflow steps.

State Schema

const workflowState = z.object({
  userId: z.string(),
  input: z.string(),
  result: z.string().optional(),
  history: z.array(z.string()).default([]),
});

Accessing State

myWorkflow.step('process', async ({ state }) => {
  return {
    result: processData(state.input),
    timestamp: new Date().toISOString(),
  };
});

State Updates

State updates are immutable - each step returns new state:

myWorkflow.step('step1', async ({ state }) => {
  return { ...state, step1Done: true };
});

Control Flow

Control flow constructs let you create complex workflow patterns.

Branching

// Conditional branching
myWorkflow.step('check').then((state) => {
  return state.input.length > 10 ? 'long' : 'short';
}).branch({
  long: longPath,
  short: shortPath,
});

Loops

// Do while
myWorkflow.doWhile({
  condition: (state) => state.attempts < 3,
  steps: [retryStep],
});

// Do until
myWorkflow.doUntil({
  condition: (state) => state.success,
  steps: [processStep],
});

Parallel Execution

// Run steps in parallel
myWorkflow.parallel([
  fetchUserData,
  fetchProductData,
  fetchRecommendations,
]).then(combineResults);

Foreach

// Process array items
myWorkflow.foreach({
  items: (state) => state.itemIds,
  body: processItem,
});

Agents & Tools in Workflows

Use agents and tools within workflow steps.

Agent Steps

import { agentStep } from '@mastra/core/workflows';

myWorkflow.step(
  agentStep({
    agent: mastra.getAgent('researcher'),
    name: 'research',
    instructions: 'Research the given topic and summarize findings',
  })
);

Tool Steps

import { toolStep } from '@mastra/core/workflows';

myWorkflow.step(
  toolStep({
    tool: myTool,
    name: 'process',
  })
);

Combining Agents and Tools

myWorkflow
  .step(fetchDataTool)
  .then(
    agentStep({
      agent: analyzerAgent,
      instructions: 'Analyze the data and provide insights',
    })
  )
  .then(formatOutputTool);

Workflow Snapshots

Snapshots save the complete state of a workflow at a point in time.

Saving Snapshots

const run = await myWorkflow.createRun();

await run.start({ input: 'test' });

// Save snapshot
const snapshotId = await run.saveSnapshot();

Restoring from Snapshot

const run = await myWorkflow.createRun({
  snapshotId: 'snap_123',
});

// Continue from snapshot
await run.resume();

Snapshot Use Cases

  • Resume workflows after system failure
  • Audit workflow execution
  • Debug workflow issues
  • Create workflow templates

Suspend & Resume

Suspend workflows to pause execution and resume later.

Suspending

const run = await myWorkflow.createRun();
await run.start({ input: 'test' });

// Suspend at a step
myWorkflow.step('awaitApproval').suspend();

// Workflow pauses here
await run.suspend();

Resuming

// Resume with additional data
await run.resume({
  approval: 'granted',
  approvedBy: 'admin@example.com',
});

Use Cases

  • Human approval workflows
  • Waiting for external events
  • Rate limiting / backoff
  • Async processing

Human-in-the-loop

Include human oversight in workflow execution.

Defining Approval Points

myWorkflow.step('process').then(
  step('approval', async ({ state }) => {
    return {
      pendingApproval: true,
      action: state.action,
    };
  }).requiresApproval({
    approvers: ['admin@example.com'],
    timeout: '24h',
  })
);

Approval Flow

  1. Workflow reaches approval step
  2. Approver receives notification
  3. Approver reviews and approves/rejects
  4. Workflow continues or aborts

Providing Feedback

await run.provideFeedback({
  step: 'review',
  feedback: 'Please refine the summary',
  rating: 3,
});

Time Travel

Replay workflow execution from any previous point.

How It Works

  1. Every step is recorded with its result
  2. You can replay from any step
  3. State is restored to that point
  4. Execution continues with potentially different data

Time Travel API

const run = await myWorkflow.createRun();
await run.start({ input: 'test' });

// Get available checkpoints
const checkpoints = await run.getCheckpoints();

// Travel back to a checkpoint
await run.timeTravel(checkpoints[0].id);

// Or travel to a specific step
await run.timeTravelToStep('step_3');

Use Cases

  • Fix errors without restarting
  • Try different paths
  • Debug complex workflows
  • Regression testing

Error Handling

Handle errors gracefully in workflows.

Try-Catch

myWorkflow.step(
  step('riskyOperation', async ({ state }) => {
    try {
      return { result: await riskyCall() };
    } catch (error) {
      return { error: error.message, recovered: true };
    }
  })
);

Error Branch

myWorkflow.step('main').error((error, state) => {
  return { errorHandled: true, originalError: error };
}).then(errorRecovery);

Retry Configuration

myWorkflow.step('unreliable', async ({ state }) => {
  return { data: await unreliableCall() };
}).retry({
  maxAttempts: 3,
  backoff: 'exponential',
  retryOn: ['NetworkError', 'TimeoutError'],
});

Timeout

myWorkflow.step('slow', async ({ state }) => {
  return { result: await slowOperation() };
}).timeout('30s');