Skip to main content
Sign In
Features

Workflows

Build durable, replayable run loops in Rivet Actors with steps, queue waits, timers, and rollback.

Use workflows for durable, multi-step execution with replay safety.

What are workflows?

A workflow is a durable, replayable run handler for a Rivet Actor.

  • Survives restarts: workflow progress is saved automatically.
  • Re-runs safely: replay follows the same recorded steps.
  • Event-driven: workflows can pause for queue messages, then continue.

Getting started

Simple workflow

Use this when you need a short multi-step sequence.

Loops

This is the recommended workflow shape for most actor workloads.

  • Use a queue wait inside the loop to receive the next unit of work.
  • Keep actor state changes in a single workflow loop.
  • This gives you one durable workflow that manages all actor progress.

Setup & teardown

Use this when the workflow should initialize resources, process queued commands, then clean up.

Features

Queue

Use this for fire-and-forget commands where the client does not need a reply.

Use the Loops example above as the baseline pattern.

Request/response (using queue)

Use this when the caller needs a response from queued processing.

Timers

Use queue messages as the trigger source, then sleep durably inside the workflow.

Join

Use join when several independent tasks can run in parallel.

Race

Use race when you need first-winner behavior.

Timeouts

Use step timeouts and retries for slow or flaky dependencies.

import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

async function chargeCard(orderId: string): Promise<string> {
  return `charge-${orderId}`;
}

export const timeoutActor = actor({
  state: {
    lastChargeId: null as string | null,
  },
  queues: {
    charge: queue<{ orderId: string }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "charge-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-charge", {
          names: ["charge"],
        });

        if (!message) return Loop.continue(undefined);

        const chargeId = await loopCtx.step<string>({
          name: "charge-card",
          timeout: 5_000,
          maxRetries: 5,
          retryBackoffBase: 200,
          retryBackoffMax: 2_000,
          run: async () => await chargeCard(message.body.orderId),
        });

        await loopCtx.step("save-charge", async () => {
          loopCtx.state.lastChargeId = chargeId;
        });

        return Loop.continue(undefined);
      },
    });
  }),
});

export const registry = setup({ use: { timeoutActor } });

Rollback

Use rollback checkpoints before steps that have compensating actions.

import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

async function reserveInventory(orderId: string): Promise<string> {
  return `reservation-${orderId}`;
}

async function releaseInventory(_reservationId: string): Promise<void> {}

async function chargeCard(orderId: string): Promise<string> {
  return `charge-${orderId}`;
}

async function refundCharge(_chargeId: string): Promise<void> {}

export const checkoutActor = actor({
  state: {
    lastOrderId: null as string | null,
    lastReservationId: null as string | null,
    lastChargeId: null as string | null,
  },
  queues: {
    checkout: queue<{ orderId: string }>(),
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "checkout-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-checkout", {
          names: ["checkout"],
        });

        if (!message) return Loop.continue(undefined);

        await loopCtx.rollbackCheckpoint("checkout-checkpoint");

        const reservationId = await loopCtx.step<string>({
          name: "reserve-inventory",
          run: async () => await reserveInventory(message.body.orderId),
          rollback: async (_rollbackCtx, output) => {
            await releaseInventory(output as string);
          },
        });

        const chargeId = await loopCtx.step<string>({
          name: "charge-card",
          run: async () => await chargeCard(message.body.orderId),
          rollback: async (_rollbackCtx, output) => {
            await refundCharge(output as string);
          },
        });

        await loopCtx.step("mark-complete", async () => {
          loopCtx.state.lastOrderId = message.body.orderId;
          loopCtx.state.lastReservationId = reservationId;
          loopCtx.state.lastChargeId = chargeId;
        });

        return Loop.continue(undefined);
      },
    });
  }),
});

export const registry = setup({ use: { checkoutActor } });

Patterns

Store workflow progress in state + broadcast

Store progress in state so replay and recovery always restore it. Broadcast state changes so clients can render progress in realtime.

