import express, { Request, Response } from 'express' import multer from 'multer' import path from 'path' import cors from 'cors' import http from 'http' import * as fs from 'fs' import basicAuth from 'express-basic-auth' import { Server } from 'socket.io' import logger from './utils/logger' import { expressRequestLogger } from './utils/logger' import { IChatFlow, IncomingInput, IReactFlowNode, IReactFlowObject, INodeData, IDatabaseExport, IRunChatflowMessageValue, IChildProcessMessage, ICredentialReturnResponse } from './Interface' import { getNodeModulesPackagePath, getStartingNodes, buildLangchain, getEndingNode, constructGraphs, resolveVariables, isStartNodeDependOnInput, getAPIKeys, addAPIKey, updateAPIKey, deleteAPIKey, compareKeys, mapMimeTypeToInputField, findAvailableConfigs, isSameOverrideConfig, replaceAllAPIKeys, isFlowValidForStream, isVectorStoreFaiss, databaseEntities, getApiKey, transformToCredentialEntity, decryptCredentialData, clearSessionMemory, replaceInputsWithConfig } from './utils' import { cloneDeep, omit } from 'lodash' import { getDataSource } from './DataSource' import { NodesPool } from './NodesPool' import { ChatFlow } from './entity/ChatFlow' import { ChatMessage } from './entity/ChatMessage' import { Credential } from './entity/Credential' import { Tool } from './entity/Tool' import { ChatflowPool } from './ChatflowPool' import { ICommonObject, INodeOptionsValue } from 'flowise-components' import { fork } from 'child_process' export class App { app: express.Application nodesPool: NodesPool chatflowPool: ChatflowPool AppDataSource = getDataSource() constructor() { this.app = express() } async initDatabase() { // Initialize database this.AppDataSource.initialize() .then(async () => { logger.info('📦 [server]: Data Source has been initialized!') // Initialize nodes pool this.nodesPool = new NodesPool() await this.nodesPool.initialize() // Initialize chatflow pool this.chatflowPool = new ChatflowPool() // Initialize API keys await getAPIKeys() }) .catch((err) => { logger.error('❌ [server]: Error during Data Source initialization:', err) }) } async config(socketIO?: Server) { // Limit is needed to allow sending/receiving base64 encoded string this.app.use(express.json({ limit: '50mb' })) this.app.use(express.urlencoded({ limit: '50mb', extended: true })) // Allow access from * this.app.use(cors()) // Add the expressRequestLogger middleware to log all requests this.app.use(expressRequestLogger) if (process.env.FLOWISE_USERNAME && process.env.FLOWISE_PASSWORD) { const username = process.env.FLOWISE_USERNAME const password = process.env.FLOWISE_PASSWORD const basicAuthMiddleware = basicAuth({ users: { [username]: password } }) const whitelistURLs = [ '/api/v1/verify/apikey/', '/api/v1/chatflows/apikey/', '/api/v1/public-chatflows', '/api/v1/prediction/', '/api/v1/node-icon/', '/api/v1/components-credentials-icon/', '/api/v1/chatflows-streaming' ] this.app.use((req, res, next) => { if (req.url.includes('/api/v1/')) { whitelistURLs.some((url) => req.url.includes(url)) ? next() : basicAuthMiddleware(req, res, next) } else next() }) } const upload = multer({ dest: `${path.join(__dirname, '..', 'uploads')}/` }) // ---------------------------------------- // Components // ---------------------------------------- // Get all component nodes this.app.get('/api/v1/nodes', (req: Request, res: Response) => { const returnData = [] for (const nodeName in this.nodesPool.componentNodes) { const clonedNode = cloneDeep(this.nodesPool.componentNodes[nodeName]) returnData.push(clonedNode) } return res.json(returnData) }) // Get all component credentials this.app.get('/api/v1/components-credentials', async (req: Request, res: Response) => { const returnData = [] for (const credName in this.nodesPool.componentCredentials) { const clonedCred = cloneDeep(this.nodesPool.componentCredentials[credName]) returnData.push(clonedCred) } return res.json(returnData) }) // Get specific component node via name this.app.get('/api/v1/nodes/:name', (req: Request, res: Response) => { if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) { return res.json(this.nodesPool.componentNodes[req.params.name]) } else { throw new Error(`Node ${req.params.name} not found`) } }) // Get component credential via name this.app.get('/api/v1/components-credentials/:name', (req: Request, res: Response) => { if (!req.params.name.includes('&')) { if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentCredentials, req.params.name)) { return res.json(this.nodesPool.componentCredentials[req.params.name]) } else { throw new Error(`Credential ${req.params.name} not found`) } } else { const returnResponse = [] for (const name of req.params.name.split('&')) { if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentCredentials, name)) { returnResponse.push(this.nodesPool.componentCredentials[name]) } else { throw new Error(`Credential ${name} not found`) } } return res.json(returnResponse) } }) // Returns specific component node icon via name this.app.get('/api/v1/node-icon/:name', (req: Request, res: Response) => { if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) { const nodeInstance = this.nodesPool.componentNodes[req.params.name] if (nodeInstance.icon === undefined) { throw new Error(`Node ${req.params.name} icon not found`) } if (nodeInstance.icon.endsWith('.svg') || nodeInstance.icon.endsWith('.png') || nodeInstance.icon.endsWith('.jpg')) { const filepath = nodeInstance.icon res.sendFile(filepath) } else { throw new Error(`Node ${req.params.name} icon is missing icon`) } } else { throw new Error(`Node ${req.params.name} not found`) } }) // Returns specific component credential icon via name this.app.get('/api/v1/components-credentials-icon/:name', (req: Request, res: Response) => { if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentCredentials, req.params.name)) { const credInstance = this.nodesPool.componentCredentials[req.params.name] if (credInstance.icon === undefined) { throw new Error(`Credential ${req.params.name} icon not found`) } if (credInstance.icon.endsWith('.svg') || credInstance.icon.endsWith('.png') || credInstance.icon.endsWith('.jpg')) { const filepath = credInstance.icon res.sendFile(filepath) } else { throw new Error(`Credential ${req.params.name} icon is missing icon`) } } else { throw new Error(`Credential ${req.params.name} not found`) } }) // load async options this.app.post('/api/v1/node-load-method/:name', async (req: Request, res: Response) => { const nodeData: INodeData = req.body if (Object.prototype.hasOwnProperty.call(this.nodesPool.componentNodes, req.params.name)) { try { const nodeInstance = this.nodesPool.componentNodes[req.params.name] const methodName = nodeData.loadMethod || '' const returnOptions: INodeOptionsValue[] = await nodeInstance.loadMethods![methodName]!.call(nodeInstance, nodeData, { appDataSource: this.AppDataSource, databaseEntities: databaseEntities }) return res.json(returnOptions) } catch (error) { return res.json([]) } } else { res.status(404).send(`Node ${req.params.name} not found`) return } }) // ---------------------------------------- // Chatflows // ---------------------------------------- // Get all chatflows this.app.get('/api/v1/chatflows', async (req: Request, res: Response) => { const chatflows: IChatFlow[] = await this.AppDataSource.getRepository(ChatFlow).find() return res.json(chatflows) }) // Get specific chatflow via api key this.app.get('/api/v1/chatflows/apikey/:apiKey', async (req: Request, res: Response) => { try { const apiKey = await getApiKey(req.params.apiKey) if (!apiKey) return res.status(401).send('Unauthorized') const chatflows = await this.AppDataSource.getRepository(ChatFlow) .createQueryBuilder('cf') .where('cf.apikeyid = :apikeyid', { apikeyid: apiKey.id }) .orWhere('cf.apikeyid IS NULL') .orWhere('cf.apikeyid = ""') .orderBy('cf.name', 'ASC') .getMany() if (chatflows.length >= 1) return res.status(200).send(chatflows) return res.status(404).send('Chatflow not found') } catch (err: any) { return res.status(500).send(err?.message) } }) // Get specific chatflow via id this.app.get('/api/v1/chatflows/:id', async (req: Request, res: Response) => { const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: req.params.id }) if (chatflow) return res.json(chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`) }) // Get specific chatflow via id (PUBLIC endpoint, used when sharing chatbot link) this.app.get('/api/v1/public-chatflows/:id', async (req: Request, res: Response) => { const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: req.params.id }) if (chatflow && chatflow.isPublic) return res.json(chatflow) else if (chatflow && !chatflow.isPublic) return res.status(401).send(`Unauthorized`) return res.status(404).send(`Chatflow ${req.params.id} not found`) }) // Save chatflow this.app.post('/api/v1/chatflows', async (req: Request, res: Response) => { const body = req.body const newChatFlow = new ChatFlow() Object.assign(newChatFlow, body) const chatflow = this.AppDataSource.getRepository(ChatFlow).create(newChatFlow) const results = await this.AppDataSource.getRepository(ChatFlow).save(chatflow) return res.json(results) }) // Update chatflow this.app.put('/api/v1/chatflows/:id', async (req: Request, res: Response) => { const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: req.params.id }) if (!chatflow) { res.status(404).send(`Chatflow ${req.params.id} not found`) return } const body = req.body const updateChatFlow = new ChatFlow() Object.assign(updateChatFlow, body) this.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow) const result = await this.AppDataSource.getRepository(ChatFlow).save(chatflow) // Update chatflowpool inSync to false, to build Langchain again because data has been changed this.chatflowPool.updateInSync(chatflow.id, false) return res.json(result) }) // Delete chatflow via id this.app.delete('/api/v1/chatflows/:id', async (req: Request, res: Response) => { const results = await this.AppDataSource.getRepository(ChatFlow).delete({ id: req.params.id }) return res.json(results) }) // Check if chatflow valid for streaming this.app.get('/api/v1/chatflows-streaming/:id', async (req: Request, res: Response) => { const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: req.params.id }) if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`) /*** Get Ending Node with Directed Graph ***/ const flowData = chatflow.flowData const parsedFlowData: IReactFlowObject = JSON.parse(flowData) const nodes = parsedFlowData.nodes const edges = parsedFlowData.edges const { graph, nodeDependencies } = constructGraphs(nodes, edges) const endingNodeId = getEndingNode(nodeDependencies, graph) if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`) const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`) if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') { return res.status(500).send(`Ending node must be either a Chain or Agent`) } const obj = { isStreaming: isFlowValidForStream(nodes, endingNodeData) } return res.json(obj) }) // ---------------------------------------- // ChatMessage // ---------------------------------------- // Get all chatmessages from chatflowid this.app.get('/api/v1/chatmessage/:id', async (req: Request, res: Response) => { const chatmessages = await this.AppDataSource.getRepository(ChatMessage).find({ where: { chatflowid: req.params.id }, order: { createdDate: 'ASC' } }) return res.json(chatmessages) }) // Add chatmessages for chatflowid this.app.post('/api/v1/chatmessage/:id', async (req: Request, res: Response) => { const body = req.body const newChatMessage = new ChatMessage() Object.assign(newChatMessage, body) const chatmessage = this.AppDataSource.getRepository(ChatMessage).create(newChatMessage) const results = await this.AppDataSource.getRepository(ChatMessage).save(chatmessage) return res.json(results) }) // Delete all chatmessages from chatflowid this.app.delete('/api/v1/chatmessage/:id', async (req: Request, res: Response) => { const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: req.params.id }) if (!chatflow) { res.status(404).send(`Chatflow ${req.params.id} not found`) return } const flowData = chatflow.flowData const parsedFlowData: IReactFlowObject = JSON.parse(flowData) const nodes = parsedFlowData.nodes let chatId = await getChatId(chatflow.id) if (!chatId) chatId = chatflow.id clearSessionMemory(nodes, this.nodesPool.componentNodes, chatId, req.query.sessionId as string) const results = await this.AppDataSource.getRepository(ChatMessage).delete({ chatflowid: req.params.id }) return res.json(results) }) // ---------------------------------------- // Credentials // ---------------------------------------- // Create new credential this.app.post('/api/v1/credentials', async (req: Request, res: Response) => { const body = req.body const newCredential = await transformToCredentialEntity(body) const credential = this.AppDataSource.getRepository(Credential).create(newCredential) const results = await this.AppDataSource.getRepository(Credential).save(credential) return res.json(results) }) // Get all credentials this.app.get('/api/v1/credentials', async (req: Request, res: Response) => { if (req.query.credentialName) { let returnCredentials = [] if (Array.isArray(req.query.credentialName)) { for (let i = 0; i < req.query.credentialName.length; i += 1) { const name = req.query.credentialName[i] as string const credentials = await this.AppDataSource.getRepository(Credential).findBy({ credentialName: name }) returnCredentials.push(...credentials) } } else { const credentials = await this.AppDataSource.getRepository(Credential).findBy({ credentialName: req.query.credentialName as string }) returnCredentials = [...credentials] } return res.json(returnCredentials) } else { const credentials = await this.AppDataSource.getRepository(Credential).find() const returnCredentials = [] for (const credential of credentials) { returnCredentials.push(omit(credential, ['encryptedData'])) } return res.json(returnCredentials) } }) // Get specific credential this.app.get('/api/v1/credentials/:id', async (req: Request, res: Response) => { const credential = await this.AppDataSource.getRepository(Credential).findOneBy({ id: req.params.id }) if (!credential) return res.status(404).send(`Credential ${req.params.id} not found`) // Decrpyt credentialData const decryptedCredentialData = await decryptCredentialData( credential.encryptedData, credential.credentialName, this.nodesPool.componentCredentials ) const returnCredential: ICredentialReturnResponse = { ...credential, plainDataObj: decryptedCredentialData } return res.json(omit(returnCredential, ['encryptedData'])) }) // Update credential this.app.put('/api/v1/credentials/:id', async (req: Request, res: Response) => { const credential = await this.AppDataSource.getRepository(Credential).findOneBy({ id: req.params.id }) if (!credential) return res.status(404).send(`Credential ${req.params.id} not found`) const body = req.body const updateCredential = await transformToCredentialEntity(body) this.AppDataSource.getRepository(Credential).merge(credential, updateCredential) const result = await this.AppDataSource.getRepository(Credential).save(credential) return res.json(result) }) // Delete all chatmessages from chatflowid this.app.delete('/api/v1/credentials/:id', async (req: Request, res: Response) => { const results = await this.AppDataSource.getRepository(Credential).delete({ id: req.params.id }) return res.json(results) }) // ---------------------------------------- // Tools // ---------------------------------------- // Get all tools this.app.get('/api/v1/tools', async (req: Request, res: Response) => { const tools = await this.AppDataSource.getRepository(Tool).find() return res.json(tools) }) // Get specific tool this.app.get('/api/v1/tools/:id', async (req: Request, res: Response) => { const tool = await this.AppDataSource.getRepository(Tool).findOneBy({ id: req.params.id }) return res.json(tool) }) // Add tool this.app.post('/api/v1/tools', async (req: Request, res: Response) => { const body = req.body const newTool = new Tool() Object.assign(newTool, body) const tool = this.AppDataSource.getRepository(Tool).create(newTool) const results = await this.AppDataSource.getRepository(Tool).save(tool) return res.json(results) }) // Update tool this.app.put('/api/v1/tools/:id', async (req: Request, res: Response) => { const tool = await this.AppDataSource.getRepository(Tool).findOneBy({ id: req.params.id }) if (!tool) { res.status(404).send(`Tool ${req.params.id} not found`) return } const body = req.body const updateTool = new Tool() Object.assign(updateTool, body) this.AppDataSource.getRepository(Tool).merge(tool, updateTool) const result = await this.AppDataSource.getRepository(Tool).save(tool) return res.json(result) }) // Delete tool this.app.delete('/api/v1/tools/:id', async (req: Request, res: Response) => { const results = await this.AppDataSource.getRepository(Tool).delete({ id: req.params.id }) return res.json(results) }) // ---------------------------------------- // Configuration // ---------------------------------------- this.app.get('/api/v1/flow-config/:id', async (req: Request, res: Response) => { const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: req.params.id }) if (!chatflow) return res.status(404).send(`Chatflow ${req.params.id} not found`) const flowData = chatflow.flowData const parsedFlowData: IReactFlowObject = JSON.parse(flowData) const nodes = parsedFlowData.nodes const availableConfigs = findAvailableConfigs(nodes, this.nodesPool.componentCredentials) return res.json(availableConfigs) }) // ---------------------------------------- // Export Load Chatflow & ChatMessage & Apikeys // ---------------------------------------- this.app.get('/api/v1/database/export', async (req: Request, res: Response) => { const chatmessages = await this.AppDataSource.getRepository(ChatMessage).find() const chatflows = await this.AppDataSource.getRepository(ChatFlow).find() const apikeys = await getAPIKeys() const result: IDatabaseExport = { chatmessages, chatflows, apikeys } return res.json(result) }) this.app.post('/api/v1/database/load', async (req: Request, res: Response) => { const databaseItems: IDatabaseExport = req.body await this.AppDataSource.getRepository(ChatFlow).delete({}) await this.AppDataSource.getRepository(ChatMessage).delete({}) let error = '' // Get a new query runner instance const queryRunner = this.AppDataSource.createQueryRunner() // Start a new transaction await queryRunner.startTransaction() try { const chatflows: ChatFlow[] = databaseItems.chatflows const chatmessages: ChatMessage[] = databaseItems.chatmessages await queryRunner.manager.insert(ChatFlow, chatflows) await queryRunner.manager.insert(ChatMessage, chatmessages) await queryRunner.commitTransaction() } catch (err: any) { error = err?.message ?? 'Error loading database' await queryRunner.rollbackTransaction() } finally { await queryRunner.release() } await replaceAllAPIKeys(databaseItems.apikeys) if (error) return res.status(500).send(error) return res.status(201).send('OK') }) // ---------------------------------------- // Prediction // ---------------------------------------- // Send input message and get prediction result (External) this.app.post('/api/v1/prediction/:id', upload.array('files'), async (req: Request, res: Response) => { await this.processPrediction(req, res, socketIO) }) // Send input message and get prediction result (Internal) this.app.post('/api/v1/internal-prediction/:id', async (req: Request, res: Response) => { await this.processPrediction(req, res, socketIO, true) }) // ---------------------------------------- // Marketplaces // ---------------------------------------- // Get all chatflows for marketplaces this.app.get('/api/v1/marketplaces/chatflows', async (req: Request, res: Response) => { const marketplaceDir = path.join(__dirname, '..', 'marketplaces', 'chatflows') const jsonsInDir = fs.readdirSync(marketplaceDir).filter((file) => path.extname(file) === '.json') const templates: any[] = [] jsonsInDir.forEach((file, index) => { const filePath = path.join(__dirname, '..', 'marketplaces', 'chatflows', file) const fileData = fs.readFileSync(filePath) const fileDataObj = JSON.parse(fileData.toString()) const template = { id: index, name: file.split('.json')[0], flowData: fileData.toString(), description: fileDataObj?.description || '' } templates.push(template) }) const FlowiseDocsQnA = templates.find((tmp) => tmp.name === 'Flowise Docs QnA') if (FlowiseDocsQnA) templates.unshift(FlowiseDocsQnA) return res.json(templates) }) // Get all tools for marketplaces this.app.get('/api/v1/marketplaces/tools', async (req: Request, res: Response) => { const marketplaceDir = path.join(__dirname, '..', 'marketplaces', 'tools') const jsonsInDir = fs.readdirSync(marketplaceDir).filter((file) => path.extname(file) === '.json') const templates: any[] = [] jsonsInDir.forEach((file, index) => { const filePath = path.join(__dirname, '..', 'marketplaces', 'tools', file) const fileData = fs.readFileSync(filePath) const fileDataObj = JSON.parse(fileData.toString()) const template = { ...fileDataObj, id: index, templateName: file.split('.json')[0] } templates.push(template) }) return res.json(templates) }) // ---------------------------------------- // API Keys // ---------------------------------------- // Get api keys this.app.get('/api/v1/apikey', async (req: Request, res: Response) => { const keys = await getAPIKeys() return res.json(keys) }) // Add new api key this.app.post('/api/v1/apikey', async (req: Request, res: Response) => { const keys = await addAPIKey(req.body.keyName) return res.json(keys) }) // Update api key this.app.put('/api/v1/apikey/:id', async (req: Request, res: Response) => { const keys = await updateAPIKey(req.params.id, req.body.keyName) return res.json(keys) }) // Delete new api key this.app.delete('/api/v1/apikey/:id', async (req: Request, res: Response) => { const keys = await deleteAPIKey(req.params.id) return res.json(keys) }) // Verify api key this.app.get('/api/v1/verify/apikey/:apiKey', async (req: Request, res: Response) => { try { const apiKey = await getApiKey(req.params.apiKey) if (!apiKey) return res.status(401).send('Unauthorized') return res.status(200).send('OK') } catch (err: any) { return res.status(500).send(err?.message) } }) // ---------------------------------------- // Serve UI static // ---------------------------------------- const packagePath = getNodeModulesPackagePath('flowise-ui') const uiBuildPath = path.join(packagePath, 'build') const uiHtmlPath = path.join(packagePath, 'build', 'index.html') this.app.use('/', express.static(uiBuildPath)) // All other requests not handled will return React app this.app.use((req, res) => { res.sendFile(uiHtmlPath) }) } /** * Validate API Key * @param {Request} req * @param {Response} res * @param {ChatFlow} chatflow */ async validateKey(req: Request, res: Response, chatflow: ChatFlow) { const chatFlowApiKeyId = chatflow.apikeyid const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? '' if (chatFlowApiKeyId && !authorizationHeader) return res.status(401).send(`Unauthorized`) const suppliedKey = authorizationHeader.split(`Bearer `).pop() if (chatFlowApiKeyId && suppliedKey) { const keys = await getAPIKeys() const apiSecret = keys.find((key) => key.id === chatFlowApiKeyId)?.apiSecret if (!compareKeys(apiSecret, suppliedKey)) return res.status(401).send(`Unauthorized`) } } /** * Start child process * @param {ChatFlow} chatflow * @param {IncomingInput} incomingInput * @param {INodeData} endingNodeData */ async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) { try { const controller = new AbortController() const { signal } = controller let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js') if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts' const childProcess = fork(childpath, [], { signal }) const value = { chatflow, chatId, incomingInput, componentNodes: cloneDeep(this.nodesPool.componentNodes), endingNodeData } as IRunChatflowMessageValue childProcess.send({ key: 'start', value } as IChildProcessMessage) let childProcessTimeout: NodeJS.Timeout return new Promise((resolve, reject) => { childProcess.on('message', async (message: IChildProcessMessage) => { if (message.key === 'finish') { const { result, addToChatFlowPool } = message.value as ICommonObject if (childProcessTimeout) { clearTimeout(childProcessTimeout) } if (Object.keys(addToChatFlowPool).length) { const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig) } resolve(result) } if (message.key === 'start') { if (process.env.EXECUTION_TIMEOUT) { childProcessTimeout = setTimeout(async () => { childProcess.kill() resolve(undefined) }, parseInt(process.env.EXECUTION_TIMEOUT, 10)) } } if (message.key === 'error') { let errMessage = message.value as string if (childProcessTimeout) { clearTimeout(childProcessTimeout) } reject(errMessage) } }) }) } catch (err) { logger.error('[server] [mode:child]: Error:', err) } } /** * Process Prediction * @param {Request} req * @param {Response} res * @param {Server} socketIO * @param {boolean} isInternal */ async processPrediction(req: Request, res: Response, socketIO?: Server, isInternal = false) { try { const chatflowid = req.params.id let incomingInput: IncomingInput = req.body let nodeToExecuteData: INodeData const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowid }) if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) let chatId = await getChatId(chatflow.id) if (!chatId) chatId = chatflowid if (!isInternal) { await this.validateKey(req, res, chatflow) } let isStreamValid = false const files = (req.files as any[]) || [] if (files.length) { const overrideConfig: ICommonObject = { ...req.body } for (const file of files) { const fileData = fs.readFileSync(file.path, { encoding: 'base64' }) const dataBase64String = `data:${file.mimetype};base64,${fileData},filename:${file.filename}` const fileInputField = mapMimeTypeToInputField(file.mimetype) if (overrideConfig[fileInputField]) { overrideConfig[fileInputField] = JSON.stringify([...JSON.parse(overrideConfig[fileInputField]), dataBase64String]) } else { overrideConfig[fileInputField] = JSON.stringify([dataBase64String]) } } incomingInput = { question: req.body.question ?? 'hello', overrideConfig, history: [] } } /* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation) when all these conditions met: * - Node Data already exists in pool * - Still in sync (i.e the flow has not been modified since) * - Existing overrideConfig and new overrideConfig are the same * - Flow doesn't start with nodes that depend on incomingInput.question ***/ const isFlowReusable = () => { return ( Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) && this.chatflowPool.activeChatflows[chatflowid].inSync && isSameOverrideConfig( isInternal, this.chatflowPool.activeChatflows[chatflowid].overrideConfig, incomingInput.overrideConfig ) && !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes) ) } if (process.env.EXECUTION_MODE === 'child') { if (isFlowReusable()) { nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData logger.debug( `[server] [mode:child]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})` ) try { const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData) return res.json(result) } catch (error) { return res.status(500).send(error) } } else { try { const result = await this.startChildProcess(chatflow, chatId, incomingInput) return res.json(result) } catch (error) { return res.status(500).send(error) } } } else { /*** Get chatflows and prepare data ***/ const flowData = chatflow.flowData const parsedFlowData: IReactFlowObject = JSON.parse(flowData) const nodes = parsedFlowData.nodes const edges = parsedFlowData.edges if (isFlowReusable()) { nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData) logger.debug( `[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})` ) } else { /*** Get Ending Node with Directed Graph ***/ const { graph, nodeDependencies } = constructGraphs(nodes, edges) const directedGraph = graph const endingNodeId = getEndingNode(nodeDependencies, directedGraph) if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`) const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`) if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') { return res.status(500).send(`Ending node must be either a Chain or Agent`) } if ( endingNodeData.outputs && Object.keys(endingNodeData.outputs).length && !Object.values(endingNodeData.outputs).includes(endingNodeData.name) ) { return res .status(500) .send( `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` ) } isStreamValid = isFlowValidForStream(nodes, endingNodeData) /*** Get Starting Nodes with Non-Directed Graph ***/ const constructedObj = constructGraphs(nodes, edges, true) const nonDirectedGraph = constructedObj.graph const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) logger.debug(`[server]: Start building chatflow ${chatflowid}`) /*** BFS to traverse from Starting Nodes to Ending Node ***/ const reactFlowNodes = await buildLangchain( startingNodeIds, nodes, graph, depthQueue, this.nodesPool.componentNodes, incomingInput.question, chatId, this.AppDataSource, incomingInput?.overrideConfig ) const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId) if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`) if (incomingInput.overrideConfig) nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig) const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) nodeToExecuteData = reactFlowNodeData const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) } const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string const nodeModule = await import(nodeInstanceFilePath) const nodeInstance = new nodeModule.nodeClass() isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData) logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) const result = isStreamValid ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history, socketIO, socketIOClientId: incomingInput.socketIOClientId, logger }) : await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history, logger }) logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) return res.json(result) } } catch (e: any) { logger.error('[server]: Error:', e) return res.status(500).send(e.message) } } async stopApp() { try { const removePromises: any[] = [] await Promise.all(removePromises) } catch (e) { logger.error(`❌[server]: Flowise Server shut down error: ${e}`) } } } /** * Get first chat message id * @param {string} chatflowid * @returns {string} */ export async function getChatId(chatflowid: string) { // first chatmessage id as the unique chat id const firstChatMessage = await getDataSource() .getRepository(ChatMessage) .createQueryBuilder('cm') .select('cm.id') .where('chatflowid = :chatflowid', { chatflowid }) .orderBy('cm.createdDate', 'ASC') .getOne() return firstChatMessage ? firstChatMessage.id : '' } let serverApp: App | undefined export async function start(): Promise { serverApp = new App() const port = parseInt(process.env.PORT || '', 10) || 3000 const server = http.createServer(serverApp.app) const io = new Server(server, { cors: { origin: '*' } }) await serverApp.initDatabase() await serverApp.config(io) server.listen(port, () => { logger.info(`⚡️ [server]: Flowise Server is listening at ${port}`) }) } export function getInstance(): App | undefined { return serverApp }