Skip to main content

Features - TypeScript SDK feature guide

The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.

In this section you can find the following:

How to develop with Signals

A Signal is a message sent to a running Workflow Execution.

Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.

How to define a Signal

A Signal has a name and can have arguments.

  • The name, also called a Signal type, is a string.
  • The arguments must be serializable.

defineSignal

import { defineSignal } from '@temporalio/workflow';

interface JoinInput {
userId: string;
groupId: string;
}

export const joinSignal = defineSignal<[JoinInput]>('join');

How to handle a Signal

Workflows listen for Signals by the Signal's name.

setHandler

import { setHandler } from '@temporalio/workflow';

export async function yourWorkflow() {
const groups = new Map<string, Set<string>>();

setHandler(joinSignal, ({ userId, groupId }: JoinInput) => {
const group = groups.get(groupId);
if (group) {
group.add(userId);
} else {
groups.set(groupId, new Set([userId]));
}
});
}

How to send a Signal from a Temporal Client

When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.

WorkflowHandle.signal

import { Client } from '@temporalio/client';
import { joinSignal } from './workflows';

const client = new Client();

const handle = client.workflow.getHandle('workflow-id-123');

await handle.signal(joinSignal, { userId: 'user-1', groupId: 'group-1' });

How to send a Signal from a Workflow

A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.

When an External Signal is sent:

getExternalWorkflowHandle

import { getExternalWorkflowHandle } from '@temporalio/workflow';
import { joinSignal } from './other-workflow';

export async function yourWorkflowThatSignals() {
const handle = getExternalWorkflowHandle('workflow-id-123');
await handle.signal(joinSignal, { userId: 'user-1', groupId: 'group-1' });
}

How to Signal-With-Start

Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.

If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.

WorkflowClient.signalWithStart

import { Client } from '@temporalio/client';
import { joinSignal, yourWorkflow } from './workflows';

const client = new Client();

await client.workflow.signalWithStart(yourWorkflow, {
workflowId: 'workflow-id-123',
taskQueue: 'my-taskqueue',
args: [{ foo: 1 }],
signal: joinSignal,
signalArgs: [{ userId: 'user-1', groupId: 'group-1' }],
});

How to develop with Queries

A Query is a synchronous operation that is used to get the state of a Workflow Execution.

How to define a Query

A Query has a name and can have arguments.

  • The name, also called a Query type, is a string.
  • The arguments must be serializable.

Use defineQuery to define the name, parameters, and return value of a Query.

state/src/workflows.ts

import { defineQuery } from '@temporalio/workflow';

export const getValueQuery = defineQuery<number | undefined, [string]>(
'getValue',
);

How to handle a Query

Queries are handled by your Workflow.

Don’t include any logic that causes Command generation within a Query handler (such as executing Activities). Including such logic causes unexpected behavior.

Use handleQuery to handle Queries inside a Workflow.

You make a Query with handle.query(query, ...args). A Query needs a return value, but can also take arguments.

state/src/workflows.ts

export async function trackState(): Promise<void> {
const state = new Map<string, number>();
setHandler(setValueSignal, (key, value) => void state.set(key, value));
setHandler(getValueQuery, (key) => state.get(key));
await CancellationScope.current().cancelRequested;
}

How to send a Query

Queries are sent from a Temporal Client.

Use WorkflowHandle.query to query a running or completed Workflow.

state/src/query-workflow.ts

import { Client } from '@temporalio/client';
import { getValueQuery } from './workflows';

async function run(): Promise<void> {
const client = new Client();
const handle = client.workflow.getHandle('state-id-0');
const meaning = await handle.query(getValueQuery, 'meaning-of-life');
console.log({ meaning });
}

How to define Signals and Queries statically or dynamically

  • Handlers for both Signals and Queries can take arguments, which can be used inside setHandler logic.
  • Only Signal Handlers can mutate state, and only Query Handlers can return values.

Define Signals and Queries statically

If you know the name of your Signals and Queries upfront, we recommend declaring them outside the Workflow Definition.

signals-queries/src/workflows.ts

import * as wf from '@temporalio/workflow';

export const unblockSignal = wf.defineSignal('unblock');
export const isBlockedQuery = wf.defineQuery<boolean>('isBlocked');

export async function unblockOrCancel(): Promise<void> {
let isBlocked = true;
wf.setHandler(unblockSignal, () => void (isBlocked = false));
wf.setHandler(isBlockedQuery, () => isBlocked);
wf.log.info('Blocked');
try {
await wf.condition(() => !isBlocked);
wf.log.info('Unblocked');
} catch (err) {
if (err instanceof wf.CancelledFailure) {
wf.log.info('Cancelled');
}
throw err;
}
}

This technique helps provide type safety because you can export the type signature of the Signal or Query to be called by the Client.

Define Signals and Queries dynamically

For more flexible use cases, you might want a dynamic Signal (such as a generated ID). You can handle it in two ways:

  • Avoid making it dynamic by collapsing all Signals into one handler and move the ID to the payload.
  • Actually make the Signal name dynamic by inlining the Signal definition per handler.
import * as wf from '@temporalio/workflow';

// "fat handler" solution
wf.setHandler(`genericSignal`, (payload) => {
switch (payload.taskId) {
case taskAId:
// do task A things
break;
case taskBId:
// do task B things
break;
default:
throw new Error('Unexpected task.');
}
});

