PersistentAI API Documentation / @persistentai/fireflow-types / IDBOSContextService
Interface: IDBOSContextService
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:53
DBOS Context Service Interface
Provides access to DBOS primitives (steps, workflows, messaging, streams) within node execute() methods.
Key Design Decisions:
- Step names are auto-prefixed with nodeId to ensure uniqueness
- Stream operations do NOT auto-prefix (allows multi-node shared streams)
- currentNodeId comes from AsyncLocalStorage (parallel-safe)
- Subflow limits are enforced to prevent resource exhaustion
Example
async execute(context: ExecutionContext): Promise<NodeExecutionResult> {
const { dbos } = context.services!;
// Run a durable step
const result = await dbos.runStep(async () => {
return await fetch('https://api.example.com/data');
}, { name: 'fetch-api-data' });
// Execute a subflow and wait for result
// NOTE: Pass context to ensure proper hierarchy tracking (rootExecutionId, options, integrations)
const subflowResult = await dbos.executeSubflow(context, {
flowId: 'my-calculator-flow',
eventName: 'calculate',
inputs: { x: 10, y: 20 },
});
this.output = subflowResult.outputs;
return {};
}Properties
closeStream()
closeStream: (
key) =>Promise<void>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:221
Close a DBOS stream. Signals to readers that no more data will be written.
IMPORTANT: Must be called at WORKFLOW level, not from within steps. Only writeStream() is allowed from steps.
Parameters
key
string
Stream key to close
Returns
Promise<void>
currentNodeId
readonlycurrentNodeId:string|undefined
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:274
The current node ID (from AsyncLocalStorage, parallel-safe). Undefined if called outside of node execution context.
executeSubflow()
executeSubflow: (
context,params) =>Promise<ISubflowResult>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:112
Execute a subflow synchronously (wait for completion).
This is the primary method for flow composition. The calling node blocks until the subflow completes.
Parameters
context
The current execution context (provides rootExecutionId, options, integrations)
params
Subflow execution parameters
Returns
Promise<ISubflowResult>
The subflow's result including outputs from FlowOutputNode
Throws
Error if subflow limits (depth, breadth, total) are exceeded
Example
const result = await dbos.executeSubflow(context, {
flowId: 'calculator-flow',
eventName: 'add',
inputs: { a: 10, b: 20 },
});
if (result.status === 'completed') {
this.output = result.outputs.sum;
}executeSubflowsParallel()
executeSubflowsParallel: (
context,subflows) =>Promise<ISubflowResult[]>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:145
Execute multiple subflows in parallel and wait for all to complete.
Parameters
context
The current execution context (provides rootExecutionId, options, integrations)
subflows
Array of subflow parameters
Returns
Promise<ISubflowResult[]>
Array of results (in same order as input)
Throws
Error if breadth limit is exceeded
Example
const results = await dbos.executeSubflowsParallel(context, [
{ flowId: 'flow-a', inputs: { x: 1 } },
{ flowId: 'flow-b', inputs: { x: 2 } },
{ flowId: 'flow-c', inputs: { x: 3 } },
]);executionId
readonlyexecutionId:string
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:268
The current execution ID
getEvent()
getEvent: <
T>(workflowId,key,timeoutSeconds?) =>Promise<T|null>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:195
Get an event value from a workflow.
Type Parameters
T
T
Parameters
workflowId
string
Target workflow ID
key
string
Event key
timeoutSeconds?
number
How long to wait if not yet set
Returns
Promise<T | null>
The event value, or null if timeout
getWorkflowStatus()
getWorkflowStatus: (
workflowId) =>Promise<IWorkflowStatus|null>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:282
Get the status of a workflow.
Parameters
workflowId
string
Target workflow ID
Returns
Promise<IWorkflowStatus | null>
Status info, or null if not found
now()
now: () =>
Promise<number>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:259
Get the current time in milliseconds since epoch.
This is a durable timestamp - if the workflow restarts, DBOS returns the same value as before.
Returns
Promise<number>
readStream()
readStream: <
T>(workflowId,key,fromOffset?) =>AsyncIterable<T>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:237
Read from a DBOS stream as an async iterable. Yields values until the stream is closed by the writer via closeStream().
Used for cross-workflow streaming bridges — reads data written by another workflow's writeStream() calls in real-time.
IMPORTANT: Must be called at WORKFLOW level, not from within steps.
Type Parameters
T
T
Parameters
workflowId
string
The workflow that owns the stream
key
string
Stream key to read from
fromOffset?
number
Start reading from this offset (default: 0)
Returns
AsyncIterable<T>
Async iterable that yields stream values until stream closes
recv()
recv: <
T>(topic?,timeoutSeconds?) =>Promise<T|null>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:169
Receive a message sent to this workflow.
Blocks until a message is received or timeout expires.
Type Parameters
T
T
Parameters
topic?
string
Optional topic to filter messages
timeoutSeconds?
number
How long to wait (default: 60)
Returns
Promise<T | null>
The message, or null if timeout
runStep()
runStep: <
T>(fn,options?) =>Promise<T>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:82
Run a function as a durable DBOS step.
Steps are checkpointed in PostgreSQL. If the workflow restarts, DBOS replays the step result from the checkpoint instead of re-executing.
Step names are auto-prefixed with {nodeId}: to ensure uniqueness.
Type Parameters
T
T
Parameters
fn
() => Promise<T>
The function to execute
options?
Step configuration options
Returns
Promise<T>
The function's return value
Example
// Named step (becomes "myNodeId:fetch-user-data")
const user = await dbos.runStep(
() => fetchUser(userId),
{ name: 'fetch-user-data' }
);
// Auto-named step (becomes "myNodeId:step-1")
const data = await dbos.runStep(() => processData());send()
send: <
T>(destinationWorkflowId,message,topic?) =>Promise<void>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:158
Send a message to another workflow.
Type Parameters
T
T
Parameters
destinationWorkflowId
string
Target workflow ID
message
T
Message payload
topic?
string
Optional message topic for filtering
Returns
Promise<void>
setEvent()
setEvent: <
T>(key,value) =>Promise<void>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:185
Set an event value for this workflow.
Events are different from messages:
- Events persist and can be read multiple times
- Messages are consumed when received
Type Parameters
T
T
Parameters
key
string
Event key
value
T
Event value
Returns
Promise<void>
sleep()
sleep: (
durationMs) =>Promise<void>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:251
Sleep for a specified duration.
This is a durable sleep - if the workflow restarts, DBOS resumes from where it left off.
Parameters
durationMs
number
Duration to sleep in milliseconds
Returns
Promise<void>
startSubflow()
startSubflow: (
context,params) =>Promise<ISubflowHandle>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:126
Start a subflow asynchronously (don't wait for completion).
Returns a handle that can be used to:
- Wait for the result later
- Check status
- Send messages to the child workflow
Parameters
context
The current execution context (provides rootExecutionId, options, integrations)
params
Subflow execution parameters
Returns
Promise<ISubflowHandle>
A handle to the running subflow
writeStream()
writeStream: <
T>(key,value) =>Promise<void>
Defined in: packages/fireflow-types/src/execution/services/dbos-context-service.ts:210
Write to a DBOS stream for real-time event publishing.
IMPORTANT: Stream keys are NOT auto-prefixed with nodeId. This allows multiple nodes to write to shared streams.
Type Parameters
T
T
Parameters
key
string
Stream key (e.g., 'events', 'progress')
value
T
Value to write
Returns
Promise<void>