From 4326cbe6b594d5141a5c2bc61c2182ebc1419e28 Mon Sep 17 00:00:00 2001 From: nikitas-novatix Date: Tue, 27 May 2025 20:54:17 +0200 Subject: [PATCH] Fix: Patching redis socket crash fix (#4431) * redis keepalive mechanism for all redis * removed offline queue commands * Simplified changes for consistency. Added REDIS_KEEP_ALIVE env variable. * update redis socket alive env variable * lint fix * put pingInterval back again because it's needed * removed comment * linting --------- Co-authored-by: Henry Co-authored-by: Henry Heng --- .../components/nodes/vectorstores/Redis/Redis.ts | 16 ++++++++++++---- packages/server/src/queue/RedisEventPublisher.ts | 12 ++++++++++-- .../server/src/queue/RedisEventSubscriber.ts | 12 ++++++++++-- 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/packages/components/nodes/vectorstores/Redis/Redis.ts b/packages/components/nodes/vectorstores/Redis/Redis.ts index d4fbcf49e..23f124139 100644 --- a/packages/components/nodes/vectorstores/Redis/Redis.ts +++ b/packages/components/nodes/vectorstores/Redis/Redis.ts @@ -153,8 +153,12 @@ class Redis_VectorStores implements INode { keepAlive: process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined // milliseconds - } + : undefined + }, + pingInterval: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined // Add Redis protocol-level pings }) await redisClient.connect() @@ -226,8 +230,12 @@ class Redis_VectorStores implements INode { keepAlive: process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined // milliseconds - } + : undefined + }, + pingInterval: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined // Add Redis protocol-level pings }) const storeConfig: RedisVectorStoreConfig = { diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index c0fce60c5..c305757ad 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -13,7 +13,11 @@ export class RedisEventPublisher implements IServerSideEventStreamer { process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) : undefined - } + }, + pingInterval: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined }) } else { this.redisPublisher = createClient({ @@ -30,7 +34,11 @@ export class RedisEventPublisher implements IServerSideEventStreamer { process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) : undefined - } + }, + pingInterval: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined }) } } diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index 5b0331a72..49c4cb9ef 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -15,7 +15,11 @@ export class RedisEventSubscriber { process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) : undefined - } + }, + pingInterval: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined }) } else { this.redisSubscriber = createClient({ @@ -32,7 +36,11 @@ export class RedisEventSubscriber { process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) : undefined - } + }, + pingInterval: + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : undefined }) } this.sseStreamer = sseStreamer