// "inline definition" solution
wf.setHandler(wf.defineSignal(`task-${taskAId}`), (payload) => {
/* do task A things */
});
wf.setHandler(wf.defineSignal(`task-${taskBId}`), (payload) => {
/* do task B things */
});

// utility "inline definition" helper
const inlineSignal = (signalName, handler) =>
wf.setHandler(wf.defineSignal(signalName), handler);
inlineSignal(`task-${taskBId}`, (payload) => {
/* do task B things */
});

API Design FAQs

Why not "new Signal" and "new Query"?

The semantic of defineSignal and defineQuery is intentional. They return Signal and Query definitions, not unique instances of Signals and Queries themselves The following is their entire source code:

/**
* Define a signal method for a Workflow.
*/
export function defineSignal<Args extends any[] = []>(
name: string,
): SignalDefinition<Args> {
return {
type: 'signal',
name,
};
}

/**
* Define a query method for a Workflow.
*/
export function defineQuery<Ret, Args extends any[] = []>(
name: string,
): QueryDefinition<Ret, Args> {
return {
type: 'query',
name,
};
}

Signals and Queries are instantiated only in setHandler and are specific to particular Workflow Executions.

These distinctions might seem minor, but they model how Temporal works under the hood, because Signals and Queries are messages identified by "just strings" and don't have meaning independent of the Workflow having a listener to handle them. This will be clearer if you refer to the Client-side APIs.

Why setHandler and not OTHER_API?

We named it setHandler instead of subscribe because a Signal or Query can have only one "handler" at a time, whereas subscribe could imply an Observable with multiple consumers and is a higher-level construct.

wf.setHandler(MySignal, handlerFn1);
wf.setHandler(MySignal, handlerFn2); // replaces handlerFn1

If you are familiar with RxJS, you are free to wrap your Signals and Queries into Observables if you want, or you could dynamically reassign the listener based on your business logic or Workflow state.

Workflow timeouts

Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.

Workflow timeouts are set when starting the Workflow Execution.

Create an instance of WorkflowOptions from the Client and set your Workflow Timeout.

Available timeouts are:

snippets/src/client.ts

await client.workflow.start(example, {
taskQueue,
workflowId,
workflowExecutionTimeout: '1 day',
});

snippets/src/client.ts

await client.workflow.start(example, {
taskQueue,
workflowId,
workflowRunTimeout: '1 minute',
});

snippets/src/client.ts

await client.workflow.start(example, {
taskQueue,
workflowId,
workflowTaskTimeout: '1 minute',
});

Workflow retries

A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Use a Retry Policy to retry a Workflow Execution in the event of a failure.

Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.

Create an instance of the Retry Policy, known as retry in TypeScript, from the WorkflowOptions of the Client interface.

snippets/src/client.ts

const handle = await client.workflow.start(example, {
taskQueue,
workflowId,
retry: {
maximumAttempts: 3,
},
});

How to set Activity timeouts

Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.

The following timeouts are available in the Activity Options.

An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.

When you call proxyActivities in a Workflow Function, you can set a range of ActivityOptions.

Available timeouts are:

// Sample of typical options you can set
const { greet } = proxyActivities<typeof activities>({
scheduleToCloseTimeout: '5m',
// startToCloseTimeout: "30s", // recommended
// scheduleToStartTimeout: "60s",

retry: {
// default retry policy if not specified
initialInterval: '1s',
backoffCoefficient: 2,
maximumAttempts: Infinity,
maximumInterval: 100 * initialInterval,
nonRetryableErrorTypes: [],
},
});

How to set an Activity Retry Policy

A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.

Activity Executions are automatically associated with a default Retry Policy if a custom one is not provided.

To set Activity Retry Policies in TypeScript, pass ActivityOptions.retry to proxyActivities.

// Sample of typical options you can set
const { yourActivity } = proxyActivities<typeof activities>({
// ...
retry: {
// default retry policy if not specified
initialInterval: '1s',
backoffCoefficient: 2,
maximumAttempts: Infinity,
maximumInterval: 100 * initialInterval,
nonRetryableErrorTypes: [],
},
});

How to Heartbeat an Activity

An Activity Heartbeat is a ping from the Worker Process that is executing the Activity to the Temporal Cluster. Each Heartbeat informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed. If the Cluster does not receive a Heartbeat within a Heartbeat Timeout time period, the Activity will be considered failed and another Activity Task Execution may be scheduled according to the Retry Policy.

Heartbeats may not always be sent to the Cluster—they may be throttled by the Worker.

Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.

Heartbeats can contain a details field describing the Activity's current progress. If an Activity gets retried, the Activity can access the details from the last Heartbeat that was sent to the Cluster.

Long-running Activities should Heartbeat their progress back to the Workflow for earlier detection of stalled Activities (with Heartbeat Timeout) and resuming stalled Activities from checkpoints (with Heartbeat details).

To set Activity Heartbeat, use Context.current().heartbeat() in your Activity implementation, and set heartbeatTimeout in your Workflow.

// activity implementation
export async function example(sleepIntervalMs = 1000): Promise<void> {
for (let progress = 1; progress <= 1000; ++progress) {
await Context.current().sleep(sleepIntervalMs);
// record activity heartbeat
Context.current().heartbeat();
}
}

