diff --git a/docker/.env.example b/docker/.env.example index 3efdd7f03..5e824f07b 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -101,4 +101,5 @@ BLOB_STORAGE_PATH=/root/.flowise/storage # REDIS_CERT= # REDIS_KEY= # REDIS_CA= -# ENABLE_BULLMQ_DASHBOARD= +# REDIS_KEEP_ALIVE= +# ENABLE_BULLMQ_DASHBOARD= \ No newline at end of file diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index db689935b..5fe47166c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -49,6 +49,7 @@ services: - REDIS_CERT=${REDIS_CERT} - REDIS_KEY=${REDIS_KEY} - REDIS_CA=${REDIS_CA} + - REDIS_KEEP_ALIVE=${REDIS_KEEP_ALIVE} - ENABLE_BULLMQ_DASHBOARD=${ENABLE_BULLMQ_DASHBOARD} ports: - '${PORT}:${PORT}' diff --git a/docker/worker/docker-compose.yml b/docker/worker/docker-compose.yml index 6dbd20711..fda3b8177 100644 --- a/docker/worker/docker-compose.yml +++ b/docker/worker/docker-compose.yml @@ -49,6 +49,7 @@ services: - REDIS_CERT=${REDIS_CERT} - REDIS_KEY=${REDIS_KEY} - REDIS_CA=${REDIS_CA} + - REDIS_KEEP_ALIVE=${REDIS_KEEP_ALIVE} - ENABLE_BULLMQ_DASHBOARD=${ENABLE_BULLMQ_DASHBOARD} ports: - '${PORT}:${PORT}' diff --git a/packages/components/nodes/cache/RedisCache/RedisCache.ts b/packages/components/nodes/cache/RedisCache/RedisCache.ts index 6646575f8..3821822a6 100644 --- a/packages/components/nodes/cache/RedisCache/RedisCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisCache.ts @@ -126,10 +126,19 @@ const getRedisClient = async (nodeData: INodeData, options: ICommonObject) => { host, username, password, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined, ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = new Redis(redisUrl, { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + }) } return client diff --git a/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts b/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts index 1e4ed86c8..c46a9921b 100644 --- a/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts +++ b/packages/components/nodes/cache/RedisCache/RedisEmbeddingsCache.ts @@ -83,10 +83,19 @@ class RedisEmbeddingsCache implements INode { host, username, password, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined, ...tlsOptions }) } else { - client = new Redis(redisUrl) + client = new Redis(redisUrl, { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + }) } ttl ??= '3600' diff --git a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts index 2f4f3ca07..e1813fae7 100644 --- a/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts +++ b/packages/components/nodes/memory/RedisBackedChatMemory/RedisBackedChatMemory.ts @@ -132,7 +132,21 @@ class BufferMemoryExtended extends FlowiseMemory implements MemoryMethods { } private async withRedisClient(fn: (client: Redis) => Promise): Promise { - const client = typeof this.redisOptions === 'string' ? new Redis(this.redisOptions) : new Redis(this.redisOptions) + const client = + typeof this.redisOptions === 'string' + ? new Redis(this.redisOptions, { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + }) + : new Redis({ + ...this.redisOptions, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + }) try { return await fn(client) } finally { diff --git a/packages/components/nodes/vectorstores/Redis/Redis.ts b/packages/components/nodes/vectorstores/Redis/Redis.ts index 3c9fd773c..d4fbcf49e 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis.ts @@ -147,7 +147,15 @@ class Redis_VectorStores implements INode { } try { - const redisClient = createClient({ url: redisUrl }) + const redisClient = createClient({ + url: redisUrl, + socket: { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined // milliseconds + } + }) await redisClient.connect() const storeConfig: RedisVectorStoreConfig = { @@ -212,7 +220,15 @@ class Redis_VectorStores implements INode { redisUrl = 'redis://' + username + ':' + password + '@' + host + ':' + portStr } - const redisClient = createClient({ url: redisUrl }) + const redisClient = createClient({ + url: redisUrl, + socket: { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined // milliseconds + } + }) const storeConfig: RedisVectorStoreConfig = { redisClient: redisClient, diff --git a/packages/server/.env.example b/packages/server/.env.example index 6ff339610..58b2f8f81 100644 --- a/packages/server/.env.example +++ b/packages/server/.env.example @@ -99,4 +99,5 @@ PORT=3000 # REDIS_CERT= # REDIS_KEY= # REDIS_CA= -# ENABLE_BULLMQ_DASHBOARD= +# REDIS_KEEP_ALIVE= +# ENABLE_BULLMQ_DASHBOARD= \ No newline at end of file diff --git a/packages/server/src/CachePool.ts b/packages/server/src/CachePool.ts index b8662a8e9..e978d89de 100644 --- a/packages/server/src/CachePool.ts +++ b/packages/server/src/CachePool.ts @@ -12,7 +12,12 @@ export class CachePool { constructor() { if (process.env.MODE === MODE.QUEUE) { if (process.env.REDIS_URL) { - this.redisClient = new Redis(process.env.REDIS_URL) + this.redisClient = new Redis(process.env.REDIS_URL, { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + }) } else { this.redisClient = new Redis({ host: process.env.REDIS_HOST || 'localhost', @@ -26,6 +31,10 @@ export class CachePool { key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined } + : undefined, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) : undefined }) } diff --git a/packages/server/src/commands/base.ts b/packages/server/src/commands/base.ts index 8fe5dbfba..733e825cf 100644 --- a/packages/server/src/commands/base.ts +++ b/packages/server/src/commands/base.ts @@ -76,6 +76,7 @@ export abstract class BaseCommand extends Command { REDIS_CERT: Flags.string(), REDIS_KEY: Flags.string(), REDIS_CA: Flags.string(), + REDIS_KEEP_ALIVE: Flags.string(), ENABLE_BULLMQ_DASHBOARD: Flags.string() } @@ -211,6 +212,7 @@ export abstract class BaseCommand extends Command { if (flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN = flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN if (flags.REMOVE_ON_AGE) process.env.REMOVE_ON_AGE = flags.REMOVE_ON_AGE if (flags.REMOVE_ON_COUNT) process.env.REMOVE_ON_COUNT = flags.REMOVE_ON_COUNT + if (flags.REDIS_KEEP_ALIVE) process.env.REDIS_KEEP_ALIVE = flags.REDIS_KEEP_ALIVE if (flags.ENABLE_BULLMQ_DASHBOARD) process.env.ENABLE_BULLMQ_DASHBOARD = flags.ENABLE_BULLMQ_DASHBOARD } } diff --git a/packages/server/src/queue/QueueManager.ts b/packages/server/src/queue/QueueManager.ts index 166a4125d..abd657ac6 100644 --- a/packages/server/src/queue/QueueManager.ts +++ b/packages/server/src/queue/QueueManager.ts @@ -41,7 +41,12 @@ export class QueueManager { port: parseInt(process.env.REDIS_PORT || '6379'), username: process.env.REDIS_USERNAME || undefined, password: process.env.REDIS_PASSWORD || undefined, - tls: tlsOpts + tls: tlsOpts, + enableReadyCheck: true, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined } } diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index 2e87596d9..c0fce60c5 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -7,7 +7,13 @@ export class RedisEventPublisher implements IServerSideEventStreamer { constructor() { if (process.env.REDIS_URL) { this.redisPublisher = createClient({ - url: process.env.REDIS_URL + url: process.env.REDIS_URL, + socket: { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + } }) } else { this.redisPublisher = createClient({ @@ -19,7 +25,11 @@ export class RedisEventPublisher implements IServerSideEventStreamer { tls: process.env.REDIS_TLS === 'true', cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined } }) } diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index af78d280e..5b0331a72 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -9,7 +9,13 @@ export class RedisEventSubscriber { constructor(sseStreamer: SSEStreamer) { if (process.env.REDIS_URL) { this.redisSubscriber = createClient({ - url: process.env.REDIS_URL + url: process.env.REDIS_URL, + socket: { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + } }) } else { this.redisSubscriber = createClient({ @@ -21,7 +27,11 @@ export class RedisEventSubscriber { tls: process.env.REDIS_TLS === 'true', cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined } }) } diff --git a/packages/server/src/utils/rateLimit.ts b/packages/server/src/utils/rateLimit.ts index 4e8e9db4b..d4dd168a6 100644 --- a/packages/server/src/utils/rateLimit.ts +++ b/packages/server/src/utils/rateLimit.ts @@ -24,7 +24,12 @@ export class RateLimiterManager { constructor() { if (process.env.MODE === MODE.QUEUE) { if (process.env.REDIS_URL) { - this.redisClient = new Redis(process.env.REDIS_URL) + this.redisClient = new Redis(process.env.REDIS_URL, { + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined + }) } else { this.redisClient = new Redis({ host: process.env.REDIS_HOST || 'localhost', @@ -38,6 +43,10 @@ export class RateLimiterManager { key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined } + : undefined, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) : undefined }) } @@ -65,7 +74,13 @@ export class RateLimiterManager { port: parseInt(process.env.REDIS_PORT || '6379'), username: process.env.REDIS_USERNAME || undefined, password: process.env.REDIS_PASSWORD || undefined, - tls: tlsOpts + tls: tlsOpts, + maxRetriesPerRequest: null, + enableReadyCheck: true, + keepAlive: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined } }