diff --git a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts index 42686ae0f..c2d0e782f 100644 --- a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts +++ b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts @@ -265,7 +265,7 @@ class OpenAIAssistant_Agents implements INode { // Start tool analytics const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds) - const toolOutput = await tool.call(actions[i].toolInput) + const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, threadId) // End tool analytics await analyticHandlers.onToolEnd(toolIds, toolOutput) diff --git a/packages/components/nodes/agents/OpenAIFunctionAgent/OpenAIFunctionAgent.ts b/packages/components/nodes/agents/OpenAIFunctionAgent/OpenAIFunctionAgent.ts index c019ca5a9..c0095cee1 100644 --- a/packages/components/nodes/agents/OpenAIFunctionAgent/OpenAIFunctionAgent.ts +++ b/packages/components/nodes/agents/OpenAIFunctionAgent/OpenAIFunctionAgent.ts @@ -1,10 +1,17 @@ -import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' -import { initializeAgentExecutorWithOptions, AgentExecutor } from 'langchain/agents' -import { getBaseClasses, mapChatHistory } from '../../../src/utils' -import { BaseLanguageModel } from 'langchain/base_language' +import { FlowiseMemory, ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' +import { AgentExecutor as LCAgentExecutor, AgentExecutorInput } from 'langchain/agents' +import { ChainValues, AgentStep, AgentFinish, AgentAction, BaseMessage, FunctionMessage, AIMessage } from 'langchain/schema' +import { OutputParserException } from 'langchain/schema/output_parser' +import { CallbackManagerForChainRun } from 'langchain/callbacks' +import { formatToOpenAIFunction } from 'langchain/tools' +import { ToolInputParsingException, Tool } from '@langchain/core/tools' +import { getBaseClasses } from '../../../src/utils' import { flatten } from 'lodash' -import { BaseChatMemory } from 'langchain/memory' +import { RunnableSequence } from 'langchain/schema/runnable' import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler' +import { ChatPromptTemplate, MessagesPlaceholder } from 'langchain/prompts' +import { ChatOpenAI } from 'langchain/chat_models/openai' +import { OpenAIFunctionsAgentOutputParser } from 'langchain/agents/openai/output_parser' class OpenAIFunctionAgent_Agents implements INode { label: string @@ -16,8 +23,9 @@ class OpenAIFunctionAgent_Agents implements INode { category: string baseClasses: string[] inputs: INodeParams[] + sessionId?: string - constructor() { + constructor(fields: { sessionId?: string }) { this.label = 'OpenAI Function Agent' this.name = 'openAIFunctionAgent' this.version = 3.0 @@ -25,7 +33,7 @@ class OpenAIFunctionAgent_Agents implements INode { this.category = 'Agents' this.icon = 'function.svg' this.description = `An agent that uses Function Calling to pick the tool and args to call` - this.baseClasses = [this.type, ...getBaseClasses(AgentExecutor)] + this.baseClasses = [this.type, ...getBaseClasses(LCAgentExecutor)] this.inputs = [ { label: 'Allowed Tools', @@ -52,54 +60,324 @@ class OpenAIFunctionAgent_Agents implements INode { additionalParams: true } ] + this.sessionId = fields?.sessionId } async init(nodeData: INodeData): Promise { - const model = nodeData.inputs?.model as BaseLanguageModel - const memory = nodeData.inputs?.memory as BaseChatMemory - const systemMessage = nodeData.inputs?.systemMessage as string + const memory = nodeData.inputs?.memory as FlowiseMemory - let tools = nodeData.inputs?.tools - tools = flatten(tools) - - const executor = await initializeAgentExecutorWithOptions(tools, model, { - agentType: 'openai-functions', - verbose: process.env.DEBUG === 'true' ? true : false, - agentArgs: { - prefix: systemMessage ?? `You are a helpful AI assistant.` - } - }) + const executor = prepareAgent(nodeData, this.sessionId) if (memory) executor.memory = memory return executor } async run(nodeData: INodeData, input: string, options: ICommonObject): Promise { - const executor = nodeData.instance as AgentExecutor - const memory = nodeData.inputs?.memory as BaseChatMemory + const memory = nodeData.inputs?.memory as FlowiseMemory - if (options && options.chatHistory) { - const chatHistoryClassName = memory.chatHistory.constructor.name - // Only replace when its In-Memory - if (chatHistoryClassName && chatHistoryClassName === 'ChatMessageHistory') { - memory.chatHistory = mapChatHistory(options) - executor.memory = memory - } - } - - ;(executor.memory as any).returnMessages = true // Return true for BaseChatModel + const executor = prepareAgent(nodeData, this.sessionId) const loggerHandler = new ConsoleCallbackHandler(options.logger) const callbacks = await additionalCallbacks(nodeData, options) + let res: ChainValues = {} + if (options.socketIO && options.socketIOClientId) { const handler = new CustomChainHandler(options.socketIO, options.socketIOClientId) - const result = await executor.run(input, [loggerHandler, handler, ...callbacks]) - return result + res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] }) } else { - const result = await executor.run(input, [loggerHandler, ...callbacks]) - return result + res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] }) } + + await memory.addChatMessages( + [ + { + text: input, + type: 'userMessage' + }, + { + text: res?.output, + type: 'apiMessage' + } + ], + this.sessionId + ) + + return res?.output + } +} + +const formatAgentSteps = (steps: AgentStep[]): BaseMessage[] => + steps.flatMap(({ action, observation }) => { + if ('messageLog' in action && action.messageLog !== undefined) { + const log = action.messageLog as BaseMessage[] + return log.concat(new FunctionMessage(observation, action.tool)) + } else { + return [new AIMessage(action.log)] + } + }) + +const prepareAgent = (nodeData: INodeData, sessionId?: string) => { + const model = nodeData.inputs?.model as ChatOpenAI + const memory = nodeData.inputs?.memory as FlowiseMemory + const systemMessage = nodeData.inputs?.systemMessage as string + let tools = nodeData.inputs?.tools + tools = flatten(tools) + const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history' + const inputKey = memory.inputKey ? memory.inputKey : 'input' + + const prompt = ChatPromptTemplate.fromMessages([ + ['ai', systemMessage ? systemMessage : `You are a helpful AI assistant.`], + new MessagesPlaceholder(memoryKey), + ['human', `{${inputKey}}`], + new MessagesPlaceholder('agent_scratchpad') + ]) + + const modelWithFunctions = model.bind({ + functions: [...tools.map((tool: any) => formatToOpenAIFunction(tool))] + }) + + const runnableAgent = RunnableSequence.from([ + { + [inputKey]: (i: { input: string; steps: AgentStep[] }) => i.input, + agent_scratchpad: (i: { input: string; steps: AgentStep[] }) => formatAgentSteps(i.steps), + [memoryKey]: async (_: { input: string; steps: AgentStep[] }) => { + const messages = (await memory.getChatMessages(sessionId, true)) as BaseMessage[] + return messages ?? [] + } + }, + prompt, + modelWithFunctions, + new OpenAIFunctionsAgentOutputParser() + ]) + + const executor = AgentExecutor.fromAgentAndTools({ + agent: runnableAgent, + tools, + sessionId + }) + + return executor +} + +type AgentExecutorOutput = ChainValues + +class AgentExecutor extends LCAgentExecutor { + sessionId?: string + + static fromAgentAndTools(fields: AgentExecutorInput & { sessionId?: string }): AgentExecutor { + const newInstance = new AgentExecutor(fields) + if (fields.sessionId) newInstance.sessionId = fields.sessionId + return newInstance + } + + shouldContinueIteration(iterations: number): boolean { + return this.maxIterations === undefined || iterations < this.maxIterations + } + + async _call(inputs: ChainValues, runManager?: CallbackManagerForChainRun): Promise { + const toolsByName = Object.fromEntries(this.tools.map((t) => [t.name.toLowerCase(), t])) + + const steps: AgentStep[] = [] + let iterations = 0 + + const getOutput = async (finishStep: AgentFinish): Promise => { + const { returnValues } = finishStep + const additional = await this.agent.prepareForOutput(returnValues, steps) + + if (this.returnIntermediateSteps) { + return { ...returnValues, intermediateSteps: steps, ...additional } + } + await runManager?.handleAgentEnd(finishStep) + return { ...returnValues, ...additional } + } + + while (this.shouldContinueIteration(iterations)) { + let output + try { + output = await this.agent.plan(steps, inputs, runManager?.getChild()) + } catch (e) { + if (e instanceof OutputParserException) { + let observation + let text = e.message + if (this.handleParsingErrors === true) { + if (e.sendToLLM) { + observation = e.observation + text = e.llmOutput ?? '' + } else { + observation = 'Invalid or incomplete response' + } + } else if (typeof this.handleParsingErrors === 'string') { + observation = this.handleParsingErrors + } else if (typeof this.handleParsingErrors === 'function') { + observation = this.handleParsingErrors(e) + } else { + throw e + } + output = { + tool: '_Exception', + toolInput: observation, + log: text + } as AgentAction + } else { + throw e + } + } + // Check if the agent has finished + if ('returnValues' in output) { + return getOutput(output) + } + + let actions: AgentAction[] + if (Array.isArray(output)) { + actions = output as AgentAction[] + } else { + actions = [output as AgentAction] + } + + const newSteps = await Promise.all( + actions.map(async (action) => { + await runManager?.handleAgentAction(action) + const tool = action.tool === '_Exception' ? new ExceptionTool() : toolsByName[action.tool?.toLowerCase()] + let observation + try { + // here we need to override Tool call method to include sessionId as parameter + observation = tool + ? // @ts-ignore + await tool.call(action.toolInput, runManager?.getChild(), undefined, this.sessionId) + : `${action.tool} is not a valid tool, try another one.` + } catch (e) { + if (e instanceof ToolInputParsingException) { + if (this.handleParsingErrors === true) { + observation = 'Invalid or incomplete tool input. Please try again.' + } else if (typeof this.handleParsingErrors === 'string') { + observation = this.handleParsingErrors + } else if (typeof this.handleParsingErrors === 'function') { + observation = this.handleParsingErrors(e) + } else { + throw e + } + observation = await new ExceptionTool().call(observation, runManager?.getChild()) + return { action, observation: observation ?? '' } + } + } + return { action, observation: observation ?? '' } + }) + ) + + steps.push(...newSteps) + + const lastStep = steps[steps.length - 1] + const lastTool = toolsByName[lastStep.action.tool?.toLowerCase()] + + if (lastTool?.returnDirect) { + return getOutput({ + returnValues: { [this.agent.returnValues[0]]: lastStep.observation }, + log: '' + }) + } + + iterations += 1 + } + + const finish = await this.agent.returnStoppedResponse(this.earlyStoppingMethod, steps, inputs) + + return getOutput(finish) + } + + async _takeNextStep( + nameToolMap: Record, + inputs: ChainValues, + intermediateSteps: AgentStep[], + runManager?: CallbackManagerForChainRun + ): Promise { + let output + try { + output = await this.agent.plan(intermediateSteps, inputs, runManager?.getChild()) + } catch (e) { + if (e instanceof OutputParserException) { + let observation + let text = e.message + if (this.handleParsingErrors === true) { + if (e.sendToLLM) { + observation = e.observation + text = e.llmOutput ?? '' + } else { + observation = 'Invalid or incomplete response' + } + } else if (typeof this.handleParsingErrors === 'string') { + observation = this.handleParsingErrors + } else if (typeof this.handleParsingErrors === 'function') { + observation = this.handleParsingErrors(e) + } else { + throw e + } + output = { + tool: '_Exception', + toolInput: observation, + log: text + } as AgentAction + } else { + throw e + } + } + + if ('returnValues' in output) { + return output + } + + let actions: AgentAction[] + if (Array.isArray(output)) { + actions = output as AgentAction[] + } else { + actions = [output as AgentAction] + } + + const result: AgentStep[] = [] + for (const agentAction of actions) { + let observation = '' + if (runManager) { + await runManager?.handleAgentAction(agentAction) + } + if (agentAction.tool in nameToolMap) { + const tool = nameToolMap[agentAction.tool] + try { + // here we need to override Tool call method to include sessionId as parameter + // @ts-ignore + observation = await tool.call(agentAction.toolInput, runManager?.getChild(), undefined, this.sessionId) + } catch (e) { + if (e instanceof ToolInputParsingException) { + if (this.handleParsingErrors === true) { + observation = 'Invalid or incomplete tool input. Please try again.' + } else if (typeof this.handleParsingErrors === 'string') { + observation = this.handleParsingErrors + } else if (typeof this.handleParsingErrors === 'function') { + observation = this.handleParsingErrors(e) + } else { + throw e + } + observation = await new ExceptionTool().call(observation, runManager?.getChild()) + } + } + } else { + observation = `${agentAction.tool} is not a valid tool, try another available tool: ${Object.keys(nameToolMap).join(', ')}` + } + result.push({ + action: agentAction, + observation + }) + } + return result + } +} + +class ExceptionTool extends Tool { + name = '_Exception' + + description = 'Exception tool' + + async _call(query: string) { + return query } } diff --git a/packages/components/nodes/memory/BufferMemory/BufferMemory.ts b/packages/components/nodes/memory/BufferMemory/BufferMemory.ts index 7793d96d4..0ad8adec9 100644 --- a/packages/components/nodes/memory/BufferMemory/BufferMemory.ts +++ b/packages/components/nodes/memory/BufferMemory/BufferMemory.ts @@ -1,6 +1,7 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' -import { BufferMemory } from 'langchain/memory' +import { FlowiseMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class BufferMemory_Memory implements INode { label: string @@ -41,7 +42,7 @@ class BufferMemory_Memory implements INode { async init(nodeData: INodeData): Promise { const memoryKey = nodeData.inputs?.memoryKey as string const inputKey = nodeData.inputs?.inputKey as string - return new BufferMemory({ + return new BufferMemoryExtended({ returnMessages: true, memoryKey, inputKey @@ -49,4 +50,41 @@ class BufferMemory_Memory implements INode { } } +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { + constructor(fields: BufferMemoryInput) { + super(fields) + } + + async getChatMessages(_?: string, returnBaseMessages = false): Promise { + const memoryResult = await this.loadMemoryVariables({}) + const baseMessages = memoryResult[this.memoryKey ?? 'chat_history'] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise { + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues) + } + + async clearChatMessages(): Promise { + await this.clear() + } + + async resumeMessages(messages: IMessage[]): Promise { + // Clear existing chatHistory to avoid duplication + if (messages.length) await this.clear() + + // Insert into chatHistory + for (const msg of messages) { + if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message) + else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message) + } + } +} + module.exports = { nodeClass: BufferMemory_Memory } diff --git a/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts b/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts index 84e607e54..ca8d0ddfd 100644 --- a/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts +++ b/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts @@ -1,6 +1,7 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' +import { FlowiseWindowMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils' import { BufferWindowMemory, BufferWindowMemoryInput } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class BufferWindowMemory_Memory implements INode { label: string @@ -57,7 +58,44 @@ class BufferWindowMemory_Memory implements INode { k: parseInt(k, 10) } - return new BufferWindowMemory(obj) + return new BufferWindowMemoryExtended(obj) + } +} + +class BufferWindowMemoryExtended extends FlowiseWindowMemory implements MemoryMethods { + constructor(fields: BufferWindowMemoryInput) { + super(fields) + } + + async getChatMessages(_?: string, returnBaseMessages = false): Promise { + const memoryResult = await this.loadMemoryVariables({}) + const baseMessages = memoryResult[this.memoryKey ?? 'chat_history'] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise { + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues) + } + + async clearChatMessages(): Promise { + await this.clear() + } + + async resumeMessages(messages: IMessage[]): Promise { + // Clear existing chatHistory to avoid duplication + if (messages.length) await this.clear() + + // Insert into chatHistory + for (const msg of messages) { + if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message) + else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message) + } } } diff --git a/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts b/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts index 332d73aa9..107ab7db9 100644 --- a/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts +++ b/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts @@ -1,7 +1,8 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' +import { FlowiseSummaryMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils' import { ConversationSummaryMemory, ConversationSummaryMemoryInput } from 'langchain/memory' import { BaseLanguageModel } from 'langchain/base_language' +import { BaseMessage } from 'langchain/schema' class ConversationSummaryMemory_Memory implements INode { label: string @@ -56,7 +57,48 @@ class ConversationSummaryMemory_Memory implements INode { inputKey } - return new ConversationSummaryMemory(obj) + return new ConversationSummaryMemoryExtended(obj) + } +} + +class ConversationSummaryMemoryExtended extends FlowiseSummaryMemory implements MemoryMethods { + constructor(fields: ConversationSummaryMemoryInput) { + super(fields) + } + + async getChatMessages(_?: string, returnBaseMessages = false): Promise { + const memoryResult = await this.loadMemoryVariables({}) + const baseMessages = memoryResult[this.memoryKey ?? 'chat_history'] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise { + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues) + } + + async clearChatMessages(): Promise { + await this.clear() + } + + async resumeMessages(messages: IMessage[]): Promise { + // Clear existing chatHistory to avoid duplication + if (messages.length) await this.clear() + + // Insert into chatHistory + for (const msg of messages) { + if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message) + else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message) + } + + // Replace buffer + const chatMessages = await this.chatHistory.getMessages() + this.buffer = await this.predictNewSummary(chatMessages.slice(-2), this.buffer) } } diff --git a/packages/components/nodes/memory/DynamoDb/DynamoDb.ts b/packages/components/nodes/memory/DynamoDb/DynamoDb.ts index 8ca6cf9e5..872ec0b51 100644 --- a/packages/components/nodes/memory/DynamoDb/DynamoDb.ts +++ b/packages/components/nodes/memory/DynamoDb/DynamoDb.ts @@ -1,15 +1,25 @@ import { - ICommonObject, - INode, - INodeData, - INodeParams, + DynamoDBClient, + DynamoDBClientConfig, + GetItemCommand, + GetItemCommandInput, + UpdateItemCommand, + UpdateItemCommandInput, + DeleteItemCommand, + DeleteItemCommandInput, + AttributeValue +} from '@aws-sdk/client-dynamodb' +import { DynamoDBChatMessageHistory } from 'langchain/stores/message/dynamodb' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, StoredMessage, BaseMessage } from 'langchain/schema' +import { + convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory -} from '../../../src' -import { DynamoDBChatMessageHistory } from 'langchain/stores/message/dynamodb' -import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +} from '../../../src/utils' +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' class DynamoDb_Memory implements INode { label: string @@ -102,49 +112,203 @@ class DynamoDb_Memory implements INode { const initalizeDynamoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { const tableName = nodeData.inputs?.tableName as string const partitionKey = nodeData.inputs?.partitionKey as string - const sessionId = nodeData.inputs?.sessionId as string const region = nodeData.inputs?.region as string const memoryKey = nodeData.inputs?.memoryKey as string const chatId = options.chatId let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const accessKeyId = getCredentialParam('accessKey', credentialData, nodeData) const secretAccessKey = getCredentialParam('secretAccessKey', credentialData, nodeData) + const config: DynamoDBClientConfig = { + region, + credentials: { + accessKeyId, + secretAccessKey + } + } + + const client = new DynamoDBClient(config ?? {}) + const dynamoDb = new DynamoDBChatMessageHistory({ tableName, partitionKey, - sessionId: sessionId ? sessionId : chatId, - config: { - region, - credentials: { - accessKeyId, - secretAccessKey - } - } + sessionId, + config }) const memory = new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: dynamoDb, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + dynamodbClient: client }) return memory } interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + dynamodbClient: DynamoDBClient + sessionId: string } -class BufferMemoryExtended extends BufferMemory { - isSessionIdUsingChatMessageId? = false +interface DynamoDBSerializedChatMessage { + M: { + type: { + S: string + } + text: { + S: string + } + role?: { + S: string + } + } +} - constructor(fields: BufferMemoryInput & Partial) { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { + isSessionIdUsingChatMessageId = false + sessionId = '' + dynamodbClient: DynamoDBClient + + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) - this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.dynamodbClient = fields.dynamodbClient + } + + overrideDynamoKey(overrideSessionId = '') { + const existingDynamoKey = (this as any).dynamoKey + const partitionKey = (this as any).partitionKey + + let newDynamoKey: Record = {} + + if (Object.keys(existingDynamoKey).includes(partitionKey)) { + newDynamoKey[partitionKey] = { S: overrideSessionId } + } + + return Object.keys(newDynamoKey).length ? newDynamoKey : existingDynamoKey + } + + async addNewMessage( + messages: StoredMessage[], + client: DynamoDBClient, + tableName = '', + dynamoKey: Record = {}, + messageAttributeName = 'messages' + ) { + const params: UpdateItemCommandInput = { + TableName: tableName, + Key: dynamoKey, + ExpressionAttributeNames: { + '#m': messageAttributeName + }, + ExpressionAttributeValues: { + ':empty_list': { + L: [] + }, + ':m': { + L: messages.map((message) => { + const dynamoSerializedMessage: DynamoDBSerializedChatMessage = { + M: { + type: { + S: message.type + }, + text: { + S: message.data.content + } + } + } + if (message.data.role) { + dynamoSerializedMessage.M.role = { S: message.data.role } + } + return dynamoSerializedMessage + }) + } + }, + UpdateExpression: 'SET #m = list_append(if_not_exists(#m, :empty_list), :m)' + } + + await client.send(new UpdateItemCommand(params)) + } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + if (!this.dynamodbClient) return [] + + const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey + const tableName = (this as any).tableName + const messageAttributeName = (this as any).messageAttributeName + + const params: GetItemCommandInput = { + TableName: tableName, + Key: dynamoKey + } + + const response = await this.dynamodbClient.send(new GetItemCommand(params)) + const items = response.Item ? response.Item[messageAttributeName]?.L ?? [] : [] + const messages = items + .map((item) => ({ + type: item.M?.type.S, + data: { + role: item.M?.role?.S, + content: item.M?.text.S + } + })) + .filter((x): x is StoredMessage => x.type !== undefined && x.data.content !== undefined) + const baseMessages = messages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.dynamodbClient) return + + const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey + const tableName = (this as any).tableName + const messageAttributeName = (this as any).messageAttributeName + + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.addNewMessage(messageToAdd, this.dynamodbClient, tableName, dynamoKey, messageAttributeName) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.addNewMessage(messageToAdd, this.dynamodbClient, tableName, dynamoKey, messageAttributeName) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.dynamodbClient) return + + const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey + const tableName = (this as any).tableName + + const params: DeleteItemCommandInput = { + TableName: tableName, + Key: dynamoKey + } + await this.dynamodbClient.send(new DeleteItemCommand(params)) + await this.clear() + } + + async resumeMessages(): Promise { + return } } diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index b654a5b20..b422921e6 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -1,17 +1,15 @@ +import { MongoClient, Collection, Document } from 'mongodb' +import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' import { + convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam, - ICommonObject, - INode, - INodeData, - INodeParams, serializeChatHistory -} from '../../../src' -import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' -import { BufferMemory, BufferMemoryInput } from 'langchain/memory' -import { BaseMessage, mapStoredMessageToChatMessage } from 'langchain/schema' -import { MongoClient } from 'mongodb' +} from '../../../src/utils' +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' class MongoDB_Memory implements INode { label: string @@ -99,23 +97,30 @@ class MongoDB_Memory implements INode { const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { const databaseName = nodeData.inputs?.databaseName as string const collectionName = nodeData.inputs?.collectionName as string - const sessionId = nodeData.inputs?.sessionId as string const memoryKey = nodeData.inputs?.memoryKey as string const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) - let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) + const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) const client = new MongoClient(mongoDBConnectUrl) await client.connect() + const collection = client.db(databaseName).collection(collectionName) const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({ collection, - sessionId: sessionId ? sessionId : chatId + sessionId }) mongoDBChatMessageHistory.getMessages = async (): Promise => { @@ -144,20 +149,81 @@ const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): P return new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: mongoDBChatMessageHistory, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + collection }) } interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + collection: Collection + sessionId: string } -class BufferMemoryExtended extends BufferMemory { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { + sessionId = '' + collection: Collection isSessionIdUsingChatMessageId? = false - constructor(fields: BufferMemoryInput & Partial) { + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) - this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.collection = fields.collection + } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + if (!this.collection) return [] + + const id = overrideSessionId ?? this.sessionId + const document = await this.collection.findOne({ sessionId: id }) + const messages = document?.messages || [] + const baseMessages = messages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.collection) return + + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.collection.updateOne( + { sessionId: id }, + { + $push: { messages: { $each: messageToAdd } } + }, + { upsert: true } + ) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.collection.updateOne( + { sessionId: id }, + { + $push: { messages: { $each: messageToAdd } } + }, + { upsert: true } + ) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.collection) return + + const id = overrideSessionId ?? this.sessionId + await this.collection.deleteOne({ sessionId: id }) + await this.clear() + } + + async resumeMessages(): Promise { + return } } diff --git a/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts b/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts index fc4a06dcc..938cc8731 100644 --- a/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts +++ b/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts @@ -1,9 +1,9 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +import { IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ICommonObject } from '../../../src' -import { MotorheadMemory, MotorheadMemoryInput } from 'langchain/memory' +import { MotorheadMemory, MotorheadMemoryInput, InputValues, MemoryVariables, OutputValues, getBufferString } from 'langchain/memory' import fetch from 'node-fetch' -import { getBufferString } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class MotorMemory_Memory implements INode { label: string @@ -88,19 +88,26 @@ class MotorMemory_Memory implements INode { const initalizeMotorhead = async (nodeData: INodeData, options: ICommonObject): Promise => { const memoryKey = nodeData.inputs?.memoryKey as string const baseURL = nodeData.inputs?.baseURL as string - const sessionId = nodeData.inputs?.sessionId as string const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const apiKey = getCredentialParam('apiKey', credentialData, nodeData) const clientId = getCredentialParam('clientId', credentialData, nodeData) - let obj: MotorheadMemoryInput & Partial = { + let obj: MotorheadMemoryInput & MotorheadMemoryExtendedInput = { returnMessages: true, - sessionId: sessionId ? sessionId : chatId, + isSessionIdUsingChatMessageId, + sessionId, memoryKey } @@ -117,8 +124,6 @@ const initalizeMotorhead = async (nodeData: INodeData, options: ICommonObject): } } - if (isSessionIdUsingChatMessageId) obj.isSessionIdUsingChatMessageId = true - const motorheadMemory = new MotorheadMemoryExtended(obj) // Get messages from sessionId @@ -131,15 +136,32 @@ interface MotorheadMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean } -class MotorheadMemoryExtended extends MotorheadMemory { +class MotorheadMemoryExtended extends MotorheadMemory implements MemoryMethods { isSessionIdUsingChatMessageId? = false - constructor(fields: MotorheadMemoryInput & Partial) { + constructor(fields: MotorheadMemoryInput & MotorheadMemoryExtendedInput) { super(fields) this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId } - async clear(): Promise { + async loadMemoryVariables(values: InputValues, overrideSessionId = ''): Promise { + if (overrideSessionId) { + this.sessionId = overrideSessionId + } + return super.loadMemoryVariables({ values }) + } + + async saveContext(inputValues: InputValues, outputValues: OutputValues, overrideSessionId = ''): Promise { + if (overrideSessionId) { + this.sessionId = overrideSessionId + } + return super.saveContext(inputValues, outputValues) + } + + async clear(overrideSessionId = ''): Promise { + if (overrideSessionId) { + this.sessionId = overrideSessionId + } try { await this.caller.call(fetch, `${this.url}/sessions/${this.sessionId}/memory`, { //@ts-ignore @@ -155,6 +177,28 @@ class MotorheadMemoryExtended extends MotorheadMemory { await this.chatHistory.clear() await super.clear() } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + const id = overrideSessionId ?? this.sessionId + const memoryVariables = await this.loadMemoryVariables({}, id) + const baseMessages = memoryVariables[this.memoryKey] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues, id) + } + + async clearChatMessages(overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + await this.clear(id) + } } module.exports = { nodeClass: MotorMemory_Memory } diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index d6ec9a114..a02df3ea2 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -1,8 +1,14 @@ -import { INode, INodeData, INodeParams, ICommonObject } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory } from '../../../src/utils' +import { INode, INodeData, INodeParams, ICommonObject, IMessage, MessageType, FlowiseMemory, MemoryMethods } from '../../../src/Interface' +import { + convertBaseMessagetoIMessage, + getBaseClasses, + getCredentialData, + getCredentialParam, + serializeChatHistory +} from '../../../src/utils' import { BufferMemory, BufferMemoryInput } from 'langchain/memory' import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis' -import { mapStoredMessageToChatMessage, BaseMessage } from 'langchain/schema' +import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from 'langchain/schema' import { Redis } from 'ioredis' class RedisBackedChatMemory_Memory implements INode { @@ -94,14 +100,20 @@ class RedisBackedChatMemory_Memory implements INode { } const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Promise => { - const sessionId = nodeData.inputs?.sessionId as string const sessionTTL = nodeData.inputs?.sessionTTL as number const memoryKey = nodeData.inputs?.memoryKey as string const windowSize = nodeData.inputs?.windowSize as number const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const redisUrl = getCredentialParam('redisUrl', credentialData, nodeData) @@ -128,7 +140,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom } let obj: RedisChatMessageHistoryInput = { - sessionId: sessionId ? sessionId : chatId, + sessionId, client } @@ -162,21 +174,71 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom const memory = new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: redisChatMessageHistory, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + redisClient: client }) return memory } interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + redisClient: Redis + sessionId: string } -class BufferMemoryExtended extends BufferMemory { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { isSessionIdUsingChatMessageId? = false + sessionId = '' + redisClient: Redis - constructor(fields: BufferMemoryInput & Partial) { + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.redisClient = fields.redisClient + } + + async getChatMessages(overrideSessionId = '', returnBaseMessage = false): Promise { + if (!this.redisClient) return [] + + const id = overrideSessionId ?? this.sessionId + const rawStoredMessages = await this.redisClient.lrange(id, 0, -1) + const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message)) + const baseMessages = orderedMessages.map(mapStoredMessageToChatMessage) + return returnBaseMessage ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.redisClient) return + + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.redisClient) return + + const id = overrideSessionId ?? this.sessionId + await this.redisClient.del(id) + await this.clear() + } + + async resumeMessages(): Promise { + return } } diff --git a/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts b/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts index 8bca04404..c3f971231 100644 --- a/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts @@ -1,8 +1,16 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory } from '../../../src/utils' -import { ICommonObject } from '../../../src' +import { Redis } from '@upstash/redis' import { BufferMemory, BufferMemoryInput } from 'langchain/memory' import { UpstashRedisChatMessageHistory } from 'langchain/stores/message/upstash_redis' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, StoredMessage, BaseMessage } from 'langchain/schema' +import { FlowiseMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { + convertBaseMessagetoIMessage, + getBaseClasses, + getCredentialData, + getCredentialParam, + serializeChatHistory +} from '../../../src/utils' +import { ICommonObject } from '../../../src/Interface' class UpstashRedisBackedChatMemory_Memory implements INode { label: string @@ -84,29 +92,39 @@ class UpstashRedisBackedChatMemory_Memory implements INode { const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject): Promise => { const baseURL = nodeData.inputs?.baseURL as string - const sessionId = nodeData.inputs?.sessionId as string const sessionTTL = nodeData.inputs?.sessionTTL as string const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const upstashRestToken = getCredentialParam('upstashRestToken', credentialData, nodeData) + const client = new Redis({ + url: baseURL, + token: upstashRestToken + }) + const redisChatMessageHistory = new UpstashRedisChatMessageHistory({ - sessionId: sessionId ? sessionId : chatId, + sessionId, sessionTTL: sessionTTL ? parseInt(sessionTTL, 10) : undefined, - config: { - url: baseURL, - token: upstashRestToken - } + client }) const memory = new BufferMemoryExtended({ memoryKey: 'chat_history', chatHistory: redisChatMessageHistory, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + redisClient: client }) return memory @@ -114,14 +132,63 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + redisClient: Redis + sessionId: string } -class BufferMemoryExtended extends BufferMemory { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { isSessionIdUsingChatMessageId? = false + sessionId = '' + redisClient: Redis - constructor(fields: BufferMemoryInput & Partial) { + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.redisClient = fields.redisClient + } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + if (!this.redisClient) return [] + + const id = overrideSessionId ?? this.sessionId + const rawStoredMessages: StoredMessage[] = await this.redisClient.lrange(id, 0, -1) + const orderedMessages = rawStoredMessages.reverse() + const previousMessages = orderedMessages.filter((x): x is StoredMessage => x.type !== undefined && x.data.content !== undefined) + const baseMessages = previousMessages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.redisClient) return + + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.redisClient) return + + const id = overrideSessionId ?? this.sessionId + await this.redisClient.del(id) + await this.clear() + } + + async resumeMessages(): Promise { + return } } diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts index ea52cb0b3..4dda76df1 100644 --- a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -1,8 +1,9 @@ +import { IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep' -import { getBufferString, InputValues, MemoryVariables, OutputValues } from 'langchain/memory' -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ICommonObject } from '../../../src' +import { InputValues, MemoryVariables, OutputValues, getBufferString } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class ZepMemory_Memory implements INode { label: string @@ -147,7 +148,7 @@ const initalizeZep = async (nodeData: INodeData, options: ICommonObject): Promis const obj: ZepMemoryInput & ZepMemoryExtendedInput = { baseURL, - sessionId: sessionId ? sessionId : chatId, + sessionId, aiPrefix, humanPrefix, returnMessages: true, @@ -166,7 +167,7 @@ interface ZepMemoryExtendedInput { k?: number } -class ZepMemoryExtended extends ZepMemory { +class ZepMemoryExtended extends ZepMemory implements MemoryMethods { isSessionIdUsingChatMessageId? = false lastN?: number @@ -196,6 +197,28 @@ class ZepMemoryExtended extends ZepMemory { } return super.clear() } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + const id = overrideSessionId ?? this.sessionId + const memoryVariables = await this.loadMemoryVariables({}, id) + const baseMessages = memoryVariables[this.memoryKey] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues, id) + } + + async clearChatMessages(overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + await this.clear(id) + } } module.exports = { nodeClass: ZepMemory_Memory } diff --git a/packages/components/nodes/tools/CustomTool/CustomTool.ts b/packages/components/nodes/tools/CustomTool/CustomTool.ts index 541edcf07..6ffcc0e21 100644 --- a/packages/components/nodes/tools/CustomTool/CustomTool.ts +++ b/packages/components/nodes/tools/CustomTool/CustomTool.ts @@ -60,7 +60,7 @@ class CustomTool_Tools implements INode { } } - async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { + async init(nodeData: INodeData, input: string, options: ICommonObject): Promise { const selectedToolId = nodeData.inputs?.selectedTool as string const customToolFunc = nodeData.inputs?.customToolFunc as string @@ -80,7 +80,36 @@ class CustomTool_Tools implements INode { code: tool.func } if (customToolFunc) obj.code = customToolFunc - return new DynamicStructuredTool(obj) + + const variables = await appDataSource.getRepository(databaseEntities['Variable']).find() + + // override variables defined in overrideConfig + // nodeData.inputs.variables is an Object, check each property and override the variable + if (nodeData?.inputs?.vars) { + for (const propertyName of Object.getOwnPropertyNames(nodeData.inputs.vars)) { + const foundVar = variables.find((v) => v.name === propertyName) + if (foundVar) { + // even if the variable was defined as runtime, we override it with static value + foundVar.type = 'static' + foundVar.value = nodeData.inputs.vars[propertyName] + } else { + // add it the variables, if not found locally in the db + variables.push({ name: propertyName, type: 'static', value: nodeData.inputs.vars[propertyName] }) + } + } + } + + const flow = { + chatId: options.chatId, // id is uppercase (I) + chatflowId: options.chatflowid, // id is lowercase (i) + input + } + + let dynamicStructuredTool = new DynamicStructuredTool(obj) + dynamicStructuredTool.setVariables(variables) + dynamicStructuredTool.setFlowObject(flow) + + return dynamicStructuredTool } catch (e) { throw new Error(e) } diff --git a/packages/components/nodes/tools/CustomTool/core.ts b/packages/components/nodes/tools/CustomTool/core.ts index 2aa06b547..338b0ae9a 100644 --- a/packages/components/nodes/tools/CustomTool/core.ts +++ b/packages/components/nodes/tools/CustomTool/core.ts @@ -1,8 +1,18 @@ import { z } from 'zod' -import { CallbackManagerForToolRun } from 'langchain/callbacks' -import { StructuredTool, ToolParams } from 'langchain/tools' import { NodeVM } from 'vm2' import { availableDependencies } from '../../../src/utils' +import { RunnableConfig } from '@langchain/core/runnables' +import { StructuredTool, ToolParams } from '@langchain/core/tools' +import { CallbackManagerForToolRun, Callbacks, CallbackManager, parseCallbackConfigArg } from '@langchain/core/callbacks/manager' + +class ToolInputParsingException extends Error { + output?: string + + constructor(message: string, output?: string) { + super(message) + this.output = output + } +} export interface BaseDynamicToolInput extends ToolParams { name: string @@ -32,6 +42,8 @@ export class DynamicStructuredTool< func: DynamicStructuredToolInput['func'] schema: T + private variables: any[] + private flowObj: any constructor(fields: DynamicStructuredToolInput) { super(fields) @@ -43,7 +55,47 @@ export class DynamicStructuredTool< this.schema = fields.schema } - protected async _call(arg: z.output): Promise { + async call(arg: z.output, configArg?: RunnableConfig | Callbacks, tags?: string[], overrideSessionId?: string): Promise { + const config = parseCallbackConfigArg(configArg) + if (config.runName === undefined) { + config.runName = this.name + } + let parsed + try { + parsed = await this.schema.parseAsync(arg) + } catch (e) { + throw new ToolInputParsingException(`Received tool input did not match expected schema`, JSON.stringify(arg)) + } + const callbackManager_ = await CallbackManager.configure( + config.callbacks, + this.callbacks, + config.tags || tags, + this.tags, + config.metadata, + this.metadata, + { verbose: this.verbose } + ) + const runManager = await callbackManager_?.handleToolStart( + this.toJSON(), + typeof parsed === 'string' ? parsed : JSON.stringify(parsed), + undefined, + undefined, + undefined, + undefined, + config.runName + ) + let result + try { + result = await this._call(parsed, runManager, overrideSessionId) + } catch (e) { + await runManager?.handleToolError(e) + throw e + } + await runManager?.handleToolEnd(result) + return result + } + + protected async _call(arg: z.output, _?: CallbackManagerForToolRun, overrideSessionId?: string): Promise { let sandbox: any = {} if (typeof arg === 'object' && Object.keys(arg).length) { for (const item in arg) { @@ -51,6 +103,32 @@ export class DynamicStructuredTool< } } + // inject variables + let vars = {} + if (this.variables) { + for (const item of this.variables) { + let value = item.value + + // read from .env file + if (item.type === 'runtime') { + value = process.env[item.name] + } + + Object.defineProperty(vars, item.name, { + enumerable: true, + configurable: true, + writable: true, + value: value + }) + } + } + sandbox['$vars'] = vars + + // inject flow properties + if (this.flowObj) { + sandbox['$flow'] = { ...this.flowObj, sessionId: overrideSessionId } + } + const defaultAllowBuiltInDep = [ 'assert', 'buffer', @@ -87,4 +165,12 @@ export class DynamicStructuredTool< return response } + + setVariables(variables: any[]) { + this.variables = variables + } + + setFlowObject(flow: any) { + this.flowObj = flow + } } diff --git a/packages/components/src/Interface.ts b/packages/components/src/Interface.ts index 6752f9440..2a625ff6a 100644 --- a/packages/components/src/Interface.ts +++ b/packages/components/src/Interface.ts @@ -73,6 +73,7 @@ export interface INodeParams { additionalParams?: boolean loadMethod?: string hidden?: boolean + variables?: ICommonObject[] } export interface INodeExecutionData { @@ -195,3 +196,37 @@ export class VectorStoreRetriever { this.vectorStore = fields.vectorStore } } + +/** + * Implement abstract classes and interface for memory + */ +import { BaseMessage } from 'langchain/schema' +import { BufferMemory, BufferWindowMemory, ConversationSummaryMemory } from 'langchain/memory' + +export interface MemoryMethods { + getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + clearChatMessages(overrideSessionId?: string): Promise + resumeMessages?(messages: IMessage[]): Promise +} + +export abstract class FlowiseMemory extends BufferMemory implements MemoryMethods { + abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + abstract clearChatMessages(overrideSessionId?: string): Promise + abstract resumeMessages(messages: IMessage[]): Promise +} + +export abstract class FlowiseWindowMemory extends BufferWindowMemory implements MemoryMethods { + abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + abstract clearChatMessages(overrideSessionId?: string): Promise + abstract resumeMessages(messages: IMessage[]): Promise +} + +export abstract class FlowiseSummaryMemory extends ConversationSummaryMemory implements MemoryMethods { + abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + abstract clearChatMessages(overrideSessionId?: string): Promise + abstract resumeMessages(messages: IMessage[]): Promise +} diff --git a/packages/components/src/utils.ts b/packages/components/src/utils.ts index 757ecc659..22fa6f4a9 100644 --- a/packages/components/src/utils.ts +++ b/packages/components/src/utils.ts @@ -8,7 +8,7 @@ import { DataSource } from 'typeorm' import { ICommonObject, IDatabaseEntity, IMessage, INodeData } from './Interface' import { AES, enc } from 'crypto-js' import { ChatMessageHistory } from 'langchain/memory' -import { AIMessage, HumanMessage } from 'langchain/schema' +import { AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' export const numberOrExpressionRegex = '^(\\d+\\.?\\d*|{{.*}})$' //return true if string consists only numbers OR expression {{}} export const notEmptyRegex = '(.|\\s)*\\S(.|\\s)*' //return true if string is not empty or blank @@ -645,3 +645,31 @@ export const convertSchemaToZod = (schema: string | object): ICommonObject => { throw new Error(e) } } + +/** + * Convert BaseMessage to IMessage + * @param {BaseMessage[]} messages + * @returns {IMessage[]} + */ +export const convertBaseMessagetoIMessage = (messages: BaseMessage[]): IMessage[] => { + const formatmessages: IMessage[] = [] + for (const m of messages) { + if (m._getType() === 'human') { + formatmessages.push({ + message: m.content as string, + type: 'userMessage' + }) + } else if (m._getType() === 'ai') { + formatmessages.push({ + message: m.content as string, + type: 'apiMessage' + }) + } else if (m._getType() === 'system') { + formatmessages.push({ + message: m.content as string, + type: 'apiMessage' + }) + } + } + return formatmessages +} diff --git a/packages/server/src/Interface.ts b/packages/server/src/Interface.ts index f82c66902..126aac38d 100644 --- a/packages/server/src/Interface.ts +++ b/packages/server/src/Interface.ts @@ -68,6 +68,15 @@ export interface ICredential { createdDate: Date } +export interface IVariable { + id: string + name: string + value: string + type: string + updatedDate: Date + createdDate: Date +} + export interface IComponentNodes { [key: string]: INode } diff --git a/packages/server/src/database/entities/Variable.ts b/packages/server/src/database/entities/Variable.ts new file mode 100644 index 000000000..88e0587d1 --- /dev/null +++ b/packages/server/src/database/entities/Variable.ts @@ -0,0 +1,25 @@ +/* eslint-disable */ +import { Entity, Column, CreateDateColumn, UpdateDateColumn, PrimaryGeneratedColumn } from 'typeorm' +import { IVariable } from "../../Interface"; + +@Entity() +export class Variable implements IVariable{ + @PrimaryGeneratedColumn('uuid') + id: string + + @Column() + name: string + + @Column({ nullable: true, type: 'text' }) + value: string + + @Column({default: 'string', type: 'text'}) + type: string + + + @CreateDateColumn() + createdDate: Date + + @UpdateDateColumn() + updatedDate: Date +} diff --git a/packages/server/src/database/entities/index.ts b/packages/server/src/database/entities/index.ts index 58447a1f5..af5c559f5 100644 --- a/packages/server/src/database/entities/index.ts +++ b/packages/server/src/database/entities/index.ts @@ -3,11 +3,13 @@ import { ChatMessage } from './ChatMessage' import { Credential } from './Credential' import { Tool } from './Tool' import { Assistant } from './Assistant' +import { Variable } from './Variable' export const entities = { ChatFlow, ChatMessage, Credential, Tool, - Assistant + Assistant, + Variable } diff --git a/packages/server/src/database/migrations/mysql/1702200925471-AddVariableEntity.ts b/packages/server/src/database/migrations/mysql/1702200925471-AddVariableEntity.ts new file mode 100644 index 000000000..a6c818874 --- /dev/null +++ b/packages/server/src/database/migrations/mysql/1702200925471-AddVariableEntity.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddVariableEntity1699325775451 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS \`variable\` ( + \`id\` varchar(36) NOT NULL, + \`name\` varchar(255) NOT NULL, + \`value\` text NOT NULL, + \`type\` varchar(255) DEFAULT NULL, + \`createdDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6), + \`updatedDate\` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6), + PRIMARY KEY (\`id\`) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE variable`) + } +} diff --git a/packages/server/src/database/migrations/mysql/index.ts b/packages/server/src/database/migrations/mysql/index.ts index 8f9824a86..a5220ad88 100644 --- a/packages/server/src/database/migrations/mysql/index.ts +++ b/packages/server/src/database/migrations/mysql/index.ts @@ -10,6 +10,7 @@ import { AddAssistantEntity1699325775451 } from './1699325775451-AddAssistantEnt import { AddUsedToolsToChatMessage1699481607341 } from './1699481607341-AddUsedToolsToChatMessage' import { AddCategoryToChatFlow1699900910291 } from './1699900910291-AddCategoryToChatFlow' import { AddFileAnnotationsToChatMessage1700271021237 } from './1700271021237-AddFileAnnotationsToChatMessage' +import { AddVariableEntity1699325775451 } from './1702200925471-AddVariableEntity' export const mysqlMigrations = [ Init1693840429259, @@ -23,5 +24,6 @@ export const mysqlMigrations = [ AddAssistantEntity1699325775451, AddUsedToolsToChatMessage1699481607341, AddCategoryToChatFlow1699900910291, - AddFileAnnotationsToChatMessage1700271021237 + AddFileAnnotationsToChatMessage1700271021237, + AddVariableEntity1699325775451 ] diff --git a/packages/server/src/database/migrations/postgres/1702200925471-AddVariableEntity.ts b/packages/server/src/database/migrations/postgres/1702200925471-AddVariableEntity.ts new file mode 100644 index 000000000..d4a1d7be7 --- /dev/null +++ b/packages/server/src/database/migrations/postgres/1702200925471-AddVariableEntity.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddVariableEntity1699325775451 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS variable ( + id uuid NOT NULL DEFAULT uuid_generate_v4(), + "name" varchar NOT NULL, + "value" text NOT NULL, + "type" text NULL, + "createdDate" timestamp NOT NULL DEFAULT now(), + "updatedDate" timestamp NOT NULL DEFAULT now(), + CONSTRAINT "PK_3c7cea7a044ac4c92764576cdbf" PRIMARY KEY (id) + );` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE variable`) + } +} diff --git a/packages/server/src/database/migrations/postgres/index.ts b/packages/server/src/database/migrations/postgres/index.ts index d196fbc16..3c3fa3966 100644 --- a/packages/server/src/database/migrations/postgres/index.ts +++ b/packages/server/src/database/migrations/postgres/index.ts @@ -10,6 +10,7 @@ import { AddAssistantEntity1699325775451 } from './1699325775451-AddAssistantEnt import { AddUsedToolsToChatMessage1699481607341 } from './1699481607341-AddUsedToolsToChatMessage' import { AddCategoryToChatFlow1699900910291 } from './1699900910291-AddCategoryToChatFlow' import { AddFileAnnotationsToChatMessage1700271021237 } from './1700271021237-AddFileAnnotationsToChatMessage' +import { AddVariableEntity1699325775451 } from './1702200925471-AddVariableEntity' export const postgresMigrations = [ Init1693891895163, @@ -23,5 +24,6 @@ export const postgresMigrations = [ AddAssistantEntity1699325775451, AddUsedToolsToChatMessage1699481607341, AddCategoryToChatFlow1699900910291, - AddFileAnnotationsToChatMessage1700271021237 + AddFileAnnotationsToChatMessage1700271021237, + AddVariableEntity1699325775451 ] diff --git a/packages/server/src/database/migrations/sqlite/1702200925471-AddVariableEntity.ts b/packages/server/src/database/migrations/sqlite/1702200925471-AddVariableEntity.ts new file mode 100644 index 000000000..63ec709fa --- /dev/null +++ b/packages/server/src/database/migrations/sqlite/1702200925471-AddVariableEntity.ts @@ -0,0 +1,13 @@ +import { MigrationInterface, QueryRunner } from 'typeorm' + +export class AddVariableEntity1699325775451 implements MigrationInterface { + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE IF NOT EXISTS "variable" ("id" varchar PRIMARY KEY NOT NULL, "name" text NOT NULL, "value" text NOT NULL, "type" varchar, "createdDate" datetime NOT NULL DEFAULT (datetime('now')), "updatedDate" datetime NOT NULL DEFAULT (datetime('now')));` + ) + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE variable`) + } +} diff --git a/packages/server/src/database/migrations/sqlite/index.ts b/packages/server/src/database/migrations/sqlite/index.ts index fdd83064a..c0ade080d 100644 --- a/packages/server/src/database/migrations/sqlite/index.ts +++ b/packages/server/src/database/migrations/sqlite/index.ts @@ -10,6 +10,7 @@ import { AddAssistantEntity1699325775451 } from './1699325775451-AddAssistantEnt import { AddUsedToolsToChatMessage1699481607341 } from './1699481607341-AddUsedToolsToChatMessage' import { AddCategoryToChatFlow1699900910291 } from './1699900910291-AddCategoryToChatFlow' import { AddFileAnnotationsToChatMessage1700271021237 } from './1700271021237-AddFileAnnotationsToChatMessage' +import { AddVariableEntity1699325775451 } from './1702200925471-AddVariableEntity' export const sqliteMigrations = [ Init1693835579790, @@ -23,5 +24,6 @@ export const sqliteMigrations = [ AddAssistantEntity1699325775451, AddUsedToolsToChatMessage1699481607341, AddCategoryToChatFlow1699900910291, - AddFileAnnotationsToChatMessage1700271021237 + AddFileAnnotationsToChatMessage1700271021237, + AddVariableEntity1699325775451 ] diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 135e14511..1ad4c7955 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -65,6 +65,7 @@ import { sanitizeMiddleware } from './utils/XSS' import axios from 'axios' import { Client } from 'langchainhub' import { parsePrompt } from './utils/hub' +import { Variable } from './database/entities/Variable' export class App { app: express.Application @@ -1199,6 +1200,47 @@ export class App { return res.json(templates) }) + // ---------------------------------------- + // Variables + // ---------------------------------------- + this.app.get('/api/v1/variables', async (req: Request, res: Response) => { + const variables = await getDataSource().getRepository(Variable).find() + return res.json(variables) + }) + + // Create new variable + this.app.post('/api/v1/variables', async (req: Request, res: Response) => { + const body = req.body + const newVariable = new Variable() + Object.assign(newVariable, body) + const variable = this.AppDataSource.getRepository(Variable).create(newVariable) + const results = await this.AppDataSource.getRepository(Variable).save(variable) + return res.json(results) + }) + + // Update variable + this.app.put('/api/v1/variables/:id', async (req: Request, res: Response) => { + const variable = await this.AppDataSource.getRepository(Variable).findOneBy({ + id: req.params.id + }) + + if (!variable) return res.status(404).send(`Variable ${req.params.id} not found`) + + const body = req.body + const updateVariable = new Variable() + Object.assign(updateVariable, body) + this.AppDataSource.getRepository(Variable).merge(variable, updateVariable) + const result = await this.AppDataSource.getRepository(Variable).save(variable) + + return res.json(result) + }) + + // Delete variable via id + this.app.delete('/api/v1/variables/:id', async (req: Request, res: Response) => { + const results = await this.AppDataSource.getRepository(Variable).delete({ id: req.params.id }) + return res.json(results) + }) + // ---------------------------------------- // API Keys // ---------------------------------------- @@ -1669,10 +1711,6 @@ export class App { this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig) } - const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string - const nodeModule = await import(nodeInstanceFilePath) - const nodeInstance = new nodeModule.nodeClass() - logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`) let sessionId = undefined @@ -1686,6 +1724,10 @@ export class App { chatHistory = await replaceChatHistory(memoryNode, incomingInput, this.AppDataSource, databaseEntities, logger) } + const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string + const nodeModule = await import(nodeInstanceFilePath) + const nodeInstance = new nodeModule.nodeClass({ sessionId }) + let result = isStreamValid ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { chatflowid, diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 39bd0854e..0bc288617 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -38,6 +38,7 @@ import { Tool } from '../database/entities/Tool' import { Assistant } from '../database/entities/Assistant' import { DataSource } from 'typeorm' import { CachePool } from '../CachePool' +import { Variable } from '../database/entities/Variable' const QUESTION_VAR_PREFIX = 'question' const CHAT_HISTORY_VAR_PREFIX = 'chat_history' @@ -48,7 +49,8 @@ export const databaseEntities: IDatabaseEntity = { ChatMessage: ChatMessage, Tool: Tool, Credential: Credential, - Assistant: Assistant + Assistant: Assistant, + Variable: Variable } /** diff --git a/packages/ui/src/api/variables.js b/packages/ui/src/api/variables.js new file mode 100644 index 000000000..944b83198 --- /dev/null +++ b/packages/ui/src/api/variables.js @@ -0,0 +1,16 @@ +import client from './client' + +const getAllVariables = () => client.get('/variables') + +const createVariable = (body) => client.post(`/variables`, body) + +const updateVariable = (id, body) => client.put(`/variables/${id}`, body) + +const deleteVariable = (id) => client.delete(`/variables/${id}`) + +export default { + getAllVariables, + createVariable, + updateVariable, + deleteVariable +} diff --git a/packages/ui/src/assets/images/variables_empty.svg b/packages/ui/src/assets/images/variables_empty.svg new file mode 100644 index 000000000..eb461e39f --- /dev/null +++ b/packages/ui/src/assets/images/variables_empty.svg @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/packages/ui/src/menu-items/dashboard.js b/packages/ui/src/menu-items/dashboard.js index 8bf5b3924..793bc290c 100644 --- a/packages/ui/src/menu-items/dashboard.js +++ b/packages/ui/src/menu-items/dashboard.js @@ -1,8 +1,8 @@ // assets -import { IconHierarchy, IconBuildingStore, IconKey, IconTool, IconLock, IconRobot } from '@tabler/icons' +import { IconHierarchy, IconBuildingStore, IconKey, IconTool, IconLock, IconRobot, IconVariable } from '@tabler/icons' // constant -const icons = { IconHierarchy, IconBuildingStore, IconKey, IconTool, IconLock, IconRobot } +const icons = { IconHierarchy, IconBuildingStore, IconKey, IconTool, IconLock, IconRobot, IconVariable } // ==============================|| DASHBOARD MENU ITEMS ||============================== // @@ -51,6 +51,14 @@ const dashboard = { icon: icons.IconLock, breadcrumbs: true }, + { + id: 'variables', + title: 'Variables', + type: 'item', + url: '/variables', + icon: icons.IconVariable, + breadcrumbs: true + }, { id: 'apikey', title: 'API Keys', diff --git a/packages/ui/src/routes/MainRoutes.js b/packages/ui/src/routes/MainRoutes.js index bce0de137..08dd721dd 100644 --- a/packages/ui/src/routes/MainRoutes.js +++ b/packages/ui/src/routes/MainRoutes.js @@ -22,6 +22,9 @@ const Assistants = Loadable(lazy(() => import('views/assistants'))) // credentials routing const Credentials = Loadable(lazy(() => import('views/credentials'))) +// variables routing +const Variables = Loadable(lazy(() => import('views/variables'))) + // ==============================|| MAIN ROUTING ||============================== // const MainRoutes = { @@ -55,6 +58,10 @@ const MainRoutes = { { path: '/credentials', element: + }, + { + path: '/variables', + element: } ] } diff --git a/packages/ui/src/views/canvas/NodeInputHandler.js b/packages/ui/src/views/canvas/NodeInputHandler.js index 33e997362..617d1066c 100644 --- a/packages/ui/src/views/canvas/NodeInputHandler.js +++ b/packages/ui/src/views/canvas/NodeInputHandler.js @@ -369,7 +369,12 @@ const NodeInputHandler = ({ inputAnchor, inputParam, data, disabled = false, isA {inputParam?.acceptVariable && ( <> {dialogProps.type !== 'TEMPLATE' && ( + ) + } + }) + onConfirm(createResp.data.id) + } + } catch (err) { + const errorData = typeof err === 'string' ? err : err.response?.data || `${err.response?.status}: ${err.response?.statusText}` + enqueueSnackbar({ + message: `Failed to add new Variable: ${errorData}`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'error', + persist: true, + action: (key) => ( + + ) + } + }) + onCancel() + } + } + + const saveVariable = async () => { + try { + const saveObj = { + name: variableName, + value: variableValue, + type: variableType + } + + const saveResp = await variablesApi.updateVariable(variable.id, saveObj) + if (saveResp.data) { + enqueueSnackbar({ + message: 'Variable saved', + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (key) => ( + + ) + } + }) + onConfirm(saveResp.data.id) + } + } catch (error) { + const errorData = error.response?.data || `${error.response?.status}: ${error.response?.statusText}` + enqueueSnackbar({ + message: `Failed to save Variable: ${errorData}`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'error', + persist: true, + action: (key) => ( + + ) + } + }) + onCancel() + } + } + + const component = show ? ( + + +
+
+ +
+ {dialogProps.type === 'ADD' ? 'Add Variable' : 'Edit Variable'} +
+
+ + +
+ + Variable Name * + + +
+
+ setVariableName(e.target.value)} + value={variableName ?? ''} + /> +
+ +
+ + Type * + +
+
+ setVariableType(newValue)} + value={variableType ?? 'choose an option'} + /> +
+ {variableType === 'static' && ( + +
+ + Value * + +
+
+ setVariableValue(e.target.value)} + value={variableValue ?? ''} + /> +
+ )} +
+ + (dialogType === 'ADD' ? addNewVariable() : saveVariable())} + > + {dialogProps.confirmButtonName} + + + +
+ ) : null + + return createPortal(component, portalElement) +} + +AddEditVariableDialog.propTypes = { + show: PropTypes.bool, + dialogProps: PropTypes.object, + onCancel: PropTypes.func, + onConfirm: PropTypes.func +} + +export default AddEditVariableDialog diff --git a/packages/ui/src/views/variables/HowToUseVariablesDialog.js b/packages/ui/src/views/variables/HowToUseVariablesDialog.js new file mode 100644 index 000000000..f328f226c --- /dev/null +++ b/packages/ui/src/views/variables/HowToUseVariablesDialog.js @@ -0,0 +1,72 @@ +import { createPortal } from 'react-dom' +import PropTypes from 'prop-types' +import { Dialog, DialogContent, DialogTitle } from '@mui/material' +import { CodeEditor } from 'ui-component/editor/CodeEditor' + +const overrideConfig = `{ + overrideConfig: { + vars: { + var1: 'abc' + } + } +}` + +const HowToUseVariablesDialog = ({ show, onCancel }) => { + const portalElement = document.getElementById('portal') + + const component = show ? ( + + + How To Use Variables + + +

Variables can be used in Custom Tool Function with the $ prefix.

+ `} + height={'50px'} + theme={'dark'} + lang={'js'} + basicSetup={{ highlightActiveLine: false, highlightActiveLineGutter: false }} + /> +

+ If variable type is Static, the value will be retrieved as it is. If variable type is Runtime, the value will be + retrieved from .env file. +

+

+ You can also override variable values in API overrideConfig using vars: +

+ +

+ Read more from{' '} + + docs + +

+
+
+ ) : null + + return createPortal(component, portalElement) +} + +HowToUseVariablesDialog.propTypes = { + show: PropTypes.bool, + onCancel: PropTypes.func +} + +export default HowToUseVariablesDialog diff --git a/packages/ui/src/views/variables/index.js b/packages/ui/src/views/variables/index.js new file mode 100644 index 000000000..9d0b2e3fe --- /dev/null +++ b/packages/ui/src/views/variables/index.js @@ -0,0 +1,314 @@ +import { useEffect, useState } from 'react' +import { useDispatch, useSelector } from 'react-redux' +import { enqueueSnackbar as enqueueSnackbarAction, closeSnackbar as closeSnackbarAction } from 'store/actions' +import moment from 'moment' + +// material-ui +import { + Button, + Box, + Stack, + Table, + TableBody, + TableCell, + TableContainer, + TableHead, + TableRow, + Paper, + IconButton, + Toolbar, + TextField, + InputAdornment, + ButtonGroup, + Chip +} from '@mui/material' +import { useTheme } from '@mui/material/styles' + +// project imports +import MainCard from 'ui-component/cards/MainCard' +import { StyledButton } from 'ui-component/button/StyledButton' +import ConfirmDialog from 'ui-component/dialog/ConfirmDialog' + +// API +import variablesApi from 'api/variables' + +// Hooks +import useApi from 'hooks/useApi' +import useConfirm from 'hooks/useConfirm' + +// utils +import useNotifier from 'utils/useNotifier' + +// Icons +import { IconTrash, IconEdit, IconX, IconPlus, IconSearch, IconVariable } from '@tabler/icons' +import VariablesEmptySVG from 'assets/images/variables_empty.svg' + +// const +import AddEditVariableDialog from './AddEditVariableDialog' +import HowToUseVariablesDialog from './HowToUseVariablesDialog' + +// ==============================|| Credentials ||============================== // + +const Variables = () => { + const theme = useTheme() + const customization = useSelector((state) => state.customization) + + const dispatch = useDispatch() + useNotifier() + + const enqueueSnackbar = (...args) => dispatch(enqueueSnackbarAction(...args)) + const closeSnackbar = (...args) => dispatch(closeSnackbarAction(...args)) + + const [showVariableDialog, setShowVariableDialog] = useState(false) + const [variableDialogProps, setVariableDialogProps] = useState({}) + const [variables, setVariables] = useState([]) + const [showHowToDialog, setShowHowToDialog] = useState(false) + + const { confirm } = useConfirm() + + const getAllVariables = useApi(variablesApi.getAllVariables) + + const [search, setSearch] = useState('') + const onSearchChange = (event) => { + setSearch(event.target.value) + } + function filterVariables(data) { + return data.name.toLowerCase().indexOf(search.toLowerCase()) > -1 + } + + const addNew = () => { + const dialogProp = { + type: 'ADD', + cancelButtonName: 'Cancel', + confirmButtonName: 'Add', + data: {} + } + setVariableDialogProps(dialogProp) + setShowVariableDialog(true) + } + + const edit = (variable) => { + const dialogProp = { + type: 'EDIT', + cancelButtonName: 'Cancel', + confirmButtonName: 'Save', + data: variable + } + setVariableDialogProps(dialogProp) + setShowVariableDialog(true) + } + + const deleteVariable = async (variable) => { + const confirmPayload = { + title: `Delete`, + description: `Delete variable ${variable.name}?`, + confirmButtonName: 'Delete', + cancelButtonName: 'Cancel' + } + const isConfirmed = await confirm(confirmPayload) + + if (isConfirmed) { + try { + const deleteResp = await variablesApi.deleteVariable(variable.id) + if (deleteResp.data) { + enqueueSnackbar({ + message: 'Variable deleted', + options: { + key: new Date().getTime() + Math.random(), + variant: 'success', + action: (key) => ( + + ) + } + }) + onConfirm() + } + } catch (error) { + const errorData = error.response?.data || `${error.response?.status}: ${error.response?.statusText}` + enqueueSnackbar({ + message: `Failed to delete Variable: ${errorData}`, + options: { + key: new Date().getTime() + Math.random(), + variant: 'error', + persist: true, + action: (key) => ( + + ) + } + }) + } + } + } + + const onConfirm = () => { + setShowVariableDialog(false) + getAllVariables.request() + } + + useEffect(() => { + getAllVariables.request() + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) + + useEffect(() => { + if (getAllVariables.data) { + setVariables(getAllVariables.data) + } + }, [getAllVariables.data]) + + return ( + <> + + + + +

Variables 

+ + + + ) + }} + /> + + + + + } + > + Add Variable + + + +
+
+
+ {variables.length === 0 && ( + + + VariablesEmptySVG + +
No Variables Yet
+
+ )} + {variables.length > 0 && ( + + + + + Name + Value + Type + Last Updated + Created + + + + + + {variables.filter(filterVariables).map((variable, index) => ( + + +
+
+ +
+ {variable.name} +
+
+ {variable.value} + + + + {moment(variable.updatedDate).format('DD-MMM-YY')} + {moment(variable.createdDate).format('DD-MMM-YY')} + + edit(variable)}> + + + + + deleteVariable(variable)}> + + + +
+ ))} +
+
+
+ )} +
+ setShowVariableDialog(false)} + onConfirm={onConfirm} + > + setShowHowToDialog(false)}> + + + ) +} + +export default Variables