// ...

// workflow code calling activity
const { example } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 hour',
heartbeatTimeout: '10s',
});

In the previous example, setting the Heartbeat informs the Temporal Server of the Activity's progress at regular intervals. If the Activity stalls or the Activity Worker becomes unavailable, the absence of Heartbeats prompts the Temporal Server to retry the Activity immediately, without waiting for startToCloseTimeout to complete.

You can also add heartbeatDetails as a checkpoint to collect data about failures during the execution, and use it to resume the Activity from that point.

The following example extends the previous sample to include a heartbeatDetails checkpoint.

export async function example(sleepIntervalMs = 1000): Promise<void> {
const startingPoint = Context.current().info.heartbeatDetails || 1; // allow for resuming from heartbeat
for (let progress = startingPoint; progress <= 100; ++progress) {
await Context.current().sleep(sleepIntervalMs);
Context.current().heartbeat(progress);
}
}

In this example, when the heartbeatTimeout is reached and the Activity is retried, the Activity Worker picks up the execution from where the previous attempt left off.

How to set a Heartbeat Timeout

A Heartbeat Timeout works in conjunction with Activity Heartbeats.

To set a Heartbeat Timeout, use ActivityOptions.heartbeatTimeout. If the Activity takes longer than that between heartbeats, the Activity is failed.

// Creating a proxy for the activity.
const { longRunningActivity } = proxyActivities<typeof activities>({
// translates to 300000 ms
scheduleToCloseTimeout: '5m',
// translates to 30000 ms
startToCloseTimeout: '30s',
// equivalent to '10 seconds'
heartbeatTimeout: 10000,
});

How to asynchronously complete an Activity

Asynchronous Activity Completion enables the Activity Function to return without the Activity Execution completing.

There are three steps to follow:

  1. The Activity provides the external system with identifying information needed to complete the Activity Execution. Identifying information can be a Task Token, or a combination of Namespace, Workflow Id, and Activity Id.
  2. The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
  3. The Temporal Client is used to Heartbeat and complete the Activity.

To asynchronously complete an Activity, call AsyncCompletionClient.complete.

activities-examples/src/activities/async-completion.ts

import { CompleteAsyncError, Context } from '@temporalio/activity';
import { AsyncCompletionClient } from '@temporalio/client';

export async function doSomethingAsync(): Promise<string> {
const taskToken = Context.current().info.taskToken;
setTimeout(() => doSomeWork(taskToken), 1000);
throw new CompleteAsyncError();
}

// this work could be done in a different process or on a different machine
async function doSomeWork(taskToken: Uint8Array): Promise<void> {
const client = new AsyncCompletionClient();
// does some work...
await client.complete(taskToken, 'Job\'s done!');
}

Local Activities

To call Local Activities in TypeScript, use proxyLocalActivities.

import * as workflow from '@temporalio/workflow';

const { getEnvVar } = workflow.proxyLocalActivities({
startToCloseTimeout: '2 seconds',
});

export async function yourWorkflow(): Promise<void> {
const someSetting = await getEnvVar('SOME_SETTING');
// ...
}

Local Activities must be registered with the Worker the same way non-local Activities are.

Cancel an Activity from a Workflow

Canceling an Activity from within a Workflow requires that the Activity Execution sends Heartbeats and sets a Heartbeat Timeout. If the Heartbeat is not invoked, the Activity cannot receive a cancellation request. When any non-immediate Activity is executed, the Activity Execution should send Heartbeats and set a Heartbeat Timeout to ensure that the server knows it is still working.

When an Activity is canceled, an error is raised in the Activity at the next available opportunity. If cleanup logic needs to be performed, it can be done in a finally clause or inside a caught cancel error. However, for the Activity to appear canceled the exception needs to be re-raised.

note

Unlike regular Activities, Local Activities can be canceled if they don't send Heartbeats. Local Activities are handled locally, and all the information needed to handle the cancellation logic is available in the same Worker process.

How to start a Child Workflow Execution

A Child Workflow Execution is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.

When using a Child Workflow API, Child Workflow–related Events (such as StartChildWorkflowExecutionInitiated, ChildWorkflowExecutionStarted, and ChildWorkflowExecutionCompleted) are logged in the Event History of the Child Workflow Execution.

Always block progress until the ChildWorkflowExecutionStarted Event is logged to the Event History to ensure the Child Workflow Execution has started. After that, Child Workflow Executions can be abandoned by using the default Abandon Parent Close Policy set in the Child Workflow Options.

To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.

Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.

To start a Child Workflow Execution and return a handle to it, use startChild.

import { startChild } from '@temporalio/workflow';

export async function parentWorkflow(names: string[]) {
const childHandle = await startChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
});
// you can use childHandle to signal or get result here
await childHandle.signal('anySignal');
const result = childHandle.result();
// you can use childHandle to signal, query, cancel, terminate, or get result here
}

To start a Child Workflow Execution and await its completion, use executeChild.

By default, a child is scheduled on the same Task Queue as the parent.

child-workflows/src/workflows.ts

import { executeChild } from '@temporalio/workflow';

export async function parentWorkflow(...names: string[]): Promise<string> {
const responseArray = await Promise.all(
names.map((name) =>
executeChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
})
),
);
return responseArray.join('\n');
}

To control any running Workflow from inside a Workflow, use getExternalWorkflowHandle(workflowId).

import { getExternalWorkflowHandle, workflowInfo } from '@temporalio/workflow';

export async function terminateWorkflow() {
const { workflowId } = workflowInfo(); // no await needed
const handle = getExternalWorkflowHandle(workflowId); // sync function, not async
await handle.cancel();
}

If the Child Workflow options aren't explicitly set, they inherit their values from the Parent Workflow options. Two advanced options are unique to Child Workflows:

  • cancellationType: Controls when to throw the CanceledFailure exception when a Child Workflow is canceled.
  • parentClosePolicy: Explained in the next section.

If you need to cancel a Child Workflow Execution, use cancellation scopes. A Child Workflow Execution is automatically cancelled when its containing scope is cancelled.

How to set a Parent Close Policy

A Parent Close Policy determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).

The default Parent Close Policy option is set to terminate the Child Workflow Execution.

To specify how a Child Workflow reacts to a Parent Workflow reaching a Closed state, use the parentClosePolicy option.

child-workflows/src/workflows.ts

import { executeChild } from '@temporalio/workflow';

export async function parentWorkflow(...names: string[]): Promise<string> {
const responseArray = await Promise.all(
names.map((name) =>
executeChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
})
),
);
return responseArray.join('\n');
}

How to Continue-As-New

Continue-As-New enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.

To cause a Workflow Execution to Continue-As-New, the Workflow function should return the result of the continueAsNew.

continue-as-new/src/workflows.ts

import { continueAsNew, log, sleep } from '@temporalio/workflow';

export async function loopingWorkflow(iteration = 0): Promise<void> {
if (iteration === 10) {
return;
}
log.info('Running Workflow iteration', { iteration });
await sleep(1000);
// Must match the arguments expected by `loopingWorkflow`
await continueAsNew<typeof loopingWorkflow>(iteration + 1);
// Unreachable code, continueAsNew is like `process.exit` and will stop execution once called.
}

Single-entity design pattern in TypeScript

The following is a simple pattern that represents a single entity. It tracks the number of iterations regardless of frequency, and calls continueAsNew while properly handling pending updates from Signals.

interface Input {
/* Define your Workflow input type here */
}
interface Update {
/* Define your Workflow update type here */
}

const MAX_ITERATIONS = 1;

export async function entityWorkflow(
input: Input,
isNew = true,
): Promise<void> {
try {
const pendingUpdates = Array<Update>();
setHandler(updateSignal, (updateCommand) => {
pendingUpdates.push(updateCommand);
});

if (isNew) {
await setup(input);
}

for (let iteration = 1; iteration <= MAX_ITERATIONS; ++iteration) {
// Ensure that we don't block the Workflow Execution forever waiting
// for updates, which means that it will eventually Continue-As-New
// even if it does not receive updates.
await condition(() => pendingUpdates.length > 0, '1 day');

while (pendingUpdates.length) {
const update = pendingUpdates.shift();
await runAnActivityOrChildWorkflow(update);
}
}
} catch (err) {
if (isCancellation(err)) {
await CancellationScope.nonCancellable(async () => {
await cleanup();
});
}
throw err;
}
await continueAsNew<typeof entityWorkflow>(input, false);
}

How to Schedule a Workflow

Scheduling Workflows is a crucial aspect of any automation process, especially when dealing with time-sensitive tasks. By scheduling a Workflow, you can automate repetitive tasks, reduce the need for manual intervention, and ensure timely execution of your business processes

Use any of the following action to help Schedule a Workflow Execution and take control over your automation process.

How to Create a Scheduled Workflow

The create action enables you to create a new Schedule. When you create a new Schedule, a unique Schedule ID is generated, which you can use to reference the Schedule in other Schedule commands.

How to Backfill a Scheduled Workflow

The backfill action executes Actions ahead of their specified time range. This command is useful when you need to execute a missed or delayed Action, or when you want to test the Workflow before its scheduled time.

How to Delete a Scheduled Workflow

The delete action enables you to delete a Schedule. When you delete a Schedule, it does not affect any Workflows that were started by the Schedule.

How to Describe a Scheduled Workflow

The describe action shows the current Schedule configuration, including information about past, current, and future Workflow Runs. This command is helpful when you want to get a detailed view of the Schedule and its associated Workflow Runs.

How to List a Scheduled Workflow

The list action lists all the available Schedules. This command is useful when you want to view a list of all the Schedules and their respective Schedule IDs.

How to Pause a Scheduled Workflow

The pause action enables you to pause and unpause a Schedule. When you pause a Schedule, all the future Workflow Runs associated with the Schedule are temporarily stopped. This command is useful when you want to temporarily halt a Workflow due to maintenance or any other reason.

How to Trigger a Scheduled Workflow

The trigger action triggers an immediate action with a given Schedule. By default, this action is subject to the Overlap Policy of the Schedule. This command is helpful when you want to execute a Workflow outside of its scheduled time.

How to Update a Scheduled Workflow

The update action enables you to update an existing Schedule. This command is useful when you need to modify the Schedule's configuration, such as changing the start time, end time, or interval.

What is a Timer?

A Workflow can set a durable timer for a fixed time period. In some SDKs, the function is called sleep(), and in others, it's called timer().

A Workflow can sleep for months. Timers are persisted, so even if your Worker or Temporal Cluster is down when the time period completes, as soon as your Worker and Cluster are back up, the sleep() call will resolve and your code will continue executing.

Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.

Asynchronous design patterns in TypeScript

The real value of sleep and condition is in knowing how to use them to model asynchronous business logic. Here are some examples we use the most; we welcome more if you can think of them!

Racing Timers

Use Promise.race with Timers to dynamically adjust delays.

export async function processOrderWorkflow({
orderProcessingMS,
sendDelayedEmailTimeoutMS,
}: ProcessOrderOptions): Promise<void> {
let processing = true;
const processOrderPromise = processOrder(orderProcessingMS).then(() => {
processing = false;
});

await Promise.race([processOrderPromise, sleep(sendDelayedEmailTimeoutMS)]);

if (processing) {
await sendNotificationEmail();
await processOrderPromise;
}
}

Racing Signals

Use Promise.race with Signals and Triggers to have a promise resolve at the earlier of either system time or human intervention.

import { defineSignal, sleep, Trigger } from '@temporalio/workflow';

const userInteraction = new Trigger<boolean>();
const completeUserInteraction = defineSignal('completeUserInteraction');

export async function yourWorkflow(userId: string) {
setHandler(completeUserInteraction, () => userInteraction.resolve(true)); // programmatic resolve
const userInteracted = await Promise.race([
userInteraction,
sleep('30 days'),
]);
if (!userInteracted) {
await sendReminderEmail(userId);
}
}

You can invert this to create a reminder pattern where the promise resolves if no Signal is received.

Antipattern: Racing sleep.then

Be careful when racing a chained sleep. This might cause bugs because the chained .then will still continue to execute.

await Promise.race([
sleep('5s').then(() => (status = 'timed_out')),
somethingElse.then(() => (status = 'processed')),
]);

if (status === 'processed') await complete(); // takes more than 5 seconds
// status = timed_out

Updatable Timer

Here is how you can build an updatable Timer with condition:

import * as wf from '@temporalio/workflow';

// usage
export async function countdownWorkflow(): Promise<void> {
const target = Date.now() + 24 * 60 * 60 * 1000; // 1 day!!!
const timer = new UpdatableTimer(target);
console.log('timer set for: ' + new Date(target).toString());
wf.setHandler(setDeadlineSignal, (deadline) => {
// send in new deadlines via Signal
timer.deadline = deadline;
console.log('timer now set for: ' + new Date(deadline).toString());
});
wf.setHandler(timeLeftQuery, () => timer.deadline - Date.now());
await timer; // if you send in a signal with a new time, this timer will resolve earlier!
console.log('countdown done!');
}

This is available in the third-party package temporal-time-utils, where you can also see the implementation:

// implementation
export class UpdatableTimer implements PromiseLike<void> {
deadlineUpdated = false;
#deadline: number;

constructor(deadline: number) {
this.#deadline = deadline;
}

private async run(): Promise<void> {
/* eslint-disable no-constant-condition */
while (true) {
this.deadlineUpdated = false;
if (
!(await wf.condition(
() => this.deadlineUpdated,
this.#deadline - Date.now(),
))
) {
break;
}
}
}

then<TResult1 = void, TResult2 = never>(
onfulfilled?: (value: void) => TResult1 | PromiseLike<TResult1>,
onrejected?: (reason: any) => TResult2 | PromiseLike<TResult2>,
): PromiseLike<TResult1 | TResult2> {
return this.run().then(onfulfilled, onrejected);
}

set deadline(value: number) {
this.#deadline = value;
this.deadlineUpdated = true;
}

get deadline(): number {
return this.#deadline;
}
}

How to use Temporal Cron Jobs

A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.

A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.

You can set each Workflow to repeat on a schedule with the cronSchedule option:

const handle = await client.start(scheduledWorkflow, {
// ...
cronSchedule: '* * * * *', // start every minute
});

How to create and manage Namespaces

You can create, update, deprecate or delete your Namespaces using either tctl or SDK APIs.

Use Namespaces to isolate your Workflow Executions according to your needs. For example, you can use Namespaces to match the development lifecycle by having separate dev and prod Namespaces. You could also use them to ensure Workflow Executions between different teams never communicate - such as ensuring that the teamA Namespace never impacts the teamB Namespace.

On Temporal Cloud, use the Temporal Cloud UI to create and manage a Namespace from the UI, or tcld commands to manage Namespaces from the command-line interface.

On self-hosted Temporal Cluster, you can register and manage your Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.

Use a custom Authorizer on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.

You must register a Namespace with the Temporal Cluster before setting it in the Temporal Client.

How to register Namespaces

Registering a Namespace creates a Namespace on the Temporal Cluster or Temporal Cloud.

On Temporal Cloud, use the Temporal Cloud UI or tcld commands to create Namespaces.

On self-hosted Temporal Cluster, you can register your Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.

Use a custom Authorizer on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.

How to manage Namespaces

You can get details for your Namespaces, update Namespace configuration, and deprecate or delete your Namespaces.

On Temporal Cloud, use the Temporal Cloud UI or tcld commands to manage Namespaces.

On self-hosted Temporal Cluster, you can manage your registered Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.

Use a custom Authorizer on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.

You must register a Namespace with the Temporal Cluster before setting it in the Temporal Client.

How to use a custom payload converter in TypeScript

Temporal SDKs provide a Payload Converter that can be customized to convert a custom data type to a Payload and back.

Implementing custom Payload conversion is optional. It is needed only if the default Data Converter does not support your custom values.

To support custom Payload conversion, create a custom Payload Converter and configure the Data Converter to use it in your Client options.

The order in which your encoding Payload Converters are applied depend on the order given to the Data Converter. You can set multiple encoding Payload Converters to run your conversions. When the Data Converter receives a value for conversion, it passes through each Payload Converter in sequence until the converter that handles the data type does the conversion.

To send values that are not JSON-serializable like a BigInt or Date, provide a custom Data Converter to the Client and Worker:

A Data Converter has two parts:

  • Payload Converter: Sync methods that sometimes run inside the Workflow isolate (and are thus limited).
  • Payload Codec: Async methods that run outside the isolate.
interface DataConverter {
payloadConverterPath?: string;
payloadCodecs?: PayloadCodec[];
}

Payload Converter

API documentation: PayloadConverter

interface PayloadConverter {
/**
* Converts a value to a {@link Payload}.
* @param value The value to convert. Example values include the Workflow args sent by the client and the values returned by a Workflow or Activity.
*/
toPayload<T>(value: T): Payload;

/**
* Converts a {@link Payload} back to a value.
*/
fromPayload<T>(payload: Payload): T;
}

Custom implementation

Some example implementations are in the SDK itself:

The sample project samples-typescript/ejson creates an EJSON custom PayloadConverter. It implements PayloadConverterWithEncoding instead of PayloadConverter so that it could be used with CompositePayloadConverter:

ejson/src/ejson-payload-converter.ts

import {
EncodingType,
METADATA_ENCODING_KEY,
Payload,
PayloadConverterError,
PayloadConverterWithEncoding,
} from '@temporalio/common';
import { decode, encode } from '@temporalio/common/lib/encoding';
import EJSON from 'ejson';

/**
* Converts between values and [EJSON](https://docs.meteor.com/api/ejson.html) Payloads.
*/
export class EjsonPayloadConverter implements PayloadConverterWithEncoding {
// Use 'json/plain' so that Payloads are displayed in the UI
public encodingType = 'json/plain' as EncodingType;

public toPayload(value: unknown): Payload | undefined {
if (value === undefined) return undefined;
let ejson;
try {
ejson = EJSON.stringify(value);
} catch (e) {
throw new UnsupportedEjsonTypeError(
`Can't run EJSON.stringify on this value: ${value}. Either convert it (or its properties) to EJSON-serializable values (see https://docs.meteor.com/api/ejson.html ), or create a custom data converter. EJSON.stringify error message: ${
errorMessage(
e,
)
}`,
e as Error,
);
}

return {
metadata: {
[METADATA_ENCODING_KEY]: encode('json/plain'),
// Include an additional metadata field to indicate that this is an EJSON payload
format: encode('extended'),
},
data: encode(ejson),
};
}

public fromPayload<T>(content: Payload): T {
return content.data ? EJSON.parse(decode(content.data)) : content.data;
}
}

export class UnsupportedEjsonTypeError extends PayloadConverterError {
public readonly name: string = 'UnsupportedJsonTypeError';

constructor(message: string | undefined, public readonly cause?: Error) {
super(message ?? undefined);
}
}

Then we instantiate one and export it:

ejson/src/payload-converter.ts

import {
CompositePayloadConverter,
UndefinedPayloadConverter,
} from '@temporalio/common';
import { EjsonPayloadConverter } from './ejson-payload-converter';

export const payloadConverter = new CompositePayloadConverter(
new UndefinedPayloadConverter(),
new EjsonPayloadConverter(),
);

We provide it to the Worker and Client:

ejson/src/worker.ts

const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'ejson',
dataConverter: {
payloadConverterPath: require.resolve('./payload-converter'),
},
});

ejson/src/client.ts

const client = new Client({
dataConverter: {
payloadConverterPath: require.resolve('./payload-converter'),
},
});

Then we can use supported data types in arguments:

ejson/src/client.ts

const user: User = {
id: uuid(),
// age: 1000n, BigInt isn't supported
hp: Infinity,
matcher: /.*Stormblessed/,
token: Uint8Array.from([1, 2, 3]),
createdAt: new Date(),
};

const handle = await client.workflow.start(example, {
args: [user],
taskQueue: 'ejson',
workflowId: `example-user-${user.id}`,
});

And they get parsed correctly for the Workflow:

ejson/src/workflows.ts

import type { Result, User } from './types';

export async function example(user: User): Promise<Result> {
const success = user.createdAt.getTime() < Date.now()
&& user.hp > 50
&& user.matcher.test('Kaladin Stormblessed')
&& user.token instanceof Uint8Array;
return { success, at: new Date() };
}

Protobufs

To serialize values as Protocol Buffers (protobufs):

  • Use protobufjs.

  • Use runtime-loaded messages (not generated classes) and MessageClass.create (not new MessageClass()).

  • Generate json-module.js with a command like the following:

    pbjs -t json-module -w commonjs -o protos/json-module.js protos/*.proto
  • Patch json-module.js:

protobufs/protos/root.js

const { patchProtobufRoot } = require('@temporalio/common/lib/protobufs');
const unpatchedRoot = require('./json-module');
module.exports = patchProtobufRoot(unpatchedRoot);

protobufs/src/payload-converter.ts

import { DefaultPayloadConverterWithProtobufs } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';

export const payloadConverter = new DefaultPayloadConverterWithProtobufs({
protobufRoot: root,
});

Alternatively, we can use Protobuf Payload Converters directly, or with other converters. If we know that we only use Protobuf objects, and we want them binary encoded (which saves space over proto3 JSON, but can't be viewed in the Web UI), we could do the following:

import { ProtobufBinaryPayloadConverter } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';

export const payloadConverter = new ProtobufBinaryPayloadConverter(root);

Similarly, if we wanted binary-encoded Protobufs in addition to the other default types, we could do the following:

import {
BinaryPayloadConverter,
CompositePayloadConverter,
JsonPayloadConverter,
UndefinedPayloadConverter,
} from '@temporalio/common';
import { ProtobufBinaryPayloadConverter } from '@temporalio/common/lib/protobufs';
import root from '../protos/root';

export const payloadConverter = new CompositePayloadConverter(
new UndefinedPayloadConverter(),
new BinaryPayloadConverter(),
new ProtobufBinaryPayloadConverter(root),
new JsonPayloadConverter(),
);
  • Provide it to the Worker:

protobufs/src/worker.ts

const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'protobufs',
dataConverter: {
payloadConverterPath: require.resolve('./payload-converter'),
},
});

WorkerOptions.dataConverter

  • Provide it to the Client:

protobufs/src/client.ts

import { Client } from '@temporalio/client';
import { v4 as uuid } from 'uuid';
import { foo, ProtoResult } from '../protos/root';
import { example } from './workflows';

async function run() {
const client = new Client({
dataConverter: {
payloadConverterPath: require.resolve('./payload-converter'),
},
});

const handle = await client.workflow.start(example, {
args: [foo.bar.ProtoInput.create({ name: 'Proto', age: 2 })],
// can't do:
// args: [new foo.bar.ProtoInput({ name: 'Proto', age: 2 })],
taskQueue: 'protobufs',
workflowId: 'my-business-id-' + uuid(),
});

console.log(`Started workflow ${handle.workflowId}`);

const result: ProtoResult = await handle.result();
console.log(result.toJSON());
}
  • Use protobufs in your Workflows and Activities:

protobufs/src/workflows.ts

import { proxyActivities } from '@temporalio/workflow';
import { foo, ProtoResult } from '../protos/root';
import type * as activities from './activities';

const { protoActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 minute',
});

export async function example(input: foo.bar.ProtoInput): Promise<ProtoResult> {
const result = await protoActivity(input);
return result;
}

protobufs/src/activities.ts

import { foo, ProtoResult } from '../protos/root';

export async function protoActivity(
input: foo.bar.ProtoInput,
): Promise<ProtoResult> {
return ProtoResult.create({
sentence: `${input.name} is ${input.age} years old.`,
});
}

Payload Codec

API documentation: PayloadCodec

The default PayloadCodec does nothing. To create a custom one, we implement the following interface:

interface PayloadCodec {
/**
* Encode an array of {@link Payload}s for sending over the wire.
* @param payloads May have length 0.
*/
encode(payloads: Payload[]): Promise<Payload[]>;

/**
* Decode an array of {@link Payload}s received from the wire.
*/
decode(payloads: Payload[]): Promise<Payload[]>;
}

Encryption

Background: Encryption

The following is an example class that implements the PayloadCodec interface:

encryption/src/encryption-codec.ts

import {
METADATA_ENCODING_KEY,
Payload,
PayloadCodec,
ValueError,
} from '@temporalio/common';
import { decode, encode } from '@temporalio/common/lib/encoding';
import { temporal } from '@temporalio/proto';
import { webcrypto as crypto } from 'node:crypto';
import { decrypt, encrypt } from './crypto';

const ENCODING = 'binary/encrypted';
const METADATA_ENCRYPTION_KEY_ID = 'encryption-key-id';

export class EncryptionCodec implements PayloadCodec {
constructor(
protected readonly keys: Map<string, crypto.CryptoKey>,
protected readonly defaultKeyId: string,
) {}

static async create(keyId: string): Promise<EncryptionCodec> {
const keys = new Map<string, crypto.CryptoKey>();
keys.set(keyId, await fetchKey(keyId));
return new this(keys, keyId);
}

async encode(payloads: Payload[]): Promise<Payload[]> {
return Promise.all(
payloads.map(async (payload) => ({
metadata: {
[METADATA_ENCODING_KEY]: encode(ENCODING),
[METADATA_ENCRYPTION_KEY_ID]: encode(this.defaultKeyId),
},
// Encrypt entire payload, preserving metadata
data: await encrypt(
temporal.api.common.v1.Payload.encode(payload).finish(),
this.keys.get(this.defaultKeyId)!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
),
})),
);
}

async decode(payloads: Payload[]): Promise<Payload[]> {
return Promise.all(
payloads.map(async (payload) => {
if (
!payload.metadata
|| decode(payload.metadata[METADATA_ENCODING_KEY]) !== ENCODING
) {
return payload;
}
if (!payload.data) {
throw new ValueError('Payload data is missing');
}

const keyIdBytes = payload.metadata[METADATA_ENCRYPTION_KEY_ID];
if (!keyIdBytes) {
throw new ValueError(
'Unable to decrypt Payload without encryption key id',
);
}

const keyId = decode(keyIdBytes);
let key = this.keys.get(keyId);
if (!key) {
key = await fetchKey(keyId);
this.keys.set(keyId, key);
}
const decryptedPayloadBytes = await decrypt(payload.data, key);
console.log('Decrypting payload.data:', payload.data);
return temporal.api.common.v1.Payload.decode(decryptedPayloadBytes);
}),
);
}
}

async function fetchKey(_keyId: string): Promise<crypto.CryptoKey> {
// In production, fetch key from a key management system (KMS). You may want to memoize requests if you'll be decoding
// Payloads that were encrypted using keys other than defaultKeyId.
const key = Buffer.from('test-key-test-key-test-key-test!');
const cryptoKey = await crypto.subtle.importKey(
'raw',
key,
{
name: 'AES-GCM',
},
true,
['encrypt', 'decrypt'],
);

return cryptoKey;
}

The encryption and decryption code is in src/crypto.ts. Because encryption is CPU intensive, and doing AES with the crypto module built into Node.js blocks the main thread, we use @ronomon/crypto-async, which uses the Node.js thread pool.

As before, we provide a custom Data Converter to the Client and Worker:

encryption/src/client.ts

const client = new Client({
dataConverter: await getDataConverter(),
});

const handle = await client.workflow.start(example, {
args: ['Alice: Private message for Bob.'],
taskQueue: 'encryption',
workflowId: `my-business-id-${uuid()}`,
});

console.log(`Started workflow ${handle.workflowId}`);
console.log(await handle.result());

encryption/src/worker.ts

const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
taskQueue: 'encryption',
dataConverter: await getDataConverter(),
});

When the Client sends 'Alice: Private message for Bob.' to the Workflow, it gets encrypted on the Client and decrypted in the Worker. The Workflow receives the decrypted message and appends another message. When it returns that longer string, the string gets encrypted by the Worker and decrypted by the Client.

encryption/src/workflows.ts

export async function example(message: string): Promise<string> {
return `${message}\nBob: Hi Alice, I'm Workflow Bob.`;
}

How to implement interceptors in TypeScript

Interceptors are a mechanism for modifying inbound and outbound SDK calls. Interceptors are commonly used to add tracing and authorization to the scheduling and execution of Workflows and Activities. You can compare these to "middleware" in other frameworks.

The TypeScript SDK comes with an optional interceptor package that adds tracing with OpenTelemetry. See how to use it in the interceptors-opentelemetry code sample.

Interceptor types

How interceptors work

Interceptors are run in a chain, and all interceptors work similarly. They accept two arguments: input and next, where next calls the next interceptor in the chain. All interceptor methods are optional—it's up to the implementor to choose which methods to intercept.

Interceptor examples

Log start and completion of Activities

import {
ActivityInput,
Next,
WorkflowOutboundCallsInterceptor,
} from '@temporalio/workflow';

export class ActivityLogInterceptor
implements WorkflowOutboundCallsInterceptor
{
constructor(public readonly workflowType: string) {}

async scheduleActivity(
input: ActivityInput,
next: Next<WorkflowOutboundCallsInterceptor, 'scheduleActivity'>,
): Promise<unknown> {
console.log('Starting activity', { activityType: input.activityType });
try {
return await next(input);
} finally {
console.log('Completed activity', {
workflow: this.workflowType,
activityType: input.activityType,
});
}
}
}

Authorization

import {
defaultDataConverter,
Next,
WorkflowInboundCallsInterceptor,
WorkflowInput,
} from '@temporalio/workflow';

/**
* WARNING: This demo is meant as a simple auth example.
* Do not use this for actual authorization logic.
* Auth headers should be encrypted and credentials
* stored outside of the codebase.
*/
export class DumbWorkflowAuthInterceptor
implements WorkflowInboundCallsInterceptor
{
public async execute(
input: WorkflowInput,
next: Next<WorkflowInboundCallsInterceptor, 'execute'>,
): Promise<unknown> {
const authHeader = input.headers.auth;
const { user, password } = authHeader
? await defaultDataConverter.fromPayload(authHeader)
: undefined;

if (!(user === 'admin' && password === 'admin')) {
throw new Error('Unauthorized');
}
return await next(input);
}
}

To properly do authorization from Workflow code, the Workflow would need to access encryption keys and possibly authenticate against an external user database, which requires the Workflow to break isolation. Please contact us if you need to discuss this further.

Interceptor registration

Activity and client interceptors registration

Workflow interceptors registration

Workflow interceptor registration is different from the other interceptors because they run in the Workflow isolate. To register Workflow interceptors, export an interceptors function from a file located in the workflows directory and provide the name of that file to the Worker on creation via WorkerOptions.

At the time of construction, the Workflow context is already initialized for the current Workflow. Use workflowInfo to add Workflow-specific information in the interceptor.

src/workflows/your-interceptors.ts

import { workflowInfo } from '@temporalio/workflow';

export const interceptors = () => ({
outbound: [new ActivityLogInterceptor(workflowInfo().workflowType)],
inbound: [],
});

src/worker/index.ts

const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
interceptors: {
workflowModules: [require.resolve('./workflows/your-interceptors')],
},
});