Skip to content

PersistentAI API Documentation / @persistentai/fireflow-types / IDBOSContextService

Interface: IDBOSContextService

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:53

DBOS Context Service Interface

Provides access to DBOS primitives (steps, workflows, messaging, streams) within node execute() methods.

Key Design Decisions:

  • Step names are auto-prefixed with nodeId to ensure uniqueness
  • Stream operations do NOT auto-prefix (allows multi-node shared streams)
  • currentNodeId comes from AsyncLocalStorage (parallel-safe)
  • Subflow limits are enforced to prevent resource exhaustion

Example

typescript
async execute(context: ExecutionContext): Promise<NodeExecutionResult> {
  const { dbos } = context.services!;

  // Run a durable step
  const result = await dbos.runStep(async () => {
    return await fetch('https://api.example.com/data');
  }, { name: 'fetch-api-data' });

  // Execute a subflow and wait for result
  // NOTE: Pass context to ensure proper hierarchy tracking (rootExecutionId, options, integrations)
  const subflowResult = await dbos.executeSubflow(context, {
    flowId: 'my-calculator-flow',
    eventName: 'calculate',
    inputs: { x: 10, y: 20 },
  });

  this.output = subflowResult.outputs;
  return {};
}

Properties

closeStream()

closeStream: (key) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:221

Close a DBOS stream. Signals to readers that no more data will be written.

IMPORTANT: Must be called at WORKFLOW level, not from within steps. Only writeStream() is allowed from steps.

Parameters

key

string

Stream key to close

Returns

Promise<void>


currentNodeId

readonly currentNodeId: string | undefined

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:274

The current node ID (from AsyncLocalStorage, parallel-safe). Undefined if called outside of node execution context.


executeSubflow()

executeSubflow: (context, params) => Promise<ISubflowResult>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:112

Execute a subflow synchronously (wait for completion).

This is the primary method for flow composition. The calling node blocks until the subflow completes.

Parameters

context

ExecutionContext

The current execution context (provides rootExecutionId, options, integrations)

params

ISubflowParams

Subflow execution parameters

Returns

Promise<ISubflowResult>

The subflow's result including outputs from FlowOutputNode

Throws

Error if subflow limits (depth, breadth, total) are exceeded

Example

typescript
const result = await dbos.executeSubflow(context, {
  flowId: 'calculator-flow',
  eventName: 'add',
  inputs: { a: 10, b: 20 },
});

if (result.status === 'completed') {
  this.output = result.outputs.sum;
}

executeSubflowsParallel()

executeSubflowsParallel: (context, subflows) => Promise<ISubflowResult[]>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:145

Execute multiple subflows in parallel and wait for all to complete.

Parameters

context

ExecutionContext

The current execution context (provides rootExecutionId, options, integrations)

subflows

ISubflowParams[]

Array of subflow parameters

Returns

Promise<ISubflowResult[]>

Array of results (in same order as input)

Throws

Error if breadth limit is exceeded

Example

typescript
const results = await dbos.executeSubflowsParallel(context, [
  { flowId: 'flow-a', inputs: { x: 1 } },
  { flowId: 'flow-b', inputs: { x: 2 } },
  { flowId: 'flow-c', inputs: { x: 3 } },
]);

executionId

readonly executionId: string

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:268

The current execution ID


getEvent()

getEvent: <T>(workflowId, key, timeoutSeconds?) => Promise<T | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:195

Get an event value from a workflow.

Type Parameters

T

T

Parameters

workflowId

string

Target workflow ID

key

string

Event key

timeoutSeconds?

number

How long to wait if not yet set

Returns

Promise<T | null>

The event value, or null if timeout


getWorkflowStatus()

getWorkflowStatus: (workflowId) => Promise<IWorkflowStatus | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:282

Get the status of a workflow.

Parameters

workflowId

string

Target workflow ID

Returns

Promise<IWorkflowStatus | null>

Status info, or null if not found


now()

now: () => Promise<number>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:259

Get the current time in milliseconds since epoch.

This is a durable timestamp - if the workflow restarts, DBOS returns the same value as before.

Returns

Promise<number>


readStream()

readStream: <T>(workflowId, key, fromOffset?) => AsyncIterable<T>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:237

Read from a DBOS stream as an async iterable. Yields values until the stream is closed by the writer via closeStream().

Used for cross-workflow streaming bridges — reads data written by another workflow's writeStream() calls in real-time.

IMPORTANT: Must be called at WORKFLOW level, not from within steps.

Type Parameters

T

T

Parameters

workflowId

string

The workflow that owns the stream

key

string

Stream key to read from

fromOffset?

number

Start reading from this offset (default: 0)

Returns

AsyncIterable<T>

Async iterable that yields stream values until stream closes


recv()

recv: <T>(topic?, timeoutSeconds?) => Promise<T | null>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:169

Receive a message sent to this workflow.

Blocks until a message is received or timeout expires.

Type Parameters

T

T

Parameters

topic?

string

Optional topic to filter messages

timeoutSeconds?

number

How long to wait (default: 60)

Returns

Promise<T | null>

The message, or null if timeout


runStep()

runStep: <T>(fn, options?) => Promise<T>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:82

Run a function as a durable DBOS step.

Steps are checkpointed in PostgreSQL. If the workflow restarts, DBOS replays the step result from the checkpoint instead of re-executing.

Step names are auto-prefixed with {nodeId}: to ensure uniqueness.

Type Parameters

T

T

Parameters

fn

() => Promise<T>

The function to execute

options?

IStepOptions

Step configuration options

Returns

Promise<T>

The function's return value

Example

typescript
// Named step (becomes "myNodeId:fetch-user-data")
const user = await dbos.runStep(
  () => fetchUser(userId),
  { name: 'fetch-user-data' }
);

// Auto-named step (becomes "myNodeId:step-1")
const data = await dbos.runStep(() => processData());

send()

send: <T>(destinationWorkflowId, message, topic?) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:158

Send a message to another workflow.

Type Parameters

T

T

Parameters

destinationWorkflowId

string

Target workflow ID

message

T

Message payload

topic?

string

Optional message topic for filtering

Returns

Promise<void>


setEvent()

setEvent: <T>(key, value) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:185

Set an event value for this workflow.

Events are different from messages:

  • Events persist and can be read multiple times
  • Messages are consumed when received

Type Parameters

T

T

Parameters

key

string

Event key

value

T

Event value

Returns

Promise<void>


sleep()

sleep: (durationMs) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:251

Sleep for a specified duration.

This is a durable sleep - if the workflow restarts, DBOS resumes from where it left off.

Parameters

durationMs

number

Duration to sleep in milliseconds

Returns

Promise<void>


startSubflow()

startSubflow: (context, params) => Promise<ISubflowHandle>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:126

Start a subflow asynchronously (don't wait for completion).

Returns a handle that can be used to:

  • Wait for the result later
  • Check status
  • Send messages to the child workflow

Parameters

context

ExecutionContext

The current execution context (provides rootExecutionId, options, integrations)

params

ISubflowParams

Subflow execution parameters

Returns

Promise<ISubflowHandle>

A handle to the running subflow


writeStream()

writeStream: <T>(key, value) => Promise<void>

Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:210

Write to a DBOS stream for real-time event publishing.

IMPORTANT: Stream keys are NOT auto-prefixed with nodeId. This allows multiple nodes to write to shared streams.

Type Parameters

T

T

Parameters

key

string

Stream key (e.g., 'events', 'progress')

value

T

Value to write

Returns

Promise<void>

Licensed under BUSL-1.1