Skip to content

PersistentAI API Documentation / @persistentai/fireflow-executor

@persistentai/fireflow-executor

Durable execution engine for PersistentAI's flow-based programming framework. Orchestrates flow executions with exactly-once semantics via DBOS, real-time event streaming over PostgreSQL LISTEN/NOTIFY, and distributed worker coordination.

Overview

This package is the execution core of PersistentAI. It takes a flow definition (a computational graph of nodes and edges) and executes it durably -- meaning that if a worker crashes mid-execution, DBOS automatically resumes from the last checkpoint with no data loss.

Three export paths serve different consumers:

ExportConsumerPurpose
./serverfireflow-execution-api, fireflow-execution-workerDBOS workflows, services, tRPC router, stores
./clientfireflow-frontendReact tRPC hooks, WebSocket client
./typesAll consumersExecutionStatus, ExecutionTask, shared interfaces

Key properties:

  • Exactly-once execution via DBOS workflow IDs and deduplication
  • Automatic crash recovery via PostgreSQL-backed checkpoints
  • PostgreSQL-only (no Kafka dependency in DBOS mode)
  • Signal pattern prevents race conditions between subscription and execution start
  • Child execution spawning from Event Emitter nodes (up to depth 100)

Architecture

System Diagram

┌─────────────────────────┐
│  Frontend (React)       │
│  @.../executor/client   │
│                         │
│  WebSocket subscription─┼───────────┐
│  tRPC mutations ────────┼──────┐    │
└─────────────────────────┘      │    │
                                 │    │
                                 ▼    │
┌─────────────────────────────────────┴─────────┐
│  Execution API (tRPC Server)                   │
│  @.../executor/server  (API mode)              │
│                                                │
│  - createServicesForAPI() -- NO DBOS runtime   │
│  - APITaskQueue enqueues via DBOSClient        │
│  - DBOSClient.send() for START_SIGNAL          │
│  - StreamBridge for event subscriptions        │
└───────────────────┬────────────────────────────┘
                    │ PostgreSQL (DBOS durable queue)

┌────────────────────────────────────────────────┐
│  Execution Worker (DBOS Runtime)               │
│  @.../executor/server  (Worker mode)           │
│                                                │
│  - createServicesForWorker() -- full DBOS      │
│  - ExecutionWorkflows.executeFireFlow()        │
│  - DBOS.writeStream() → PostgreSQL             │
│  - Spawns child executions                     │
└───────────────────┬────────────────────────────┘


┌────────────────────────────────────────────────┐
│  PostgreSQL                                    │
│  - DBOS system tables (workflow checkpoints)   │
│  - fireflow_executions (execution records)     │
│  - DBOS streams (real-time event delivery)     │
│  - LISTEN/NOTIFY (push notifications)          │
└────────────────────────────────────────────────┘

Execution Lifecycle (3-Phase Workflow)

The core workflow lives in server/dbos/workflows/ExecutionWorkflows.ts. Every flow execution follows three phases:

┌─────────────────────────────────────────────────────────────┐
│ Phase 1: Stream Initialization                              │
│                                                             │
│  1. Load execution row from database                        │
│  2. Write EXECUTION_CREATED event (index -1)                │
│     └─ Creates the DBOS stream immediately                  │
│  3. Wait for START_SIGNAL via DBOS.recv()                   │
│     ├─ Parent executions: 5-minute timeout                  │
│     └─ Child executions: skip wait (auto-start)             │
├─────────────────────────────────────────────────────────────┤
│ Phase 2: Flow Execution                                     │
│                                                             │
│  1. updateToRunning() ─── durable checkpoint                │
│  2. Load flow via FlowCachedLoader (30min TTL)              │
│  3. Create services: DBOSContextService, VFS, VFSWrite      │
│  4. Create ExecutionContext with services                    │
│  5. Create SPTreeExecutionEngine                             │
│  6. Subscribe engine events → buffer → DBOS.writeStream()   │
│  7. engine.execute() ─── runs at WORKFLOW level              │
│  8. Flush event buffer, collect child tasks                  │
├─────────────────────────────────────────────────────────────┤
│ Phase 3: Child Spawning                                     │
│                                                             │
│  1. Collect emitted events from context.emittedEvents       │
│  2. Generate child execution IDs (wrapped in steps)         │
│  3. Create child execution rows in database                 │
│  4. Parallel spawn via DBOS.startWorkflow() + allSettled    │
│  5. Await all child results with Promise.allSettled()        │
│  6. updateToCompleted() ─── final durable checkpoint        │
└─────────────────────────────────────────────────────────────┘

