PersistentAI API Documentation / @persistentai/fireflow-trpc
@persistentai/fireflow-trpc
A type-safe tRPC layer for the PersistentAI flow-based programming framework. This package provides end-to-end type safety between client and server, real-time subscriptions via WebSockets, and a robust API for flow management, node registration, and execution control.
Overview
@persistentai/fireflow-trpc serves as the communication backbone of PersistentAI, enabling:
- Type-Safe APIs: Complete end-to-end type safety using tRPC and SuperJSON
- Real-Time Updates: WebSocket-based subscriptions for flow and execution events
- Flow Management: Create, retrieve, update, and delete flows
- Node Operations: Register, discover, and instantiate computational nodes
- Execution Control: Start, stop, pause, and monitor flow executions
- Storage Integration: Support for both in-memory and PostgreSQL persistence
- Debugging Tools: Breakpoints, stepping, and execution monitoring
Installation
# Before installing, make sure you have set up authentication for GitHub Packages
npm install @persistentai/fireflow-trpc
# or
yarn add @persistentai/fireflow-trpc
# or
pnpm add @persistentai/fireflow-trpcAuthentication for GitHub Packages
To use this package, you need to configure npm to authenticate with GitHub Packages:
- Create a personal access token (PAT) with the
read:packagesscope on GitHub. - Add the following to your project's
.npmrcfile or to your global~/.npmrcfile:
@persistentai:registry=https://npm.pkg.github.com
//npm.pkg.github.com/:_authToken=YOUR_GITHUB_PATReplace YOUR_GITHUB_PAT with your actual GitHub personal access token.
Usage
Client Setup
import { trpcClient, trpcReact, queryClient } from '@persistentai/fireflow-trpc/client'
import { QueryClientProvider } from '@tanstack/react-query'
import { useState } from 'react'
function App() {
// Set up WebSocket connection
const [trpc] = useState(() => trpcReact.createClient({
links: [
wsLink({
client: createWSClient({
url: 'ws://localhost:3001',
}),
}),
],
}))
return (
<trpcReact.Provider client={trpc} queryClient={queryClient}>
<QueryClientProvider client={queryClient}>
<YourApplication />
</QueryClientProvider>
</trpcReact.Provider>
)
}Server Setup
import { init, applyWSSHandler, appRouter, createContext } from '@persistentai/fireflow-trpc/server'
import { WebSocketServer } from 'ws'
// Initialize tRPC context and stores
await init()
// Create WebSocket server
const wss = new WebSocketServer({ port: 3001 })
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
})
console.log('WebSocket Server listening on ws://localhost:3001')Working with Flows
// Client-side example
import { trpcClient } from '@persistentai/fireflow-trpc/client'
// Create a new flow
const createFlow = async () => {
const flow = await trpcClient.flow.create.mutate({
name: 'My Flow',
description: 'A flow for processing data',
tags: ['demo', 'processing'],
})
return flow
}
// Add a node to a flow
const addNode = async (flowId, nodeType) => {
const node = await trpcClient.flow.addNode.mutate({
flowId,
nodeType,
position: { x: 100, y: 100 },
})
return node
}
// Connect node ports
const connectPorts = async (flowId, sourceNodeId, targetNodeId) => {
const edge = await trpcClient.flow.connectPorts.mutate({
flowId,
sourceNodeId,
sourcePortId: 'output',
targetNodeId,
targetPortId: 'input',
})
return edge
}
// Subscribe to flow events
const subscribeToFlowEvents = async (flowId) => {
const subscription = trpcClient.flow.subscribeToEvents.subscribe(
{ flowId },
{
onData: (event) => console.log('Flow event:', event),
onError: (err) => console.error('Subscription error:', err),
}
)
return subscription
}Executing Flows
// Create execution instance
const createExecution = async (flowId) => {
const execution = await trpcClient.execution.create.mutate({
flowId,
options: {
debug: true, // Enable debugging features
},
})
return execution
}
// Start execution
const startExecution = async (executionId) => {
await trpcClient.execution.start.mutate({ executionId })
}
// Subscribe to execution events
const subscribeToExecutionEvents = async (executionId) => {
const subscription = trpcClient.execution.subscribeToEvents.subscribe(
{ executionId },
{
onData: (event) => console.log('Execution event:', event),
onError: (err) => console.error('Subscription error:', err),
}
)
return subscription
}
// Add breakpoint for debugging
const addBreakpoint = async (executionId, nodeId) => {
await trpcClient.execution.debug.addBreakpoint.mutate({
executionId,
nodeId,
})
}
// Step through execution (when paused at breakpoint)
const stepExecution = async (executionId) => {
await trpcClient.execution.debug.step.mutate({ executionId })
}Exploring Available Nodes
// Get all nodes categorized
const getCategorizedNodes = async () => {
const categories = await trpcClient.nodeRegistry.getCategorizedNodes.query()
return categories
}
// Search for nodes
const searchNodes = async (query) => {
const results = await trpcClient.nodeRegistry.searchNodes.query(query)
return results
}
// Get nodes for a specific category
const getNodesByCategory = async (categoryId) => {
const category = await trpcClient.nodeRegistry.getNodesByCategory.query(categoryId)
return category
}API Reference
The package exports two main entry points:
Client API (@persistentai/fireflow-trpc/client)
trpcReact: React hooks for tRPC queries and mutationstrpcClient: Direct client for non-React environmentsqueryClient: Configured TanStack Query client- Types:
RouterInputs,RouterOutputs, and more
Server API (@persistentai/fireflow-trpc/server)
appRouter: Main tRPC router with all procedurescreateContext: Context factory for tRPC requestsinit: Initialize backend systems (node registry, stores, etc.)applyWSSHandler: Setup WebSocket handler for tRPC
Key Components
Flow Procedures
create: Create a new flowget: Get a flow by IDlist: List all flowsdelete: Delete a flowaddNode: Add a node to a flowremoveNode: Remove a node from a flowconnectPorts: Connect ports between nodesremoveEdge: Remove an edgeupdateNodePosition: Update node positionupdatePortValue: Update port valuesubscribeToEvents: Subscribe to flow events
Execution Procedures
create: Create an execution instancestart: Start executionstop: Stop executionpause: Pause executionresume: Resume executiongetState: Get execution statesubscribeToEvents: Subscribe to execution events- Debug operations:
addBreakpoint: Add a breakpointremoveBreakpoint: Remove a breakpointstep: Step executiongetBreakpoints: Get all breakpoints
Node Registry Procedures
getCategorizedNodes: Get all nodes grouped by categoriessearchNodes: Search nodes by querygetNodesByCategory: Get nodes for a specific categorygetCategories: Get all categoriesgetNodeType: Get a specific node type information
Database Support
PersistentAI tRPC supports two storage mechanisms:
- In-Memory Store: Default storage option, suitable for development
- PostgreSQL Store: Persistent storage for production environments
To use PostgreSQL, set the DATABASE_URL environment variable:
DATABASE_URL=postgres://username:password@localhost:5432/dbnameRun migrations to set up the database schema:
pnpm run migrateLicense
BUSL-1.1 - Business Source License
Related Packages
- @persistentai/fireflow-types: Core type definitions and decorators
- @persistentai/fireflow-frontend: Frontend components for visual flow programming
- @persistentai/fireflow-backend: Backend services for flow execution
- @persistentai/fireflow-nodes: Collection of pre-built nodes