Merge pull request #111 from FlowiseAI/feature/ChildProcessing

This commit is contained in:
Ong Chung Yau 2023-05-18 18:29:32 +07:00 committed by GitHub
commit 0c16bcad74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 337 additions and 69 deletions

View File

@ -1,3 +1,4 @@
PORT=3000
# USERNAME=user
# PASSWORD=1234
# PASSWORD=1234
# EXECUTION_MODE=child or main

View File

@ -0,0 +1,148 @@
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
import { buildLangchain, constructGraphs, getEndingNode, getStartingNodes, resolveVariables } from './utils'
export class ChildProcess {
/**
* Stop child process when app is killed
*/
static async stopChildProcess() {
setTimeout(() => {
process.exit(0)
}, 50000)
}
/**
* Process prediction
* @param {IRunChatflowMessageValue} messageValue
* @return {Promise<void>}
*/
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
process.on('SIGTERM', ChildProcess.stopChildProcess)
process.on('SIGINT', ChildProcess.stopChildProcess)
await sendToParentProcess('start', '_')
// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, incomingInput, componentNodes } = messageValue
let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (endingNodeData) {
nodeToExecuteData = endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
await sendToParentProcess(
'error',
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
return
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
componentNodes,
incomingInput.question,
incomingInput?.overrideConfig
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) {
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
return
}
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
addToChatFlowPool = {
chatflowid: chatflow.id,
nodeToExecuteData,
startingNodes,
overrideConfig: incomingInput?.overrideConfig
}
}
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
await sendToParentProcess('finish', { result, addToChatFlowPool })
}
}
/**
* Send data back to parent process
* @param {string} key Key of message
* @param {*} value Value of message
* @returns {Promise<void>}
*/
async function sendToParentProcess(key: string, value: any): Promise<void> {
// tslint:disable-line:no-any
return new Promise((resolve, reject) => {
process.send!(
{
key,
value
},
(error: Error) => {
if (error) {
return reject(error)
}
resolve()
}
)
})
}
const childProcess = new ChildProcess()
process.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'start') {
await childProcess.runChildProcess(message.value)
process.exit()
}
})

View File

@ -138,3 +138,15 @@ export interface IDatabaseExport {
chatflows: IChatFlow[]
apikeys: ICommonObject[]
}
export interface IRunChatflowMessageValue {
chatflow: IChatFlow
incomingInput: IncomingInput
componentNodes: IComponentNodes
endingNodeData?: INodeData
}
export interface IChildProcessMessage {
key: string
value?: any
}

View File

@ -6,7 +6,16 @@ import http from 'http'
import * as fs from 'fs'
import basicAuth from 'express-basic-auth'
import { IChatFlow, IncomingInput, IReactFlowNode, IReactFlowObject, INodeData, IDatabaseExport } from './Interface'
import {
IChatFlow,
IncomingInput,
IReactFlowNode,
IReactFlowObject,
INodeData,
IDatabaseExport,
IRunChatflowMessageValue,
IChildProcessMessage
} from './Interface'
import {
getNodeModulesPackagePath,
getStartingNodes,
@ -32,6 +41,7 @@ import { ChatFlow } from './entity/ChatFlow'
import { ChatMessage } from './entity/ChatMessage'
import { ChatflowPool } from './ChatflowPool'
import { ICommonObject } from 'flowise-components'
import { fork } from 'child_process'
export class App {
app: express.Application
@ -369,6 +379,12 @@ export class App {
})
}
/**
* Validate API Key
* @param {Request} req
* @param {Response} res
* @param {ChatFlow} chatflow
*/
async validateKey(req: Request, res: Response, chatflow: ChatFlow) {
const chatFlowApiKeyId = chatflow.apikeyid
const authorizationHeader = (req.headers['Authorization'] as string) ?? (req.headers['authorization'] as string) ?? ''
@ -383,6 +399,73 @@ export class App {
}
}
/**
* Start child process
* @param {ChatFlow} chatflow
* @param {IncomingInput} incomingInput
* @param {INodeData} endingNodeData
*/
async startChildProcess(chatflow: ChatFlow, incomingInput: IncomingInput, endingNodeData?: INodeData) {
try {
const controller = new AbortController()
const { signal } = controller
let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js')
if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts'
const childProcess = fork(childpath, [], { signal })
const value = {
chatflow,
incomingInput,
componentNodes: cloneDeep(this.nodesPool.componentNodes),
endingNodeData
} as IRunChatflowMessageValue
childProcess.send({ key: 'start', value } as IChildProcessMessage)
let childProcessTimeout: NodeJS.Timeout
return new Promise((resolve, reject) => {
childProcess.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'finish') {
const { result, addToChatFlowPool } = message.value as ICommonObject
if (childProcessTimeout) {
clearTimeout(childProcessTimeout)
}
if (Object.keys(addToChatFlowPool).length) {
const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig)
}
resolve(result)
}
if (message.key === 'start') {
if (process.env.EXECUTION_TIMEOUT) {
childProcessTimeout = setTimeout(async () => {
childProcess.kill()
resolve(undefined)
}, parseInt(process.env.EXECUTION_TIMEOUT, 10))
}
}
if (message.key === 'error') {
let errMessage = message.value as string
if (childProcessTimeout) {
clearTimeout(childProcessTimeout)
}
reject(errMessage)
}
})
})
} catch (err) {
console.error(err)
}
}
/**
* Process Prediction
* @param {Request} req
* @param {Response} res
* @param {boolean} isInternal
*/
async processPrediction(req: Request, res: Response, isInternal = false) {
try {
const chatflowid = req.params.id
@ -427,78 +510,102 @@ export class App {
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
this.chatflowPool.activeChatflows[chatflowid].inSync &&
isSameOverrideConfig(
isInternal,
this.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes)
) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
incomingInput?.overrideConfig
const isRebuildNeeded = () => {
return (
Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) &&
this.chatflowPool.activeChatflows[chatflowid].inSync &&
isSameOverrideConfig(
isInternal,
this.chatflowPool.activeChatflows[chatflowid].overrideConfig,
incomingInput.overrideConfig
) &&
!isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes)
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
if (process.env.EXECUTION_MODE === 'child') {
if (isRebuildNeeded()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
try {
const result = await this.startChildProcess(chatflow, incomingInput, nodeToExecuteData)
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
} else {
try {
const result = await this.startChildProcess(chatflow, incomingInput)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
}
} else {
if (isRebuildNeeded()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
return res.json(result)
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`)
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
incomingInput?.overrideConfig
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatHistory: incomingInput.history })
return res.json(result)
}
} catch (e: any) {
return res.status(500).send(e.message)
}