diff --git a/docker/.env.example b/docker/.env.example index bff5ef8f9..44df6f53c 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -86,6 +86,8 @@ BLOB_STORAGE_PATH=/root/.flowise/storage # QUEUE_NAME=flowise-queue # QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000 # WORKER_CONCURRENCY=100000 +# REMOVE_ON_AGE=86400 +# REMOVE_ON_COUNT=10000 # REDIS_URL= # REDIS_HOST=localhost # REDIS_PORT=6379 diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a9bda9e27..3806af1d9 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -38,6 +38,8 @@ services: - WORKER_CONCURRENCY=${WORKER_CONCURRENCY} - QUEUE_NAME=${QUEUE_NAME} - QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN} + - REMOVE_ON_AGE=${REMOVE_ON_AGE} + - REMOVE_ON_COUNT=${REMOVE_ON_COUNT} - REDIS_URL=${REDIS_URL} - REDIS_HOST=${REDIS_HOST} - REDIS_PORT=${REDIS_PORT} @@ -50,11 +52,11 @@ services: ports: - '${PORT}:${PORT}' healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:${PORT}/api/v1/ping"] - interval: 10s - timeout: 5s - retries: 5 - start_period: 30s + test: ['CMD', 'curl', '-f', 'http://localhost:${PORT}/api/v1/ping'] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s volumes: - ~/.flowise:/root/.flowise entrypoint: /bin/sh -c "sleep 3; flowise start" diff --git a/docker/worker/docker-compose.yml b/docker/worker/docker-compose.yml index 88a8631d0..818bbb35b 100644 --- a/docker/worker/docker-compose.yml +++ b/docker/worker/docker-compose.yml @@ -38,6 +38,8 @@ services: - WORKER_CONCURRENCY=${WORKER_CONCURRENCY} - QUEUE_NAME=${QUEUE_NAME} - QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN} + - REMOVE_ON_AGE=${REMOVE_ON_AGE} + - REMOVE_ON_COUNT=${REMOVE_ON_COUNT} - REDIS_URL=${REDIS_URL} - REDIS_HOST=${REDIS_HOST} - REDIS_PORT=${REDIS_PORT} diff --git a/packages/server/.env.example b/packages/server/.env.example index dabae5a02..927967976 100644 --- a/packages/server/.env.example +++ b/packages/server/.env.example @@ -83,6 +83,8 @@ PORT=3000 # QUEUE_NAME=flowise-queue # QUEUE_REDIS_EVENT_STREAM_MAX_LEN=100000 # WORKER_CONCURRENCY=100000 +# REMOVE_ON_AGE=86400 +# REMOVE_ON_COUNT=10000 # REDIS_URL= # REDIS_HOST=localhost # REDIS_PORT=6379 diff --git a/packages/server/src/commands/base.ts b/packages/server/src/commands/base.ts index 5bed81e56..37329719b 100644 --- a/packages/server/src/commands/base.ts +++ b/packages/server/src/commands/base.ts @@ -61,6 +61,8 @@ export abstract class BaseCommand extends Command { WORKER_CONCURRENCY: Flags.string(), QUEUE_NAME: Flags.string(), QUEUE_REDIS_EVENT_STREAM_MAX_LEN: Flags.string(), + REMOVE_ON_AGE: Flags.string(), + REMOVE_ON_COUNT: Flags.string(), REDIS_URL: Flags.string(), REDIS_HOST: Flags.string(), REDIS_PORT: Flags.string(), @@ -196,6 +198,8 @@ export abstract class BaseCommand extends Command { if (flags.REDIS_CA) process.env.REDIS_CA = flags.REDIS_CA if (flags.WORKER_CONCURRENCY) process.env.WORKER_CONCURRENCY = flags.WORKER_CONCURRENCY if (flags.QUEUE_NAME) process.env.QUEUE_NAME = flags.QUEUE_NAME - if (flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN = flags.QUEUE_REDIS_EVENT_STREAM + if (flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN = flags.QUEUE_REDIS_EVENT_STREAM_MAX_LEN + if (flags.REMOVE_ON_AGE) process.env.REMOVE_ON_AGE = flags.REMOVE_ON_AGE + if (flags.REMOVE_ON_COUNT) process.env.REMOVE_ON_COUNT = flags.REMOVE_ON_COUNT } } diff --git a/packages/server/src/queue/BaseQueue.ts b/packages/server/src/queue/BaseQueue.ts index 0c3003ea6..d3bf18d29 100644 --- a/packages/server/src/queue/BaseQueue.ts +++ b/packages/server/src/queue/BaseQueue.ts @@ -1,4 +1,4 @@ -import { Queue, Worker, Job, QueueEvents, RedisOptions } from 'bullmq' +import { Queue, Worker, Job, QueueEvents, RedisOptions, KeepJobs } from 'bullmq' import { v4 as uuidv4 } from 'uuid' import logger from '../utils/logger' @@ -6,6 +6,8 @@ const QUEUE_REDIS_EVENT_STREAM_MAX_LEN = process.env.QUEUE_REDIS_EVENT_STREAM_MA ? parseInt(process.env.QUEUE_REDIS_EVENT_STREAM_MAX_LEN) : 10000 const WORKER_CONCURRENCY = process.env.WORKER_CONCURRENCY ? parseInt(process.env.WORKER_CONCURRENCY) : 100000 +const REMOVE_ON_AGE = process.env.REMOVE_ON_AGE ? parseInt(process.env.REMOVE_ON_AGE) : -1 +const REMOVE_ON_COUNT = process.env.REMOVE_ON_COUNT ? parseInt(process.env.REMOVE_ON_COUNT) : -1 export abstract class BaseQueue { protected queue: Queue @@ -34,7 +36,24 @@ export abstract class BaseQueue { public async addJob(jobData: any): Promise { const jobId = jobData.id || uuidv4() - return await this.queue.add(jobId, jobData, { removeOnFail: true }) + + let removeOnFail: number | boolean | KeepJobs | undefined = true + let removeOnComplete: number | boolean | KeepJobs | undefined = undefined + + // Only override removal options if age or count is specified + if (REMOVE_ON_AGE !== -1 || REMOVE_ON_COUNT !== -1) { + const keepJobObj: KeepJobs = {} + if (REMOVE_ON_AGE !== -1) { + keepJobObj.age = REMOVE_ON_AGE + } + if (REMOVE_ON_COUNT !== -1) { + keepJobObj.count = REMOVE_ON_COUNT + } + removeOnFail = keepJobObj + removeOnComplete = keepJobObj + } + + return await this.queue.add(jobId, jobData, { removeOnFail, removeOnComplete }) } public createWorker(concurrency: number = WORKER_CONCURRENCY): Worker {