Skip to content

PersistentAI API Documentation / @persistent-ai/fireflow-types / IDBOSContextService

Interface: IDBOSContextService

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:54

DBOS Context Service Interface

Provides access to DBOS primitives (steps, workflows, messaging, streams) within node execute() methods.

Key Design Decisions:

  • Step names are auto-prefixed with nodeId to ensure uniqueness
  • Stream operations do NOT auto-prefix (allows multi-node shared streams)
  • currentNodeId comes from AsyncLocalStorage (parallel-safe)
  • Subflow limits are enforced to prevent resource exhaustion

Example

typescript
async execute(context: ExecutionContext): Promise<NodeExecutionResult> {
  const { dbos } = context.services!;

  // Run a durable step
  const result = await dbos.runStep(async () => {
    return await fetch('https://api.example.com/data');
  }, { name: 'fetch-api-data' });

  // Execute a subflow and wait for result
  // NOTE: Pass context to ensure proper hierarchy tracking (rootExecutionId, options, integrations)
  const subflowResult = await dbos.executeSubflow(context, {
    flowId: 'my-calculator-flow',
    eventName: 'calculate',
    inputs: { x: 10, y: 20 },
  });

  this.output = subflowResult.outputs;
  return {};
}

Properties

closeStream()

closeStream: (key) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:222

Close a DBOS stream. Signals to readers that no more data will be written.

IMPORTANT: Must be called at WORKFLOW level, not from within steps. Only writeStream() is allowed from steps.

Parameters

key

string

Stream key to close

Returns

Promise<void>


currentNodeId

readonly currentNodeId: string | undefined

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:275

The current node ID (from AsyncLocalStorage, parallel-safe). Undefined if called outside of node execution context.


executeSubflow()

executeSubflow: (context, params) => Promise<ISubflowResult>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:113

Execute a subflow synchronously (wait for completion).

This is the primary method for flow composition. The calling node blocks until the subflow completes.

Parameters

context

ExecutionContext

The current execution context (provides rootExecutionId, options, integrations)

params

ISubflowParams

Subflow execution parameters

Returns

Promise<ISubflowResult>

The subflow's result including outputs from FlowOutputNode

Throws

Error if subflow limits (depth, breadth, total) are exceeded

Example

typescript
const result = await dbos.executeSubflow(context, {
  flowId: 'calculator-flow',
  eventName: 'add',
  inputs: { a: 10, b: 20 },
});

if (result.status === 'completed') {
  this.output = result.outputs.sum;
}

executeSubflowsParallel()

executeSubflowsParallel: (context, subflows) => Promise<ISubflowResult[]>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:146

Execute multiple subflows in parallel and wait for all to complete.

Parameters

context

ExecutionContext

The current execution context (provides rootExecutionId, options, integrations)

subflows

ISubflowParams[]

Array of subflow parameters

Returns

Promise<ISubflowResult[]>

Array of results (in same order as input)

Throws

Error if breadth limit is exceeded

Example

typescript
const results = await dbos.executeSubflowsParallel(context, [
  { flowId: 'flow-a', inputs: { x: 1 } },
  { flowId: 'flow-b', inputs: { x: 2 } },
  { flowId: 'flow-c', inputs: { x: 3 } },
]);

executionId

readonly executionId: string

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:269

The current execution ID


getEvent()

getEvent: <T>(workflowId, key, timeoutSeconds?) => Promise<T | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:196

Get an event value from a workflow.

Type Parameters

T

T

Parameters

workflowId

string

Target workflow ID

key

string

Event key

timeoutSeconds?

number

How long to wait if not yet set

Returns

Promise<T | null>

The event value, or null if timeout


getEventNonDurable()

getEventNonDurable: <T>(workflowId, key, timeoutSeconds?) => Promise<T | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:321

Get an event value from a workflow without DBOS durability.

Uses external DBOSClient.getEvent() instead of DBOS.getEvent(). The result is NOT checkpointed -- if the workflow replays, this call will execute again (not return cached value).

Type Parameters

T

T

Parameters

workflowId

string

key

string

timeoutSeconds?

number

Returns

Promise<T | null>

Throws

Error if non-durable dependencies are not provided


getWorkflowStatus()

getWorkflowStatus: (workflowId) => Promise<IWorkflowStatus | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:283

Get the status of a workflow.

Parameters

workflowId

string

Target workflow ID

Returns

Promise<IWorkflowStatus | null>

Status info, or null if not found


now()

now: () => Promise<number>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:260

Get the current time in milliseconds since epoch.

This is a durable timestamp - if the workflow restarts, DBOS returns the same value as before.

Returns

Promise<number>


readStream()

readStream: <T>(workflowId, key, fromOffset?) => AsyncIterable<T>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:238

Read from a DBOS stream as an async iterable. Yields values until the stream is closed by the writer via closeStream().

Used for cross-workflow streaming bridges — reads data written by another workflow's writeStream() calls in real-time.

