From 5907d28cdf9718e9d73e9599176a205ea294ecc0 Mon Sep 17 00:00:00 2001 From: Henry Date: Wed, 30 Jul 2025 14:37:15 +0100 Subject: [PATCH] add more loggings around queue --- packages/server/src/queue/BaseQueue.ts | 73 +++++++++++++++---- packages/server/src/queue/PredictionQueue.ts | 7 ++ .../server/src/queue/RedisEventPublisher.ts | 32 ++++++++ .../server/src/queue/RedisEventSubscriber.ts | 29 ++++++++ 4 files changed, 125 insertions(+), 16 deletions(-) diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index d3bf18d29..49e634535 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -57,22 +57,63 @@ export abstract class BaseQueue { } public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker { - this.worker = new Worker( - this.queue.name, - async (job: Job) => { - const start = new Date().getTime() - logger.info(`Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`) - const result = await this.processJob(job.data) - const end = new Date().getTime() - logger.info(`Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`) - return result - }, - { - connection: this.connection, - concurrency - } - ) - return this.worker + logger.info(`[BaseQueue] Creating worker for queue "${this.queue.name}" with concurrency: ${concurrency}`) + + try { + this.worker = new Worker( + this.queue.name, + async (job: Job) => { + const start = new Date().getTime() + logger.info(`[BaseQueue] Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`) + try { + const result = await this.processJob(job.data) + const end = new Date().getTime() + logger.info( + `[BaseQueue] Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)` + ) + return result + } catch (error) { + const end = new Date().getTime() + logger.error( + `[BaseQueue] Job ${job.id} failed in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms):`, + { error } + ) + throw error + } + }, + { + connection: this.connection, + concurrency + } + ) + + // Add error listeners to the worker + this.worker.on('error', (err) => { + logger.error(`[BaseQueue] Worker error for queue "${this.queue.name}":`, { error: err }) + }) + + this.worker.on('ready', () => { + logger.info(`[BaseQueue] Worker ready for queue "${this.queue.name}"`) + }) + + this.worker.on('closing', () => { + logger.info(`[BaseQueue] Worker closing for queue "${this.queue.name}"`) + }) + + this.worker.on('closed', () => { + logger.info(`[BaseQueue] Worker closed for queue "${this.queue.name}"`) + }) + + this.worker.on('failed', (job, err) => { + logger.error(`[BaseQueue] Worker job ${job?.id} failed in queue "${this.queue.name}":`, { error: err }) + }) + + logger.info(`[BaseQueue] Worker created successfully for queue "${this.queue.name}"`) + return this.worker + } catch (error) { + logger.error(`[BaseQueue] Failed to create worker for queue "${this.queue.name}":`, { error }) + throw error + } } public async getJobs(): Promise { diff --git a/packages/server/src/queue/PredictionQueue.ts b/packages/server/src/queue/PredictionQueue.ts index 10cc125f7..af0c9d8c7 100644 --- a/packages/server/src/queue/PredictionQueue.ts +++ b/packages/server/src/queue/PredictionQueue.ts @@ -100,6 +100,13 @@ export class PredictionQueue extends BaseQueue { data.signal = signal } + if (this.redisPublisher) { + logger.info( + `[PredictionQueue] RedisPublisher is connected [orgId:${data.orgId}/flowId:${data.chatflow.id}/chatId:${data.chatId}]`, + this.redisPublisher.isConnected() + ) + } + return await executeFlow(data) } } diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index 0e874832e..f2b601508 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -42,6 +42,38 @@ export class RedisEventPublisher implements IServerSideEventStreamer { : undefined }) } + + this.setupEventListeners() + } + + private setupEventListeners() { + this.redisPublisher.on('connect', () => { + logger.info(`[RedisEventPublisher] Redis client connecting...`) + }) + + this.redisPublisher.on('ready', () => { + logger.info(`[RedisEventPublisher] Redis client ready and connected`) + }) + + this.redisPublisher.on('error', (err) => { + logger.error(`[RedisEventPublisher] Redis client error:`, { + error: err, + isReady: this.redisPublisher.isReady, + isOpen: this.redisPublisher.isOpen + }) + }) + + this.redisPublisher.on('end', () => { + logger.warn(`[RedisEventPublisher] Redis client connection ended`) + }) + + this.redisPublisher.on('reconnecting', () => { + logger.info(`[RedisEventPublisher] Redis client reconnecting...`) + }) + } + + isConnected() { + return this.redisPublisher.isReady } async connect() { diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index b68bab08e..a202bdc65 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -45,6 +45,35 @@ export class RedisEventSubscriber { }) } this.sseStreamer = sseStreamer + + this.setupEventListeners() + } + + private setupEventListeners() { + this.redisSubscriber.on('connect', () => { + logger.info(`[RedisEventSubscriber] Redis client connecting...`) + }) + + this.redisSubscriber.on('ready', () => { + logger.info(`[RedisEventSubscriber] Redis client ready and connected`) + }) + + this.redisSubscriber.on('error', (err) => { + logger.error(`[RedisEventSubscriber] Redis client error:`, { + error: err, + isReady: this.redisSubscriber.isReady, + isOpen: this.redisSubscriber.isOpen, + subscribedChannelsCount: this.subscribedChannels.size + }) + }) + + this.redisSubscriber.on('end', () => { + logger.warn(`[RedisEventSubscriber] Redis client connection ended`) + }) + + this.redisSubscriber.on('reconnecting', () => { + logger.info(`[RedisEventSubscriber] Redis client reconnecting...`) + }) } async connect() {