add more loggings around queue
This commit is contained in:
parent
0ff0ee1c12
commit
5907d28cdf
|
|
@ -57,22 +57,63 @@ export abstract class BaseQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
|
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
|
||||||
this.worker = new Worker(
|
logger.info(`[BaseQueue] Creating worker for queue "${this.queue.name}" with concurrency: ${concurrency}`)
|
||||||
this.queue.name,
|
|
||||||
async (job: Job) => {
|
try {
|
||||||
const start = new Date().getTime()
|
this.worker = new Worker(
|
||||||
logger.info(`Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`)
|
this.queue.name,
|
||||||
const result = await this.processJob(job.data)
|
async (job: Job) => {
|
||||||
const end = new Date().getTime()
|
const start = new Date().getTime()
|
||||||
logger.info(`Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`)
|
logger.info(`[BaseQueue] Processing job ${job.id} in ${this.queue.name} at ${new Date().toISOString()}`)
|
||||||
return result
|
try {
|
||||||
},
|
const result = await this.processJob(job.data)
|
||||||
{
|
const end = new Date().getTime()
|
||||||
connection: this.connection,
|
logger.info(
|
||||||
concurrency
|
`[BaseQueue] Completed job ${job.id} in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms)`
|
||||||
}
|
)
|
||||||
)
|
return result
|
||||||
return this.worker
|
} catch (error) {
|
||||||
|
const end = new Date().getTime()
|
||||||
|
logger.error(
|
||||||
|
`[BaseQueue] Job ${job.id} failed in ${this.queue.name} at ${new Date().toISOString()} (${end - start}ms):`,
|
||||||
|
{ error }
|
||||||
|
)
|
||||||
|
throw error
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection: this.connection,
|
||||||
|
concurrency
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Add error listeners to the worker
|
||||||
|
this.worker.on('error', (err) => {
|
||||||
|
logger.error(`[BaseQueue] Worker error for queue "${this.queue.name}":`, { error: err })
|
||||||
|
})
|
||||||
|
|
||||||
|
this.worker.on('ready', () => {
|
||||||
|
logger.info(`[BaseQueue] Worker ready for queue "${this.queue.name}"`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.worker.on('closing', () => {
|
||||||
|
logger.info(`[BaseQueue] Worker closing for queue "${this.queue.name}"`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.worker.on('closed', () => {
|
||||||
|
logger.info(`[BaseQueue] Worker closed for queue "${this.queue.name}"`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.worker.on('failed', (job, err) => {
|
||||||
|
logger.error(`[BaseQueue] Worker job ${job?.id} failed in queue "${this.queue.name}":`, { error: err })
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info(`[BaseQueue] Worker created successfully for queue "${this.queue.name}"`)
|
||||||
|
return this.worker
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[BaseQueue] Failed to create worker for queue "${this.queue.name}":`, { error })
|
||||||
|
throw error
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async getJobs(): Promise<Job[]> {
|
public async getJobs(): Promise<Job[]> {
|
||||||
|
|
|
||||||
|
|
@ -100,6 +100,13 @@ export class PredictionQueue extends BaseQueue {
|
||||||
data.signal = signal
|
data.signal = signal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this.redisPublisher) {
|
||||||
|
logger.info(
|
||||||
|
`[PredictionQueue] RedisPublisher is connected [orgId:${data.orgId}/flowId:${data.chatflow.id}/chatId:${data.chatId}]`,
|
||||||
|
this.redisPublisher.isConnected()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
return await executeFlow(data)
|
return await executeFlow(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,38 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
: undefined
|
: undefined
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.setupEventListeners()
|
||||||
|
}
|
||||||
|
|
||||||
|
private setupEventListeners() {
|
||||||
|
this.redisPublisher.on('connect', () => {
|
||||||
|
logger.info(`[RedisEventPublisher] Redis client connecting...`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisPublisher.on('ready', () => {
|
||||||
|
logger.info(`[RedisEventPublisher] Redis client ready and connected`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisPublisher.on('error', (err) => {
|
||||||
|
logger.error(`[RedisEventPublisher] Redis client error:`, {
|
||||||
|
error: err,
|
||||||
|
isReady: this.redisPublisher.isReady,
|
||||||
|
isOpen: this.redisPublisher.isOpen
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisPublisher.on('end', () => {
|
||||||
|
logger.warn(`[RedisEventPublisher] Redis client connection ended`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisPublisher.on('reconnecting', () => {
|
||||||
|
logger.info(`[RedisEventPublisher] Redis client reconnecting...`)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
isConnected() {
|
||||||
|
return this.redisPublisher.isReady
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,35 @@ export class RedisEventSubscriber {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
this.sseStreamer = sseStreamer
|
this.sseStreamer = sseStreamer
|
||||||
|
|
||||||
|
this.setupEventListeners()
|
||||||
|
}
|
||||||
|
|
||||||
|
private setupEventListeners() {
|
||||||
|
this.redisSubscriber.on('connect', () => {
|
||||||
|
logger.info(`[RedisEventSubscriber] Redis client connecting...`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisSubscriber.on('ready', () => {
|
||||||
|
logger.info(`[RedisEventSubscriber] Redis client ready and connected`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisSubscriber.on('error', (err) => {
|
||||||
|
logger.error(`[RedisEventSubscriber] Redis client error:`, {
|
||||||
|
error: err,
|
||||||
|
isReady: this.redisSubscriber.isReady,
|
||||||
|
isOpen: this.redisSubscriber.isOpen,
|
||||||
|
subscribedChannelsCount: this.subscribedChannels.size
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisSubscriber.on('end', () => {
|
||||||
|
logger.warn(`[RedisEventSubscriber] Redis client connection ended`)
|
||||||
|
})
|
||||||
|
|
||||||
|
this.redisSubscriber.on('reconnecting', () => {
|
||||||
|
logger.info(`[RedisEventSubscriber] Redis client reconnecting...`)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue