Feat: Support Google Cloud Storage (#4061)

* support google cloud storage

* update example and docs for supporting google cloud storage

* recover the indent of pnpm-lock-yaml

* populate the logs to google logging

* normalize gcs storage paths

---------

Co-authored-by: Ilango <rajagopalilango@gmail.com>
Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
allen 2025-04-14 07:52:54 -07:00 committed by GitHub
parent d53b1b657f
commit c318fc57e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 529 additions and 160 deletions

View File

@ -121,7 +121,7 @@ Flowise has 3 different modules in a single mono repository.
Flowise support different environment variables to configure your instance. You can specify the following variables in the `.env` file inside `packages/server` folder. Read [more](https://docs.flowiseai.com/environment-variables)
| Variable | Description | Type | Default |
| ---------------------------- | -------------------------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- |
|------------------------------------|----------------------------------------------------------------------------------|--------------------------------------------------|-------------------------------------|
| PORT | The HTTP port Flowise runs on | Number | 3000 |
| CORS_ORIGINS | The allowed origins for all cross-origin HTTP calls | String | |
| IFRAME_ORIGINS | The allowed origins for iframe src embedding | String | |
@ -149,7 +149,7 @@ Flowise support different environment variables to configure your instance. You
| FLOWISE_SECRETKEY_OVERWRITE | Encryption key to be used instead of the key stored in SECRETKEY_PATH | String | |
| DISABLE_FLOWISE_TELEMETRY | Turn off telemetry | Boolean | |
| MODEL_LIST_CONFIG_JSON | File path to load list of models from your local config file | String | `/your_model_list_config_file_path` |
| STORAGE_TYPE | Type of storage for uploaded files. default is `local` | Enum String: `s3`, `local` | `local` |
| STORAGE_TYPE | Type of storage for uploaded files. default is `local` | Enum String: `s3`, `local`, `gcs` | `local` |
| BLOB_STORAGE_PATH | Local folder path where uploaded files are stored when `STORAGE_TYPE` is `local` | String | `your-home-dir/.flowise/storage` |
| S3_STORAGE_BUCKET_NAME | Bucket name to hold the uploaded files when `STORAGE_TYPE` is `s3` | String | |
| S3_STORAGE_ACCESS_KEY_ID | AWS Access Key | String | |
@ -157,6 +157,10 @@ Flowise support different environment variables to configure your instance. You
| S3_STORAGE_REGION | Region for S3 bucket | String | |
| S3_ENDPOINT_URL | Custom Endpoint for S3 | String | |
| S3_FORCE_PATH_STYLE | Set this to true to force the request to use path-style addressing | Boolean | false |
| GOOGLE_CLOUD_STORAGE_PROJ_ID | The GCP project id for cloud storage & logging when `STORAGE_TYPE` is `gcs` | String | |
| GOOGLE_CLOUD_STORAGE_CREDENTIAL | The credential key file path when `STORAGE_TYPE` is `gcs` | String | |
| GOOGLE_CLOUD_STORAGE_BUCKET_NAME | Bucket name to hold the uploaded files when `STORAGE_TYPE` is `gcs` | String | |
| GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS | Enable uniform bucket level access when `STORAGE_TYPE` is `gcs` | Boolean | true |
| SHOW_COMMUNITY_NODES | Show nodes created by community | Boolean | |
| DISABLED_NODES | Hide nodes from UI (comma separated list of node names) | String | |

View File

@ -48,7 +48,7 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
# see https://raw.githubusercontent.com/FlowiseAI/Flowise/main/packages/components/models.json for the format
# MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path
# STORAGE_TYPE=local (local | s3)
# STORAGE_TYPE=local (local | s3 | gcs)
# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage
# S3_STORAGE_BUCKET_NAME=flowise
# S3_STORAGE_ACCESS_KEY_ID=<your-access-key>
@ -56,6 +56,10 @@ BLOB_STORAGE_PATH=/root/.flowise/storage
# S3_STORAGE_REGION=us-west-2
# S3_ENDPOINT_URL=<custom-s3-endpoint-url>
# S3_FORCE_PATH_STYLE=false
# GOOGLE_CLOUD_STORAGE_CREDENTIAL=/the/keyfilename/path
# GOOGLE_CLOUD_STORAGE_PROJ_ID=<your-gcp-project-id>
# GOOGLE_CLOUD_STORAGE_BUCKET_NAME=<the-bucket-name>
# GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS=true
# SHOW_COMMUNITY_NODES=true
# DISABLED_NODES=bufferMemory,chatOpenAI (comma separated list of node names to disable)

View File

@ -35,6 +35,7 @@
"@gomomento/sdk": "^1.51.1",
"@gomomento/sdk-core": "^1.51.1",
"@google-ai/generativelanguage": "^2.5.0",
"@google-cloud/storage": "^7.15.2",
"@google/generative-ai": "^0.15.0",
"@huggingface/inference": "^2.6.1",
"@langchain/anthropic": "0.3.14",

View File

@ -8,6 +8,7 @@ import {
S3Client,
S3ClientConfig
} from '@aws-sdk/client-s3'
import { Storage } from '@google-cloud/storage'
import { Readable } from 'node:stream'
import { getUserHome } from './utils'
import sanitize from 'sanitize-filename'
@ -34,6 +35,25 @@ export const addBase64FilesToStorage = async (fileBase64: string, chatflowid: st
})
await s3Client.send(putObjCmd)
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const splitDataURI = fileBase64.split(',')
const filename = splitDataURI.pop()?.split(':')[1] ?? ''
const bf = Buffer.from(splitDataURI.pop() || '', 'base64')
const mime = splitDataURI[0].split(':')[1].split(';')[0]
const sanitizedFilename = _sanitizeFilename(filename)
const normalizedChatflowid = chatflowid.replace(/\\/g, '/')
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
const filePath = `${normalizedChatflowid}/${normalizedFilename}`
const file = bucket.file(filePath)
await new Promise<void>((resolve, reject) => {
file.createWriteStream({ contentType: mime, metadata: { contentEncoding: 'base64' } })
.on('error', (err) => reject(err))
.on('finish', () => resolve())
.end(bf)
})
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else {
@ -76,6 +96,20 @@ export const addArrayFilesToStorage = async (mime: string, bf: Buffer, fileName:
await s3Client.send(putObjCmd)
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const normalizedPaths = paths.map((p) => p.replace(/\\/g, '/'))
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
const filePath = [...normalizedPaths, normalizedFilename].join('/')
const file = bucket.file(filePath)
await new Promise<void>((resolve, reject) => {
file.createWriteStream()
.on('error', (err) => reject(err))
.on('finish', () => resolve())
.end(bf)
})
fileNames.push(sanitizedFilename)
return 'FILE-STORAGE::' + JSON.stringify(fileNames)
} else {
const dir = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
if (!fs.existsSync(dir)) {
@ -109,6 +143,19 @@ export const addSingleFileToStorage = async (mime: string, bf: Buffer, fileName:
})
await s3Client.send(putObjCmd)
return 'FILE-STORAGE::' + sanitizedFilename
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const normalizedPaths = paths.map((p) => p.replace(/\\/g, '/'))
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
const filePath = [...normalizedPaths, normalizedFilename].join('/')
const file = bucket.file(filePath)
await new Promise<void>((resolve, reject) => {
file.createWriteStream({ contentType: mime, metadata: { contentEncoding: 'base64' } })
.on('error', (err) => reject(err))
.on('finish', () => resolve())
.end(bf)
})
return 'FILE-STORAGE::' + sanitizedFilename
} else {
const dir = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
if (!fs.existsSync(dir)) {
@ -146,6 +193,11 @@ export const getFileFromUpload = async (filePath: string): Promise<Buffer> => {
// @ts-ignore
const buffer = Buffer.concat(response.Body.toArray())
return buffer
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const file = bucket.file(filePath)
const [buffer] = await file.download()
return buffer
} else {
return fs.readFileSync(filePath)
}
@ -179,6 +231,14 @@ export const getFileFromStorage = async (file: string, ...paths: string[]): Prom
// @ts-ignore
const buffer = Buffer.concat(response.Body.toArray())
return buffer
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const normalizedPaths = paths.map((p) => p.replace(/\\/g, '/'))
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
const filePath = [...normalizedPaths, normalizedFilename].join('/')
const file = bucket.file(filePath)
const [buffer] = await file.download()
return buffer
} else {
const fileInStorage = path.join(getStoragePath(), ...paths.map(_sanitizeFilename), sanitizedFilename)
return fs.readFileSync(fileInStorage)
@ -208,6 +268,10 @@ export const removeFilesFromStorage = async (...paths: string[]) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const normalizedPath = paths.map((p) => p.replace(/\\/g, '/')).join('/')
await bucket.deleteFiles({ prefix: `${normalizedPath}/` })
} else {
const directory = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
_deleteLocalFolderRecursive(directory)
@ -223,6 +287,9 @@ export const removeSpecificFileFromUpload = async (filePath: string) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
await bucket.file(filePath).delete()
} else {
fs.unlinkSync(filePath)
}
@ -237,6 +304,15 @@ export const removeSpecificFileFromStorage = async (...paths: string[]) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const fileName = paths.pop()
if (fileName) {
const sanitizedFilename = _sanitizeFilename(fileName)
paths.push(sanitizedFilename)
}
const normalizedPath = paths.map((p) => p.replace(/\\/g, '/')).join('/')
await bucket.file(normalizedPath).delete()
} else {
const fileName = paths.pop()
if (fileName) {
@ -257,6 +333,10 @@ export const removeFolderFromStorage = async (...paths: string[]) => {
Key = Key.substring(1)
}
await _deleteS3Folder(Key)
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const normalizedPath = paths.map((p) => p.replace(/\\/g, '/')).join('/')
await bucket.deleteFiles({ prefix: `${normalizedPath}/` })
} else {
const directory = path.join(getStoragePath(), ...paths.map(_sanitizeFilename))
_deleteLocalFolderRecursive(directory, true)
@ -355,6 +435,14 @@ export const streamStorageFile = async (
const blob = await body.transformToByteArray()
return Buffer.from(blob)
}
} else if (storageType === 'gcs') {
const { bucket } = getGcsClient()
const normalizedChatflowId = chatflowId.replace(/\\/g, '/')
const normalizedChatId = chatId.replace(/\\/g, '/')
const normalizedFilename = sanitizedFilename.replace(/\\/g, '/')
const filePath = `${normalizedChatflowId}/${normalizedChatId}/${normalizedFilename}`
const [buffer] = await bucket.file(filePath).download()
return buffer
} else {
const filePath = path.join(getStoragePath(), chatflowId, chatId, sanitizedFilename)
//raise error if file path is not absolute
@ -372,6 +460,28 @@ export const streamStorageFile = async (
}
}
export const getGcsClient = () => {
const pathToGcsCredential = process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL
const projectId = process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID
const bucketName = process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME
if (!pathToGcsCredential) {
throw new Error('GOOGLE_CLOUD_STORAGE_CREDENTIAL env variable is required')
}
if (!bucketName) {
throw new Error('GOOGLE_CLOUD_STORAGE_BUCKET_NAME env variable is required')
}
const storageConfig = {
keyFilename: pathToGcsCredential,
...(projectId ? { projectId } : {})
}
const storage = new Storage(storageConfig)
const bucket = storage.bucket(bucketName)
return { storage, bucket }
}
export const getS3Config = () => {
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY

View File

@ -54,6 +54,10 @@ PORT=3000
# S3_STORAGE_REGION=us-west-2
# S3_ENDPOINT_URL=<custom-s3-endpoint-url>
# S3_FORCE_PATH_STYLE=false
# GOOGLE_CLOUD_STORAGE_CREDENTIAL=/the/keyfilename/path
# GOOGLE_CLOUD_STORAGE_PROJ_ID=<your-gcp-project-id>
# GOOGLE_CLOUD_STORAGE_BUCKET_NAME=<the-bucket-name>
# GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS=true
# SHOW_COMMUNITY_NODES=true
# DISABLED_NODES=bufferMemory,chatOpenAI (comma separated list of node names to disable)

View File

@ -57,6 +57,7 @@
"license": "SEE LICENSE IN LICENSE.md",
"dependencies": {
"@aws-sdk/client-secrets-manager": "^3.699.0",
"@google-cloud/logging-winston": "^6.0.0",
"@oclif/core": "4.0.7",
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/auto-instrumentations-node": "^0.52.0",
@ -95,6 +96,7 @@
"moment": "^2.29.3",
"moment-timezone": "^0.5.34",
"multer": "^1.4.5-lts.1",
"multer-cloud-storage": "^4.0.0",
"multer-s3": "^3.0.1",
"mysql2": "^3.11.3",
"flowise-nim-container-manager": "^1.0.11",

View File

@ -49,6 +49,10 @@ export abstract class BaseCommand extends Command {
S3_STORAGE_REGION: Flags.string(),
S3_ENDPOINT_URL: Flags.string(),
S3_FORCE_PATH_STYLE: Flags.string(),
GOOGLE_CLOUD_STORAGE_CREDENTIAL: Flags.string(),
GOOGLE_CLOUD_STORAGE_PROJ_ID: Flags.string(),
GOOGLE_CLOUD_STORAGE_BUCKET_NAME: Flags.string(),
GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS: Flags.string(),
SHOW_COMMUNITY_NODES: Flags.string(),
SECRETKEY_STORAGE_TYPE: Flags.string(),
SECRETKEY_PATH: Flags.string(),
@ -184,6 +188,11 @@ export abstract class BaseCommand extends Command {
if (flags.S3_STORAGE_REGION) process.env.S3_STORAGE_REGION = flags.S3_STORAGE_REGION
if (flags.S3_ENDPOINT_URL) process.env.S3_ENDPOINT_URL = flags.S3_ENDPOINT_URL
if (flags.S3_FORCE_PATH_STYLE) process.env.S3_FORCE_PATH_STYLE = flags.S3_FORCE_PATH_STYLE
if (flags.GOOGLE_CLOUD_STORAGE_CREDENTIAL) process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL = flags.GOOGLE_CLOUD_STORAGE_CREDENTIAL
if (flags.GOOGLE_CLOUD_STORAGE_PROJ_ID) process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID = flags.GOOGLE_CLOUD_STORAGE_PROJ_ID
if (flags.GOOGLE_CLOUD_STORAGE_BUCKET_NAME) process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME = flags.GOOGLE_CLOUD_STORAGE_BUCKET_NAME
if (flags.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS)
process.env.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS = flags.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS
// Queue
if (flags.MODE) process.env.MODE = flags.MODE

View File

@ -43,6 +43,7 @@ import { randomBytes } from 'crypto'
import { AES, enc } from 'crypto-js'
import multer from 'multer'
import multerS3 from 'multer-s3'
import MulterGoogleCloudStorage from 'multer-cloud-storage'
import { ChatFlow } from '../database/entities/ChatFlow'
import { ChatMessage } from '../database/entities/ChatMessage'
import { Credential } from '../database/entities/Credential'
@ -1799,6 +1800,16 @@ export const getMulterStorage = () => {
})
})
return upload
} else if (storageType === 'gcs') {
return multer({
storage: new MulterGoogleCloudStorage({
projectId: process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID,
bucket: process.env.GOOGLE_CLOUD_STORAGE_BUCKET_NAME,
keyFilename: process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL,
uniformBucketLevelAccess: Boolean(process.env.GOOGLE_CLOUD_UNIFORM_BUCKET_ACCESS) ?? true,
destination: `uploads/${getOrgId()}`
})
})
} else {
return multer({ dest: getUploadPath() })
}

View File

@ -5,6 +5,7 @@ import config from './config' // should be replaced by node-config or similar
import { createLogger, transports, format } from 'winston'
import { NextFunction, Request, Response } from 'express'
import { S3ClientConfig } from '@aws-sdk/client-s3'
import { LoggingWinston } from '@google-cloud/logging-winston'
const { S3StreamLogger } = require('s3-streamlogger')
@ -13,6 +14,11 @@ const { combine, timestamp, printf, errors } = format
let s3ServerStream: any
let s3ErrorStream: any
let s3ServerReqStream: any
let gcsServerStream: any
let gcsErrorStream: any
let gcsServerReqStream: any
if (process.env.STORAGE_TYPE === 's3') {
const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID
const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY
@ -60,6 +66,29 @@ if (process.env.STORAGE_TYPE === 's3') {
})
}
if (process.env.STORAGE_TYPE === 'gcs') {
const config = {
projectId: process.env.GOOGLE_CLOUD_STORAGE_PROJ_ID,
keyFilename: process.env.GOOGLE_CLOUD_STORAGE_CREDENTIAL,
defaultCallback: (err: any) => {
if (err) {
console.error('Error logging to GCS: ' + err)
}
}
}
gcsServerStream = new LoggingWinston({
...config,
logName: 'server'
})
gcsErrorStream = new LoggingWinston({
...config,
logName: 'error'
})
gcsServerReqStream = new LoggingWinston({
...config,
logName: 'requests'
})
}
// expect the log dir be relative to the projects root
const logDir = config.logging.dir
@ -101,7 +130,8 @@ const logger = createLogger({
stream: s3ServerStream
})
]
: [])
: []),
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsServerStream] : [])
],
exceptionHandlers: [
...(!process.env.STORAGE_TYPE || process.env.STORAGE_TYPE === 'local'
@ -117,7 +147,8 @@ const logger = createLogger({
stream: s3ErrorStream
})
]
: [])
: []),
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsErrorStream] : [])
],
rejectionHandlers: [
...(!process.env.STORAGE_TYPE || process.env.STORAGE_TYPE === 'local'
@ -133,7 +164,8 @@ const logger = createLogger({
stream: s3ErrorStream
})
]
: [])
: []),
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsErrorStream] : [])
]
})
@ -168,7 +200,8 @@ export function expressRequestLogger(req: Request, res: Response, next: NextFunc
stream: s3ServerReqStream
})
]
: [])
: []),
...(process.env.STORAGE_TYPE === 'gcs' ? [gcsServerReqStream] : [])
]
})

File diff suppressed because one or more lines are too long