PersistentAI API Documentation / @persistentai/fireflow-executor / server / DBOSTaskQueue
Class: DBOSTaskQueue
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:42
DBOS-based implementation of ITaskQueue interface
This implementation uses DBOS Durable Queues with direct DBOS.startWorkflow() calls. It provides a compatible interface with the existing ITaskQueue so it can be used as a drop-in replacement in the system.
Key Features:
- Uses @DBOS.workflow() decorated class method directly
- No manual offset management (DBOS handles it)
- No consumer groups (DBOS handles distribution)
- consumeTasks() is a no-op (DBOS auto-consumes via workflow registration)
- Built-in exactly-once semantics through idempotency
Usage:
const taskQueue = new DBOSTaskQueue();
// Publish task - uses DBOS.startWorkflow() directly
await taskQueue.publishTask(task);
// Consumption is automatic - no need to call consumeTasks()Implements
Constructors
Constructor
new DBOSTaskQueue(
queueName?):DBOSTaskQueue
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:50
Create a DBOS task queue
Parameters
queueName?
string = QUEUE_NAME
Optional queue name (defaults to 'fireflow-executions')
Returns
DBOSTaskQueue
Methods
close()
close():
Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:136
Close the queue and cleanup resources
NOTE: This is handled by DBOSExecutionWorker.stop(). No cleanup needed at the queue level.
Returns
Promise<void>
Implementation of
consumeTasks()
consumeTasks(
_handler):Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:100
Consume tasks from the queue
NOTE: This is a no-op for DBOS implementation because DBOS automatically consumes from the queue through workflow registration. Workers don't need to manually subscribe - DBOS handles consumption internally.
The handler parameter is ignored because DBOS workflows are registered via the @DBOS.workflow() decorator.
Parameters
_handler
TaskHandler
Task handler (unused in DBOS implementation)
Returns
Promise<void>
Implementation of
getPendingCount()
getPendingCount():
Promise<number>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:125
Get the number of pending tasks in the queue
NOTE: This is optional in the ITaskQueue interface and not currently implemented for DBOS. DBOS workflows are tracked in system tables, but there's no simple API to count pending tasks.
Returns
Promise<number>
0 (not implemented)
Implementation of
publishTask()
publishTask(
task):Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:64
Publish an execution task to the DBOS queue
Uses DBOS.startWorkflow() directly with the ExecutionWorkflows class. The task will be durably stored in PostgreSQL and eventually processed by a worker. DBOS guarantees at-least-once execution.
Parameters
task
Execution task to publish
Returns
Promise<void>
Implementation of
stopConsuming()
stopConsuming():
Promise<void>
Defined in: packages/fireflow-executor/server/implementations/dbos/DBOSTaskQueue.ts:111
Stop consuming tasks
NOTE: This is handled by DBOSExecutionWorker.stop() which shuts down the entire DBOS runtime. Individual task queues don't need to stop.
Returns
Promise<void>