Workflows
Build durable, replayable run loops in Rivet Actors with steps, queue waits, timers, and rollback.
Use workflows for durable, multi-step execution with replay safety.
What are workflows?
A workflow is a durable, replayable run handler for a Rivet Actor.
- Survives restarts: workflow progress is saved automatically.
- Re-runs safely: replay follows the same recorded steps.
- Event-driven: workflows can pause for queue messages, then continue.
Getting started
Simple workflow
Use this when you need a short multi-step sequence.
import { actor, setup } from "rivetkit";
import { workflow } from "rivetkit/workflow";
export const invoiceActor = actor({
state: {
subtotal: 0,
tax: 0,
total: 0,
status: "idle" as "idle" | "complete",
},
run: workflow(async (ctx) => {
const subtotal = await ctx.step("load-subtotal", async () => 100);
const tax = await ctx.step("calculate-tax", async () => {
return Math.round(subtotal * 0.1);
});
await ctx.step("save-invoice", async () => {
ctx.state.subtotal = subtotal;
ctx.state.tax = tax;
ctx.state.total = subtotal + tax;
ctx.state.status = "complete";
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { invoiceActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.invoiceActor.getOrCreate(["main"]);
const state = await handle.getState();
console.log(state.status, state.total);
Loops
This is the recommended workflow shape for most actor workloads.
- Use a queue wait inside the loop to receive the next unit of work.
- Keep actor state changes in a single workflow loop.
- This gives you one durable workflow that manages all actor progress.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
export const workflowCounter = actor({
state: {
value: 0,
processed: 0,
},
queues: {
counter: queue<{ delta: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "counter-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-counter-command");
if (!message) return Loop.continue(undefined);
await loopCtx.step("apply-counter-command", async () => {
loopCtx.state.value += message.body.delta;
loopCtx.state.processed += 1;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { workflowCounter } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.workflowCounter.getOrCreate(["main"]);
await handle.send("counter", { delta: 1 });
await handle.send("counter", { delta: 2 });
const state = await handle.getState();
console.log(state.value, state.processed);
Setup & teardown
Use this when the workflow should initialize resources, process queued commands, then clean up.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type WorkMessage = { amount: number };
type ControlMessage = { type: "stop"; reason: string };
export const setupRunTeardownActor = actor({
state: {
phase: "idle" as "idle" | "running" | "stopped",
total: 0,
processed: 0,
stopReason: null as string | null,
},
queues: {
work: queue<WorkMessage>(),
control: queue<ControlMessage>(),
},
run: workflow(async (ctx) => {
await ctx.step("setup", async () => {
ctx.state.phase = "running";
ctx.state.stopReason = null;
});
const stopReason = await ctx.loop({
name: "worker-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-command", {
names: ["work", "control"],
});
if (!message) return Loop.continue(undefined);
if (message.name === "work") {
const work = message.body as WorkMessage;
await loopCtx.step("apply-work", async () => {
loopCtx.state.total += work.amount;
loopCtx.state.processed += 1;
});
return Loop.continue(undefined);
}
const control = message.body as ControlMessage;
if (control.type === "stop") {
return Loop.break(control.reason);
}
return Loop.continue(undefined);
},
});
await ctx.step("teardown", async () => {
ctx.state.phase = "stopped";
ctx.state.stopReason = stopReason;
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { setupRunTeardownActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.setupRunTeardownActor.getOrCreate(["main"]);
await handle.send("work", { amount: 5 });
await handle.send("work", { amount: 3 });
await handle.send("control", { type: "stop", reason: "maintenance" });
const state = await handle.getState();
console.log(state.phase, state.total, state.stopReason);
Features
Queue
Use this for fire-and-forget commands where the client does not need a reply.
Use the Loops example above as the baseline pattern.
Request/response (using queue)
Use this when the caller needs a response from queued processing.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
export const requestResponseActor = actor({
state: {
handled: 0,
},
queues: {
requests: queue<{ value: number }, { doubled: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "request-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-request", {
names: ["requests"],
completable: true,
});
if (!message || !message.complete) return Loop.continue(undefined);
const doubled = await loopCtx.step("handle-request", async () => {
loopCtx.state.handled += 1;
return message.body.value * 2;
});
await message.complete({ doubled });
return Loop.continue(undefined);
},
});
}),
});
export const registry = setup({ use: { requestResponseActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.requestResponseActor.getOrCreate(["main"]);
const result = await handle.send(
"requests",
{ value: 21 },
{ wait: true, timeout: 1_000 },
);
if (result.status === "completed") {
const response = result.response as { doubled: number };
console.log(response.doubled);
}
Timers
Use queue messages as the trigger source, then sleep durably inside the workflow.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type Reminder = {
text: string;
at: number;
};
export const reminderActor = actor({
state: {
fired: [] as string[],
},
queues: {
reminders: queue<Reminder>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "reminder-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-reminder", {
names: ["reminders"],
});
if (!message) return Loop.continue(undefined);
const runAt = Math.max(Date.now(), message.body.at);
await loopCtx.sleepUntil("wait-until-reminder", runAt);
await loopCtx.step("record-reminder", async () => {
loopCtx.state.fired.push(message.body.text);
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { reminderActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.reminderActor.getOrCreate(["main"]);
await handle.send("reminders", {
text: "send weekly report",
at: Date.now() + 1_000,
});
await new Promise((resolve) => setTimeout(resolve, 1_300));
console.log(await handle.getState());
Join
Use join when several independent tasks can run in parallel.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
export const dashboardActor = actor({
state: {
summary: null as null | {
users: number;
orders: number;
revenue: number;
},
},
queues: {
refresh: queue<Record<string, never>>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "dashboard-loop",
run: async (loopCtx) => {
await loopCtx.queue.next("wait-refresh", {
names: ["refresh"],
});
const summary = await loopCtx.join("fetch-summary", {
users: {
run: async (branchCtx) => {
return await branchCtx.step("fetch-users", async () => 42);
},
},
orders: {
run: async (branchCtx) => {
return await branchCtx.step("fetch-orders", async () => 12);
},
},
revenue: {
run: async (branchCtx) => {
return await branchCtx.step("fetch-revenue", async () => 9_900);
},
},
});
await loopCtx.step("save-summary", async () => {
loopCtx.state.summary = {
users: summary.users,
orders: summary.orders,
revenue: summary.revenue,
};
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { dashboardActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.dashboardActor.getOrCreate(["main"]);
await handle.send("refresh", {});
console.log(await handle.getState());
Race
Use race when you need first-winner behavior.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
export const raceActor = actor({
state: {
lastWinner: null as null | string,
lastValue: null as null | string,
},
queues: {
start: queue<Record<string, never>>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "race-loop",
run: async (loopCtx) => {
await loopCtx.queue.next("wait-start", {
names: ["start"],
});
const { winner, value } = await loopCtx.race("work-vs-timeout", [
{
name: "work",
run: async (branchCtx) => {
await branchCtx.sleep("work-delay", 75);
return await branchCtx.step("finish-work", async () => "work-complete");
},
},
{
name: "timeout",
run: async (branchCtx) => {
await branchCtx.sleep("timeout-delay", 500);
return "timed-out";
},
},
]);
await loopCtx.step("record-result", async () => {
loopCtx.state.lastWinner = winner;
loopCtx.state.lastValue = value;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { raceActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.raceActor.getOrCreate(["main"]);
await handle.send("start", {});
console.log(await handle.getState());
Timeouts
Use step timeouts and retries for slow or flaky dependencies.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
async function chargeCard(orderId: string): Promise<string> {
return `charge-${orderId}`;
}
export const timeoutActor = actor({
state: {
lastChargeId: null as string | null,
},
queues: {
charge: queue<{ orderId: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "charge-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-charge", {
names: ["charge"],
});
if (!message) return Loop.continue(undefined);
const chargeId = await loopCtx.step<string>({
name: "charge-card",
timeout: 5_000,
maxRetries: 5,
retryBackoffBase: 200,
retryBackoffMax: 2_000,
run: async () => await chargeCard(message.body.orderId),
});
await loopCtx.step("save-charge", async () => {
loopCtx.state.lastChargeId = chargeId;
});
return Loop.continue(undefined);
},
});
}),
});
export const registry = setup({ use: { timeoutActor } });
Rollback
Use rollback checkpoints before steps that have compensating actions.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
async function reserveInventory(orderId: string): Promise<string> {
return `reservation-${orderId}`;
}
async function releaseInventory(_reservationId: string): Promise<void> {}
async function chargeCard(orderId: string): Promise<string> {
return `charge-${orderId}`;
}
async function refundCharge(_chargeId: string): Promise<void> {}
export const checkoutActor = actor({
state: {
lastOrderId: null as string | null,
lastReservationId: null as string | null,
lastChargeId: null as string | null,
},
queues: {
checkout: queue<{ orderId: string }>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "checkout-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-checkout", {
names: ["checkout"],
});
if (!message) return Loop.continue(undefined);
await loopCtx.rollbackCheckpoint("checkout-checkpoint");
const reservationId = await loopCtx.step<string>({
name: "reserve-inventory",
run: async () => await reserveInventory(message.body.orderId),
rollback: async (_rollbackCtx, output) => {
await releaseInventory(output as string);
},
});
const chargeId = await loopCtx.step<string>({
name: "charge-card",
run: async () => await chargeCard(message.body.orderId),
rollback: async (_rollbackCtx, output) => {
await refundCharge(output as string);
},
});
await loopCtx.step("mark-complete", async () => {
loopCtx.state.lastOrderId = message.body.orderId;
loopCtx.state.lastReservationId = reservationId;
loopCtx.state.lastChargeId = chargeId;
});
return Loop.continue(undefined);
},
});
}),
});
export const registry = setup({ use: { checkoutActor } });
Patterns
Store workflow progress in state + broadcast
Store progress in state so replay and recovery always restore it. Broadcast state changes so clients can render progress in realtime.
import { actor, event, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type Progress = {
stage: "idle" | "running" | "completed";
completed: number;
total: number;
};
export const progressActor = actor({
state: {
progress: {
stage: "idle",
completed: 0,
total: 0,
} as Progress,
sum: 0,
},
events: {
progressUpdated: event<Progress>(),
},
queues: {
jobs: queue<{ value: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "progress-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-job", {
names: ["jobs"],
});
if (!message) return Loop.continue(undefined);
await loopCtx.step("mark-running", async () => {
loopCtx.state.progress = {
stage: "running",
completed: loopCtx.state.progress.completed,
total: loopCtx.state.progress.total + 1,
};
loopCtx.broadcast("progressUpdated", loopCtx.state.progress);
});
await loopCtx.step("apply-job", async () => {
loopCtx.state.sum += message.body.value;
loopCtx.state.progress = {
stage: "completed",
completed: loopCtx.state.progress.completed + 1,
total: loopCtx.state.progress.total,
};
loopCtx.broadcast("progressUpdated", loopCtx.state.progress);
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { progressActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.progressActor.getOrCreate(["main"]);
const conn = handle.connect();
conn.on("progressUpdated", (progress) => {
console.log("progress", progress);
});
await handle.send("jobs", { value: 5 });
await handle.send("jobs", { value: 7 });
console.log(await handle.getState());
Cron (queue-driven)
Rivet scheduling triggers actions. For cron-like workflows, use a small scheduled action as a bridge that enqueues work, then process that work in the workflow loop.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
function nextMinute(timestamp: number): number {
const minuteMs = 60_000;
return Math.floor(timestamp / minuteMs) * minuteMs + minuteMs;
}
export const cronActor = actor({
state: {
runs: 0,
lastRunAt: null as number | null,
},
queues: {
"cron-tick": queue<{ scheduledAt: number }>(),
},
onCreate: async (c) => {
const firstTickAt = nextMinute(Date.now());
await c.schedule.at(firstTickAt, "enqueueCronTick", firstTickAt);
},
actions: {
enqueueCronTick: async (c, scheduledAt: number) => {
await c.queue.send("cron-tick", { scheduledAt });
const nextTickAt = nextMinute(scheduledAt + 1);
await c.schedule.at(nextTickAt, "enqueueCronTick", nextTickAt);
},
getState: (c) => c.state,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "cron-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-cron-tick", {
names: ["cron-tick"],
});
if (!message) return Loop.continue(undefined);
await loopCtx.step("run-cron-job", async () => {
loopCtx.state.runs += 1;
loopCtx.state.lastRunAt = message.body.scheduledAt;
});
return Loop.continue(undefined);
},
});
}),
});
export const registry = setup({ use: { cronActor } });
These are common workflow shapes used in production systems.
Queue-driven worker
Use this when external systems enqueue work and the actor should process each item durably.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type Job = { id: string; amount: number };
export const queueWorkerActor = actor({
state: {
processed: 0,
totalAmount: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "worker-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-job", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const job = message.body as Job;
await loopCtx.step("process-job", async () => {
loopCtx.state.processed += 1;
loopCtx.state.totalAmount += job.amount;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { queueWorkerActor } });
Setup & teardown
Use this when you need one-time initialization before a long-lived loop, plus cleanup when the actor stops sleeping or is destroyed.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
function openResource(): string {
return "connected";
}
function closeResource(_resource: string): void {}
export const setupRunTeardownActor = actor({
vars: {
resource: null as string | null,
},
state: {
initialized: false,
ticks: 0,
},
onWake: (c) => {
c.vars.resource = openResource();
},
onSleep: (c) => {
if (!c.vars.resource) return;
closeResource(c.vars.resource);
c.vars.resource = null;
},
run: workflow(async (ctx) => {
await ctx.step("setup", async () => {
if (!ctx.vars.resource) ctx.vars.resource = openResource();
ctx.state.initialized = true;
});
await ctx.loop({
name: "main-loop",
run: async (loopCtx) => {
await loopCtx.sleep("tick", 1_000);
await loopCtx.step("tick-step", async () => {
loopCtx.state.ticks += 1;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { setupRunTeardownActor } });
Human approval gate
Use this when an operation must pause for a user or system decision before continuing.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type ApprovalMessage = {
requestId: string;
approved: boolean;
};
export const approvalGateActor = actor({
state: {
pendingRequestId: null as string | null,
lastDecision: null as "approved" | "rejected" | null,
},
run: workflow(async (ctx) => {
await ctx.step("request-approval", async () => {
ctx.state.pendingRequestId = "req-1";
ctx.state.lastDecision = null;
});
await ctx.loop({
name: "approval-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-approval", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const approval = message.body as ApprovalMessage;
await loopCtx.step("apply-decision", async () => {
loopCtx.state.pendingRequestId = approval.requestId;
loopCtx.state.lastDecision = approval.approved
? "approved"
: "rejected";
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { approvalGateActor } });
Fan-out / fan-in (join)
Use this when independent work items can run in parallel and you need a single merged result.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
async function loadUsers(): Promise<number> {
return 12;
}
async function loadOrders(): Promise<number> {
return 34;
}
async function loadInvoices(): Promise<number> {
return 56;
}
export const fanInOutActor = actor({
state: {
total: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "join-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-refresh", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const joined = await loopCtx.join("parallel-work", {
users: {
run: async () => await loadUsers(),
},
orders: {
run: async () => await loadOrders(),
},
invoices: {
run: async () => await loadInvoices(),
},
});
await loopCtx.step("merge-results", async () => {
loopCtx.state.total =
joined.users + joined.orders + joined.invoices;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { fanInOutActor } });
Batch drainer
Use this when throughput matters and handling one message at a time is too expensive.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type MetricMessage = { value: number };
export const batchDrainerActor = actor({
state: {
pending: [] as number[],
flushedBatches: 0,
lastBatchTotal: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "drain-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-metric", {
timeout: 5_000,
});
if (message) {
const metric = message.body as MetricMessage;
await loopCtx.step("buffer-message", async () => {
loopCtx.state.pending.push(metric.value);
});
}
if (loopCtx.state.pending.length < 5) return Loop.continue(undefined);
await loopCtx.step("flush-batch", async () => {
const total = loopCtx.state.pending.reduce((sum, value) => sum + value, 0);
loopCtx.state.lastBatchTotal = total;
loopCtx.state.flushedBatches += 1;
loopCtx.state.pending = [];
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { batchDrainerActor } });
Coordinator -> worker RPC
Use this when one actor orchestrates work by calling actions on other actors.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type TaskMessage = {
taskId: string;
workerId: string;
value: number;
};
export const workerActor = actor({
actions: {
runTask: async (_c, value: number) => value * 2,
},
});
export const coordinatorActor = actor({
state: {
lastTaskId: null as string | null,
lastResult: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "orchestrator-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-task", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const task = message.body as TaskMessage;
const result = await loopCtx.step("dispatch-rpc", async () => {
const client = loopCtx.client<typeof registry>();
const worker = client.workerActor.getOrCreate([task.workerId]);
return worker.runTask(task.value);
});
await loopCtx.step("record-result", async () => {
loopCtx.state.lastTaskId = task.taskId;
loopCtx.state.lastResult = result as number;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { coordinatorActor, workerActor } });
Request/response over queue (async RPC)
Use this when you want decoupled actor-to-actor communication with durable waits and explicit completion.
import { actor, queue, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type RequestMessage = { value: number };
export const requestResponseActor = actor({
state: {
handled: 0,
},
queues: {
requests: queue<RequestMessage, { doubled: number }>(),
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "request-response-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-request", {
names: ["requests"],
completable: true,
});
if (!message || !message.complete) return Loop.continue(undefined);
const doubled = await loopCtx.step("handle-request", async () => {
loopCtx.state.handled += 1;
return message.body.value * 2;
});
await message.complete({ doubled });
return Loop.continue(undefined);
},
});
}),
});
export const registry = setup({ use: { requestResponseActor } });
import { createClient } from "rivetkit/client";
import type { registry } from "./actors";
const client = createClient<typeof registry>();
const handle = client.requestResponseActor.getOrCreate(["main"]);
const result = await handle.send("requests", { value: 21 }, { wait: true });
if (result.status === "completed") {
const response = result.response as { doubled: number };
console.log(response.doubled);
}
Scatter-gather across actors
Use this when multiple actors can process independent parts of a request in parallel, then return a merged response.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type ScatterMessage = { input: number };
export const shardActor = actor({
actions: {
compute: async (_c, input: number) => input * 10,
},
});
export const scatterGatherActor = actor({
state: {
lastSum: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "scatter-gather-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-scatter", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const scatter = message.body as ScatterMessage;
const gathered = await loopCtx.join("gather", {
shardA: {
run: async (joinCtx) =>
await joinCtx.step("call-shard-a", async () => {
const client = joinCtx.client<typeof registry>();
const handle = client.shardActor.getOrCreate(["a"]);
return handle.compute(scatter.input);
}),
},
shardB: {
run: async (joinCtx) =>
await joinCtx.step("call-shard-b", async () => {
const client = joinCtx.client<typeof registry>();
const handle = client.shardActor.getOrCreate(["b"]);
return handle.compute(scatter.input);
}),
},
shardC: {
run: async (joinCtx) =>
await joinCtx.step("call-shard-c", async () => {
const client = joinCtx.client<typeof registry>();
const handle = client.shardActor.getOrCreate(["c"]);
return handle.compute(scatter.input);
}),
},
});
await loopCtx.step("aggregate", async () => {
loopCtx.state.lastSum = gathered.shardA + gathered.shardB + gathered.shardC;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { scatterGatherActor, shardActor } });
Timeout + fallback actor
Use this when a primary actor call might be slow or unavailable and you need a deterministic fallback path.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
export const primaryServiceActor = actor({
actions: {
fetchValue: async () => {
await new Promise((resolve) => setTimeout(resolve, 500));
return "primary";
},
},
});
export const fallbackServiceActor = actor({
actions: {
fetchValue: async () => "fallback",
},
});
export const timeoutFallbackActor = actor({
state: {
lastSource: "none" as "none" | "primary" | "fallback",
lastValue: "",
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "timeout-loop",
run: async (loopCtx) => {
await loopCtx.queue.next("wait-request", {
timeout: 30_000,
});
const winner = await loopCtx.race("primary-vs-timeout", [
{
name: "primary",
run: async (raceCtx) =>
await raceCtx.step("call-primary", async () => {
const client = raceCtx.client<typeof registry>();
const primary = client.primaryServiceActor.getOrCreate(["main"]);
return primary.fetchValue();
}),
},
{
name: "timeout",
run: async (raceCtx) => {
await raceCtx.sleep("primary-timeout", 200);
return "timeout";
},
},
]);
let value = winner.value as string;
let source: "primary" | "fallback" = "primary";
if (winner.winner === "timeout") {
value = (await loopCtx.step("fallback-call", async () => {
const client = loopCtx.client<typeof registry>();
const fallback = client.fallbackServiceActor.getOrCreate(["main"]);
return fallback.fetchValue();
})) as string;
source = "fallback";
}
await loopCtx.step("record-choice", async () => {
loopCtx.state.lastSource = source;
loopCtx.state.lastValue = value;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({
use: { timeoutFallbackActor, primaryServiceActor, fallbackServiceActor },
});
Cross-actor saga (compensating actions)
Use this when a workflow spans multiple actors and each side effect may need compensation.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type CheckoutMessage = {
orderId: string;
amount: number;
};
export const inventoryActor = actor({
actions: {
reserve: async (_c, orderId: string) => `reserve-${orderId}`,
release: async (_c, reservationId: string) => reservationId,
},
});
export const billingActor = actor({
actions: {
charge: async (_c, amount: number) => `charge-${amount}`,
refund: async (_c, chargeId: string) => chargeId,
},
});
export const checkoutSagaActor = actor({
state: {
completedOrders: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "checkout-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-order", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const checkout = message.body as CheckoutMessage;
await loopCtx.rollbackCheckpoint("checkout-saga");
const reservationId = (await loopCtx.step({
name: "reserve-inventory",
run: async () => {
const client = loopCtx.client<typeof registry>();
const inventory = client.inventoryActor.getOrCreate(["main"]);
return inventory.reserve(checkout.orderId);
},
rollback: async (_rollbackCtx, output) => {
const client = loopCtx.client<typeof registry>();
const inventory = client.inventoryActor.getOrCreate(["main"]);
await inventory.release(output as string);
},
})) as string;
const chargeId = (await loopCtx.step({
name: "charge-card",
run: async () => {
const client = loopCtx.client<typeof registry>();
const billing = client.billingActor.getOrCreate(["main"]);
return billing.charge(checkout.amount);
},
rollback: async (_rollbackCtx, output) => {
const client = loopCtx.client<typeof registry>();
const billing = client.billingActor.getOrCreate(["main"]);
await billing.refund(output as string);
},
})) as string;
await loopCtx.step("mark-complete", async () => {
loopCtx.state.completedOrders += 1;
void reservationId;
void chargeId;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({
use: { checkoutSagaActor, inventoryActor, billingActor },
});
Signal-driven control loop
Use this when workflow progress should be triggered by commands/events instead of fixed polling intervals.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type ControlSignal = { kind: "pause" | "resume" | "stop" };
export const controlLoopActor = actor({
state: {
mode: "running" as "running" | "paused" | "stopped",
handledSignals: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "control-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-signal", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const signal = message.body as ControlSignal;
await loopCtx.step("apply-signal", async () => {
loopCtx.state.handledSignals += 1;
if (signal.kind === "pause") loopCtx.state.mode = "paused";
if (signal.kind === "resume") loopCtx.state.mode = "running";
if (signal.kind === "stop") loopCtx.state.mode = "stopped";
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { controlLoopActor } });
Poll + backoff loop
Use this when an external dependency has variable availability and retries should slow down after failures.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
async function pollExternal(attempt: number): Promise<boolean> {
return attempt % 3 === 0;
}
export const pollBackoffActor = actor({
state: {
attempts: 0,
backoffMs: 100,
status: "unknown" as "unknown" | "healthy" | "retrying",
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "poll-loop",
run: async (loopCtx) => {
const success = await loopCtx.step("poll-target", async () => {
loopCtx.state.attempts += 1;
return pollExternal(loopCtx.state.attempts);
});
if (success) {
await loopCtx.step("reset-backoff", async () => {
loopCtx.state.status = "healthy";
loopCtx.state.backoffMs = 100;
});
await loopCtx.sleep("healthy-interval", 1_000);
return Loop.continue(undefined);
}
await loopCtx.step("grow-backoff", async () => {
loopCtx.state.status = "retrying";
loopCtx.state.backoffMs = Math.min(loopCtx.state.backoffMs * 2, 5_000);
});
await loopCtx.sleep("retry-delay", loopCtx.state.backoffMs);
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { pollBackoffActor } });
Child worker orchestration
Use this when one workflow coordinates many child workers (actors or worker workflows) and manages their lifecycle.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type BatchMessage = { payload: number };
export const childWorkerActor = actor({
actions: {
process: async (_c, payload: number) => payload * 3,
},
});
export const orchestratorActor = actor({
state: {
lastTotal: 0,
},
run: workflow(async (ctx) => {
await ctx.step("start-children", async () => {
const client = ctx.client<typeof registry>();
await client.childWorkerActor.getOrCreate(["child-a"]).process(0);
await client.childWorkerActor.getOrCreate(["child-b"]).process(0);
await client.childWorkerActor.getOrCreate(["child-c"]).process(0);
});
await ctx.loop({
name: "orchestrate-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-batch", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const batch = message.body as BatchMessage;
const results = await loopCtx.join("collect-updates", {
a: {
run: async (joinCtx) =>
await joinCtx.step("run-child-a", async () => {
const client = joinCtx.client<typeof registry>();
return client.childWorkerActor.getOrCreate(["child-a"]).process(batch.payload);
}),
},
b: {
run: async (joinCtx) =>
await joinCtx.step("run-child-b", async () => {
const client = joinCtx.client<typeof registry>();
return client.childWorkerActor.getOrCreate(["child-b"]).process(batch.payload);
}),
},
c: {
run: async (joinCtx) =>
await joinCtx.step("run-child-c", async () => {
const client = joinCtx.client<typeof registry>();
return client.childWorkerActor.getOrCreate(["child-c"]).process(batch.payload);
}),
},
});
await loopCtx.step("reconcile", async () => {
loopCtx.state.lastTotal = results.a + results.b + results.c;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { orchestratorActor, childWorkerActor } });
Bounded drain + concurrency cap
Use this when inbound work can spike and you need predictable per-iteration limits.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type WorkMessage = { id: string; value: number };
const MAX_PER_ITERATION = 10;
const CONCURRENCY_LIMIT = 3;
async function processWork(value: number): Promise<number> {
return value * 2;
}
async function runWithLimit<T>(
limit: number,
items: T[],
fn: (item: T) => Promise<void>,
): Promise<void> {
let nextIndex = 0;
const workers = Array.from({ length: limit }, async () => {
while (nextIndex < items.length) {
const current = items[nextIndex];
nextIndex += 1;
await fn(current);
}
});
await Promise.all(workers);
}
export const boundedDrainActor = actor({
state: {
processed: 0,
lastWindowSize: 0,
lastWindowTotal: 0,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "bounded-drain-loop",
run: async (loopCtx) => {
const window: WorkMessage[] = [];
for (let i = 0; i < MAX_PER_ITERATION; i += 1) {
const [message] = await loopCtx.queue.next("wait-work", {
timeout: i === 0 ? 30_000 : 10,
});
if (!message) break;
window.push(message.body as WorkMessage);
}
if (window.length === 0) return Loop.continue(undefined);
await loopCtx.step("process-window", async () => {
let windowTotal = 0;
await runWithLimit(CONCURRENCY_LIMIT, window, async (work) => {
const result = await processWork(work.value);
windowTotal += result;
});
loopCtx.state.processed += window.length;
loopCtx.state.lastWindowSize = window.length;
loopCtx.state.lastWindowTotal = windowTotal;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { boundedDrainActor } });
Versioned workflow evolution
Use this when workflow structure changes across deployments and old histories must still replay.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
export const versionedWorkflowActor = actor({
state: {
runs: 0,
},
run: workflow(async (ctx) => {
await ctx.step("validate-v2", async () => {
ctx.state.runs += 1;
});
await ctx.removed("validate-v1", "step");
await ctx.loop({
name: "main-loop-v2",
run: async (loopCtx) => {
await loopCtx.sleep("idle", 500);
await loopCtx.step("heartbeat-v2", async () => {
loopCtx.state.runs += 1;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { versionedWorkflowActor } });
Checkpoint-friendly loop design
Use this when you need reliable replay and resume semantics across crashes and restarts.
import { actor, setup } from "rivetkit";
import { Loop, workflow } from "rivetkit/workflow";
type PaymentMessage = { id: string; amount: number };
export const checkpointFriendlyActor = actor({
state: {
appliedCount: 0,
totalAmount: 0,
lastPaymentId: null as string | null,
},
run: workflow(async (ctx) => {
await ctx.loop({
name: "payment-loop",
run: async (loopCtx) => {
const [message] = await loopCtx.queue.next("wait-payment", {
timeout: 30_000,
});
if (!message) return Loop.continue(undefined);
const payment = message.body as PaymentMessage;
await loopCtx.rollbackCheckpoint("apply-payment-checkpoint");
const plan = (await loopCtx.step("build-plan", async () => {
return {
paymentId: payment.id,
amount: payment.amount,
};
})) as { paymentId: string; amount: number };
await loopCtx.step("apply-side-effects", async () => {
loopCtx.state.appliedCount += 1;
loopCtx.state.totalAmount += plan.amount;
loopCtx.state.lastPaymentId = plan.paymentId;
});
return Loop.continue(undefined);
},
});
}),
actions: {
getState: (c) => c.state,
},
});
export const registry = setup({ use: { checkpointFriendlyActor } });
Migrations
- Keep workflow entry names stable once deployed.
- If an old entry was removed or renamed, call
ctx.removed(name, originalType). - This keeps replay compatible across deployments.
Step-only access to actor APIs
state, vars, db, client(), and connection/event APIs are only valid inside ctx.step(...) callbacks.
Use non-step workflow code for orchestration only: queue waits, sleeps, loops, joins, races, and rollback boundaries. Keep actor-local side effects in steps.
Debugging
GET /inspector/workflow-historyreturns workflow history status for an actor.- Response includes
isWorkflowEnabledandhistory. - In non-dev mode, inspector endpoints require authorization.
Recommendations
- Prefer queue-driven loops for long-lived workflows.
- Structure long-lived workflows with setup and teardown around the main loop.
- Keep actor state changes and side effects inside steps.
- Store workflow progress in
stateand broadcast updates as progress changes. - Use timeouts and rollback for external side effects.