From 0a4570ecda32f487ef880d28ad405b0c5e2f68bd Mon Sep 17 00:00:00 2001 From: Henry Heng Date: Fri, 16 May 2025 01:13:25 +0800 Subject: [PATCH] Bugfix/pass execute custom function to worker (#4432) pass execute custom function to worker --- packages/server/src/queue/PredictionQueue.ts | 12 +++- packages/server/src/services/nodes/index.ts | 68 +++++++------------ .../src/utils/executeCustomNodeFunction.ts | 58 ++++++++++++++++ 3 files changed, 95 insertions(+), 43 deletions(-) create mode 100644 packages/server/src/utils/executeCustomNodeFunction.ts diff --git a/packages/server/src/queue/PredictionQueue.ts b/packages/server/src/queue/PredictionQueue.ts index 97927eb11..46092d657 100644 --- a/packages/server/src/queue/PredictionQueue.ts +++ b/packages/server/src/queue/PredictionQueue.ts @@ -10,6 +10,7 @@ import { RedisOptions } from 'bullmq' import logger from '../utils/logger' import { generateAgentflowv2 as generateAgentflowv2_json } from 'flowise-components' import { databaseEntities } from '../utils' +import { executeCustomNodeFunction } from '../utils/executeCustomNodeFunction' interface PredictionQueueOptions { appDataSource: DataSource @@ -65,7 +66,7 @@ export class PredictionQueue extends BaseQueue { if (this.redisPublisher) data.sseStreamer = this.redisPublisher if (Object.prototype.hasOwnProperty.call(data, 'isAgentFlowGenerator')) { - logger.info('Generating Agentflow...') + logger.info(`Generating Agentflow...`) const { prompt, componentNodes, toolNodes, selectedChatModel, question } = data as IGenerateAgentflowv2Params const options: Record = { appDataSource: this.appDataSource, @@ -75,6 +76,15 @@ export class PredictionQueue extends BaseQueue { return await generateAgentflowv2_json({ prompt, componentNodes, toolNodes, selectedChatModel }, question, options) } + if (Object.prototype.hasOwnProperty.call(data, 'isExecuteCustomFunction')) { + logger.info(`Executing Custom Function...`) + return await executeCustomNodeFunction({ + appDataSource: this.appDataSource, + componentNodes: this.componentNodes, + data + }) + } + if (this.abortControllerPool) { const abortControllerId = `${data.chatflow.id}_${data.chatId}` const signal = new AbortController() diff --git a/packages/server/src/services/nodes/index.ts b/packages/server/src/services/nodes/index.ts index a55932d76..f0e8b3f53 100644 --- a/packages/server/src/services/nodes/index.ts +++ b/packages/server/src/services/nodes/index.ts @@ -1,12 +1,14 @@ -import { cloneDeep } from 'lodash' +import { cloneDeep, omit } from 'lodash' import { StatusCodes } from 'http-status-codes' import { getRunningExpressApp } from '../../utils/getRunningExpressApp' -import { INodeData } from '../../Interface' -import { INodeOptionsValue, ICommonObject, handleEscapeCharacters } from 'flowise-components' +import { INodeData, MODE } from '../../Interface' +import { INodeOptionsValue } from 'flowise-components' import { databaseEntities } from '../../utils' import logger from '../../utils/logger' import { InternalFlowiseError } from '../../errors/internalFlowiseError' import { getErrorMessage } from '../../errors/utils' +import { OMIT_QUEUE_JOB_DATA } from '../../utils/constants' +import { executeCustomNodeFunction } from '../../utils/executeCustomNodeFunction' // Get all component nodes const getAllNodes = async () => { @@ -120,47 +122,29 @@ const getSingleNodeAsyncOptions = async (nodeName: string, requestBody: any): Pr // execute custom function node const executeCustomFunction = async (requestBody: any) => { - try { - const appServer = getRunningExpressApp() - const body = requestBody - const functionInputVariables = Object.fromEntries( - [...(body?.javascriptFunction ?? '').matchAll(/\$([a-zA-Z0-9_]+)/g)].map((g) => [g[1], undefined]) - ) - if (functionInputVariables && Object.keys(functionInputVariables).length) { - for (const key in functionInputVariables) { - if (key.includes('vars')) { - delete functionInputVariables[key] - } - } + const appServer = getRunningExpressApp() + const executeData = { + appDataSource: appServer.AppDataSource, + componentNodes: appServer.nodesPool.componentNodes, + data: requestBody, + isExecuteCustomFunction: true + } + + if (process.env.MODE === MODE.QUEUE) { + const predictionQueue = appServer.queueManager.getQueue('prediction') + + const job = await predictionQueue.addJob(omit(executeData, OMIT_QUEUE_JOB_DATA)) + logger.debug(`[server]: Execute Custom Function Job added to queue: ${job.id}`) + + const queueEvents = predictionQueue.getQueueEvents() + const result = await job.waitUntilFinished(queueEvents) + if (!result) { + throw new Error('Failed to execute custom function') } - const nodeData = { inputs: { functionInputVariables, ...body } } - if (Object.prototype.hasOwnProperty.call(appServer.nodesPool.componentNodes, 'customFunction')) { - try { - const nodeInstanceFilePath = appServer.nodesPool.componentNodes['customFunction'].filePath as string - const nodeModule = await import(nodeInstanceFilePath) - const newNodeInstance = new nodeModule.nodeClass() - const options: ICommonObject = { - appDataSource: appServer.AppDataSource, - databaseEntities, - logger - } - - const returnData = await newNodeInstance.init(nodeData, '', options) - const dbResponse = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData - - return dbResponse - } catch (error) { - throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error running custom function: ${error}`) - } - } else { - throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Node customFunction not found`) - } - } catch (error) { - throw new InternalFlowiseError( - StatusCodes.INTERNAL_SERVER_ERROR, - `Error: nodesService.executeCustomFunction - ${getErrorMessage(error)}` - ) + return result + } else { + return await executeCustomNodeFunction(executeData) } } diff --git a/packages/server/src/utils/executeCustomNodeFunction.ts b/packages/server/src/utils/executeCustomNodeFunction.ts new file mode 100644 index 000000000..1695cc17c --- /dev/null +++ b/packages/server/src/utils/executeCustomNodeFunction.ts @@ -0,0 +1,58 @@ +import { handleEscapeCharacters, ICommonObject } from 'flowise-components' +import { databaseEntities } from '.' +import { InternalFlowiseError } from '../errors/internalFlowiseError' +import { StatusCodes } from 'http-status-codes' +import { getErrorMessage } from '../errors/utils' +import { DataSource } from 'typeorm' +import { IComponentNodes } from '../Interface' + +export const executeCustomNodeFunction = async ({ + appDataSource, + componentNodes, + data +}: { + appDataSource: DataSource + componentNodes: IComponentNodes + data: any +}) => { + try { + const body = data + const functionInputVariables = Object.fromEntries( + [...(body?.javascriptFunction ?? '').matchAll(/\$([a-zA-Z0-9_]+)/g)].map((g) => [g[1], undefined]) + ) + if (functionInputVariables && Object.keys(functionInputVariables).length) { + for (const key in functionInputVariables) { + if (key.includes('vars')) { + delete functionInputVariables[key] + } + } + } + const nodeData = { inputs: { functionInputVariables, ...body } } + if (Object.prototype.hasOwnProperty.call(componentNodes, 'customFunction')) { + try { + const nodeInstanceFilePath = componentNodes['customFunction'].filePath as string + const nodeModule = await import(nodeInstanceFilePath) + const newNodeInstance = new nodeModule.nodeClass() + + const options: ICommonObject = { + appDataSource, + databaseEntities + } + + const returnData = await newNodeInstance.init(nodeData, '', options) + const dbResponse = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData + + return dbResponse + } catch (error) { + throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error running custom function: ${error}`) + } + } else { + throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Node customFunction not found`) + } + } catch (error) { + throw new InternalFlowiseError( + StatusCodes.INTERNAL_SERVER_ERROR, + `Error: nodesService.executeCustomFunction - ${getErrorMessage(error)}` + ) + } +}