Bugfix/pass execute custom function to worker (#4432)
pass execute custom function to worker
This commit is contained in:
parent
a6e64230b4
commit
0a4570ecda
|
|
@ -10,6 +10,7 @@ import { RedisOptions } from 'bullmq'
|
||||||
import logger from '../utils/logger'
|
import logger from '../utils/logger'
|
||||||
import { generateAgentflowv2 as generateAgentflowv2_json } from 'flowise-components'
|
import { generateAgentflowv2 as generateAgentflowv2_json } from 'flowise-components'
|
||||||
import { databaseEntities } from '../utils'
|
import { databaseEntities } from '../utils'
|
||||||
|
import { executeCustomNodeFunction } from '../utils/executeCustomNodeFunction'
|
||||||
|
|
||||||
interface PredictionQueueOptions {
|
interface PredictionQueueOptions {
|
||||||
appDataSource: DataSource
|
appDataSource: DataSource
|
||||||
|
|
@ -65,7 +66,7 @@ export class PredictionQueue extends BaseQueue {
|
||||||
if (this.redisPublisher) data.sseStreamer = this.redisPublisher
|
if (this.redisPublisher) data.sseStreamer = this.redisPublisher
|
||||||
|
|
||||||
if (Object.prototype.hasOwnProperty.call(data, 'isAgentFlowGenerator')) {
|
if (Object.prototype.hasOwnProperty.call(data, 'isAgentFlowGenerator')) {
|
||||||
logger.info('Generating Agentflow...')
|
logger.info(`Generating Agentflow...`)
|
||||||
const { prompt, componentNodes, toolNodes, selectedChatModel, question } = data as IGenerateAgentflowv2Params
|
const { prompt, componentNodes, toolNodes, selectedChatModel, question } = data as IGenerateAgentflowv2Params
|
||||||
const options: Record<string, any> = {
|
const options: Record<string, any> = {
|
||||||
appDataSource: this.appDataSource,
|
appDataSource: this.appDataSource,
|
||||||
|
|
@ -75,6 +76,15 @@ export class PredictionQueue extends BaseQueue {
|
||||||
return await generateAgentflowv2_json({ prompt, componentNodes, toolNodes, selectedChatModel }, question, options)
|
return await generateAgentflowv2_json({ prompt, componentNodes, toolNodes, selectedChatModel }, question, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (Object.prototype.hasOwnProperty.call(data, 'isExecuteCustomFunction')) {
|
||||||
|
logger.info(`Executing Custom Function...`)
|
||||||
|
return await executeCustomNodeFunction({
|
||||||
|
appDataSource: this.appDataSource,
|
||||||
|
componentNodes: this.componentNodes,
|
||||||
|
data
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
if (this.abortControllerPool) {
|
if (this.abortControllerPool) {
|
||||||
const abortControllerId = `${data.chatflow.id}_${data.chatId}`
|
const abortControllerId = `${data.chatflow.id}_${data.chatId}`
|
||||||
const signal = new AbortController()
|
const signal = new AbortController()
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,14 @@
|
||||||
import { cloneDeep } from 'lodash'
|
import { cloneDeep, omit } from 'lodash'
|
||||||
import { StatusCodes } from 'http-status-codes'
|
import { StatusCodes } from 'http-status-codes'
|
||||||
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
|
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
|
||||||
import { INodeData } from '../../Interface'
|
import { INodeData, MODE } from '../../Interface'
|
||||||
import { INodeOptionsValue, ICommonObject, handleEscapeCharacters } from 'flowise-components'
|
import { INodeOptionsValue } from 'flowise-components'
|
||||||
import { databaseEntities } from '../../utils'
|
import { databaseEntities } from '../../utils'
|
||||||
import logger from '../../utils/logger'
|
import logger from '../../utils/logger'
|
||||||
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
|
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
|
||||||
import { getErrorMessage } from '../../errors/utils'
|
import { getErrorMessage } from '../../errors/utils'
|
||||||
|
import { OMIT_QUEUE_JOB_DATA } from '../../utils/constants'
|
||||||
|
import { executeCustomNodeFunction } from '../../utils/executeCustomNodeFunction'
|
||||||
|
|
||||||
// Get all component nodes
|
// Get all component nodes
|
||||||
const getAllNodes = async () => {
|
const getAllNodes = async () => {
|
||||||
|
|
@ -120,47 +122,29 @@ const getSingleNodeAsyncOptions = async (nodeName: string, requestBody: any): Pr
|
||||||
|
|
||||||
// execute custom function node
|
// execute custom function node
|
||||||
const executeCustomFunction = async (requestBody: any) => {
|
const executeCustomFunction = async (requestBody: any) => {
|
||||||
try {
|
const appServer = getRunningExpressApp()
|
||||||
const appServer = getRunningExpressApp()
|
const executeData = {
|
||||||
const body = requestBody
|
appDataSource: appServer.AppDataSource,
|
||||||
const functionInputVariables = Object.fromEntries(
|
componentNodes: appServer.nodesPool.componentNodes,
|
||||||
[...(body?.javascriptFunction ?? '').matchAll(/\$([a-zA-Z0-9_]+)/g)].map((g) => [g[1], undefined])
|
data: requestBody,
|
||||||
)
|
isExecuteCustomFunction: true
|
||||||
if (functionInputVariables && Object.keys(functionInputVariables).length) {
|
}
|
||||||
for (const key in functionInputVariables) {
|
|
||||||
if (key.includes('vars')) {
|
if (process.env.MODE === MODE.QUEUE) {
|
||||||
delete functionInputVariables[key]
|
const predictionQueue = appServer.queueManager.getQueue('prediction')
|
||||||
}
|
|
||||||
}
|
const job = await predictionQueue.addJob(omit(executeData, OMIT_QUEUE_JOB_DATA))
|
||||||
|
logger.debug(`[server]: Execute Custom Function Job added to queue: ${job.id}`)
|
||||||
|
|
||||||
|
const queueEvents = predictionQueue.getQueueEvents()
|
||||||
|
const result = await job.waitUntilFinished(queueEvents)
|
||||||
|
if (!result) {
|
||||||
|
throw new Error('Failed to execute custom function')
|
||||||
}
|
}
|
||||||
const nodeData = { inputs: { functionInputVariables, ...body } }
|
|
||||||
if (Object.prototype.hasOwnProperty.call(appServer.nodesPool.componentNodes, 'customFunction')) {
|
|
||||||
try {
|
|
||||||
const nodeInstanceFilePath = appServer.nodesPool.componentNodes['customFunction'].filePath as string
|
|
||||||
const nodeModule = await import(nodeInstanceFilePath)
|
|
||||||
const newNodeInstance = new nodeModule.nodeClass()
|
|
||||||
|
|
||||||
const options: ICommonObject = {
|
return result
|
||||||
appDataSource: appServer.AppDataSource,
|
} else {
|
||||||
databaseEntities,
|
return await executeCustomNodeFunction(executeData)
|
||||||
logger
|
|
||||||
}
|
|
||||||
|
|
||||||
const returnData = await newNodeInstance.init(nodeData, '', options)
|
|
||||||
const dbResponse = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
|
|
||||||
|
|
||||||
return dbResponse
|
|
||||||
} catch (error) {
|
|
||||||
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error running custom function: ${error}`)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Node customFunction not found`)
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
throw new InternalFlowiseError(
|
|
||||||
StatusCodes.INTERNAL_SERVER_ERROR,
|
|
||||||
`Error: nodesService.executeCustomFunction - ${getErrorMessage(error)}`
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
import { handleEscapeCharacters, ICommonObject } from 'flowise-components'
|
||||||
|
import { databaseEntities } from '.'
|
||||||
|
import { InternalFlowiseError } from '../errors/internalFlowiseError'
|
||||||
|
import { StatusCodes } from 'http-status-codes'
|
||||||
|
import { getErrorMessage } from '../errors/utils'
|
||||||
|
import { DataSource } from 'typeorm'
|
||||||
|
import { IComponentNodes } from '../Interface'
|
||||||
|
|
||||||
|
export const executeCustomNodeFunction = async ({
|
||||||
|
appDataSource,
|
||||||
|
componentNodes,
|
||||||
|
data
|
||||||
|
}: {
|
||||||
|
appDataSource: DataSource
|
||||||
|
componentNodes: IComponentNodes
|
||||||
|
data: any
|
||||||
|
}) => {
|
||||||
|
try {
|
||||||
|
const body = data
|
||||||
|
const functionInputVariables = Object.fromEntries(
|
||||||
|
[...(body?.javascriptFunction ?? '').matchAll(/\$([a-zA-Z0-9_]+)/g)].map((g) => [g[1], undefined])
|
||||||
|
)
|
||||||
|
if (functionInputVariables && Object.keys(functionInputVariables).length) {
|
||||||
|
for (const key in functionInputVariables) {
|
||||||
|
if (key.includes('vars')) {
|
||||||
|
delete functionInputVariables[key]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const nodeData = { inputs: { functionInputVariables, ...body } }
|
||||||
|
if (Object.prototype.hasOwnProperty.call(componentNodes, 'customFunction')) {
|
||||||
|
try {
|
||||||
|
const nodeInstanceFilePath = componentNodes['customFunction'].filePath as string
|
||||||
|
const nodeModule = await import(nodeInstanceFilePath)
|
||||||
|
const newNodeInstance = new nodeModule.nodeClass()
|
||||||
|
|
||||||
|
const options: ICommonObject = {
|
||||||
|
appDataSource,
|
||||||
|
databaseEntities
|
||||||
|
}
|
||||||
|
|
||||||
|
const returnData = await newNodeInstance.init(nodeData, '', options)
|
||||||
|
const dbResponse = typeof returnData === 'string' ? handleEscapeCharacters(returnData, true) : returnData
|
||||||
|
|
||||||
|
return dbResponse
|
||||||
|
} catch (error) {
|
||||||
|
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error running custom function: ${error}`)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Node customFunction not found`)
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
throw new InternalFlowiseError(
|
||||||
|
StatusCodes.INTERNAL_SERVER_ERROR,
|
||||||
|
`Error: nodesService.executeCustomFunction - ${getErrorMessage(error)}`
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue