PersistentAI API Documentation / @persistentai/fireflow-executor / server / DBOSEventBus
Class: DBOSEventBus
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:31
DBOS-based implementation of IEventBus
Simplified wrapper around StreamBridge for execution event streaming. Uses generic DBOS stream infrastructure with MultiChannel pattern.
Features:
- Real-time streaming via PostgreSQL LISTEN/NOTIFY
- Automatic sharding across PGListener pool
- MultiChannel for efficient fan-out
- Backward compatible with IEventBus interface
Implements
Constructors
Constructor
new DBOSEventBus(
streamBridge):DBOSEventBus
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:35
Parameters
streamBridge
StreamBridge
Returns
DBOSEventBus
Methods
close()
close():
Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:151
Close the event bus
Returns
Promise<void>
Implementation of
closeStream()
closeStream(
executionId):Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:301
Close stream for specific execution (no-op)
DBOS automatically closes streams when workflow terminates
Parameters
executionId
string
Returns
Promise<void>
publishEvent()
publishEvent(
executionId,event):Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:49
Publish an execution event to DBOS stream
Note: This continues to use DBOS.writeStream() directly to maintain existing execution flow (no changes to ExecutionWorkflow)
Parameters
executionId
string
Execution ID (also workflow ID)
event
Event to publish
Returns
Promise<void>
Implementation of
subscribeToEvents()
subscribeToEvents(
executionId,fromIndex?,batchConfig?):AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:78
Subscribe to execution events
Parameters
executionId
string
Execution ID
fromIndex?
number = 0
Starting event index (0-based)
batchConfig?
EventBatchConfig
Optional batching configuration
Returns
AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>
Async iterable of event batches
Implementation of
subscribeToExecutionTree()
subscribeToExecutionTree(
rootWorkflowId,fromIndex?,batchConfig?):AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:169
Subscribe to execution tree events.
Reads events from the root workflow AND all branch subworkflows. Branch discovery is recursive — sub-branches of branches are also included.
When the root writes to its 'branches' stream, this method discovers those branches and subscribes to their 'events' streams as well. All events are merged into a single output iterable.
Falls back to regular subscribeToEvents if no branches exist (backwards compatible with old engine).
Parameters
rootWorkflowId
string
fromIndex?
number = 0
batchConfig?
EventBatchConfig
Returns
AsyncIterable<ExecutionEventImpl<ExecutionEventEnum>[]>
Implementation of
IEventBus.subscribeToExecutionTree
subscribeToStream()
subscribeToStream<
T>(workflowId,streamKey,fromOffset?,batchConfig?):AsyncIterable<T[]>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:113
Subscribe to an arbitrary DBOS stream by workflow ID and stream key. Used for port-level frontend streaming (STREAM_PUBLISHED events).
Type Parameters
T
T = any
Parameters
workflowId
string
streamKey
string
fromOffset?
number = 0
batchConfig?
EventBatchConfig
Returns
AsyncIterable<T[]>
Implementation of
unsubscribe()
unsubscribe(
executionId):Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSEventBus.ts:143
Unsubscribe from events
Parameters
executionId
string
Returns
Promise<void>