Compare commits
1 Commits
main
...
chore/Remo
| Author | SHA1 | Date |
|---|---|---|
|
|
acbdd0fcd5 |
|
|
@ -57,8 +57,6 @@ export abstract class BaseQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
|
public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {
|
||||||
logger.info(`[BaseQueue] Creating worker for queue "${this.queue.name}" with concurrency: ${concurrency}`)
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.worker = new Worker(
|
this.worker = new Worker(
|
||||||
this.queue.name,
|
this.queue.name,
|
||||||
|
|
@ -92,14 +90,6 @@ export abstract class BaseQueue {
|
||||||
logger.error(`[BaseQueue] Worker error for queue "${this.queue.name}":`, { error: err })
|
logger.error(`[BaseQueue] Worker error for queue "${this.queue.name}":`, { error: err })
|
||||||
})
|
})
|
||||||
|
|
||||||
this.worker.on('ready', () => {
|
|
||||||
logger.info(`[BaseQueue] Worker ready for queue "${this.queue.name}"`)
|
|
||||||
})
|
|
||||||
|
|
||||||
this.worker.on('closing', () => {
|
|
||||||
logger.info(`[BaseQueue] Worker closing for queue "${this.queue.name}"`)
|
|
||||||
})
|
|
||||||
|
|
||||||
this.worker.on('closed', () => {
|
this.worker.on('closed', () => {
|
||||||
logger.info(`[BaseQueue] Worker closed for queue "${this.queue.name}"`)
|
logger.info(`[BaseQueue] Worker closed for queue "${this.queue.name}"`)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -100,13 +100,6 @@ export class PredictionQueue extends BaseQueue {
|
||||||
data.signal = signal
|
data.signal = signal
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.redisPublisher) {
|
|
||||||
logger.info(
|
|
||||||
`[PredictionQueue] RedisPublisher is connected [orgId:${data.orgId}/flowId:${data.chatflow.id}/chatId:${data.chatId}]`,
|
|
||||||
this.redisPublisher.isConnected()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
return await executeFlow(data)
|
return await executeFlow(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'
|
||||||
import { Express } from 'express'
|
import { Express } from 'express'
|
||||||
import { UsageCacheManager } from '../UsageCacheManager'
|
import { UsageCacheManager } from '../UsageCacheManager'
|
||||||
import { ExpressAdapter } from '@bull-board/express'
|
import { ExpressAdapter } from '@bull-board/express'
|
||||||
import logger from '../utils/logger'
|
|
||||||
|
|
||||||
const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'
|
const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'
|
||||||
|
|
||||||
|
|
@ -48,9 +47,6 @@ export class QueueManager {
|
||||||
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
|
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
|
||||||
: undefined
|
: undefined
|
||||||
}
|
}
|
||||||
logger.info(
|
|
||||||
`[QueueManager] Connecting to Redis using URL: ${process.env.REDIS_URL.replace(/\/\/[^:]+:[^@]+@/, '//[CREDENTIALS]@')}`
|
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
let tlsOpts = undefined
|
let tlsOpts = undefined
|
||||||
if (process.env.REDIS_TLS === 'true') {
|
if (process.env.REDIS_TLS === 'true') {
|
||||||
|
|
@ -72,11 +68,6 @@ export class QueueManager {
|
||||||
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
|
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
|
||||||
: undefined
|
: undefined
|
||||||
}
|
}
|
||||||
logger.info(
|
|
||||||
`[QueueManager] Connecting to Redis using host:port: ${process.env.REDIS_HOST || 'localhost'}:${
|
|
||||||
process.env.REDIS_PORT || '6379'
|
|
||||||
}`
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -149,15 +140,6 @@ export class QueueManager {
|
||||||
})
|
})
|
||||||
this.registerQueue('prediction', predictionQueue)
|
this.registerQueue('prediction', predictionQueue)
|
||||||
|
|
||||||
// Add connection event logging for prediction queue
|
|
||||||
if (predictionQueue.getQueue().opts.connection) {
|
|
||||||
const connInfo = predictionQueue.getQueue().opts.connection || {}
|
|
||||||
const connInfoString = JSON.stringify(connInfo)
|
|
||||||
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
|
|
||||||
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
|
|
||||||
logger.info(`[QueueManager] Prediction queue connected to Redis: ${connInfoString}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
this.predictionQueueEventsProducer = new QueueEventsProducer(predictionQueue.getQueueName(), {
|
this.predictionQueueEventsProducer = new QueueEventsProducer(predictionQueue.getQueueName(), {
|
||||||
connection: this.connection
|
connection: this.connection
|
||||||
})
|
})
|
||||||
|
|
@ -172,15 +154,6 @@ export class QueueManager {
|
||||||
})
|
})
|
||||||
this.registerQueue('upsert', upsertionQueue)
|
this.registerQueue('upsert', upsertionQueue)
|
||||||
|
|
||||||
// Add connection event logging for upsert queue
|
|
||||||
if (upsertionQueue.getQueue().opts.connection) {
|
|
||||||
const connInfo = upsertionQueue.getQueue().opts.connection || {}
|
|
||||||
const connInfoString = JSON.stringify(connInfo)
|
|
||||||
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
|
|
||||||
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
|
|
||||||
logger.info(`[QueueManager] Upsert queue connected to Redis: ${connInfoString}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (serverAdapter) {
|
if (serverAdapter) {
|
||||||
createBullBoard({
|
createBullBoard({
|
||||||
queues: [new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())],
|
queues: [new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())],
|
||||||
|
|
|
||||||
|
|
@ -77,20 +77,7 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
logger.info(`[RedisEventPublisher] Connecting to Redis...`)
|
|
||||||
await this.redisPublisher.connect()
|
await this.redisPublisher.connect()
|
||||||
|
|
||||||
// Log connection details after successful connection
|
|
||||||
const connInfo = this.redisPublisher.options?.socket
|
|
||||||
const connInfoString = JSON.stringify(connInfo)
|
|
||||||
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
|
|
||||||
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
|
|
||||||
logger.info(`[RedisEventPublisher] Connected to Redis: ${connInfoString}`)
|
|
||||||
|
|
||||||
// Add error event listener
|
|
||||||
this.redisPublisher.on('error', (err) => {
|
|
||||||
logger.error(`[RedisEventPublisher] Redis connection error`, { error: err })
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
||||||
|
|
|
||||||
|
|
@ -77,20 +77,7 @@ export class RedisEventSubscriber {
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
logger.info(`[RedisEventSubscriber] Connecting to Redis...`)
|
|
||||||
await this.redisSubscriber.connect()
|
await this.redisSubscriber.connect()
|
||||||
|
|
||||||
// Log connection details after successful connection
|
|
||||||
const connInfo = this.redisSubscriber.options?.socket
|
|
||||||
const connInfoString = JSON.stringify(connInfo)
|
|
||||||
.replace(/"username":"[^"]*"/g, '"username":"[REDACTED]"')
|
|
||||||
.replace(/"password":"[^"]*"/g, '"password":"[REDACTED]"')
|
|
||||||
logger.info(`[RedisEventSubscriber] Connected to Redis: ${connInfoString}`)
|
|
||||||
|
|
||||||
// Add error event listener
|
|
||||||
this.redisSubscriber.on('error', (err) => {
|
|
||||||
logger.error(`[RedisEventSubscriber] Redis connection error`, { error: err })
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe(channel: string) {
|
subscribe(channel: string) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue