From 393f9b57c69e1405731f7a9014a6b790713fd772 Mon Sep 17 00:00:00 2001 From: Henry Date: Sun, 28 Jan 2024 17:18:18 +0000 Subject: [PATCH] use singleton redis connection --- .../nodes/cache/RedisCache/RedisCache.ts | 43 ++++++++++++- .../cache/RedisCache/RedisEmbeddingsCache.ts | 43 ++++++++++++- .../RedisBackedChatMemory.ts | 61 ++++++++++++------- .../nodes/vectorstores/Qdrant/Qdrant.ts | 7 --- .../nodes/vectorstores/Redis/Redis.ts | 31 ++++++++-- .../vectorstores/Redis/RedisSearchBase.ts | 28 +++++++-- .../vectorstores/Redis/Redis_Existing.ts | 1 - .../nodes/vectorstores/Redis/Redis_Upsert.ts | 1 - 8 files changed, 169 insertions(+), 46 deletions(-) diff --git a/packages/components/nodes/cache/RedisCache/RedisCache.ts b/packages/components/nodes/cache/RedisCache/RedisCache.ts index 4e61c239e..c93adf58f 100644 --- a/packages/components/nodes/cache/RedisCache/RedisCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisCache.ts @@ -1,9 +1,46 @@ import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' import { RedisCache as LangchainRedisCache } from 'langchain/cache/ioredis' -import { Redis } from 'ioredis' +import { Redis, RedisOptions } from 'ioredis' +import { isEqual } from 'lodash' import { Generation, ChatGeneration, StoredGeneration, mapStoredMessageToChatMessage } from 'langchain/schema' import hash from 'object-hash' +let redisClientSingleton: Redis +let redisClientOption: RedisOptions +let redisClientUrl: string + +const getRedisClientbyOption = (option: RedisOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + +const getRedisClientbyUrl = (url: string) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } else if (redisClientSingleton && url !== redisClientUrl) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } + return redisClientSingleton +} + class RedisCache implements INode { label: string name: string @@ -60,7 +97,7 @@ class RedisCache implements INode { const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - client = new Redis({ + client = getRedisClientbyOption({ port: portStr ? parseInt(portStr) : 6379, host, username, @@ -68,7 +105,7 @@ class RedisCache implements INode { ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = getRedisClientbyUrl(redisUrl) } const redisClient = new LangchainRedisCache(client) diff --git a/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts b/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts index fe1b4df8a..b74413fe5 100644 --- a/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts @@ -1,9 +1,46 @@ import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' -import { Redis } from 'ioredis' +import { Redis, RedisOptions } from 'ioredis' +import { isEqual } from 'lodash' import { CacheBackedEmbeddings } from 'langchain/embeddings/cache_backed' import { RedisByteStore } from 'langchain/storage/ioredis' import { Embeddings } from 'langchain/embeddings/base' +let redisClientSingleton: Redis +let redisClientOption: RedisOptions +let redisClientUrl: string + +const getRedisClientbyOption = (option: RedisOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + +const getRedisClientbyUrl = (url: string) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } else if (redisClientSingleton && url !== redisClientUrl) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } + return redisClientSingleton +} + class RedisEmbeddingsCache implements INode { label: string name: string @@ -75,7 +112,7 @@ class RedisEmbeddingsCache implements INode { const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - client = new Redis({ + client = getRedisClientbyOption({ port: portStr ? parseInt(portStr) : 6379, host, username, @@ -83,7 +120,7 @@ class RedisEmbeddingsCache implements INode { ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = getRedisClientbyUrl(redisUrl) } ttl ??= '3600' diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index baf4ea6bb..c54e07b51 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -1,10 +1,47 @@ -import { Redis } from 'ioredis' +import { Redis, RedisOptions } from 'ioredis' +import { isEqual } from 'lodash' import { BufferMemory, BufferMemoryInput } from 'langchain/memory' import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis' import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from 'langchain/schema' import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface' import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' +let redisClientSingleton: Redis +let redisClientOption: RedisOptions +let redisClientUrl: string + +const getRedisClientbyOption = (option: RedisOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(option) + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + +const getRedisClientbyUrl = (url: string) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } else if (redisClientSingleton && url !== redisClientUrl) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = new Redis(url) + redisClientUrl = url + return redisClientSingleton + } + return redisClientSingleton +} + class RedisBackedChatMemory_Memory implements INode { label: string name: string @@ -95,7 +132,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} - client = new Redis({ + client = getRedisClientbyOption({ port: portStr ? parseInt(portStr) : 6379, host, username, @@ -103,7 +140,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = getRedisClientbyUrl(redisUrl) } let obj: RedisChatMessageHistoryInput = { @@ -120,24 +157,6 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom const redisChatMessageHistory = new RedisChatMessageHistory(obj) - /*redisChatMessageHistory.getMessages = async (): Promise => { - const rawStoredMessages = await client.lrange((redisChatMessageHistory as any).sessionId, windowSize ? -windowSize : 0, -1) - const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message)) - return orderedMessages.map(mapStoredMessageToChatMessage) - } - - redisChatMessageHistory.addMessage = async (message: BaseMessage): Promise => { - const messageToAdd = [message].map((msg) => msg.toDict()) - await client.lpush((redisChatMessageHistory as any).sessionId, JSON.stringify(messageToAdd[0])) - if (sessionTTL) { - await client.expire((redisChatMessageHistory as any).sessionId, sessionTTL) - } - } - - redisChatMessageHistory.clear = async (): Promise => { - await client.del((redisChatMessageHistory as any).sessionId) - }*/ - const memory = new BufferMemoryExtended({ memoryKey: memoryKey ?? 'chat_history', chatHistory: redisChatMessageHistory, diff --git a/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts b/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts index 1d5f77887..80899942f 100644 --- a/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts +++ b/packages/components/nodes/vectorstores/Qdrant/Qdrant.ts @@ -1,6 +1,5 @@ import { flatten } from 'lodash' import { QdrantClient } from '@qdrant/js-client-rest' -import type { Schemas as QdrantSchemas } from '@qdrant/js-client-rest' import { VectorStoreRetrieverInput } from 'langchain/vectorstores/base' import { Document } from 'langchain/document' import { QdrantVectorStore, QdrantLibArgs } from 'langchain/vectorstores/qdrant' @@ -9,12 +8,6 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' type RetrieverConfig = Partial> -type QdrantSearchResponse = QdrantSchemas['ScoredPoint'] & { - payload: { - metadata: object - content: string - } -} class Qdrant_VectorStores implements INode { label: string diff --git a/packages/components/nodes/vectorstores/Redis/Redis.ts b/packages/components/nodes/vectorstores/Redis/Redis.ts index 49f9e8ffc..0dddf782b 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis.ts @@ -1,5 +1,5 @@ -import { flatten } from 'lodash' -import { createClient, SearchOptions } from 'redis' +import { flatten, isEqual } from 'lodash' +import { createClient, SearchOptions, RedisClientOptions } from 'redis' import { Embeddings } from 'langchain/embeddings/base' import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis' import { Document } from 'langchain/document' @@ -7,6 +7,27 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { escapeAllStrings, escapeSpecialChars, unEscapeSpecialChars } from './utils' +let redisClientSingleton: ReturnType +let redisClientOption: RedisClientOptions + +const getRedisClient = async (option: RedisClientOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} + class Redis_VectorStores implements INode { label: string name: string @@ -149,8 +170,7 @@ class Redis_VectorStores implements INode { } try { - const redisClient = createClient({ url: redisUrl }) - await redisClient.connect() + const redisClient = await getRedisClient({ url: redisUrl }) const storeConfig: RedisVectorStoreConfig = { redisClient: redisClient, @@ -210,8 +230,7 @@ class Redis_VectorStores implements INode { redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr } - const redisClient = createClient({ url: redisUrl }) - await redisClient.connect() + const redisClient = await getRedisClient({ url: redisUrl }) const storeConfig: RedisVectorStoreConfig = { redisClient: redisClient, diff --git a/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts b/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts index b6aa6ebb0..e87b49f9c 100644 --- a/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts +++ b/packages/components/nodes/vectorstores/Redis/RedisSearchBase.ts @@ -7,13 +7,34 @@ import { INodeOutputsValue, INodeParams } from '../../../src' - import { Embeddings } from 'langchain/embeddings/base' import { VectorStore } from 'langchain/vectorstores/base' import { Document } from 'langchain/document' -import { createClient, SearchOptions } from 'redis' +import { createClient, SearchOptions, RedisClientOptions } from 'redis' import { RedisVectorStore } from 'langchain/vectorstores/redis' import { escapeSpecialChars, unEscapeSpecialChars } from './utils' +import { isEqual } from 'lodash' + +let redisClientSingleton: ReturnType +let redisClientOption: RedisClientOptions + +const getRedisClient = async (option: RedisClientOptions) => { + if (!redisClientSingleton) { + // if client doesn't exists + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } else if (redisClientSingleton && !isEqual(option, redisClientOption)) { + // if client exists but option changed + redisClientSingleton.quit() + redisClientSingleton = createClient(option) + await redisClientSingleton.connect() + redisClientOption = option + return redisClientSingleton + } + return redisClientSingleton +} export abstract class RedisSearchBase { label: string @@ -141,8 +162,7 @@ export abstract class RedisSearchBase { redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr } - this.redisClient = createClient({ url: redisUrl }) - await this.redisClient.connect() + this.redisClient = await getRedisClient({ url: redisUrl }) const vectorStore = await this.constructVectorStore(embeddings, indexName, replaceIndex, docs) if (!contentKey || contentKey === '') contentKey = 'content' diff --git a/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts b/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts index e8848d338..6f2ec9b32 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis_Existing.ts @@ -3,7 +3,6 @@ import { Embeddings } from 'langchain/embeddings/base' import { VectorStore } from 'langchain/vectorstores/base' import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis' import { Document } from 'langchain/document' - import { RedisSearchBase } from './RedisSearchBase' class RedisExisting_VectorStores extends RedisSearchBase implements INode { diff --git a/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts b/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts index 4da58eaff..c43948247 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis_Upsert.ts @@ -1,7 +1,6 @@ import { ICommonObject, INode, INodeData } from '../../../src/Interface' import { Embeddings } from 'langchain/embeddings/base' import { Document } from 'langchain/document' - import { flatten } from 'lodash' import { RedisSearchBase } from './RedisSearchBase' import { VectorStore } from 'langchain/vectorstores/base'