IMPORTANT: Must be called at WORKFLOW level, not from within steps.

Type Parameters

T

T

Parameters

workflowId

string

The workflow that owns the stream

key

string

Stream key to read from

fromOffset?

number

Start reading from this offset (default: 0)

Returns

AsyncIterable<T>

Async iterable that yields stream values until stream closes


readStreamNonDurable()

readStreamNonDurable: <T>(workflowId, streamKey, fromOffset?, batchConfig?) => AsyncIterable<T[]>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:331

Read from a stream without DBOS durability.

Uses EventBus/StreamBridge (PGListener-based) instead of DBOS.readStream(). Returns batched async iterable for efficient consumption.

Type Parameters

T

T

Parameters

workflowId

string

streamKey

string

fromOffset?

number

batchConfig?

IStreamBatchConfig

Returns

AsyncIterable<T[]>

Throws

Error if non-durable dependencies are not provided


recv()

recv: <T>(topic?, timeoutSeconds?) => Promise<T | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:170

Receive a message sent to this workflow.

Blocks until a message is received or timeout expires.

Type Parameters

T

T

Parameters

topic?

string

Optional topic to filter messages

timeoutSeconds?

number

How long to wait (default: 60)

Returns

Promise<T | null>

The message, or null if timeout


runStep()

runStep: <T>(fn, options?) => Promise<T>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:83

Run a function as a durable DBOS step.

Steps are checkpointed in PostgreSQL. If the workflow restarts, DBOS replays the step result from the checkpoint instead of re-executing.

Step names are auto-prefixed with {nodeId}: to ensure uniqueness.

Type Parameters

T

T

Parameters

fn

() => Promise<T>

The function to execute

options?

IStepOptions

Step configuration options

Returns

Promise<T>

The function's return value

Example

typescript
// Named step (becomes "myNodeId:fetch-user-data")
const user = await dbos.runStep(
  () => fetchUser(userId),
  { name: 'fetch-user-data' }
);

// Auto-named step (becomes "myNodeId:step-1")
const data = await dbos.runStep(() => processData());

send()

send: <T>(destinationWorkflowId, message, topic?) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:159

Send a message to another workflow.

Type Parameters

T

T

Parameters

destinationWorkflowId

string

Target workflow ID

message

T

Message payload

topic?

string

Optional message topic for filtering

Returns

Promise<void>


sendNonDurable()

sendNonDurable: <T>(destinationWorkflowId, message, topic?) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:347

Send a message to another workflow without DBOS durability.

Uses external DBOSClient.send() instead of DBOS.send(). The send is NOT checkpointed -- if the workflow replays, this call will execute again (potentially sending duplicate messages).

Type Parameters

T

T

Parameters

destinationWorkflowId

string

message

T

topic?

string

Returns

Promise<void>

Throws

Error if non-durable dependencies are not provided


setEvent()

setEvent: <T>(key, value) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:186

Set an event value for this workflow.

Events are different from messages:

  • Events persist and can be read multiple times
  • Messages are consumed when received

Type Parameters

T

T

Parameters

key

string

Event key

value

T

Event value

Returns

Promise<void>


sleep()

sleep: (durationMs) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:252

Sleep for a specified duration.

This is a durable sleep - if the workflow restarts, DBOS resumes from where it left off.

Parameters

durationMs

number

Duration to sleep in milliseconds

Returns

Promise<void>


startSubflow()

startSubflow: (context, params) => Promise<ISubflowHandle>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:127

Start a subflow asynchronously (don't wait for completion).

Returns a handle that can be used to:

  • Wait for the result later
  • Check status
  • Send messages to the child workflow

Parameters

context

ExecutionContext

The current execution context (provides rootExecutionId, options, integrations)

params

ISubflowParams

Subflow execution parameters

Returns

Promise<ISubflowHandle>

A handle to the running subflow


startSubflowNonDurable()

startSubflowNonDurable: (context, params) => Promise<ISubflowHandle>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:310

Start a subflow without DBOS durability guarantees.

Uses an external DBOSClient to:

  • Generate execution ID directly (no DBOS.runStep wrapper)
  • Create execution row directly (no DBOS.runStep wrapper)
  • Enqueue via DBOSClient.enqueue() instead of DBOS.startWorkflow()

The returned handle uses DBOSClient methods for getEvent, send, retrieveWorkflow.

Parameters

context

ExecutionContext

params

ISubflowParams

Returns

Promise<ISubflowHandle>

Throws

Error if non-durable dependencies are not provided


writeStream()

writeStream: <T>(key, value) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:211

Write to a DBOS stream for real-time event publishing.

IMPORTANT: Stream keys are NOT auto-prefixed with nodeId. This allows multiple nodes to write to shared streams.

Type Parameters

T

T

Parameters

key

string

Stream key (e.g., 'events', 'progress')

value

T

Value to write

Returns

Promise<void>

Licensed under BUSL-1.1