From df7a3995cddf44c44b4f3afdc4a199a42df0534f Mon Sep 17 00:00:00 2001 From: Henry Date: Wed, 19 Apr 2023 23:31:41 +0100 Subject: [PATCH] add chatpools data --- packages/server/src/ChatflowPool.ts | 6 ++- packages/server/src/Interface.ts | 1 + packages/server/src/index.ts | 77 +++++++++++++++-------------- packages/server/src/utils/index.ts | 6 +-- 4 files changed, 48 insertions(+), 42 deletions(-) diff --git a/packages/server/src/ChatflowPool.ts b/packages/server/src/ChatflowPool.ts index 125f7f57a..3b363a8b3 100644 --- a/packages/server/src/ChatflowPool.ts +++ b/packages/server/src/ChatflowPool.ts @@ -1,4 +1,4 @@ -import { IActiveChatflows, INodeData } from './Interface' +import { IActiveChatflows, INodeData, IReactFlowNode } from './Interface' /** * This pool is to keep track of active chatflow pools @@ -11,9 +11,11 @@ export class ChatflowPool { * Add to the pool * @param {string} chatflowid * @param {INodeData} endingNodeData + * @param {IReactFlowNode[]} startingNodes */ - add(chatflowid: string, endingNodeData: INodeData) { + add(chatflowid: string, endingNodeData: INodeData, startingNodes: IReactFlowNode[]) { this.activeChatflows[chatflowid] = { + startingNodes, endingNodeData, inSync: true } diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index cc83e1a88..822b6343c 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -117,6 +117,7 @@ export interface IncomingInput { export interface IActiveChatflows { [key: string]: { + startingNodes: IReactFlowNode[] endingNodeData: INodeData inSync: boolean } diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 7115a030b..7246e7c82 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -12,7 +12,7 @@ import { getEndingNode, constructGraphs, resolveVariables, - checkStartNodeDependOnInput + isStartNodeDependOnInput } from './utils' import { cloneDeep } from 'lodash' import { getDataSource } from './DataSource' @@ -203,39 +203,6 @@ export class App { let nodeToExecuteData: INodeData - /*** Get chatflows and prepare data ***/ - const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ - id: chatflowid - }) - if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) - - const flowData = chatflow.flowData - const parsedFlowData: IReactFlowObject = JSON.parse(flowData) - const nodes = parsedFlowData.nodes - const edges = parsedFlowData.edges - - /*** Get Ending Node with Directed Graph ***/ - const { graph, nodeDependencies } = constructGraphs(nodes, edges) - const directedGraph = graph - const endingNodeId = getEndingNode(nodeDependencies, directedGraph) - if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`) - - const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data - if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`) - - if (endingNodeData.outputs && !Object.values(endingNodeData.outputs).includes(endingNodeData.name)) { - return res - .status(500) - .send( - `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` - ) - } - - /*** Get Starting Nodes with Non-Directed Graph ***/ - const constructedObj = constructGraphs(nodes, edges, true) - const nonDirectedGraph = constructedObj.graph - const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) - /* Check if: * - Node Data already exists in pool * - Still in sync (i.e the flow has not been modified since) @@ -244,10 +211,47 @@ export class App { if ( Object.prototype.hasOwnProperty.call(this.chatflowPool.activeChatflows, chatflowid) && this.chatflowPool.activeChatflows[chatflowid].inSync && - !checkStartNodeDependOnInput(nodes, startingNodeIds) + !isStartNodeDependOnInput(this.chatflowPool.activeChatflows[chatflowid].startingNodes) ) { nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData } else { + /*** Get chatflows and prepare data ***/ + const chatflow = await this.AppDataSource.getRepository(ChatFlow).findOneBy({ + id: chatflowid + }) + if (!chatflow) return res.status(404).send(`Chatflow ${chatflowid} not found`) + + const flowData = chatflow.flowData + const parsedFlowData: IReactFlowObject = JSON.parse(flowData) + const nodes = parsedFlowData.nodes + const edges = parsedFlowData.edges + + /*** Get Ending Node with Directed Graph ***/ + const { graph, nodeDependencies } = constructGraphs(nodes, edges) + const directedGraph = graph + const endingNodeId = getEndingNode(nodeDependencies, directedGraph) + if (!endingNodeId) return res.status(500).send(`Ending node must be either a Chain or Agent`) + + const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data + if (!endingNodeData) return res.status(500).send(`Ending node must be either a Chain or Agent`) + + if ( + endingNodeData.outputs && + Object.keys(endingNodeData.outputs).length && + !Object.values(endingNodeData.outputs).includes(endingNodeData.name) + ) { + return res + .status(500) + .send( + `Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction` + ) + } + + /*** Get Starting Nodes with Non-Directed Graph ***/ + const constructedObj = constructGraphs(nodes, edges, true) + const nonDirectedGraph = constructedObj.graph + const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId) + /*** BFS to traverse from Starting Nodes to Ending Node ***/ const reactFlowNodes = await buildLangchain( startingNodeIds, @@ -264,7 +268,8 @@ export class App { const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question) nodeToExecuteData = reactFlowNodeData - this.chatflowPool.add(chatflowid, nodeToExecuteData) + const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id)) + this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes) } const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 8ec72328d..1e2a91e96 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -350,12 +350,10 @@ export const resolveVariables = (reactFlowNodeData: INodeData, reactFlowNodes: I /** * Rebuild flow if LLMChain has dependency on other chains * User Question => Prompt_0 => LLMChain_0 => Prompt-1 => LLMChain_1 - * @param {IReactFlowNode[]} nodes - * @param {string[]} startingNodeIds + * @param {IReactFlowNode[]} startingNodes * @returns {boolean} */ -export const checkStartNodeDependOnInput = (nodes: IReactFlowNode[], startingNodeIds: string[]) => { - const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id) && nd.id.toLowerCase().includes('prompttemplate')) +export const isStartNodeDependOnInput = (startingNodes: IReactFlowNode[]): boolean => { for (const node of startingNodes) { for (const inputName in node.data.inputs) { const inputVariables = getInputVariables(node.data.inputs[inputName])