diff --git a/packages/components/nodes/memory/BufferMemory/BufferMemory.ts b/packages/components/nodes/memory/BufferMemory/BufferMemory.ts index 7793d96d4..0ad8adec9 100644 --- a/packages/components/nodes/memory/BufferMemory/BufferMemory.ts +++ b/packages/components/nodes/memory/BufferMemory/BufferMemory.ts @@ -1,6 +1,7 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' -import { BufferMemory } from 'langchain/memory' +import { FlowiseMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class BufferMemory_Memory implements INode { label: string @@ -41,7 +42,7 @@ class BufferMemory_Memory implements INode { async init(nodeData: INodeData): Promise { const memoryKey = nodeData.inputs?.memoryKey as string const inputKey = nodeData.inputs?.inputKey as string - return new BufferMemory({ + return new BufferMemoryExtended({ returnMessages: true, memoryKey, inputKey @@ -49,4 +50,41 @@ class BufferMemory_Memory implements INode { } } +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { + constructor(fields: BufferMemoryInput) { + super(fields) + } + + async getChatMessages(_?: string, returnBaseMessages = false): Promise { + const memoryResult = await this.loadMemoryVariables({}) + const baseMessages = memoryResult[this.memoryKey ?? 'chat_history'] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise { + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues) + } + + async clearChatMessages(): Promise { + await this.clear() + } + + async resumeMessages(messages: IMessage[]): Promise { + // Clear existing chatHistory to avoid duplication + if (messages.length) await this.clear() + + // Insert into chatHistory + for (const msg of messages) { + if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message) + else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message) + } + } +} + module.exports = { nodeClass: BufferMemory_Memory } diff --git a/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts b/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts index 84e607e54..ca8d0ddfd 100644 --- a/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts +++ b/packages/components/nodes/memory/BufferWindowMemory/BufferWindowMemory.ts @@ -1,6 +1,7 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' +import { FlowiseWindowMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils' import { BufferWindowMemory, BufferWindowMemoryInput } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class BufferWindowMemory_Memory implements INode { label: string @@ -57,7 +58,44 @@ class BufferWindowMemory_Memory implements INode { k: parseInt(k, 10) } - return new BufferWindowMemory(obj) + return new BufferWindowMemoryExtended(obj) + } +} + +class BufferWindowMemoryExtended extends FlowiseWindowMemory implements MemoryMethods { + constructor(fields: BufferWindowMemoryInput) { + super(fields) + } + + async getChatMessages(_?: string, returnBaseMessages = false): Promise { + const memoryResult = await this.loadMemoryVariables({}) + const baseMessages = memoryResult[this.memoryKey ?? 'chat_history'] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise { + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues) + } + + async clearChatMessages(): Promise { + await this.clear() + } + + async resumeMessages(messages: IMessage[]): Promise { + // Clear existing chatHistory to avoid duplication + if (messages.length) await this.clear() + + // Insert into chatHistory + for (const msg of messages) { + if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message) + else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message) + } } } diff --git a/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts b/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts index 332d73aa9..107ab7db9 100644 --- a/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts +++ b/packages/components/nodes/memory/ConversationSummaryMemory/ConversationSummaryMemory.ts @@ -1,7 +1,8 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' +import { FlowiseSummaryMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses } from '../../../src/utils' import { ConversationSummaryMemory, ConversationSummaryMemoryInput } from 'langchain/memory' import { BaseLanguageModel } from 'langchain/base_language' +import { BaseMessage } from 'langchain/schema' class ConversationSummaryMemory_Memory implements INode { label: string @@ -56,7 +57,48 @@ class ConversationSummaryMemory_Memory implements INode { inputKey } - return new ConversationSummaryMemory(obj) + return new ConversationSummaryMemoryExtended(obj) + } +} + +class ConversationSummaryMemoryExtended extends FlowiseSummaryMemory implements MemoryMethods { + constructor(fields: ConversationSummaryMemoryInput) { + super(fields) + } + + async getChatMessages(_?: string, returnBaseMessages = false): Promise { + const memoryResult = await this.loadMemoryVariables({}) + const baseMessages = memoryResult[this.memoryKey ?? 'chat_history'] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[]): Promise { + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues) + } + + async clearChatMessages(): Promise { + await this.clear() + } + + async resumeMessages(messages: IMessage[]): Promise { + // Clear existing chatHistory to avoid duplication + if (messages.length) await this.clear() + + // Insert into chatHistory + for (const msg of messages) { + if (msg.type === 'userMessage') await this.chatHistory.addUserMessage(msg.message) + else if (msg.type === 'apiMessage') await this.chatHistory.addAIChatMessage(msg.message) + } + + // Replace buffer + const chatMessages = await this.chatHistory.getMessages() + this.buffer = await this.predictNewSummary(chatMessages.slice(-2), this.buffer) } } diff --git a/packages/components/nodes/memory/DynamoDb/DynamoDb.ts b/packages/components/nodes/memory/DynamoDb/DynamoDb.ts index 8ca6cf9e5..15b00d335 100644 --- a/packages/components/nodes/memory/DynamoDb/DynamoDb.ts +++ b/packages/components/nodes/memory/DynamoDb/DynamoDb.ts @@ -1,15 +1,25 @@ import { - ICommonObject, - INode, - INodeData, - INodeParams, + DynamoDBClient, + DynamoDBClientConfig, + GetItemCommand, + GetItemCommandInput, + UpdateItemCommand, + UpdateItemCommandInput, + DeleteItemCommand, + DeleteItemCommandInput, + AttributeValue +} from '@aws-sdk/client-dynamodb' +import { DynamoDBChatMessageHistory } from 'langchain/stores/message/dynamodb' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, StoredMessage, BaseMessage } from 'langchain/schema' +import { + convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory -} from '../../../src' -import { DynamoDBChatMessageHistory } from 'langchain/stores/message/dynamodb' -import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +} from '../../../src/utils' +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' class DynamoDb_Memory implements INode { label: string @@ -102,49 +112,199 @@ class DynamoDb_Memory implements INode { const initalizeDynamoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { const tableName = nodeData.inputs?.tableName as string const partitionKey = nodeData.inputs?.partitionKey as string - const sessionId = nodeData.inputs?.sessionId as string const region = nodeData.inputs?.region as string const memoryKey = nodeData.inputs?.memoryKey as string const chatId = options.chatId let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const accessKeyId = getCredentialParam('accessKey', credentialData, nodeData) const secretAccessKey = getCredentialParam('secretAccessKey', credentialData, nodeData) + const config: DynamoDBClientConfig = { + region, + credentials: { + accessKeyId, + secretAccessKey + } + } + + const client = new DynamoDBClient(config ?? {}) + const dynamoDb = new DynamoDBChatMessageHistory({ tableName, partitionKey, - sessionId: sessionId ? sessionId : chatId, - config: { - region, - credentials: { - accessKeyId, - secretAccessKey - } - } + sessionId, + config }) const memory = new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: dynamoDb, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + dynamodbClient: client }) return memory } interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + dynamodbClient: DynamoDBClient + sessionId: string } -class BufferMemoryExtended extends BufferMemory { - isSessionIdUsingChatMessageId? = false +interface DynamoDBSerializedChatMessage { + M: { + type: { + S: string + } + text: { + S: string + } + role?: { + S: string + } + } +} - constructor(fields: BufferMemoryInput & Partial) { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { + isSessionIdUsingChatMessageId = false + sessionId = '' + dynamodbClient: DynamoDBClient + + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) - this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.dynamodbClient = fields.dynamodbClient + } + + overrideDynamoKey(overrideSessionId = '') { + const existingDynamoKey = (this as any).dynamoKey + const partitionKey = (this as any).partitionKey + + let newDynamoKey: Record = {} + + if (Object.keys(existingDynamoKey).includes(partitionKey)) { + newDynamoKey[partitionKey] = { S: overrideSessionId } + } + + return Object.keys(newDynamoKey).length ? newDynamoKey : existingDynamoKey + } + + async addNewMessage( + messages: StoredMessage[], + client: DynamoDBClient, + tableName = '', + dynamoKey: Record = {}, + messageAttributeName = 'messages' + ) { + const params: UpdateItemCommandInput = { + TableName: tableName, + Key: dynamoKey, + ExpressionAttributeNames: { + '#m': messageAttributeName + }, + ExpressionAttributeValues: { + ':empty_list': { + L: [] + }, + ':m': { + L: messages.map((message) => { + const dynamoSerializedMessage: DynamoDBSerializedChatMessage = { + M: { + type: { + S: message.type + }, + text: { + S: message.data.content + } + } + } + if (message.data.role) { + dynamoSerializedMessage.M.role = { S: message.data.role } + } + return dynamoSerializedMessage + }) + } + }, + UpdateExpression: 'SET #m = list_append(if_not_exists(#m, :empty_list), :m)' + } + + await client.send(new UpdateItemCommand(params)) + } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + if (!this.dynamodbClient) return [] + + const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey + const tableName = (this as any).tableName + const messageAttributeName = (this as any).messageAttributeName + + const params: GetItemCommandInput = { + TableName: tableName, + Key: dynamoKey + } + + const response = await this.dynamodbClient.send(new GetItemCommand(params)) + const items = response.Item ? response.Item[messageAttributeName]?.L ?? [] : [] + const messages = items + .map((item) => ({ + type: item.M?.type.S, + data: { + role: item.M?.role?.S, + content: item.M?.text.S + } + })) + .filter((x): x is StoredMessage => x.type !== undefined && x.data.content !== undefined) + const baseMessages = messages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.dynamodbClient) return + + const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey + const tableName = (this as any).tableName + const messageAttributeName = (this as any).messageAttributeName + + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.addNewMessage(messageToAdd, this.dynamodbClient, tableName, dynamoKey, messageAttributeName) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.addNewMessage(messageToAdd, this.dynamodbClient, tableName, dynamoKey, messageAttributeName) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.dynamodbClient) return + + const dynamoKey = overrideSessionId ? this.overrideDynamoKey(overrideSessionId) : (this as any).dynamoKey + const tableName = (this as any).tableName + + const params: DeleteItemCommandInput = { + TableName: tableName, + Key: dynamoKey + } + await this.dynamodbClient.send(new DeleteItemCommand(params)) + await this.clear() } } diff --git a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts index b654a5b20..681e9042c 100644 --- a/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts +++ b/packages/components/nodes/memory/MongoDBMemory/MongoDBMemory.ts @@ -1,17 +1,15 @@ +import { MongoClient, Collection, Document } from 'mongodb' +import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' +import { BufferMemory, BufferMemoryInput } from 'langchain/memory' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, BaseMessage } from 'langchain/schema' import { + convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam, - ICommonObject, - INode, - INodeData, - INodeParams, serializeChatHistory -} from '../../../src' -import { MongoDBChatMessageHistory } from 'langchain/stores/message/mongodb' -import { BufferMemory, BufferMemoryInput } from 'langchain/memory' -import { BaseMessage, mapStoredMessageToChatMessage } from 'langchain/schema' -import { MongoClient } from 'mongodb' +} from '../../../src/utils' +import { FlowiseMemory, ICommonObject, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' class MongoDB_Memory implements INode { label: string @@ -99,23 +97,30 @@ class MongoDB_Memory implements INode { const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): Promise => { const databaseName = nodeData.inputs?.databaseName as string const collectionName = nodeData.inputs?.collectionName as string - const sessionId = nodeData.inputs?.sessionId as string const memoryKey = nodeData.inputs?.memoryKey as string const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) - let mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) + const mongoDBConnectUrl = getCredentialParam('mongoDBConnectUrl', credentialData, nodeData) const client = new MongoClient(mongoDBConnectUrl) await client.connect() + const collection = client.db(databaseName).collection(collectionName) const mongoDBChatMessageHistory = new MongoDBChatMessageHistory({ collection, - sessionId: sessionId ? sessionId : chatId + sessionId }) mongoDBChatMessageHistory.getMessages = async (): Promise => { @@ -144,20 +149,77 @@ const initializeMongoDB = async (nodeData: INodeData, options: ICommonObject): P return new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: mongoDBChatMessageHistory, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + collection }) } interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + collection: Collection + sessionId: string } -class BufferMemoryExtended extends BufferMemory { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { + sessionId = '' + collection: Collection isSessionIdUsingChatMessageId? = false - constructor(fields: BufferMemoryInput & Partial) { + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) - this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.collection = fields.collection + } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + if (!this.collection) return [] + + const id = overrideSessionId ?? this.sessionId + const document = await this.collection.findOne({ sessionId: id }) + const messages = document?.messages || [] + const baseMessages = messages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.collection) return + + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.collection.updateOne( + { sessionId: id }, + { + $push: { messages: { $each: messageToAdd } } + }, + { upsert: true } + ) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.collection.updateOne( + { sessionId: id }, + { + $push: { messages: { $each: messageToAdd } } + }, + { upsert: true } + ) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.collection) return + + const id = overrideSessionId ?? this.sessionId + await this.collection.deleteOne({ sessionId: id }) + await this.clear() } } diff --git a/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts b/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts index fc4a06dcc..97f25ba3f 100644 --- a/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts +++ b/packages/components/nodes/memory/MotorheadMemory/MotorheadMemory.ts @@ -1,9 +1,9 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +import { IMessage, INode, INodeData, INodeParams, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ICommonObject } from '../../../src' -import { MotorheadMemory, MotorheadMemoryInput } from 'langchain/memory' +import { MotorheadMemory, MotorheadMemoryInput, InputValues, MemoryVariables, OutputValues, getBufferString } from 'langchain/memory' import fetch from 'node-fetch' -import { getBufferString } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class MotorMemory_Memory implements INode { label: string @@ -88,19 +88,26 @@ class MotorMemory_Memory implements INode { const initalizeMotorhead = async (nodeData: INodeData, options: ICommonObject): Promise => { const memoryKey = nodeData.inputs?.memoryKey as string const baseURL = nodeData.inputs?.baseURL as string - const sessionId = nodeData.inputs?.sessionId as string const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const apiKey = getCredentialParam('apiKey', credentialData, nodeData) const clientId = getCredentialParam('clientId', credentialData, nodeData) - let obj: MotorheadMemoryInput & Partial = { + let obj: MotorheadMemoryInput & MotorheadMemoryExtendedInput = { returnMessages: true, - sessionId: sessionId ? sessionId : chatId, + isSessionIdUsingChatMessageId, + sessionId, memoryKey } @@ -117,8 +124,6 @@ const initalizeMotorhead = async (nodeData: INodeData, options: ICommonObject): } } - if (isSessionIdUsingChatMessageId) obj.isSessionIdUsingChatMessageId = true - const motorheadMemory = new MotorheadMemoryExtended(obj) // Get messages from sessionId @@ -134,12 +139,29 @@ interface MotorheadMemoryExtendedInput { class MotorheadMemoryExtended extends MotorheadMemory { isSessionIdUsingChatMessageId? = false - constructor(fields: MotorheadMemoryInput & Partial) { + constructor(fields: MotorheadMemoryInput & MotorheadMemoryExtendedInput) { super(fields) this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId } - async clear(): Promise { + async loadMemoryVariables(values: InputValues, overrideSessionId = ''): Promise { + if (overrideSessionId) { + this.sessionId = overrideSessionId + } + return super.loadMemoryVariables({ values }) + } + + async saveContext(inputValues: InputValues, outputValues: OutputValues, overrideSessionId = ''): Promise { + if (overrideSessionId) { + this.sessionId = overrideSessionId + } + return super.saveContext(inputValues, outputValues) + } + + async clear(overrideSessionId = ''): Promise { + if (overrideSessionId) { + this.sessionId = overrideSessionId + } try { await this.caller.call(fetch, `${this.url}/sessions/${this.sessionId}/memory`, { //@ts-ignore @@ -155,6 +177,28 @@ class MotorheadMemoryExtended extends MotorheadMemory { await this.chatHistory.clear() await super.clear() } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + const id = overrideSessionId ?? this.sessionId + const memoryVariables = await this.loadMemoryVariables({}, id) + const baseMessages = memoryVariables[this.memoryKey] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues, id) + } + + async clearChatMessages(overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + await this.clear(id) + } } module.exports = { nodeClass: MotorMemory_Memory } diff --git a/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts b/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts index 8bca04404..3ff20a882 100644 --- a/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/UpstashRedisBackedChatMemory/UpstashRedisBackedChatMemory.ts @@ -1,8 +1,16 @@ -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam, serializeChatHistory } from '../../../src/utils' -import { ICommonObject } from '../../../src' +import { Redis } from '@upstash/redis' import { BufferMemory, BufferMemoryInput } from 'langchain/memory' import { UpstashRedisChatMessageHistory } from 'langchain/stores/message/upstash_redis' +import { mapStoredMessageToChatMessage, AIMessage, HumanMessage, StoredMessage, BaseMessage } from 'langchain/schema' +import { FlowiseMemory, IMessage, INode, INodeData, INodeParams, MemoryMethods, MessageType } from '../../../src/Interface' +import { + convertBaseMessagetoIMessage, + getBaseClasses, + getCredentialData, + getCredentialParam, + serializeChatHistory +} from '../../../src/utils' +import { ICommonObject } from '../../../src/Interface' class UpstashRedisBackedChatMemory_Memory implements INode { label: string @@ -84,29 +92,39 @@ class UpstashRedisBackedChatMemory_Memory implements INode { const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject): Promise => { const baseURL = nodeData.inputs?.baseURL as string - const sessionId = nodeData.inputs?.sessionId as string const sessionTTL = nodeData.inputs?.sessionTTL as string const chatId = options?.chatId as string let isSessionIdUsingChatMessageId = false - if (!sessionId && chatId) isSessionIdUsingChatMessageId = true + let sessionId = '' + + if (!nodeData.inputs?.sessionId && chatId) { + isSessionIdUsingChatMessageId = true + sessionId = chatId + } else { + sessionId = nodeData.inputs?.sessionId + } const credentialData = await getCredentialData(nodeData.credential ?? '', options) const upstashRestToken = getCredentialParam('upstashRestToken', credentialData, nodeData) + const client = new Redis({ + url: baseURL, + token: upstashRestToken + }) + const redisChatMessageHistory = new UpstashRedisChatMessageHistory({ - sessionId: sessionId ? sessionId : chatId, + sessionId, sessionTTL: sessionTTL ? parseInt(sessionTTL, 10) : undefined, - config: { - url: baseURL, - token: upstashRestToken - } + client }) const memory = new BufferMemoryExtended({ memoryKey: 'chat_history', chatHistory: redisChatMessageHistory, - isSessionIdUsingChatMessageId + isSessionIdUsingChatMessageId, + sessionId, + redisClient: client }) return memory @@ -114,14 +132,59 @@ const initalizeUpstashRedis = async (nodeData: INodeData, options: ICommonObject interface BufferMemoryExtendedInput { isSessionIdUsingChatMessageId: boolean + redisClient: Redis + sessionId: string } -class BufferMemoryExtended extends BufferMemory { +class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { isSessionIdUsingChatMessageId? = false + sessionId = '' + redisClient: Redis - constructor(fields: BufferMemoryInput & Partial) { + constructor(fields: BufferMemoryInput & BufferMemoryExtendedInput) { super(fields) this.isSessionIdUsingChatMessageId = fields.isSessionIdUsingChatMessageId + this.sessionId = fields.sessionId + this.redisClient = fields.redisClient + } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + if (!this.redisClient) return [] + + const id = overrideSessionId ?? this.sessionId + const rawStoredMessages: StoredMessage[] = await this.redisClient.lrange(id, 0, -1) + const orderedMessages = rawStoredMessages.reverse() + const previousMessages = orderedMessages.filter((x): x is StoredMessage => x.type !== undefined && x.data.content !== undefined) + const baseMessages = previousMessages.map(mapStoredMessageToChatMessage) + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + if (!this.redisClient) return + + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + + if (input) { + const newInputMessage = new HumanMessage(input.text) + const messageToAdd = [newInputMessage].map((msg) => msg.toDict()) + await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + } + + if (output) { + const newOutputMessage = new AIMessage(output.text) + const messageToAdd = [newOutputMessage].map((msg) => msg.toDict()) + await this.redisClient.lpush(id, JSON.stringify(messageToAdd[0])) + } + } + + async clearChatMessages(overrideSessionId = ''): Promise { + if (!this.redisClient) return + + const id = overrideSessionId ?? this.sessionId + await this.redisClient.del(id) + await this.clear() } } diff --git a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts index ea52cb0b3..3da35db27 100644 --- a/packages/components/nodes/memory/ZepMemory/ZepMemory.ts +++ b/packages/components/nodes/memory/ZepMemory/ZepMemory.ts @@ -1,8 +1,9 @@ +import { IMessage, INode, INodeData, INodeParams, MessageType } from '../../../src/Interface' +import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ZepMemory, ZepMemoryInput } from 'langchain/memory/zep' -import { getBufferString, InputValues, MemoryVariables, OutputValues } from 'langchain/memory' -import { INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ICommonObject } from '../../../src' +import { InputValues, MemoryVariables, OutputValues, getBufferString } from 'langchain/memory' +import { BaseMessage } from 'langchain/schema' class ZepMemory_Memory implements INode { label: string @@ -147,7 +148,7 @@ const initalizeZep = async (nodeData: INodeData, options: ICommonObject): Promis const obj: ZepMemoryInput & ZepMemoryExtendedInput = { baseURL, - sessionId: sessionId ? sessionId : chatId, + sessionId, aiPrefix, humanPrefix, returnMessages: true, @@ -196,6 +197,28 @@ class ZepMemoryExtended extends ZepMemory { } return super.clear() } + + async getChatMessages(overrideSessionId = '', returnBaseMessages = false): Promise { + const id = overrideSessionId ?? this.sessionId + const memoryVariables = await this.loadMemoryVariables({}, id) + const baseMessages = memoryVariables[this.memoryKey] + return returnBaseMessages ? baseMessages : convertBaseMessagetoIMessage(baseMessages) + } + + async addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + const input = msgArray.find((msg) => msg.type === 'userMessage') + const output = msgArray.find((msg) => msg.type === 'apiMessage') + const inputValues = { [this.inputKey ?? 'input']: input?.text } + const outputValues = { output: output?.text } + + await this.saveContext(inputValues, outputValues, id) + } + + async clearChatMessages(overrideSessionId = ''): Promise { + const id = overrideSessionId ?? this.sessionId + await this.clear(id) + } } module.exports = { nodeClass: ZepMemory_Memory } diff --git a/packages/components/src/Interface.ts b/packages/components/src/Interface.ts index bc50155c3..e508ebeee 100644 --- a/packages/components/src/Interface.ts +++ b/packages/components/src/Interface.ts @@ -196,3 +196,34 @@ export class VectorStoreRetriever { this.vectorStore = fields.vectorStore } } + +/** + * Implement abstract classes and interface for memory + */ +import { BaseMessage } from 'langchain/schema' +import { BufferMemory, BufferWindowMemory, ConversationSummaryMemory } from 'langchain/memory' + +export interface MemoryMethods { + getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + clearChatMessages(overrideSessionId?: string): Promise + resumeMessages?(messages: IMessage[]): Promise +} + +export abstract class FlowiseMemory extends BufferMemory implements MemoryMethods { + abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + abstract clearChatMessages(overrideSessionId?: string): Promise +} + +export abstract class FlowiseWindowMemory extends BufferWindowMemory implements MemoryMethods { + abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + abstract clearChatMessages(overrideSessionId?: string): Promise +} + +export abstract class FlowiseSummaryMemory extends ConversationSummaryMemory implements MemoryMethods { + abstract getChatMessages(overrideSessionId?: string, returnBaseMessages?: boolean): Promise + abstract addChatMessages(msgArray: { text: string; type: MessageType }[], overrideSessionId?: string): Promise + abstract clearChatMessages(overrideSessionId?: string): Promise +}