import { Request } from 'express' import { IFileUpload, convertSpeechToText, ICommonObject, addSingleFileToStorage, addArrayFilesToStorage } from 'flowise-components' import { StatusCodes } from 'http-status-codes' import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage, IChatFlow, IReactFlowEdge } from '../Interface' import { InternalFlowiseError } from '../errors/internalFlowiseError' import { ChatFlow } from '../database/entities/ChatFlow' import { Server } from 'socket.io' import { getRunningExpressApp } from '../utils/getRunningExpressApp' import { mapMimeTypeToInputField, isFlowValidForStream, buildFlow, getTelemetryFlowObj, getAppVersion, resolveVariables, getSessionChatHistory, findMemoryNode, replaceInputsWithConfig, getStartingNodes, isStartNodeDependOnInput, getMemorySessionId, isSameOverrideConfig, getEndingNodes, constructGraphs } from '../utils' import { utilValidateKey } from './validateKey' import { databaseEntities } from '.' import { v4 as uuidv4 } from 'uuid' import { omit } from 'lodash' import * as fs from 'fs' import logger from './logger' import { utilAddChatMessage } from './addChatMesage' import { buildAgentGraph } from './buildAgentGraph' import { getErrorMessage } from '../errors/utils' /** * Build Chatflow * @param {Request} req * @param {Server} socketIO * @param {boolean} isInternal */ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInternal: boolean = false): Promise => { try { const appServer = getRunningExpressApp() const chatflowid = req.params.id const httpProtocol = req.get('x-forwarded-proto') || req.protocol const baseURL = `${httpProtocol}://${req.get('host')}` let incomingInput: IncomingInput = req.body let nodeToExecuteData: INodeData const chatflow = await appServer.AppDataSource.getRepository(ChatFlow).findOneBy({ id: chatflowid }) if (!chatflow) { throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Chatflow ${chatflowid} not found`) } const chatId = incomingInput.chatId ?? incomingInput.overrideConfig?.sessionId ?? uuidv4() const userMessageDateTime = new Date() if (!isInternal) { const isKeyValidated = await utilValidateKey(req, chatflow) if (!isKeyValidated) { throw new InternalFlowiseError(StatusCodes.UNAUTHORIZED, `Unauthorized`) } } let fileUploads: IFileUpload[] = [] if (incomingInput.uploads) { fileUploads = incomingInput.uploads for (let i = 0; i < fileUploads.length; i += 1) { const upload = fileUploads[i] if ((upload.type === 'file' || upload.type === 'audio') && upload.data) { const filename = upload.name const splitDataURI = upload.data.split(',') const bf = Buffer.from(splitDataURI.pop() || '', 'base64') const mime = splitDataURI[0].split(':')[1].split(';')[0] await addSingleFileToStorage(mime, bf, filename, chatflowid, chatId) upload.type = 'stored-file' // Omit upload.data since we don't store the content in database fileUploads[i] = omit(upload, ['data']) } // Run Speech to Text conversion if (upload.mime === 'audio/webm' || upload.mime === 'audio/mp4' || upload.mime === 'audio/ogg') { logger.debug(`Attempting a speech to text conversion...`) let speechToTextConfig: ICommonObject = {} if (chatflow.speechToText) { const speechToTextProviders = JSON.parse(chatflow.speechToText) for (const provider in speechToTextProviders) { const providerObj = speechToTextProviders[provider] if (providerObj.status) { speechToTextConfig = providerObj speechToTextConfig['name'] = provider break } } } if (speechToTextConfig) { const options: ICommonObject = { chatId, chatflowid, appDataSource: appServer.AppDataSource, databaseEntities: databaseEntities } const speechToTextResult = await convertSpeechToText(upload, speechToTextConfig, options) logger.debug(`Speech to text result: ${speechToTextResult}`) if (speechToTextResult) { incomingInput.question = speechToTextResult } } } } } let isStreamValid = false const files = (req.files as Express.Multer.File[]) || [] if (files.length) { const overrideConfig: ICommonObject = { ...req.body } const fileNames: string[] = [] for (const file of files) { const fileBuffer = fs.readFileSync(file.path) const storagePath = await addArrayFilesToStorage(file.mimetype, fileBuffer, file.originalname, fileNames, chatflowid) const fileInputField = mapMimeTypeToInputField(file.mimetype) overrideConfig[fileInputField] = storagePath fs.unlinkSync(file.path) } incomingInput = { question: req.body.question ?? 'hello', overrideConfig, socketIOClientId: req.body.socketIOClientId } } /*** Get chatflows and prepare data ***/ const flowData = chatflow.flowData const parsedFlowData: IReactFlowObject = JSON.parse(flowData) const nodes = parsedFlowData.nodes const edges = parsedFlowData.edges /*** Get session ID ***/ const memoryNode = findMemoryNode(nodes, edges) const memoryType = memoryNode?.data.label let sessionId = getMemorySessionId(memoryNode, incomingInput, chatId, isInternal) /*** Get Ending Node with Directed Graph ***/ const { graph, nodeDependencies } = constructGraphs(nodes, edges) const directedGraph = graph const endingNodes = getEndingNodes(nodeDependencies, directedGraph, nodes) /*** If the graph is an agent graph, build the agent response ***/ if (endingNodes.filter((node) => node.data.category === 'Multi Agents').length) { return await utilBuildAgentResponse( chatflow, isInternal, chatId, memoryType ?? '', sessionId, userMessageDateTime, fileUploads, incomingInput, nodes, edges, socketIO, baseURL ) } // Get prepend messages const prependMessages = incomingInput.history /* Reuse the flow without having to rebuild (to avoid duplicated upsert, recomputation, reinitialization of memory) when all these conditions met: * - Reuse of flows is not disabled * - Node Data already exists in pool * - Still in sync (i.e the flow has not been modified since) * - Existing overrideConfig and new overrideConfig are the same * - Flow doesn't start with/contain nodes that depend on incomingInput.question ***/ const isFlowReusable = () => { return ( process.env.DISABLE_CHATFLOW_REUSE !== 'true' && Object.prototype.hasOwnProperty.call(appServer.chatflowPool.activeChatflows, chatflowid) && appServer.chatflowPool.activeChatflows[chatflowid].inSync && appServer.chatflowPool.activeChatflows[chatflowid].endingNodeData && isSameOverrideConfig( isInternal, appServer.chatflowPool.activeChatflows[chatflowid].overrideConfig, incomingInput.overrideConfig ) && !isStartNodeDependOnInput(appServer.chatflowPool.activeChatflows[chatflowid].startingNodes, nodes) ) } if (isFlowReusable()) { nodeToExecuteData = appServer.chatflowPool.activeChatflows[chatflowid].endingNodeData as INodeData isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData) logger.debug( `[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})` ) } else { const isCustomFunctionEndingNode = endingNodes.some((node) => node.data?.outputs?.output === 'EndingNode') for (const endingNode of endingNodes) { const endingNodeData = endingNode.data const isEndingNode = endingNodeData?.outputs?.output === 'EndingNode' // Once custom function ending node exists, no need to do follow-up checks. if (isEndingNode) continue if ( endingNodeData.outputs && Object.keys(endingNodeData.outputs).length && !Object.values(endingNodeData.outputs ?? {}).includes(endingNodeData.name) ) { throw new InternalFlowiseError( StatusCodes.INTERNAL_SERVER_ERROR, `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` ) } isStreamValid = isFlowValidForStream(nodes, endingNodeData) } // Once custom function ending node exists, flow is always unavailable to stream isStreamValid = isCustomFunctionEndingNode ? false : isStreamValid let chatHistory: IMessage[] = [] // When {{chat_history}} is used in Format Prompt Value, fetch the chat conversations from memory node for (const endingNode of endingNodes) { const endingNodeData = endingNode.data if (!endingNodeData.inputs?.memory) continue const memoryNodeId = endingNodeData.inputs?.memory.split('.')[0].replace('{{', '') const memoryNode = nodes.find((node) => node.data.id === memoryNodeId) if (!memoryNode) continue chatHistory = await getSessionChatHistory( chatflowid, getMemorySessionId(memoryNode, incomingInput, chatId, isInternal), memoryNode, appServer.nodesPool.componentNodes, appServer.AppDataSource, databaseEntities, logger, prependMessages ) } /*** Get Starting Nodes with Reversed Graph ***/ const constructedObj = constructGraphs(nodes, edges, { isReversed: true }) const nonDirectedGraph = constructedObj.graph let startingNodeIds: string[] = [] let depthQueue: IDepthQueue = {} const endingNodeIds = endingNodes.map((n) => n.id) for (const endingNodeId of endingNodeIds) { const resx = getStartingNodes(nonDirectedGraph, endingNodeId) startingNodeIds.push(...resx.startingNodeIds) depthQueue = Object.assign(depthQueue, resx.depthQueue) } startingNodeIds = [...new Set(startingNodeIds)] const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) logger.debug(`[server]: Start building chatflow ${chatflowid}`) /*** BFS to traverse from Starting Nodes to Ending Node ***/ const reactFlowNodes = await buildFlow( startingNodeIds, nodes, edges, graph, depthQueue, appServer.nodesPool.componentNodes, incomingInput.question, chatHistory, chatId, sessionId ?? '', chatflowid, appServer.AppDataSource, incomingInput?.overrideConfig, appServer.cachePool, false, undefined, incomingInput.uploads, baseURL, socketIO, incomingInput.socketIOClientId ) const nodeToExecute = endingNodeIds.length === 1 ? reactFlowNodes.find((node: IReactFlowNode) => endingNodeIds[0] === node.id) : reactFlowNodes[reactFlowNodes.length - 1] if (!nodeToExecute) { throw new InternalFlowiseError(StatusCodes.NOT_FOUND, `Node not found`) } if (incomingInput.overrideConfig) { nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig) } const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question, chatHistory) nodeToExecuteData = reactFlowNodeData appServer.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) } logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) const nodeInstanceFilePath = appServer.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string const nodeModule = await import(nodeInstanceFilePath) const nodeInstance = new nodeModule.nodeClass({ sessionId }) let result = isStreamValid ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatId, chatflowid, logger, appDataSource: appServer.AppDataSource, databaseEntities, analytic: chatflow.analytic, uploads: incomingInput.uploads, socketIO, socketIOClientId: incomingInput.socketIOClientId, prependMessages }) : await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatId, chatflowid, logger, appDataSource: appServer.AppDataSource, databaseEntities, analytic: chatflow.analytic, uploads: incomingInput.uploads, prependMessages }) result = typeof result === 'string' ? { text: result } : result // Retrieve threadId from assistant if exists if (typeof result === 'object' && result.assistant) { sessionId = result.assistant.threadId } const userMessage: Omit = { role: 'userMessage', content: incomingInput.question, chatflowid, chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, chatId, memoryType, sessionId, createdDate: userMessageDateTime, fileUploads: incomingInput.uploads ? JSON.stringify(fileUploads) : undefined, leadEmail: incomingInput.leadEmail } await utilAddChatMessage(userMessage) let resultText = '' if (result.text) resultText = result.text else if (result.json) resultText = '```json\n' + JSON.stringify(result.json, null, 2) else resultText = JSON.stringify(result, null, 2) const apiMessage: Omit = { role: 'apiMessage', content: resultText, chatflowid, chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, chatId, memoryType, sessionId } if (result?.sourceDocuments) apiMessage.sourceDocuments = JSON.stringify(result.sourceDocuments) if (result?.usedTools) apiMessage.usedTools = JSON.stringify(result.usedTools) if (result?.fileAnnotations) apiMessage.fileAnnotations = JSON.stringify(result.fileAnnotations) const chatMessage = await utilAddChatMessage(apiMessage) logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) await appServer.telemetry.sendTelemetry('prediction_sent', { version: await getAppVersion(), chatflowId: chatflowid, chatId, type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, flowGraph: getTelemetryFlowObj(nodes, edges) }) // Prepare response // return the question in the response // this is used when input text is empty but question is in audio format result.question = incomingInput.question result.chatId = chatId result.chatMessageId = chatMessage?.id if (sessionId) result.sessionId = sessionId if (memoryType) result.memoryType = memoryType return result } catch (e) { logger.error('[server]: Error:', e) throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, getErrorMessage(e)) } } const utilBuildAgentResponse = async ( chatflow: IChatFlow, isInternal: boolean, chatId: string, memoryType: string, sessionId: string, userMessageDateTime: Date, fileUploads: IFileUpload[], incomingInput: ICommonObject, nodes: IReactFlowNode[], edges: IReactFlowEdge[], socketIO?: Server, baseURL?: string ) => { try { const appServer = getRunningExpressApp() const streamResults = await buildAgentGraph(chatflow, chatId, sessionId, incomingInput, baseURL, socketIO) if (streamResults) { const { finalResult, agentReasoning } = streamResults const userMessage: Omit = { role: 'userMessage', content: incomingInput.question, chatflowid: chatflow.id, chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, chatId, memoryType, sessionId, createdDate: userMessageDateTime, fileUploads: incomingInput.uploads ? JSON.stringify(fileUploads) : undefined, leadEmail: incomingInput.leadEmail } await utilAddChatMessage(userMessage) const apiMessage: Omit = { role: 'apiMessage', content: finalResult, chatflowid: chatflow.id, chatType: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, chatId, memoryType, sessionId } if (agentReasoning.length) apiMessage.agentReasoning = JSON.stringify(agentReasoning) const chatMessage = await utilAddChatMessage(apiMessage) await appServer.telemetry.sendTelemetry('prediction_sent', { version: await getAppVersion(), chatlowId: chatflow.id, chatId, type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, flowGraph: getTelemetryFlowObj(nodes, edges) }) // Prepare response let result: ICommonObject = {} result.text = finalResult result.question = incomingInput.question result.chatId = chatId result.chatMessageId = chatMessage?.id if (sessionId) result.sessionId = sessionId if (memoryType) result.memoryType = memoryType if (agentReasoning.length) result.agentReasoning = agentReasoning await appServer.telemetry.sendTelemetry('graph_compiled', { version: await getAppVersion(), graphId: chatflow.id, type: isInternal ? chatType.INTERNAL : chatType.EXTERNAL, flowGraph: getTelemetryFlowObj(nodes, edges) }) return result } return undefined } catch (e) { logger.error('[server]: Error:', e) throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, getErrorMessage(e)) } }