use singleton redis connection

This commit is contained in:
Henry 2024-01-28 17:18:18 +00:00
parent b382dd4c43
commit 393f9b57c6
8 changed files with 169 additions and 46 deletions

View File

@ -1,9 +1,46 @@
import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { RedisCache as LangchainRedisCache } from 'langchain/cache/ioredis' import { RedisCache as LangchainRedisCache } from 'langchain/cache/ioredis'
import { Redis } from 'ioredis' import { Redis, RedisOptions } from 'ioredis'
import { isEqual } from 'lodash'
import { Generation, ChatGeneration, StoredGeneration, mapStoredMessageToChatMessage } from 'langchain/schema' import { Generation, ChatGeneration, StoredGeneration, mapStoredMessageToChatMessage } from 'langchain/schema'
import hash from 'object-hash' import hash from 'object-hash'
let redisClientSingleton: Redis
let redisClientOption: RedisOptions
let redisClientUrl: string
const getRedisClientbyOption = (option: RedisOptions) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = new Redis(option)
redisClientOption = option
return redisClientSingleton
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = new Redis(option)
redisClientOption = option
return redisClientSingleton
}
return redisClientSingleton
}
const getRedisClientbyUrl = (url: string) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = new Redis(url)
redisClientUrl = url
return redisClientSingleton
} else if (redisClientSingleton && url !== redisClientUrl) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = new Redis(url)
redisClientUrl = url
return redisClientSingleton
}
return redisClientSingleton
}
class RedisCache implements INode { class RedisCache implements INode {
label: string label: string
name: string name: string
@ -60,7 +97,7 @@ class RedisCache implements INode {
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
client = new Redis({ client = getRedisClientbyOption({
port: portStr ? parseInt(portStr) : 6379, port: portStr ? parseInt(portStr) : 6379,
host, host,
username, username,
@ -68,7 +105,7 @@ class RedisCache implements INode {
...tlsOptions ...tlsOptions
}) })
} else { } else {
client = new Redis(redisUrl) client = getRedisClientbyUrl(redisUrl)
} }
const redisClient = new LangchainRedisCache(client) const redisClient = new LangchainRedisCache(client)

View File

@ -1,9 +1,46 @@
import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src' import { getBaseClasses, getCredentialData, getCredentialParam, ICommonObject, INode, INodeData, INodeParams } from '../../../src'
import { Redis } from 'ioredis' import { Redis, RedisOptions } from 'ioredis'
import { isEqual } from 'lodash'
import { CacheBackedEmbeddings } from 'langchain/embeddings/cache_backed' import { CacheBackedEmbeddings } from 'langchain/embeddings/cache_backed'
import { RedisByteStore } from 'langchain/storage/ioredis' import { RedisByteStore } from 'langchain/storage/ioredis'
import { Embeddings } from 'langchain/embeddings/base' import { Embeddings } from 'langchain/embeddings/base'
let redisClientSingleton: Redis
let redisClientOption: RedisOptions
let redisClientUrl: string
const getRedisClientbyOption = (option: RedisOptions) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = new Redis(option)
redisClientOption = option
return redisClientSingleton
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = new Redis(option)
redisClientOption = option
return redisClientSingleton
}
return redisClientSingleton
}
const getRedisClientbyUrl = (url: string) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = new Redis(url)
redisClientUrl = url
return redisClientSingleton
} else if (redisClientSingleton && url !== redisClientUrl) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = new Redis(url)
redisClientUrl = url
return redisClientSingleton
}
return redisClientSingleton
}
class RedisEmbeddingsCache implements INode { class RedisEmbeddingsCache implements INode {
label: string label: string
name: string name: string
@ -75,7 +112,7 @@ class RedisEmbeddingsCache implements INode {
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
client = new Redis({ client = getRedisClientbyOption({
port: portStr ? parseInt(portStr) : 6379, port: portStr ? parseInt(portStr) : 6379,
host, host,
username, username,
@ -83,7 +120,7 @@ class RedisEmbeddingsCache implements INode {
...tlsOptions ...tlsOptions
}) })
} else { } else {
client = new Redis(redisUrl) client = getRedisClientbyUrl(redisUrl)
} }
ttl ??= '3600' ttl ??= '3600'

View File

@ -1,10 +1,47 @@
import { Redis } from 'ioredis' import { Redis, RedisOptions } from 'ioredis'
import { isEqual } from 'lodash'
import { BufferMemory, BufferMemoryInput } from 'langchain/memory' import { BufferMemory, BufferMemoryInput } from 'langchain/memory'
import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis' import { RedisChatMessageHistory, RedisChatMessageHistoryInput } from 'langchain/stores/message/ioredis'
import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from 'langchain/schema' import { mapStoredMessageToChatMessage, BaseMessage, AIMessage, HumanMessage } from 'langchain/schema'
import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface' import { INode, INodeData, INodeParams, ICommonObject, MessageType, IMessage, MemoryMethods, FlowiseMemory } from '../../../src/Interface'
import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { convertBaseMessagetoIMessage, getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
let redisClientSingleton: Redis
let redisClientOption: RedisOptions
let redisClientUrl: string
const getRedisClientbyOption = (option: RedisOptions) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = new Redis(option)
redisClientOption = option
return redisClientSingleton
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = new Redis(option)
redisClientOption = option
return redisClientSingleton
}
return redisClientSingleton
}
const getRedisClientbyUrl = (url: string) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = new Redis(url)
redisClientUrl = url
return redisClientSingleton
} else if (redisClientSingleton && url !== redisClientUrl) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = new Redis(url)
redisClientUrl = url
return redisClientSingleton
}
return redisClientSingleton
}
class RedisBackedChatMemory_Memory implements INode { class RedisBackedChatMemory_Memory implements INode {
label: string label: string
name: string name: string
@ -95,7 +132,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {} const tlsOptions = sslEnabled === true ? { tls: { rejectUnauthorized: false } } : {}
client = new Redis({ client = getRedisClientbyOption({
port: portStr ? parseInt(portStr) : 6379, port: portStr ? parseInt(portStr) : 6379,
host, host,
username, username,
@ -103,7 +140,7 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
...tlsOptions ...tlsOptions
}) })
} else { } else {
client = new Redis(redisUrl) client = getRedisClientbyUrl(redisUrl)
} }
let obj: RedisChatMessageHistoryInput = { let obj: RedisChatMessageHistoryInput = {
@ -120,24 +157,6 @@ const initalizeRedis = async (nodeData: INodeData, options: ICommonObject): Prom
const redisChatMessageHistory = new RedisChatMessageHistory(obj) const redisChatMessageHistory = new RedisChatMessageHistory(obj)
/*redisChatMessageHistory.getMessages = async (): Promise<BaseMessage[]> => {
const rawStoredMessages = await client.lrange((redisChatMessageHistory as any).sessionId, windowSize ? -windowSize : 0, -1)
const orderedMessages = rawStoredMessages.reverse().map((message) => JSON.parse(message))
return orderedMessages.map(mapStoredMessageToChatMessage)
}
redisChatMessageHistory.addMessage = async (message: BaseMessage): Promise<void> => {
const messageToAdd = [message].map((msg) => msg.toDict())
await client.lpush((redisChatMessageHistory as any).sessionId, JSON.stringify(messageToAdd[0]))
if (sessionTTL) {
await client.expire((redisChatMessageHistory as any).sessionId, sessionTTL)
}
}
redisChatMessageHistory.clear = async (): Promise<void> => {
await client.del((redisChatMessageHistory as any).sessionId)
}*/
const memory = new BufferMemoryExtended({ const memory = new BufferMemoryExtended({
memoryKey: memoryKey ?? 'chat_history', memoryKey: memoryKey ?? 'chat_history',
chatHistory: redisChatMessageHistory, chatHistory: redisChatMessageHistory,

View File

@ -1,6 +1,5 @@
import { flatten } from 'lodash' import { flatten } from 'lodash'
import { QdrantClient } from '@qdrant/js-client-rest' import { QdrantClient } from '@qdrant/js-client-rest'
import type { Schemas as QdrantSchemas } from '@qdrant/js-client-rest'
import { VectorStoreRetrieverInput } from 'langchain/vectorstores/base' import { VectorStoreRetrieverInput } from 'langchain/vectorstores/base'
import { Document } from 'langchain/document' import { Document } from 'langchain/document'
import { QdrantVectorStore, QdrantLibArgs } from 'langchain/vectorstores/qdrant' import { QdrantVectorStore, QdrantLibArgs } from 'langchain/vectorstores/qdrant'
@ -9,12 +8,6 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
type RetrieverConfig = Partial<VectorStoreRetrieverInput<QdrantVectorStore>> type RetrieverConfig = Partial<VectorStoreRetrieverInput<QdrantVectorStore>>
type QdrantSearchResponse = QdrantSchemas['ScoredPoint'] & {
payload: {
metadata: object
content: string
}
}
class Qdrant_VectorStores implements INode { class Qdrant_VectorStores implements INode {
label: string label: string

View File

@ -1,5 +1,5 @@
import { flatten } from 'lodash' import { flatten, isEqual } from 'lodash'
import { createClient, SearchOptions } from 'redis' import { createClient, SearchOptions, RedisClientOptions } from 'redis'
import { Embeddings } from 'langchain/embeddings/base' import { Embeddings } from 'langchain/embeddings/base'
import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis' import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis'
import { Document } from 'langchain/document' import { Document } from 'langchain/document'
@ -7,6 +7,27 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from
import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils'
import { escapeAllStrings, escapeSpecialChars, unEscapeSpecialChars } from './utils' import { escapeAllStrings, escapeSpecialChars, unEscapeSpecialChars } from './utils'
let redisClientSingleton: ReturnType<typeof createClient>
let redisClientOption: RedisClientOptions
const getRedisClient = async (option: RedisClientOptions) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = createClient(option)
await redisClientSingleton.connect()
redisClientOption = option
return redisClientSingleton
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = createClient(option)
await redisClientSingleton.connect()
redisClientOption = option
return redisClientSingleton
}
return redisClientSingleton
}
class Redis_VectorStores implements INode { class Redis_VectorStores implements INode {
label: string label: string
name: string name: string
@ -149,8 +170,7 @@ class Redis_VectorStores implements INode {
} }
try { try {
const redisClient = createClient({ url: redisUrl }) const redisClient = await getRedisClient({ url: redisUrl })
await redisClient.connect()
const storeConfig: RedisVectorStoreConfig = { const storeConfig: RedisVectorStoreConfig = {
redisClient: redisClient, redisClient: redisClient,
@ -210,8 +230,7 @@ class Redis_VectorStores implements INode {
redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr
} }
const redisClient = createClient({ url: redisUrl }) const redisClient = await getRedisClient({ url: redisUrl })
await redisClient.connect()
const storeConfig: RedisVectorStoreConfig = { const storeConfig: RedisVectorStoreConfig = {
redisClient: redisClient, redisClient: redisClient,

View File

@ -7,13 +7,34 @@ import {
INodeOutputsValue, INodeOutputsValue,
INodeParams INodeParams
} from '../../../src' } from '../../../src'
import { Embeddings } from 'langchain/embeddings/base' import { Embeddings } from 'langchain/embeddings/base'
import { VectorStore } from 'langchain/vectorstores/base' import { VectorStore } from 'langchain/vectorstores/base'
import { Document } from 'langchain/document' import { Document } from 'langchain/document'
import { createClient, SearchOptions } from 'redis' import { createClient, SearchOptions, RedisClientOptions } from 'redis'
import { RedisVectorStore } from 'langchain/vectorstores/redis' import { RedisVectorStore } from 'langchain/vectorstores/redis'
import { escapeSpecialChars, unEscapeSpecialChars } from './utils' import { escapeSpecialChars, unEscapeSpecialChars } from './utils'
import { isEqual } from 'lodash'
let redisClientSingleton: ReturnType<typeof createClient>
let redisClientOption: RedisClientOptions
const getRedisClient = async (option: RedisClientOptions) => {
if (!redisClientSingleton) {
// if client doesn't exists
redisClientSingleton = createClient(option)
await redisClientSingleton.connect()
redisClientOption = option
return redisClientSingleton
} else if (redisClientSingleton && !isEqual(option, redisClientOption)) {
// if client exists but option changed
redisClientSingleton.quit()
redisClientSingleton = createClient(option)
await redisClientSingleton.connect()
redisClientOption = option
return redisClientSingleton
}
return redisClientSingleton
}
export abstract class RedisSearchBase { export abstract class RedisSearchBase {
label: string label: string
@ -141,8 +162,7 @@ export abstract class RedisSearchBase {
redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr
} }
this.redisClient = createClient({ url: redisUrl }) this.redisClient = await getRedisClient({ url: redisUrl })
await this.redisClient.connect()
const vectorStore = await this.constructVectorStore(embeddings, indexName, replaceIndex, docs) const vectorStore = await this.constructVectorStore(embeddings, indexName, replaceIndex, docs)
if (!contentKey || contentKey === '') contentKey = 'content' if (!contentKey || contentKey === '') contentKey = 'content'

View File

@ -3,7 +3,6 @@ import { Embeddings } from 'langchain/embeddings/base'
import { VectorStore } from 'langchain/vectorstores/base' import { VectorStore } from 'langchain/vectorstores/base'
import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis' import { RedisVectorStore, RedisVectorStoreConfig } from 'langchain/vectorstores/redis'
import { Document } from 'langchain/document' import { Document } from 'langchain/document'
import { RedisSearchBase } from './RedisSearchBase' import { RedisSearchBase } from './RedisSearchBase'
class RedisExisting_VectorStores extends RedisSearchBase implements INode { class RedisExisting_VectorStores extends RedisSearchBase implements INode {

View File

@ -1,7 +1,6 @@
import { ICommonObject, INode, INodeData } from '../../../src/Interface' import { ICommonObject, INode, INodeData } from '../../../src/Interface'
import { Embeddings } from 'langchain/embeddings/base' import { Embeddings } from 'langchain/embeddings/base'
import { Document } from 'langchain/document' import { Document } from 'langchain/document'
import { flatten } from 'lodash' import { flatten } from 'lodash'
import { RedisSearchBase } from './RedisSearchBase' import { RedisSearchBase } from './RedisSearchBase'
import { VectorStore } from 'langchain/vectorstores/base' import { VectorStore } from 'langchain/vectorstores/base'