Cron (queue-driven)

Rivet scheduling triggers actions. For cron-like workflows, use a small scheduled action as a bridge that enqueues work, then process that work in the workflow loop.

import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

function nextMinute(timestamp: number): number {
  const minuteMs = 60_000;
  return Math.floor(timestamp / minuteMs) * minuteMs + minuteMs;
}

export const cronActor = actor({
  state: {
    runs: 0,
    lastRunAt: null as number | null,
  },
  queues: {
    "cron-tick": queue<{ scheduledAt: number }>(),
  },
  onCreate: async (c) => {
    const firstTickAt = nextMinute(Date.now());
    await c.schedule.at(firstTickAt, "enqueueCronTick", firstTickAt);
  },
  actions: {
    enqueueCronTick: async (c, scheduledAt: number) => {
      await c.queue.send("cron-tick", { scheduledAt });

      const nextTickAt = nextMinute(scheduledAt + 1);
      await c.schedule.at(nextTickAt, "enqueueCronTick", nextTickAt);
    },
    getState: (c) => c.state,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "cron-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-cron-tick", {
          names: ["cron-tick"],
        });

        if (!message) return Loop.continue(undefined);

        await loopCtx.step("run-cron-job", async () => {
          loopCtx.state.runs += 1;
          loopCtx.state.lastRunAt = message.body.scheduledAt;
        });

        return Loop.continue(undefined);
      },
    });
  }),
});

export const registry = setup({ use: { cronActor } });

These are common workflow shapes used in production systems.

Queue-driven worker

Use this when external systems enqueue work and the actor should process each item durably.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type Job = { id: string; amount: number };

export const queueWorkerActor = actor({
  state: {
    processed: 0,
    totalAmount: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "worker-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-job", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const job = message.body as Job;

        await loopCtx.step("process-job", async () => {
          loopCtx.state.processed += 1;
          loopCtx.state.totalAmount += job.amount;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { queueWorkerActor } });

Setup & teardown

Use this when you need one-time initialization before a long-lived loop, plus cleanup when the actor stops sleeping or is destroyed.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

function openResource(): string {
  return "connected";
}

function closeResource(_resource: string): void {}

export const setupRunTeardownActor = actor({
  vars: {
    resource: null as string | null,
  },
  state: {
    initialized: false,
    ticks: 0,
  },
  onWake: (c) => {
    c.vars.resource = openResource();
  },
  onSleep: (c) => {
    if (!c.vars.resource) return;
    closeResource(c.vars.resource);
    c.vars.resource = null;
  },
  run: workflow(async (ctx) => {
    await ctx.step("setup", async () => {
      if (!ctx.vars.resource) ctx.vars.resource = openResource();
      ctx.state.initialized = true;
    });

    await ctx.loop({
      name: "main-loop",
      run: async (loopCtx) => {
        await loopCtx.sleep("tick", 1_000);
        await loopCtx.step("tick-step", async () => {
          loopCtx.state.ticks += 1;
        });
        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { setupRunTeardownActor } });

Human approval gate

Use this when an operation must pause for a user or system decision before continuing.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type ApprovalMessage = {
  requestId: string;
  approved: boolean;
};

export const approvalGateActor = actor({
  state: {
    pendingRequestId: null as string | null,
    lastDecision: null as "approved" | "rejected" | null,
  },
  run: workflow(async (ctx) => {
    await ctx.step("request-approval", async () => {
      ctx.state.pendingRequestId = "req-1";
      ctx.state.lastDecision = null;
    });

    await ctx.loop({
      name: "approval-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-approval", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const approval = message.body as ApprovalMessage;

        await loopCtx.step("apply-decision", async () => {
          loopCtx.state.pendingRequestId = approval.requestId;
          loopCtx.state.lastDecision = approval.approved
            ? "approved"
            : "rejected";
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { approvalGateActor } });

Fan-out / fan-in (join)

Use this when independent work items can run in parallel and you need a single merged result.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

async function loadUsers(): Promise<number> {
  return 12;
}

async function loadOrders(): Promise<number> {
  return 34;
}

async function loadInvoices(): Promise<number> {
  return 56;
}

export const fanInOutActor = actor({
  state: {
    total: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "join-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-refresh", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);

        const joined = await loopCtx.join("parallel-work", {
          users: {
            run: async () => await loadUsers(),
          },
          orders: {
            run: async () => await loadOrders(),
          },
          invoices: {
            run: async () => await loadInvoices(),
          },
        });

        await loopCtx.step("merge-results", async () => {
          loopCtx.state.total =
            joined.users + joined.orders + joined.invoices;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { fanInOutActor } });

Batch drainer

Use this when throughput matters and handling one message at a time is too expensive.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type MetricMessage = { value: number };

export const batchDrainerActor = actor({
  state: {
    pending: [] as number[],
    flushedBatches: 0,
    lastBatchTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "drain-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-metric", {
          timeout: 5_000,
        });

        if (message) {
          const metric = message.body as MetricMessage;
          await loopCtx.step("buffer-message", async () => {
            loopCtx.state.pending.push(metric.value);
          });
        }

        if (loopCtx.state.pending.length < 5) return Loop.continue(undefined);

        await loopCtx.step("flush-batch", async () => {
          const total = loopCtx.state.pending.reduce((sum, value) => sum + value, 0);
          loopCtx.state.lastBatchTotal = total;
          loopCtx.state.flushedBatches += 1;
          loopCtx.state.pending = [];
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { batchDrainerActor } });

Coordinator -> worker RPC

Use this when one actor orchestrates work by calling actions on other actors.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type TaskMessage = {
  taskId: string;
  workerId: string;
  value: number;
};

export const workerActor = actor({
  actions: {
    runTask: async (_c, value: number) => value * 2,
  },
});

export const coordinatorActor = actor({
  state: {
    lastTaskId: null as string | null,
    lastResult: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "orchestrator-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-task", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const task = message.body as TaskMessage;

        const result = await loopCtx.step("dispatch-rpc", async () => {
          const client = loopCtx.client<typeof registry>();
          const worker = client.workerActor.getOrCreate([task.workerId]);
          return worker.runTask(task.value);
        });

        await loopCtx.step("record-result", async () => {
          loopCtx.state.lastTaskId = task.taskId;
          loopCtx.state.lastResult = result as number;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { coordinatorActor, workerActor } });

Request/response over queue (async RPC)

Use this when you want decoupled actor-to-actor communication with durable waits and explicit completion.

Scatter-gather across actors

Use this when multiple actors can process independent parts of a request in parallel, then return a merged response.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type ScatterMessage = { input: number };

export const shardActor = actor({
  actions: {
    compute: async (_c, input: number) => input * 10,
  },
});

export const scatterGatherActor = actor({
  state: {
    lastSum: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "scatter-gather-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-scatter", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const scatter = message.body as ScatterMessage;

        const gathered = await loopCtx.join("gather", {
          shardA: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-a", async () => {
                const client = joinCtx.client<typeof registry>();
                const handle = client.shardActor.getOrCreate(["a"]);
                return handle.compute(scatter.input);
              }),
          },
          shardB: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-b", async () => {
                const client = joinCtx.client<typeof registry>();
                const handle = client.shardActor.getOrCreate(["b"]);
                return handle.compute(scatter.input);
              }),
          },
          shardC: {
            run: async (joinCtx) =>
              await joinCtx.step("call-shard-c", async () => {
                const client = joinCtx.client<typeof registry>();
                const handle = client.shardActor.getOrCreate(["c"]);
                return handle.compute(scatter.input);
              }),
          },
        });

        await loopCtx.step("aggregate", async () => {
          loopCtx.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { scatterGatherActor, shardActor } });

Timeout + fallback actor

Use this when a primary actor call might be slow or unavailable and you need a deterministic fallback path.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

export const primaryServiceActor = actor({
  actions: {
    fetchValue: async () => {
      await new Promise((resolve) => setTimeout(resolve, 500));
      return "primary";
    },
  },
});

export const fallbackServiceActor = actor({
  actions: {
    fetchValue: async () => "fallback",
  },
});

export const timeoutFallbackActor = actor({
  state: {
    lastSource: "none" as "none" | "primary" | "fallback",
    lastValue: "",
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "timeout-loop",
      run: async (loopCtx) => {
        await loopCtx.queue.next("wait-request", {
          timeout: 30_000,
        });

        const winner = await loopCtx.race("primary-vs-timeout", [
          {
            name: "primary",
            run: async (raceCtx) =>
              await raceCtx.step("call-primary", async () => {
                const client = raceCtx.client<typeof registry>();
                const primary = client.primaryServiceActor.getOrCreate(["main"]);
                return primary.fetchValue();
              }),
          },
          {
            name: "timeout",
            run: async (raceCtx) => {
              await raceCtx.sleep("primary-timeout", 200);
              return "timeout";
            },
          },
        ]);

        let value = winner.value as string;
        let source: "primary" | "fallback" = "primary";

        if (winner.winner === "timeout") {
          value = (await loopCtx.step("fallback-call", async () => {
            const client = loopCtx.client<typeof registry>();
            const fallback = client.fallbackServiceActor.getOrCreate(["main"]);
            return fallback.fetchValue();
          })) as string;
          source = "fallback";
        }

        await loopCtx.step("record-choice", async () => {
          loopCtx.state.lastSource = source;
          loopCtx.state.lastValue = value;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({
  use: { timeoutFallbackActor, primaryServiceActor, fallbackServiceActor },
});

Cross-actor saga (compensating actions)

Use this when a workflow spans multiple actors and each side effect may need compensation.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type CheckoutMessage = {
  orderId: string;
  amount: number;
};

export const inventoryActor = actor({
  actions: {
    reserve: async (_c, orderId: string) => `reserve-${orderId}`,
    release: async (_c, reservationId: string) => reservationId,
  },
});

export const billingActor = actor({
  actions: {
    charge: async (_c, amount: number) => `charge-${amount}`,
    refund: async (_c, chargeId: string) => chargeId,
  },
});

export const checkoutSagaActor = actor({
  state: {
    completedOrders: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "checkout-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-order", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const checkout = message.body as CheckoutMessage;

        await loopCtx.rollbackCheckpoint("checkout-saga");

        const reservationId = (await loopCtx.step({
          name: "reserve-inventory",
          run: async () => {
            const client = loopCtx.client<typeof registry>();
            const inventory = client.inventoryActor.getOrCreate(["main"]);
            return inventory.reserve(checkout.orderId);
          },
          rollback: async (_rollbackCtx, output) => {
            const client = loopCtx.client<typeof registry>();
            const inventory = client.inventoryActor.getOrCreate(["main"]);
            await inventory.release(output as string);
          },
        })) as string;

        const chargeId = (await loopCtx.step({
          name: "charge-card",
          run: async () => {
            const client = loopCtx.client<typeof registry>();
            const billing = client.billingActor.getOrCreate(["main"]);
            return billing.charge(checkout.amount);
          },
          rollback: async (_rollbackCtx, output) => {
            const client = loopCtx.client<typeof registry>();
            const billing = client.billingActor.getOrCreate(["main"]);
            await billing.refund(output as string);
          },
        })) as string;

        await loopCtx.step("mark-complete", async () => {
          loopCtx.state.completedOrders += 1;
          void reservationId;
          void chargeId;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({
  use: { checkoutSagaActor, inventoryActor, billingActor },
});

Signal-driven control loop

Use this when workflow progress should be triggered by commands/events instead of fixed polling intervals.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type ControlSignal = { kind: "pause" | "resume" | "stop" };

export const controlLoopActor = actor({
  state: {
    mode: "running" as "running" | "paused" | "stopped",
    handledSignals: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "control-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-signal", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const signal = message.body as ControlSignal;

        await loopCtx.step("apply-signal", async () => {
          loopCtx.state.handledSignals += 1;
          if (signal.kind === "pause") loopCtx.state.mode = "paused";
          if (signal.kind === "resume") loopCtx.state.mode = "running";
          if (signal.kind === "stop") loopCtx.state.mode = "stopped";
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { controlLoopActor } });

Poll + backoff loop

Use this when an external dependency has variable availability and retries should slow down after failures.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

async function pollExternal(attempt: number): Promise<boolean> {
  return attempt % 3 === 0;
}

export const pollBackoffActor = actor({
  state: {
    attempts: 0,
    backoffMs: 100,
    status: "unknown" as "unknown" | "healthy" | "retrying",
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "poll-loop",
      run: async (loopCtx) => {
        const success = await loopCtx.step("poll-target", async () => {
          loopCtx.state.attempts += 1;
          return pollExternal(loopCtx.state.attempts);
        });

        if (success) {
          await loopCtx.step("reset-backoff", async () => {
            loopCtx.state.status = "healthy";
            loopCtx.state.backoffMs = 100;
          });
          await loopCtx.sleep("healthy-interval", 1_000);
          return Loop.continue(undefined);
        }

        await loopCtx.step("grow-backoff", async () => {
          loopCtx.state.status = "retrying";
          loopCtx.state.backoffMs = Math.min(loopCtx.state.backoffMs * 2, 5_000);
        });

        await loopCtx.sleep("retry-delay", loopCtx.state.backoffMs);
        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { pollBackoffActor } });

Child worker orchestration

Use this when one workflow coordinates many child workers (actors or worker workflows) and manages their lifecycle.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type BatchMessage = { payload: number };

export const childWorkerActor = actor({
  actions: {
    process: async (_c, payload: number) => payload * 3,
  },
});

export const orchestratorActor = actor({
  state: {
    lastTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.step("start-children", async () => {
      const client = ctx.client<typeof registry>();
      await client.childWorkerActor.getOrCreate(["child-a"]).process(0);
      await client.childWorkerActor.getOrCreate(["child-b"]).process(0);
      await client.childWorkerActor.getOrCreate(["child-c"]).process(0);
    });

    await ctx.loop({
      name: "orchestrate-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-batch", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const batch = message.body as BatchMessage;

        const results = await loopCtx.join("collect-updates", {
          a: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-a", async () => {
                const client = joinCtx.client<typeof registry>();
                return client.childWorkerActor.getOrCreate(["child-a"]).process(batch.payload);
              }),
          },
          b: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-b", async () => {
                const client = joinCtx.client<typeof registry>();
                return client.childWorkerActor.getOrCreate(["child-b"]).process(batch.payload);
              }),
          },
          c: {
            run: async (joinCtx) =>
              await joinCtx.step("run-child-c", async () => {
                const client = joinCtx.client<typeof registry>();
                return client.childWorkerActor.getOrCreate(["child-c"]).process(batch.payload);
              }),
          },
        });

        await loopCtx.step("reconcile", async () => {
          loopCtx.state.lastTotal = results.a + results.b + results.c;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { orchestratorActor, childWorkerActor } });

Bounded drain + concurrency cap

Use this when inbound work can spike and you need predictable per-iteration limits.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type WorkMessage = { id: string; value: number };

const MAX_PER_ITERATION = 10;
const CONCURRENCY_LIMIT = 3;

async function processWork(value: number): Promise<number> {
  return value * 2;
}

async function runWithLimit<T>(
  limit: number,
  items: T[],
  fn: (item: T) => Promise<void>,
): Promise<void> {
  let nextIndex = 0;
  const workers = Array.from({ length: limit }, async () => {
    while (nextIndex < items.length) {
      const current = items[nextIndex];
      nextIndex += 1;
      await fn(current);
    }
  });
  await Promise.all(workers);
}

export const boundedDrainActor = actor({
  state: {
    processed: 0,
    lastWindowSize: 0,
    lastWindowTotal: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "bounded-drain-loop",
      run: async (loopCtx) => {
        const window: WorkMessage[] = [];

        for (let i = 0; i < MAX_PER_ITERATION; i += 1) {
          const [message] = await loopCtx.queue.next("wait-work", {
            timeout: i === 0 ? 30_000 : 10,
          });
          if (!message) break;
          window.push(message.body as WorkMessage);
        }

        if (window.length === 0) return Loop.continue(undefined);

        await loopCtx.step("process-window", async () => {
          let windowTotal = 0;
          await runWithLimit(CONCURRENCY_LIMIT, window, async (work) => {
            const result = await processWork(work.value);
            windowTotal += result;
          });

          loopCtx.state.processed += window.length;
          loopCtx.state.lastWindowSize = window.length;
          loopCtx.state.lastWindowTotal = windowTotal;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { boundedDrainActor } });

Versioned workflow evolution

Use this when workflow structure changes across deployments and old histories must still replay.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

export const versionedWorkflowActor = actor({
  state: {
    runs: 0,
  },
  run: workflow(async (ctx) => {
    await ctx.step("validate-v2", async () => {
      ctx.state.runs += 1;
    });

    await ctx.removed("validate-v1", "step");

    await ctx.loop({
      name: "main-loop-v2",
      run: async (loopCtx) => {
        await loopCtx.sleep("idle", 500);
        await loopCtx.step("heartbeat-v2", async () => {
          loopCtx.state.runs += 1;
        });
        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { versionedWorkflowActor } });

Checkpoint-friendly loop design

Use this when you need reliable replay and resume semantics across crashes and restarts.

import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";

type PaymentMessage = { id: string; amount: number };

export const checkpointFriendlyActor = actor({
  state: {
    appliedCount: 0,
    totalAmount: 0,
    lastPaymentId: null as string | null,
  },
  run: workflow(async (ctx) => {
    await ctx.loop({
      name: "payment-loop",
      run: async (loopCtx) => {
        const [message] = await loopCtx.queue.next("wait-payment", {
          timeout: 30_000,
        });

        if (!message) return Loop.continue(undefined);
        const payment = message.body as PaymentMessage;

        await loopCtx.rollbackCheckpoint("apply-payment-checkpoint");

        const plan = (await loopCtx.step("build-plan", async () => {
          return {
            paymentId: payment.id,
            amount: payment.amount,
          };
        })) as { paymentId: string; amount: number };

        await loopCtx.step("apply-side-effects", async () => {
          loopCtx.state.appliedCount += 1;
          loopCtx.state.totalAmount += plan.amount;
          loopCtx.state.lastPaymentId = plan.paymentId;
        });

        return Loop.continue(undefined);
      },
    });
  }),
  actions: {
    getState: (c) => c.state,
  },
});

export const registry = setup({ use: { checkpointFriendlyActor } });

Migrations

  • Keep workflow entry names stable once deployed.
  • If an old entry was removed or renamed, call ctx.removed(name, originalType).
  • This keeps replay compatible across deployments.

Step-only access to actor APIs

state, vars, db, client(), and connection/event APIs are only valid inside ctx.step(...) callbacks.

Use non-step workflow code for orchestration only: queue waits, sleeps, loops, joins, races, and rollback boundaries. Keep actor-local side effects in steps.

Debugging

  • GET /inspector/workflow-history returns workflow history status for an actor.
  • Response includes isWorkflowEnabled and history.
  • In non-dev mode, inspector endpoints require authorization.

Recommendations

  • Prefer queue-driven loops for long-lived workflows.
  • Structure long-lived workflows with setup and teardown around the main loop.
  • Keep actor state changes and side effects inside steps.
  • Store workflow progress in state and broadcast updates as progress changes.
  • Use timeouts and rollback for external side effects.