Why execution runs at workflow level (not inside a step): Nodes need access to DBOS primitives via context.services.dbos.runStep(). Running inside a step would prevent nodes from creating their own durable checkpoints.

Signal Pattern

Solves the race condition where a client subscribes to events before the execution stream exists:

1. create()     → Workflow starts → writes EXECUTION_CREATED → stream exists
2. subscribe()  → Stream already exists → gets EXECUTION_CREATED immediately
3. start()      → Sends START_SIGNAL → workflow continues to Phase 2

Without this pattern, the client could subscribe to a nonexistent stream and miss the initial event.

Event Streaming Pipeline

ExecutionEngine
  │  (synchronous events)

In-memory buffer (pendingEvents[])
  │  (async drain by long-running DBOS step)

DBOS.writeStream('events', {...})
  │  (persisted in PostgreSQL)

PostgreSQL NOTIFY on channel 'ds_{workflowId}_{streamKey}'


PGListenerPool (10 pg-listen connections)
  │  Hash-based routing: FNV-1a(streamId) % 10
  │  Fallback: least-loaded if listener at capacity

StreamBridge → DBOSEventBus.subscribeToEvents()
  │  Batching: max 100 events, 25ms timeout

tRPC subscription (async generator)


WebSocket → Frontend

Pool configuration (from server/implementations/dbos/streaming/types.ts):

ParameterValueDescription
Pool size10PGListener instances
Max streams/listener1000Before routing to next listener
StrategyhashConsistent routing via FNV-1a
Health check30sConnection verification interval
Batch size100Max events per batch to consumers
Batch timeout25msMax wait before flushing batch
Channel prefixds_Fits PostgreSQL 63-byte NAMEDATALEN

Package Exports

./server

Exports from server/index.ts:

DBOS Integration:

  • ExecutionWorkflows -- DBOS workflow class with executeFireFlow()
  • DBOSExecutionWorker -- Worker lifecycle management
  • executionQueue, QUEUE_NAME -- Module-level workflow queue
  • initializeDBOS, shutdownDBOS -- DBOS lifecycle
  • initializeUpdateStatusSteps -- Inject store into durable steps
  • updateToRunning, updateToCompleted, updateToFailed -- Durable status steps
  • CommandController (type) -- Shared command state interface

Implementations:

  • DBOSEventBus -- DBOS stream-based event pub/sub
  • DBOSTaskQueue -- Worker-mode task queue (direct DBOS.startWorkflow())
  • APITaskQueue -- API-mode task queue (enqueue via DBOSClient)
  • InMemoryEventBus, InMemoryTaskQueue -- Local development implementations

Services:

  • ExecutionService -- Main execution orchestrator
  • RecoveryService -- Failed execution recovery
  • createServices() -- Auto-detect initialization
  • createServicesForAPI() -- API-only mode (no DBOS runtime)
  • createServicesForWorker() -- Worker mode (full DBOS runtime)
  • closeServices(), getServices() -- Lifecycle management
  • ServiceInstances (type) -- Service container interface

Stores:

  • PostgresExecutionStore -- PostgreSQL execution persistence
  • getExecutionStore(), getFlowStore(), loadFlow() -- Store accessors
  • IExecutionStore (type) -- Store interface
  • executionsTable, executionClaimsTable, executionRecoveryTable -- Drizzle schemas
  • ExecutionRow, ExecutionClaimRow, ExecutionRecoveryRow (types) -- Row types

tRPC:

  • executionRouter -- All execution procedures
  • createTRPCContext -- Auth + services injection
  • ExecutionRouter, TRPCContext, ExecutorContext (types)

Utilities:

  • config -- Environment configuration object
  • createLogger() -- Pino logger factory
  • getDatabaseMain(), getDatabaseExecutions() -- PostgreSQL pools
  • closeDatabaseMain(), closeDatabaseExecutions() -- Pool cleanup
  • createWSServer() -- WebSocket server for tRPC
  • ExecutionMode (type) -- 'local' | 'dbos'

./client

Exports from client/index.ts:

ExportTypeDescription
trpcReactTRPCReacttRPC React hooks instance
createTRPCClient(opts)FunctionWebSocket client (keep-alive 5s, pong timeout 3s)
TRPCProviderReact ComponentContext provider wrapping tRPC + QueryClient
useTRPC()HookAccess tRPC proxy in components
useTRPCClient()HookAccess raw tRPC client
getExecutorQueryClient()FunctionTanStack Query client singleton (staleTime: 1000ms)
ReactQueryOptionsTypeInferred React Query options
RouterInputsTypeInferred router input types
RouterOutputsTypeInferred router output types
TRPCClientTypetRPC client type

./types

Exports from types/:

ExecutionStatus (enum): Idle | Creating | Created | Running | Paused | Completed | Failed | Stopped

ExecutionCommandType (enum): CREATE | START | STOP | PAUSE | STEP | RESUME | HEARTBEAT

Interfaces:

  • ExecutionTask -- Task payload for queue (executionId, flowId, debug, retry config)
  • ExecutionCommand -- Idempotent lifecycle command with payload
  • ExecutionInstance -- Runtime state (task, row, context, flow, engine)
  • ExecutionClaim -- Worker claim with heartbeat tracking
  • ExecutionTreeNode -- Hierarchical execution node (id, parentId, level)
  • RootExecution -- Root execution with nested depth/count
  • ExecutionError -- Error with optional nodeId and stack
  • ExecutionEventMessage -- Event with executionId, timestamp, workerId
  • RetryHistoryEntry -- Retry attempt tracking

Service Factory & Initialization

Three initialization paths in server/services/ServiceFactory.ts:

FunctionUse CaseDBOS RuntimeTask QueueEvent Bus
createServicesForAPI()Execution API serverNoAPITaskQueue (DBOSClient)DBOSEventBus
createServicesForWorker()Execution workerYes (full)DBOSTaskQueueDBOSEventBus
createServices()General (auto-detect)If ENABLE_DBOS_EXECUTION=trueAutoAuto

All factories are singletons -- calling them multiple times returns the same instance.

ServiceInstances Interface

typescript
interface ServiceInstances {
  eventBus: IEventBus
  taskQueue: ITaskQueue
  executionStore: IExecutionStore
  executionService: ExecutionService
  flowStore: IFlowStore
  authService: AuthService
  ownershipResolver: IOwnershipResolver
  dbosWorker?: DBOSExecutionWorker  // Only in worker mode
  dbosClient?: DBOSClient           // Only in API mode
}

Initialization Order (Critical)

For worker mode, the queue MUST be created at module level BEFORE DBOS.launch():

1. Import queue.ts → WorkflowQueue created at module level
2. Import ExecutionWorkflows → @DBOS.workflow() registered
3. Call initializeDBOS() → DBOS.launch() picks up queue + workflow
4. Create StreamBridge → 10 PGListeners for event streaming
5. Create DBOSEventBus → Wraps StreamBridge
6. initializeUpdateStatusSteps() → Inject store into durable steps
7. Create remaining services

If the queue is created AFTER DBOS.launch(), the worker can enqueue but cannot dequeue -- tasks will pile up indefinitely.

tRPC API Reference

All procedures are defined in server/trpc/router.ts.

Execution Lifecycle

ProcedureTypeAuthInputDescription
createMutationauthed{ flowId, options?, integration?, events? }Creates execution row + starts DBOS workflow (waits for signal)
startMutationexecutionContext{ executionId }Sends START_SIGNAL to waiting workflow
stopMutationexecutionContext{ executionId, reason? }Cancels DBOS workflow via cancelWorkflow()
pauseMutationexecutionContext{ executionId, reason? }Sends PAUSE command via DBOS messaging (debug mode)
resumeMutationexecutionContext{ executionId }Sends RESUME command via DBOS messaging (debug mode)

Queries

ProcedureTypeAuthInputDescription
getExecutionDetailsQueryexecutionContext{ executionId }Returns full ExecutionRow
getExecutionsTreeQueryexecutionContext{ executionId }Returns parent + all child executions
getRootExecutionsQueryauthed{ flowId, limit, after? }Paginated root executions for a flow

Subscriptions (WebSocket)

ProcedureTypeAuthInputDescription
subscribeToExecutionEventsSubscriptionexecutionContext{ executionId, fromIndex?, eventTypes?, batchSize?, batchTimeoutMs? }Real-time execution event stream
subscribeToPortStreamSubscriptionexecutionContext{ executionId, workflowId, streamKey, fromOffset?, batchSize?, batchTimeoutMs? }Port-level data stream

DBOS Bridge (Mini-App)

Flow-scoped access to DBOS primitives for external mini-apps:

ProcedureTypeAuthInputDescription
getEventQueryflowContext{ flowId, workflowId, key, timeoutSeconds? }Get DBOS event value
sendMessageMutationflowContext{ flowId, workflowId, message, topic? }Send message to workflow
getWorkflowStatusQueryflowContext{ flowId, workflowId }Get DBOS workflow status
subscribeToStreamSubscriptionflowContext{ flowId, workflowId, streamKey, fromOffset?, batchSize?, batchTimeoutMs? }Subscribe to any DBOS stream

Auth Middleware

  • authedProcedure -- Validates JWT token (skipped if auth disabled or dev mode)
  • flowContextProcedure -- Validates flowId access (extends authed, admin bypass)
  • executionContextProcedure -- Validates executionId access via flow ownership (extends authed, admin bypass)

Context Services

Nodes access DBOS primitives at runtime via context.services. These are injected by the workflow during Phase 2.

DBOS Context Service (context.services.dbos)

Defined in server/services/context/DBOSContextService.ts:

MethodDescription
runStep(fn, options?)Execute durable step (name auto-prefixed with nodeId:step-N)
startSubflow(context, params)Start child flow, returns handle with getResult(), waitForOutput(), send()
executeSubflow(context, params)Start and await child flow result
executeSubflowsParallel(context, subflows)Start multiple child flows in parallel
send(workflowId, message, topic?)Send message to another workflow
recv(topic?, timeoutSeconds?)Receive message (WORKFLOW level only, not steps)
setEvent(key, value)Set a durable event value
getEvent(workflowId, key, timeoutSeconds?)Get an event value from any workflow
writeStream(key, value)Write to DBOS stream (allowed from steps)
readStream(workflowId, key)Read from DBOS stream
closeStream(key)Close a DBOS stream
sleep(durationMs)Durable sleep (survives replay)
now()Durable timestamp (returns same value on replay)
getWorkflowStatus(workflowId)Query workflow status

Subflow limits (enforced by DBOSContextService):

  • Max depth: 100 levels
  • Max breadth: 10 concurrent subflows per parent
  • Max total: 1000 subflows per execution

VFS Context Services

  • context.services.vfs (VFSContextService) -- Read-only virtual file system access
  • context.services.vfsWrite (VFSWriteContextService) -- Write virtual file system access

Both use the flow owner's userId for permission checks.

Database Schema

Defined in server/stores/postgres/schema.ts. All tables use the fireflow_ prefix.

fireflow_executions

ColumnTypeDescription
idtext PKExecution ID (nanoid)
flow_idtextFlow being executed
owner_idtextFlow owner ID
root_execution_idtextRoot of execution tree
parent_execution_idtextParent execution (null for root)
statustext enumcreated, running, paused, completed, failed, stopped
created_attimestampCreation time
updated_attimestampLast update time
started_attimestampWhen execution began
completed_attimestampWhen execution finished
error_messagetextError message (if failed)
error_node_idtextNode that caused failure
execution_depthintegerNesting level (0 = root)
optionsjsonbExecutionOptions
integrationjsonbIntegrationContext
external_eventsjsonbExecutionExternalEvent[]
failure_countintegerNumber of failures (for recovery)
last_failure_reasontextMost recent failure reason
last_failure_attimestampMost recent failure time
processing_started_attimestampWhen worker claimed execution
processing_worker_idtextWorker currently processing

Indexes (10 total):

  • executions_flow_depth_created_idx -- Flow + depth + created (tree queries)
  • executions_root_execution_id_idx -- Tree traversal
  • executions_parent_execution_id_idx -- Child lookups
  • executions_flow_id_idx -- Flow-scoped queries
  • executions_status_idx -- Status filtering
  • executions_started_at_idx -- Time-range queries
  • executions_failure_count_idx -- Recovery scanning
  • executions_processing_worker_idx -- Worker lookups
  • executions_status_failure_idx -- Recovery: status + failure count
  • executions_last_failure_at_idx -- Recovery timing

fireflow_execution_claims

ColumnTypeDescription
execution_idtext PKClaimed execution
worker_idtextClaiming worker
claimed_attimestampClaim time
expires_attimestampClaim expiration
heartbeat_attimestampLast heartbeat
statustextactive, released, expired

fireflow_execution_recovery

ColumnTypeDescription
idserial PKAuto-increment ID
execution_idtext FKRecovered execution
recovered_attimestampRecovery time
recovered_by_workertextWorker that recovered
recovery_reasontextexpired_claim, no_claim, stuck_running, retry_after_failure
previous_statustextStatus before recovery
previous_worker_idtextWorker before recovery

Migration Commands

bash
# Push schema directly (development)
pnpm --filter @persistentai/fireflow-executor migrate:push

# Generate migration files
pnpm --filter @persistentai/fireflow-executor migrate:generate

# Run migrations
pnpm --filter @persistentai/fireflow-executor migrate:run

Configuration Reference

All environment variables from server/utils/config.ts:

Execution Mode

VariableDefaultDescription
EXECUTION_MODElocal'local' or 'dbos'

Database

VariableDefaultDescription
DATABASE_URLpostgres://postgres@localhost:5432/fireflowMain database
DATABASE_URL_EXECUTIONSFalls back to DATABASE_URLExecution-specific database

DBOS

VariableDefaultDescription
ENABLE_DBOS_EXECUTIONfalseEnable DBOS durable execution
DBOS_APPLICATION_NAMEfireflow-executorApp name for DBOS identification
DBOS_QUEUE_CONCURRENCY100Global concurrency limit across all workers
DBOS_WORKER_CONCURRENCY5Per-worker concurrency limit
DBOS_SYSTEM_DATABASE_POOL_SIZE10DBOS system database connection pool
DBOS_ADMIN_ENABLEDtrueEnable DBOS admin UI
DBOS_ADMIN_PORT3022Admin UI port
DBOS_CONDUCTOR_URL(empty)Remote DBOS Conductor URL (optional)
DBOS_CONDUCTOR_KEY(empty)Conductor authentication key (optional)

Note: DBOS_APPLICATION_VERSION is hardcoded in server/dbos/version.ts (not configurable via env var). This ensures API and Worker always use the same version, preventing DBOS replay mismatches.

Worker

VariableDefaultDescription
WORKER_IDHOSTNAME or randomUnique worker identifier
WORKER_CONCURRENCY10Local execution concurrency
WORKER_CLAIM_TIMEOUT_MS30000Claim duration before expiration
WORKER_HEARTBEAT_INTERVAL_MS5000Heartbeat frequency
WORKER_CLAIM_EXPIRATION_CHECK_MS10000Check interval for expired claims

Metrics

VariableDefaultDescription
ENABLE_METRICSfalseEnable metrics collection
METRICS_LOG_LEVELdebugMetrics log level (debug, info, warn)
METRICS_SAMPLING_ENABLEDfalseEnable sampling
METRICS_SAMPLING_RATE1.0Sampling rate (0.0 - 1.0)
METRICS_BATCH_SIZE1Events per batch before logging
METRICS_FLUSH_INTERVAL1000Batch flush interval (ms)
METRICS_INCLUDE_MEMORYfalseInclude memory snapshots in metrics

Recovery

VariableDefaultDescription
ENABLE_RECOVERYtrueEnable automatic recovery service
RECOVERY_SCAN_INTERVAL_MS30000How often to scan for stuck executions
RECOVERY_MAX_FAILURE_COUNT5Max failures before permanent failure

Logging

VariableDefaultDescription
LOG_LEVELinfoPino log level
NODE_ENVdevelopmentproduction enables compact logging

Execution Engine

VariableDefaultDescription
ENABLE_UNIFIED_EXECUTIONfalseEnable SP-tree execution engine (required)

DBOS Constraints & Gotchas

Workflow vs Step Restrictions

WORKFLOW context: send(), recv(), startWorkflow(), writeStream(),
                  readStream(), runStep(), setEvent(), getEvent()  ── ALL allowed

STEP context:     writeStream()  ── ONLY this one is allowed

Everything else (send, recv, startWorkflow, etc.) throws if called from a step. The codebase works around this by:

  • Collecting child tasks during execution, spawning them at workflow level
  • Using CommandController (shared mutable state) to pass commands from workflow to step
  • Writing events via DBOS.writeStream() from within the long-running step

Queue Creation Order

The workflow queue in server/dbos/queue.ts is created at module level (during import). This is intentional -- it MUST exist before DBOS.launch() for the worker to dequeue tasks. The file includes a runtime check:

typescript
const dbosAlreadyInitialized = DBOS.isInitialized()
if (dbosAlreadyInitialized) {
  logger.error('CRITICAL: DBOS is already initialized! Queue will NOT be used for dequeue!')
}

Step Name Determinism

DBOS replays steps by name. If step names differ between original execution and replay, cached results won't match. DBOSContextService ensures determinism by auto-prefixing:

stepName = `${nodeId}:${userProvidedName || `step-${counter}`}`

The counter is per-node via AsyncLocalStorage, ensuring parallel node execution doesn't cause name collisions.

Application Version

DBOS_APPLICATION_VERSION is hardcoded in server/dbos/version.ts (currently '1.0.0'). This is deliberate -- using an environment variable would risk version drift between API and Worker deployments, causing DBOS to route tasks to the wrong worker version.

Debug Mode

Command polling (PAUSE/RESUME/STEP/STOP via DBOS.recv()) is only enabled when task.debug=true. In production mode (debug=false), there is zero polling overhead -- no recv() calls are made during execution.

Recovery Service

server/services/RecoveryService.ts handles stuck and failed executions:

  • Uses PostgreSQL advisory locks so only one worker runs recovery at a time
  • Scans for: expired claims, unclaimed created-status executions, retry-after-failure
  • Max failure count (default 5) before marking execution as permanently failed
  • Records all recovery actions in fireflow_execution_recovery table for audit
  • Configurable scan interval (default 30 seconds)

Directory Structure

fireflow-executor/
├── client/
│   ├── index.ts                     # tRPCReact hooks, TRPCProvider, type helpers
│   └── trpc.ts                      # WebSocket client setup, SuperJSON, keep-alive
├── server/
│   ├── index.ts                     # All server exports
│   ├── dbos/
│   │   ├── config.ts                # DBOS initialization (launch/shutdown)
│   │   ├── DBOSExecutionWorker.ts   # Worker lifecycle management
│   │   ├── queue.ts                 # Module-level WorkflowQueue (BEFORE DBOS.launch)
│   │   ├── version.ts              # Hardcoded DBOS_APPLICATION_VERSION
│   │   ├── types.ts                # ExecutionResult, DBOSQueueOptions
│   │   ├── workflows/
│   │   │   └── ExecutionWorkflows.ts # @DBOS.workflow() 3-phase orchestration
│   │   ├── steps/
│   │   │   └── UpdateStatusStep.ts  # @DBOS.step() durable status updates
│   │   └── utils/
│   │       └── FlowCachedLoader.ts  # Version-aware flow caching (30min TTL)
│   ├── implementations/
│   │   ├── dbos/
│   │   │   ├── APITaskQueue.ts      # Enqueue via DBOSClient (API-only)
│   │   │   ├── DBOSEventBus.ts      # DBOS stream event pub/sub
│   │   │   ├── DBOSTaskQueue.ts     # Direct DBOS.startWorkflow() (worker)
│   │   │   ├── migrations/
│   │   │   │   └── PostgreSQLMigrations.ts  # LISTEN/NOTIFY setup
│   │   │   └── streaming/
│   │   │       ├── DBOSStreamSubscriber.ts  # Stream consumer
│   │   │       ├── PGListener.ts            # Single LISTEN connection
│   │   │       ├── PGListenerPool.ts        # 10-listener pool
│   │   │       ├── StreamBridge.ts          # DBOS → event bus bridge
│   │   │       └── types.ts                 # Stream constants & config
│   │   └── local/
│   │       ├── InMemoryEventBus.ts  # Dev-only event bus
│   │       └── InMemoryTaskQueue.ts # Dev-only task queue
│   ├── interfaces/
│   │   ├── IEventBus.ts             # Event pub/sub interface
│   │   └── ITaskQueue.ts            # Task queue interface
│   ├── services/
│   │   ├── ExecutionService.ts      # Main execution orchestrator
│   │   ├── IExecutionService.ts     # Service interface
│   │   ├── RecoveryService.ts       # Failed execution recovery
│   │   ├── ServiceFactory.ts        # DI container (3 init paths)
│   │   └── context/
│   │       ├── DBOSContextService.ts    # DBOS primitives for nodes
│   │       ├── VFSContextService.ts     # VFS read access
│   │       ├── VFSWriteContextService.ts # VFS write access
│   │       └── node-execution-scope.ts  # AsyncLocalStorage scope
│   ├── stores/
│   │   ├── execution-store.ts       # Store factory
│   │   ├── flow-store.ts            # Flow persistence (from fireflow-trpc)
│   │   ├── user-store.ts            # User auth (from fireflow-trpc)
│   │   ├── interfaces/
│   │   │   └── IExecutionStore.ts   # Store interface
│   │   └── postgres/
│   │       ├── schema.ts            # 3 Drizzle tables + 18 indexes
│   │       └── postgres-execution-store.ts # PostgreSQL implementation
│   ├── trpc/
│   │   ├── context.ts               # Auth + services injection
│   │   └── router.ts                # All execution procedures
│   ├── metrics/
│   │   ├── MetricsTracker.ts        # Collection & aggregation
│   │   ├── helpers.ts               # Metric helpers
│   │   └── types.ts                 # Metric type definitions
│   ├── utils/
│   │   ├── config.ts                # Environment configuration
│   │   ├── db.ts                    # Database pool management
│   │   ├── logger.ts                # Pino logger setup
│   │   └── serialization.ts         # SuperJSON setup
│   ├── ws-server.ts                 # WebSocket server for tRPC
│   └── drizzle.config.ts            # Drizzle Kit configuration
├── types/
│   ├── index.ts                     # Re-exports
│   ├── execution.ts                 # ExecutionStatus, ExecutionInstance, etc.
│   └── messages.ts                  # ExecutionCommand, ExecutionTask, etc.
├── package.json
├── tsconfig.json
└── .env.example

Development

Prerequisites

  • PostgreSQL running (Docker or local)
  • pnpm installed
  • Node.js v24+ or Bun for development

Commands

bash
# Build
pnpm --filter @persistentai/fireflow-executor build

# Type check
pnpm --filter @persistentai/fireflow-executor typecheck

# Development (watch mode)
pnpm --filter @persistentai/fireflow-executor dev

# Run worker in development
pnpm --filter @persistentai/fireflow-executor dev:worker

# Run event stream server
pnpm --filter @persistentai/fireflow-executor dev:stream

# Production start
pnpm --filter @persistentai/fireflow-executor start:worker
pnpm --filter @persistentai/fireflow-executor start:stream

# Database migrations
pnpm --filter @persistentai/fireflow-executor migrate:push
pnpm --filter @persistentai/fireflow-executor migrate:generate
pnpm --filter @persistentai/fireflow-executor migrate:run

Key Dependencies

PackageVersionPurpose
@dbos-inc/dbos-sdk^4.7.9Durable execution framework
@trpc/server^11.7.2Type-safe API procedures
@trpc/client^11.7.2tRPC client with WebSocket link
@trpc/react-query^11.7.2React hooks for tRPC
drizzle-orm^0.44.5Type-safe PostgreSQL toolkit
pg^8.16.3PostgreSQL driver
pg-listen^1.7.0PostgreSQL LISTEN/NOTIFY
pino^9.12.0Structured logging
ws^8.18.3WebSocket server
zod^3.25.76Runtime schema validation
superjsonpeerSerialization (Date, Map, Set, BigInt)

Workspace dependencies:

  • @persistentai/fireflow-types -- Execution engine, node types, decorators
  • @persistentai/fireflow-nodes -- Pre-built node implementations
  • @persistentai/fireflow-trpc -- Flow store, auth service, Drizzle schemas
  • @persistentai/fireflow-vfs -- Virtual file system for node execution

Troubleshooting

SymptomLikely CauseSolution
"Queue will NOT be used for dequeue" in logsqueue.ts imported after DBOS.launch()Ensure queue.ts is imported before initializeDBOS() in ServiceFactory
Events not streaming to frontendENABLE_DBOS_EXECUTION not setSet ENABLE_DBOS_EXECUTION=true in environment
Child executions not startingDepth/breadth/total limit exceededCheck execution depth (max 100), breadth (max 10), total (max 1000)
Execution stuck in created statusSTART_SIGNAL not sent within 5 minutesVerify client calls start() after create()
Step replay returns wrong resultsNon-deterministic step namesUse context.services.dbos.runStep() (auto-prefixes with nodeId)
"Services not initialized" errorcreateServices*() not called before tRPCCall factory function during server startup before creating tRPC context
PostgreSQL connection exhaustionToo many PGListener connectionsCheck DBOS_SYSTEM_DATABASE_POOL_SIZE (default 10) and PGListenerPool (10 connections)
"Failed to initialize execution engine"ENABLE_UNIFIED_EXECUTION not setSet ENABLE_UNIFIED_EXECUTION=true -- only SPTreeExecutionEngine is supported
Workflow version mismatch errorsAPI and Worker on different versionsEnsure both are built from the same commit (version is hardcoded in version.ts)
"Execution start timeout"Client never called start()Signal pattern requires: create()subscribe()start()

License

Business Source License 1.1 (BUSL-1.1) -- see LICENSE.txt

Modules

Licensed under BUSL-1.1