From 7006d64de0c5b2132a65dd7311046d884a0d5b99 Mon Sep 17 00:00:00 2001 From: Vinod Kiran Date: Tue, 23 Apr 2024 16:05:38 +0530 Subject: [PATCH] Feature/s3 storage (#2226) * centralizing file writing.... * allowing s3 as storage option * allowing s3 as storage option * update s3 storage --------- Co-authored-by: Henry --- CONTRIBUTING-ZH.md | 53 ++-- CONTRIBUTING.md | 61 ++-- docker/.env.example | 21 +- .../nodes/agents/CSVAgent/CSVAgent.ts | 7 +- .../ConversationalAgent.ts | 2 +- .../agents/ReActAgentChat/ReActAgentChat.ts | 2 +- .../nodes/agents/ToolAgent/ToolAgent.ts | 10 +- .../nodes/chains/ApiChain/OpenAPIChain.ts | 7 +- .../ConversationChain/ConversationChain.ts | 6 +- .../nodes/chains/LLMChain/LLMChain.ts | 2 +- .../nodes/documentloaders/Csv/Csv.ts | 7 +- .../nodes/documentloaders/Docx/Docx.ts | 7 +- .../nodes/documentloaders/Json/Json.ts | 7 +- .../documentloaders/Jsonlines/Jsonlines.ts | 7 +- .../nodes/documentloaders/Pdf/Pdf.ts | 7 +- .../nodes/documentloaders/Text/Text.ts | 7 +- .../tools/OpenAPIToolkit/OpenAPIToolkit.ts | 9 +- .../nodes/vectorstores/Vectara/Vectara.ts | 7 +- .../vectorstores/Vectara/Vectara_Upload.ts | 7 +- packages/components/src/index.ts | 1 + packages/components/src/multiModalUtils.ts | 12 +- packages/components/src/speechToText.ts | 10 +- packages/components/src/storageUtils.ts | 275 ++++++++++++++++++ packages/components/src/utils.ts | 7 - packages/server/.env.example | 26 +- packages/server/src/commands/start.ts | 20 +- .../src/controllers/get-upload-file/index.ts | 28 +- .../src/routes/get-upload-file/index.ts | 2 +- .../src/services/chat-messages/index.ts | 9 +- .../server/src/services/chatflows/index.ts | 19 +- packages/server/src/utils/buildChatflow.ts | 14 +- packages/server/src/utils/fileRepository.ts | 26 +- packages/server/src/utils/index.ts | 28 -- .../ui/src/views/chatmessage/ChatMessage.jsx | 2 +- 34 files changed, 458 insertions(+), 257 deletions(-) create mode 100644 packages/components/src/storageUtils.ts diff --git a/CONTRIBUTING-ZH.md b/CONTRIBUTING-ZH.md index 92b897185..742eb2f79 100644 --- a/CONTRIBUTING-ZH.md +++ b/CONTRIBUTING-ZH.md @@ -118,30 +118,35 @@ Flowise 在一个单一的单体存储库中有 3 个不同的模块。 Flowise 支持不同的环境变量来配置您的实例。您可以在 `packages/server` 文件夹中的 `.env` 文件中指定以下变量。阅读[更多信息](https://docs.flowiseai.com/environment-variables) -| 变量名 | 描述 | 类型 | 默认值 | -| --------------------------- | ------------------------------------------------------ | ----------------------------------------------- | ----------------------------------- | -| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 | -| FLOWISE_USERNAME | 登录用户名 | 字符串 | | -| FLOWISE_PASSWORD | 登录密码 | 字符串 | | -| FLOWISE_FILE_SIZE_LIMIT | 上传文件大小限制 | 字符串 | 50mb | -| DEBUG | 打印组件的日志 | 布尔值 | | -| BLOB_STORAGE_PATH | 存储位置 | 字符串 | `your-home-dir/.flowise/storage` | -| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` | -| LOG_LEVEL | 日志的不同级别 | 枚举字符串: `error`, `info`, `verbose`, `debug` | `info` | -| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` | -| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | | -| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | | -| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串: `sqlite`, `mysql`, `postgres` | `sqlite` | -| DATABASE_PATH | 数据库保存的位置(当 DATABASE_TYPE 是 sqlite 时) | 字符串 | `your-home-dir/.flowise` | -| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | -| SECRETKEY_PATH | 保存加密密钥(用于加密/解密凭据)的位置 | 字符串 | `your-path/Flowise/packages/server` | -| FLOWISE_SECRETKEY_OVERWRITE | 加密密钥用于替代存储在 SECRETKEY_PATH 中的密钥 | 字符串 | -| DISABLE_FLOWISE_TELEMETRY | 关闭遥测 | 字符串 | -| MODEL_LIST_CONFIG_JSON | 加载模型的位置 | 字符 | `/your_model_list_config_file_path` | +| 变量名 | 描述 | 类型 | 默认值 | +| ---------------------------- | ------------------------------------------------------- | ----------------------------------------------- | ----------------------------------- | +| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 | +| FLOWISE_USERNAME | 登录用户名 | 字符串 | | +| FLOWISE_PASSWORD | 登录密码 | 字符串 | | +| FLOWISE_FILE_SIZE_LIMIT | 上传文件大小限制 | 字符串 | 50mb | +| DEBUG | 打印组件的日志 | 布尔值 | | +| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` | +| LOG_LEVEL | 日志的不同级别 | 枚举字符串: `error`, `info`, `verbose`, `debug` | `info` | +| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` | +| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | | +| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | | +| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串: `sqlite`, `mysql`, `postgres` | `sqlite` | +| DATABASE_PATH | 数据库保存的位置(当 DATABASE_TYPE 是 sqlite 时) | 字符串 | `your-home-dir/.flowise` | +| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | | +| SECRETKEY_PATH | 保存加密密钥(用于加密/解密凭据)的位置 | 字符串 | `your-path/Flowise/packages/server` | +| FLOWISE_SECRETKEY_OVERWRITE | 加密密钥用于替代存储在 SECRETKEY_PATH 中的密钥 | 字符串 | +| DISABLE_FLOWISE_TELEMETRY | 关闭遥测 | 字符串 | +| MODEL_LIST_CONFIG_JSON | 加载模型的位置 | 字符 | `/your_model_list_config_file_path` | +| STORAGE_TYPE | 上传文件的存储类型 | 枚举字符串: `local`, `s3` | `local` | +| BLOB_STORAGE_PATH | 上传文件存储的本地文件夹路径, 当`STORAGE_TYPE`是`local` | 字符串 | `your-home-dir/.flowise/storage` | +| S3_STORAGE_BUCKET_NAME | S3 存储文件夹路径, 当`STORAGE_TYPE`是`s3` | 字符串 | | +| S3_STORAGE_ACCESS_KEY_ID | AWS 访问密钥 (Access Key) | 字符串 | | +| S3_STORAGE_SECRET_ACCESS_KEY | AWS 密钥 (Secret Key) | 字符串 | | +| S3_STORAGE_REGION | S3 存储地区 | 字符串 | | 您也可以在使用 `npx` 时指定环境变量。例如: diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index a17d89880..55b93e306 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -120,34 +120,39 @@ Flowise has 3 different modules in a single mono repository. Flowise support different environment variables to configure your instance. You can specify the following variables in the `.env` file inside `packages/server` folder. Read [more](https://docs.flowiseai.com/environment-variables) -| Variable | Description | Type | Default | -| --------------------------- | ---------------------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- | -| PORT | The HTTP port Flowise runs on | Number | 3000 | -| CORS_ORIGINS | The allowed origins for all cross-origin HTTP calls | String | | -| IFRAME_ORIGINS | The allowed origins for iframe src embedding | String | | -| FLOWISE_USERNAME | Username to login | String | | -| FLOWISE_PASSWORD | Password to login | String | | -| FLOWISE_FILE_SIZE_LIMIT | Upload File Size Limit | String | 50mb | -| DEBUG | Print logs from components | Boolean | | -| BLOB_STORAGE_PATH | Location where uploaded files are stored | String | `your-home-dir/.flowise/storage` | -| LOG_PATH | Location where log files are stored | String | `your-path/Flowise/logs` | -| LOG_LEVEL | Different levels of logs | Enum String: `error`, `info`, `verbose`, `debug` | `info` | -| APIKEY_PATH | Location where api keys are saved | String | `your-path/Flowise/packages/server` | -| TOOL_FUNCTION_BUILTIN_DEP | NodeJS built-in modules to be used for Tool Function | String | | -| TOOL_FUNCTION_EXTERNAL_DEP | External modules to be used for Tool Function | String | | -| DATABASE_TYPE | Type of database to store the flowise data | Enum String: `sqlite`, `mysql`, `postgres` | `sqlite` | -| DATABASE_PATH | Location where database is saved (When DATABASE_TYPE is sqlite) | String | `your-home-dir/.flowise` | -| DATABASE_HOST | Host URL or IP address (When DATABASE_TYPE is not sqlite) | String | | -| DATABASE_PORT | Database port (When DATABASE_TYPE is not sqlite) | String | | -| DATABASE_USER | Database username (When DATABASE_TYPE is not sqlite) | String | | -| DATABASE_PASSWORD | Database password (When DATABASE_TYPE is not sqlite) | String | | -| DATABASE_NAME | Database name (When DATABASE_TYPE is not sqlite) | String | | -| DATABASE_SSL_KEY_BASE64 | Database SSL client cert in base64 (takes priority over DATABASE_SSL) | Boolean | false | -| DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false | -| SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` | -| FLOWISE_SECRETKEY_OVERWRITE | Encryption key to be used instead of the key stored in SECRETKEY_PATH | String | -| DISABLE_FLOWISE_TELEMETRY | Turn off telemetry | Boolean | -| MODEL_LIST_CONFIG_JSON | File path to load list of models from your local config file | String | `/your_model_list_config_file_path` | +| Variable | Description | Type | Default | +| ---------------------------- | -------------------------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- | +| PORT | The HTTP port Flowise runs on | Number | 3000 | +| CORS_ORIGINS | The allowed origins for all cross-origin HTTP calls | String | | +| IFRAME_ORIGINS | The allowed origins for iframe src embedding | String | | +| FLOWISE_USERNAME | Username to login | String | | +| FLOWISE_PASSWORD | Password to login | String | | +| FLOWISE_FILE_SIZE_LIMIT | Upload File Size Limit | String | 50mb | +| DEBUG | Print logs from components | Boolean | | +| LOG_PATH | Location where log files are stored | String | `your-path/Flowise/logs` | +| LOG_LEVEL | Different levels of logs | Enum String: `error`, `info`, `verbose`, `debug` | `info` | +| APIKEY_PATH | Location where api keys are saved | String | `your-path/Flowise/packages/server` | +| TOOL_FUNCTION_BUILTIN_DEP | NodeJS built-in modules to be used for Tool Function | String | | +| TOOL_FUNCTION_EXTERNAL_DEP | External modules to be used for Tool Function | String | | +| DATABASE_TYPE | Type of database to store the flowise data | Enum String: `sqlite`, `mysql`, `postgres` | `sqlite` | +| DATABASE_PATH | Location where database is saved (When DATABASE_TYPE is sqlite) | String | `your-home-dir/.flowise` | +| DATABASE_HOST | Host URL or IP address (When DATABASE_TYPE is not sqlite) | String | | +| DATABASE_PORT | Database port (When DATABASE_TYPE is not sqlite) | String | | +| DATABASE_USER | Database username (When DATABASE_TYPE is not sqlite) | String | | +| DATABASE_PASSWORD | Database password (When DATABASE_TYPE is not sqlite) | String | | +| DATABASE_NAME | Database name (When DATABASE_TYPE is not sqlite) | String | | +| DATABASE_SSL_KEY_BASE64 | Database SSL client cert in base64 (takes priority over DATABASE_SSL) | Boolean | false | +| DATABASE_SSL | Database connection overssl (When DATABASE_TYPE is postgre) | Boolean | false | +| SECRETKEY_PATH | Location where encryption key (used to encrypt/decrypt credentials) is saved | String | `your-path/Flowise/packages/server` | +| FLOWISE_SECRETKEY_OVERWRITE | Encryption key to be used instead of the key stored in SECRETKEY_PATH | String | +| DISABLE_FLOWISE_TELEMETRY | Turn off telemetry | Boolean | +| MODEL_LIST_CONFIG_JSON | File path to load list of models from your local config file | String | `/your_model_list_config_file_path` | +| STORAGE_TYPE | Type of storage for uploaded files. default is `local` | Enum String: `s3`, `local` | `local` | +| BLOB_STORAGE_PATH | Local folder path where uploaded files are stored when `STORAGE_TYPE` is `local` | String | `your-home-dir/.flowise/storage` | +| S3_STORAGE_BUCKET_NAME | Bucket name to hold the uploaded files when `STORAGE_TYPE` is `s3` | String | | +| S3_STORAGE_ACCESS_KEY_ID | AWS Access Key | String | | +| S3_STORAGE_SECRET_ACCESS_KEY | AWS Secret Key | String | | +| S3_STORAGE_REGION | Region for S3 bucket | String | | You can also specify the env variables when using `npx`. For example: diff --git a/docker/.env.example b/docker/.env.example index dfed336c5..d685140a9 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -5,17 +5,16 @@ SECRETKEY_PATH=/root/.flowise LOG_PATH=/root/.flowise/logs BLOB_STORAGE_PATH=/root/.flowise/storage +# NUMBER_OF_PROXIES= 1 # CORS_ORIGINS="*" # IFRAME_ORIGINS="*" -# NUMBER_OF_PROXIES= 1 - # DATABASE_TYPE=postgres -# DATABASE_PORT="" +# DATABASE_PORT=5432 # DATABASE_HOST="" -# DATABASE_NAME="flowise" -# DATABASE_USER="" -# DATABASE_PASSWORD="" +# DATABASE_NAME=flowise +# DATABASE_USER=root +# DATABASE_PASSWORD=mypassword # DATABASE_SSL=true # DATABASE_SSL_KEY_BASE64= @@ -23,6 +22,7 @@ BLOB_STORAGE_PATH=/root/.flowise/storage # FLOWISE_PASSWORD=1234 # FLOWISE_SECRETKEY_OVERWRITE=myencryptionkey # FLOWISE_FILE_SIZE_LIMIT=50mb + # DEBUG=true # LOG_LEVEL=debug (error | warn | info | verbose | debug) # TOOL_FUNCTION_BUILTIN_DEP=crypto,fs @@ -37,4 +37,11 @@ BLOB_STORAGE_PATH=/root/.flowise/storage # Uncomment the following line to enable model list config, load the list of models from your local config file # see https://raw.githubusercontent.com/FlowiseAI/Flowise/main/packages/components/models.json for the format -# MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path \ No newline at end of file +# MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path + +# STORAGE_TYPE=local (local | s3) +# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage +# S3_STORAGE_BUCKET_NAME=flowise +# S3_STORAGE_ACCESS_KEY_ID= +# S3_STORAGE_SECRET_ACCESS_KEY= +# S3_STORAGE_REGION=us-west-2 \ No newline at end of file diff --git a/packages/components/nodes/agents/CSVAgent/CSVAgent.ts b/packages/components/nodes/agents/CSVAgent/CSVAgent.ts index df3f39ab1..4478f9a46 100644 --- a/packages/components/nodes/agents/CSVAgent/CSVAgent.ts +++ b/packages/components/nodes/agents/CSVAgent/CSVAgent.ts @@ -7,9 +7,7 @@ import { getBaseClasses } from '../../../src/utils' import { LoadPyodide, finalSystemPrompt, systemPrompt } from './core' import { checkInputs, Moderation } from '../../moderation/Moderation' import { formatResponse } from '../../outputparsers/OutputParserHelpers' -import path from 'path' -import { getStoragePath } from '../../../src' -import fs from 'fs' +import { getFileFromStorage } from '../../../src' class CSV_Agents implements INode { label: string @@ -114,8 +112,7 @@ class CSV_Agents implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) base64String += fileData.toString('base64') } } else { diff --git a/packages/components/nodes/agents/ConversationalAgent/ConversationalAgent.ts b/packages/components/nodes/agents/ConversationalAgent/ConversationalAgent.ts index 8b4a09304..d8b3d7570 100644 --- a/packages/components/nodes/agents/ConversationalAgent/ConversationalAgent.ts +++ b/packages/components/nodes/agents/ConversationalAgent/ConversationalAgent.ts @@ -203,7 +203,7 @@ const prepareAgent = async ( if (llmSupportsVision(model)) { const visionChatModel = model as IVisionChatModal - const messageContent = addImagesToMessages(nodeData, options, model.multiModalOption) + const messageContent = await addImagesToMessages(nodeData, options, model.multiModalOption) if (messageContent?.length) { visionChatModel.setVisionModel() diff --git a/packages/components/nodes/agents/ReActAgentChat/ReActAgentChat.ts b/packages/components/nodes/agents/ReActAgentChat/ReActAgentChat.ts index be91b380a..ee5629260 100644 --- a/packages/components/nodes/agents/ReActAgentChat/ReActAgentChat.ts +++ b/packages/components/nodes/agents/ReActAgentChat/ReActAgentChat.ts @@ -98,7 +98,7 @@ class ReActAgentChat_Agents implements INode { if (llmSupportsVision(model)) { const visionChatModel = model as IVisionChatModal - const messageContent = addImagesToMessages(nodeData, options, model.multiModalOption) + const messageContent = await addImagesToMessages(nodeData, options, model.multiModalOption) if (messageContent?.length) { // Change model to vision supported diff --git a/packages/components/nodes/agents/ToolAgent/ToolAgent.ts b/packages/components/nodes/agents/ToolAgent/ToolAgent.ts index 1c5605a97..6eeeb745d 100644 --- a/packages/components/nodes/agents/ToolAgent/ToolAgent.ts +++ b/packages/components/nodes/agents/ToolAgent/ToolAgent.ts @@ -106,7 +106,7 @@ class ToolAgent_Agents implements INode { } } - const executor = prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) + const executor = await prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) const loggerHandler = new ConsoleCallbackHandler(options.logger) const callbacks = await additionalCallbacks(nodeData, options) @@ -178,7 +178,11 @@ class ToolAgent_Agents implements INode { } } -const prepareAgent = (nodeData: INodeData, options: ICommonObject, flowObj: { sessionId?: string; chatId?: string; input?: string }) => { +const prepareAgent = async ( + nodeData: INodeData, + options: ICommonObject, + flowObj: { sessionId?: string; chatId?: string; input?: string } +) => { const model = nodeData.inputs?.model as BaseChatModel const maxIterations = nodeData.inputs?.maxIterations as string const memory = nodeData.inputs?.memory as FlowiseMemory @@ -197,7 +201,7 @@ const prepareAgent = (nodeData: INodeData, options: ICommonObject, flowObj: { se if (llmSupportsVision(model)) { const visionChatModel = model as IVisionChatModal - const messageContent = addImagesToMessages(nodeData, options, model.multiModalOption) + const messageContent = await addImagesToMessages(nodeData, options, model.multiModalOption) if (messageContent?.length) { visionChatModel.setVisionModel() diff --git a/packages/components/nodes/chains/ApiChain/OpenAPIChain.ts b/packages/components/nodes/chains/ApiChain/OpenAPIChain.ts index 3d2113335..efe751f40 100644 --- a/packages/components/nodes/chains/ApiChain/OpenAPIChain.ts +++ b/packages/components/nodes/chains/ApiChain/OpenAPIChain.ts @@ -5,9 +5,7 @@ import { getBaseClasses } from '../../../src/utils' import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler' import { checkInputs, Moderation, streamResponse } from '../../moderation/Moderation' import { formatResponse } from '../../outputparsers/OutputParserHelpers' -import { getStoragePath } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getFileFromStorage } from '../../../src' class OpenApiChain_Chains implements INode { label: string @@ -111,8 +109,7 @@ const initChain = async (nodeData: INodeData, options: ICommonObject) => { if (yamlFileBase64.startsWith('FILE-STORAGE::')) { const file = yamlFileBase64.replace('FILE-STORAGE::', '') const chatflowid = options.chatflowid - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) yamlString = fileData.toString() } else { const splitDataURI = yamlFileBase64.split(',') diff --git a/packages/components/nodes/chains/ConversationChain/ConversationChain.ts b/packages/components/nodes/chains/ConversationChain/ConversationChain.ts index 99aebdf6c..672ede7b1 100644 --- a/packages/components/nodes/chains/ConversationChain/ConversationChain.ts +++ b/packages/components/nodes/chains/ConversationChain/ConversationChain.ts @@ -111,7 +111,7 @@ class ConversationChain_Chains implements INode { async run(nodeData: INodeData, input: string, options: ICommonObject): Promise { const memory = nodeData.inputs?.memory - const chain = prepareChain(nodeData, options, this.sessionId) + const chain = await prepareChain(nodeData, options, this.sessionId) const moderations = nodeData.inputs?.inputModeration as Moderation[] if (moderations && moderations.length > 0) { @@ -216,14 +216,14 @@ const prepareChatPrompt = (nodeData: INodeData, humanImageMessages: MessageConte return chatPrompt } -const prepareChain = (nodeData: INodeData, options: ICommonObject, sessionId?: string) => { +const prepareChain = async (nodeData: INodeData, options: ICommonObject, sessionId?: string) => { let model = nodeData.inputs?.model as BaseChatModel const memory = nodeData.inputs?.memory as FlowiseMemory const memoryKey = memory.memoryKey ?? 'chat_history' let messageContent: MessageContentImageUrl[] = [] if (llmSupportsVision(model)) { - messageContent = addImagesToMessages(nodeData, options, model.multiModalOption) + messageContent = await addImagesToMessages(nodeData, options, model.multiModalOption) const visionChatModel = model as IVisionChatModal if (messageContent?.length) { visionChatModel.setVisionModel() diff --git a/packages/components/nodes/chains/LLMChain/LLMChain.ts b/packages/components/nodes/chains/LLMChain/LLMChain.ts index fa0fd61c3..0bb1e1220 100644 --- a/packages/components/nodes/chains/LLMChain/LLMChain.ts +++ b/packages/components/nodes/chains/LLMChain/LLMChain.ts @@ -184,7 +184,7 @@ const runPrediction = async ( if (llmSupportsVision(chain.llm)) { const visionChatModel = chain.llm as IVisionChatModal - const messageContent = addImagesToMessages(nodeData, options, visionChatModel.multiModalOption) + const messageContent = await addImagesToMessages(nodeData, options, visionChatModel.multiModalOption) if (messageContent?.length) { // Change model to gpt-4-vision && max token to higher when using gpt-4-vision visionChatModel.setVisionModel() diff --git a/packages/components/nodes/documentloaders/Csv/Csv.ts b/packages/components/nodes/documentloaders/Csv/Csv.ts index 8a1f6d0a7..3fb9ccfb8 100644 --- a/packages/components/nodes/documentloaders/Csv/Csv.ts +++ b/packages/components/nodes/documentloaders/Csv/Csv.ts @@ -1,9 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { TextSplitter } from 'langchain/text_splitter' import { CSVLoader } from 'langchain/document_loaders/fs/csv' -import path from 'path' -import { getStoragePath } from '../../../src' -import fs from 'fs' +import { getFileFromStorage } from '../../../src' class Csv_DocumentLoaders implements INode { label: string @@ -75,8 +73,7 @@ class Csv_DocumentLoaders implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) const loader = new CSVLoader(blob, columnName.trim().length === 0 ? undefined : columnName.trim()) diff --git a/packages/components/nodes/documentloaders/Docx/Docx.ts b/packages/components/nodes/documentloaders/Docx/Docx.ts index c8c75e95b..1efe3fbd3 100644 --- a/packages/components/nodes/documentloaders/Docx/Docx.ts +++ b/packages/components/nodes/documentloaders/Docx/Docx.ts @@ -1,9 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { TextSplitter } from 'langchain/text_splitter' import { DocxLoader } from 'langchain/document_loaders/fs/docx' -import path from 'path' -import { getStoragePath } from '../../../src' -import fs from 'fs' +import { getFileFromStorage } from '../../../src' class Docx_DocumentLoaders implements INode { label: string @@ -66,8 +64,7 @@ class Docx_DocumentLoaders implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) const loader = new DocxLoader(blob) diff --git a/packages/components/nodes/documentloaders/Json/Json.ts b/packages/components/nodes/documentloaders/Json/Json.ts index b204c5942..72f80750a 100644 --- a/packages/components/nodes/documentloaders/Json/Json.ts +++ b/packages/components/nodes/documentloaders/Json/Json.ts @@ -1,9 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { TextSplitter } from 'langchain/text_splitter' import { JSONLoader } from 'langchain/document_loaders/fs/json' -import { getStoragePath } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getFileFromStorage } from '../../../src' class Json_DocumentLoaders implements INode { label: string @@ -82,8 +80,7 @@ class Json_DocumentLoaders implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) const loader = new JSONLoader(blob, pointers.length != 0 ? pointers : undefined) diff --git a/packages/components/nodes/documentloaders/Jsonlines/Jsonlines.ts b/packages/components/nodes/documentloaders/Jsonlines/Jsonlines.ts index 14517dbd8..6c854e056 100644 --- a/packages/components/nodes/documentloaders/Jsonlines/Jsonlines.ts +++ b/packages/components/nodes/documentloaders/Jsonlines/Jsonlines.ts @@ -1,9 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { TextSplitter } from 'langchain/text_splitter' import { JSONLinesLoader } from 'langchain/document_loaders/fs/json' -import { getStoragePath } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getFileFromStorage } from '../../../src' class Jsonlines_DocumentLoaders implements INode { label: string @@ -76,8 +74,7 @@ class Jsonlines_DocumentLoaders implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) const loader = new JSONLinesLoader(blob, pointer) diff --git a/packages/components/nodes/documentloaders/Pdf/Pdf.ts b/packages/components/nodes/documentloaders/Pdf/Pdf.ts index d21587a91..278f1c61d 100644 --- a/packages/components/nodes/documentloaders/Pdf/Pdf.ts +++ b/packages/components/nodes/documentloaders/Pdf/Pdf.ts @@ -1,9 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { TextSplitter } from 'langchain/text_splitter' import { PDFLoader } from 'langchain/document_loaders/fs/pdf' -import { getStoragePath } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getFileFromStorage } from '../../../src' class Pdf_DocumentLoaders implements INode { label: string @@ -92,8 +90,7 @@ class Pdf_DocumentLoaders implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const bf = Buffer.from(fileData) await this.extractDocs(usage, bf, legacyBuild, textSplitter, alldocs) } diff --git a/packages/components/nodes/documentloaders/Text/Text.ts b/packages/components/nodes/documentloaders/Text/Text.ts index fe63bcaff..fbfd27186 100644 --- a/packages/components/nodes/documentloaders/Text/Text.ts +++ b/packages/components/nodes/documentloaders/Text/Text.ts @@ -2,9 +2,7 @@ import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from import { TextSplitter } from 'langchain/text_splitter' import { TextLoader } from 'langchain/document_loaders/fs/text' import { Document } from '@langchain/core/documents' -import { getStoragePath, handleEscapeCharacters } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getFileFromStorage, handleEscapeCharacters } from '../../../src' class Text_DocumentLoaders implements INode { label: string @@ -85,8 +83,7 @@ class Text_DocumentLoaders implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) const loader = new TextLoader(blob) diff --git a/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts b/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts index a1526c0b5..a958b09bb 100644 --- a/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts +++ b/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts @@ -3,10 +3,7 @@ import { BaseLanguageModel } from '@langchain/core/language_models/base' import { OpenApiToolkit } from 'langchain/agents' import { JsonSpec, JsonObject } from './core' import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' -import { getCredentialData, getCredentialParam } from '../../../src' -import { getStoragePath } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getCredentialData, getCredentialParam, getFileFromStorage } from '../../../src' class OpenAPIToolkit_Tools implements INode { label: string @@ -63,9 +60,9 @@ class OpenAPIToolkit_Tools implements INode { if (yamlFileBase64.startsWith('FILE-STORAGE::')) { const file = yamlFileBase64.replace('FILE-STORAGE::', '') const chatflowid = options.chatflowid - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const utf8String = fileData.toString('utf-8') + data = load(utf8String) as JsonObject } else { const splitDataURI = yamlFileBase64.split(',') diff --git a/packages/components/nodes/vectorstores/Vectara/Vectara.ts b/packages/components/nodes/vectorstores/Vectara/Vectara.ts index 8baa6ed9a..7d0705075 100644 --- a/packages/components/nodes/vectorstores/Vectara/Vectara.ts +++ b/packages/components/nodes/vectorstores/Vectara/Vectara.ts @@ -11,9 +11,7 @@ import { Document } from '@langchain/core/documents' import { Embeddings } from '@langchain/core/embeddings' import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams, IndexingResult } from '../../../src/Interface' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' -import { getStoragePath } from '../../../src' -import fs from 'fs' -import path from 'path' +import { getFileFromStorage } from '../../../src' class Vectara_VectorStores implements INode { label: string @@ -197,8 +195,7 @@ class Vectara_VectorStores implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) vectaraFiles.push({ blob: blob, fileName: getFileName(file) }) } diff --git a/packages/components/nodes/vectorstores/Vectara/Vectara_Upload.ts b/packages/components/nodes/vectorstores/Vectara/Vectara_Upload.ts index 378c89624..1006f4c50 100644 --- a/packages/components/nodes/vectorstores/Vectara/Vectara_Upload.ts +++ b/packages/components/nodes/vectorstores/Vectara/Vectara_Upload.ts @@ -1,9 +1,7 @@ import { VectaraStore, VectaraLibArgs, VectaraFilter, VectaraContextConfig, VectaraFile } from '@langchain/community/vectorstores/vectara' import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' -import path from 'path' -import { getStoragePath } from '../../../src' -import fs from 'fs' +import { getFileFromStorage } from '../../../src' class VectaraUpload_VectorStores implements INode { label: string @@ -144,8 +142,7 @@ class VectaraUpload_VectorStores implements INode { const chatflowid = options.chatflowid for (const file of files) { - const fileInStorage = path.join(getStoragePath(), chatflowid, file) - const fileData = fs.readFileSync(fileInStorage) + const fileData = await getFileFromStorage(file, chatflowid) const blob = new Blob([fileData]) vectaraFiles.push({ blob: blob, fileName: getFileName(file) }) } diff --git a/packages/components/src/index.ts b/packages/components/src/index.ts index 10cd10368..fb556163d 100644 --- a/packages/components/src/index.ts +++ b/packages/components/src/index.ts @@ -7,3 +7,4 @@ dotenv.config({ path: envPath, override: true }) export * from './Interface' export * from './utils' export * from './speechToText' +export * from './storageUtils' diff --git a/packages/components/src/multiModalUtils.ts b/packages/components/src/multiModalUtils.ts index b3a512102..fbb13ffb5 100644 --- a/packages/components/src/multiModalUtils.ts +++ b/packages/components/src/multiModalUtils.ts @@ -1,13 +1,11 @@ import { IVisionChatModal, ICommonObject, IFileUpload, IMultiModalOption, INodeData, MessageContentImageUrl } from './Interface' -import path from 'path' -import { getStoragePath } from './utils' -import fs from 'fs' +import { getFileFromStorage } from './storageUtils' -export const addImagesToMessages = ( +export const addImagesToMessages = async ( nodeData: INodeData, options: ICommonObject, multiModalOption?: IMultiModalOption -): MessageContentImageUrl[] => { +): Promise => { const imageContent: MessageContentImageUrl[] = [] let model = nodeData.inputs?.model @@ -18,10 +16,8 @@ export const addImagesToMessages = ( for (const upload of imageUploads) { let bf = upload.data if (upload.type == 'stored-file') { - const filePath = path.join(getStoragePath(), options.chatflowid, options.chatId, upload.name) - + const contents = await getFileFromStorage(upload.name, options.chatflowid, options.chatId) // as the image is stored in the server, read the file and convert it to base64 - const contents = fs.readFileSync(filePath) bf = 'data:' + upload.mime + ';base64,' + contents.toString('base64') imageContent.push({ diff --git a/packages/components/src/speechToText.ts b/packages/components/src/speechToText.ts index 20d72e40a..732b574c8 100644 --- a/packages/components/src/speechToText.ts +++ b/packages/components/src/speechToText.ts @@ -1,16 +1,14 @@ import { ICommonObject, IFileUpload } from './Interface' -import { getCredentialData, getStoragePath } from './utils' +import { getCredentialData } from './utils' import { type ClientOptions, OpenAIClient } from '@langchain/openai' -import fs from 'fs' -import path from 'path' import { AssemblyAI } from 'assemblyai' +import { getFileFromStorage } from './storageUtils' export const convertSpeechToText = async (upload: IFileUpload, speechToTextConfig: ICommonObject, options: ICommonObject) => { if (speechToTextConfig) { const credentialId = speechToTextConfig.credentialId as string const credentialData = await getCredentialData(credentialId ?? '', options) - const filePath = path.join(getStoragePath(), options.chatflowid, options.chatId, upload.name) - const audio_file = fs.createReadStream(filePath) + const audio_file = await getFileFromStorage(upload.name, options.chatflowid, options.chatId) if (speechToTextConfig.name === 'openAIWhisper') { const openAIClientOptions: ClientOptions = { @@ -18,7 +16,7 @@ export const convertSpeechToText = async (upload: IFileUpload, speechToTextConfi } const openAIClient = new OpenAIClient(openAIClientOptions) const transcription = await openAIClient.audio.transcriptions.create({ - file: audio_file, + file: new File([new Blob([audio_file])], upload.name), model: 'whisper-1', language: speechToTextConfig?.language, temperature: speechToTextConfig?.temperature ? parseFloat(speechToTextConfig.temperature) : undefined, diff --git a/packages/components/src/storageUtils.ts b/packages/components/src/storageUtils.ts new file mode 100644 index 000000000..6a33fbf8b --- /dev/null +++ b/packages/components/src/storageUtils.ts @@ -0,0 +1,275 @@ +import path from 'path' +import fs from 'fs' +import { DeleteObjectsCommand, GetObjectCommand, ListObjectsV2Command, PutObjectCommand, S3Client } from '@aws-sdk/client-s3' +import { Readable } from 'node:stream' +import { getUserHome } from './utils' + +export const addBase64FilesToStorage = async (file: string, chatflowid: string, fileNames: string[]) => { + const storageType = getStorageType() + if (storageType === 's3') { + const { s3Client, Bucket } = getS3Config() + + const splitDataURI = file.split(',') + const filename = splitDataURI.pop()?.split(':')[1] ?? '' + const bf = Buffer.from(splitDataURI.pop() || '', 'base64') + const mime = splitDataURI[0].split(':')[1].split(';')[0] + + const key = chatflowid + '/' + filename + const putObjCmd = new PutObjectCommand({ + Bucket, + Key: key, + ContentEncoding: 'base64', // required for binary data + ContentType: mime, + Body: bf + }) + await s3Client.send(putObjCmd) + + fileNames.push(filename) + return 'FILE-STORAGE::' + JSON.stringify(fileNames) + } else { + const dir = path.join(getStoragePath(), chatflowid) + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }) + } + + const splitDataURI = file.split(',') + const filename = splitDataURI.pop()?.split(':')[1] ?? '' + const bf = Buffer.from(splitDataURI.pop() || '', 'base64') + + const filePath = path.join(dir, filename) + fs.writeFileSync(filePath, bf) + fileNames.push(filename) + return 'FILE-STORAGE::' + JSON.stringify(fileNames) + } +} + +export const addFileToStorage = async (mime: string, bf: Buffer, fileName: string, ...paths: string[]) => { + const storageType = getStorageType() + if (storageType === 's3') { + const { s3Client, Bucket } = getS3Config() + + let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + fileName + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + + const putObjCmd = new PutObjectCommand({ + Bucket, + Key, + ContentEncoding: 'base64', // required for binary data + ContentType: mime, + Body: bf + }) + await s3Client.send(putObjCmd) + } else { + const dir = path.join(getStoragePath(), ...paths) + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }) + } + + const filePath = path.join(dir, fileName) + fs.writeFileSync(filePath, bf) + } +} + +export const getFileFromStorage = async (file: string, ...paths: string[]): Promise => { + const storageType = getStorageType() + if (storageType === 's3') { + const { s3Client, Bucket } = getS3Config() + + let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + '/' + file + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + + const getParams = { + Bucket, + Key + } + + const response = await s3Client.send(new GetObjectCommand(getParams)) + const body = response.Body + if (body instanceof Readable) { + const streamToString = await body.transformToString('base64') + if (streamToString) { + return Buffer.from(streamToString, 'base64') + } + } + // @ts-ignore + const buffer = Buffer.concat(response.Body.toArray()) + return buffer + } else { + const fileInStorage = path.join(getStoragePath(), ...paths, file) + return fs.readFileSync(fileInStorage) + } +} + +/** + * Prepare storage path + */ +export const getStoragePath = (): string => { + return process.env.BLOB_STORAGE_PATH ? path.join(process.env.BLOB_STORAGE_PATH) : path.join(getUserHome(), '.flowise', 'storage') +} + +/** + * Get the storage type - local or s3 + */ +export const getStorageType = (): string => { + return process.env.STORAGE_TYPE ? process.env.STORAGE_TYPE : 'local' +} + +export const removeFilesFromStorage = async (...paths: string[]) => { + const storageType = getStorageType() + if (storageType === 's3') { + let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + // remove the first '/' if it exists + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + await _deleteS3Folder(Key) + } else { + const directory = path.join(getStoragePath(), ...paths) + _deleteLocalFolderRecursive(directory) + } +} + +export const removeFolderFromStorage = async (...paths: string[]) => { + const storageType = getStorageType() + if (storageType === 's3') { + let Key = paths.reduce((acc, cur) => acc + '/' + cur, '') + // remove the first '/' if it exists + if (Key.startsWith('/')) { + Key = Key.substring(1) + } + await _deleteS3Folder(Key) + } else { + const directory = path.join(getStoragePath(), ...paths) + _deleteLocalFolderRecursive(directory, true) + } +} + +const _deleteLocalFolderRecursive = (directory: string, deleteParentChatflowFolder?: boolean) => { + // Console error here as failing is not destructive operation + if (fs.existsSync(directory)) { + if (deleteParentChatflowFolder) { + fs.rmSync(directory, { recursive: true, force: true }) + } else { + fs.readdir(directory, (error, files) => { + if (error) console.error('Could not read directory') + + for (let i = 0; i < files.length; i++) { + const file = files[i] + const file_path = path.join(directory, file) + + fs.stat(file_path, (error, stat) => { + if (error) console.error('File do not exist') + + if (!stat.isDirectory()) { + fs.unlink(file_path, (error) => { + if (error) console.error('Could not delete file') + }) + if (i === files.length - 1) { + fs.rmSync(directory, { recursive: true, force: true }) + } + } else { + _deleteLocalFolderRecursive(file_path) + } + }) + } + }) + } + } +} + +const _deleteS3Folder = async (location: string) => { + let count = 0 // number of files deleted + const { s3Client, Bucket } = getS3Config() + async function recursiveS3Delete(token?: any) { + // get the files + const listCommand = new ListObjectsV2Command({ + Bucket: Bucket, + Prefix: location, + ContinuationToken: token + }) + let list = await s3Client.send(listCommand) + if (list.KeyCount) { + const deleteCommand = new DeleteObjectsCommand({ + Bucket: Bucket, + Delete: { + Objects: list.Contents?.map((item) => ({ Key: item.Key })), + Quiet: false + } + }) + let deleted = await s3Client.send(deleteCommand) + // @ts-ignore + count += deleted.Deleted.length + + if (deleted.Errors) { + deleted.Errors.map((error: any) => console.error(`${error.Key} could not be deleted - ${error.Code}`)) + } + } + // repeat if more files to delete + if (list.NextContinuationToken) { + await recursiveS3Delete(list.NextContinuationToken) + } + // return total deleted count when finished + return `${count} files deleted from S3` + } + // start the recursive function + return recursiveS3Delete() +} + +export const streamStorageFile = async ( + chatflowId: string, + chatId: string, + fileName: string +): Promise => { + const storageType = getStorageType() + if (storageType === 's3') { + const { s3Client, Bucket } = getS3Config() + + const Key = chatflowId + '/' + chatId + '/' + fileName + const getParams = { + Bucket, + Key + } + const response = await s3Client.send(new GetObjectCommand(getParams)) + const body = response.Body + if (body instanceof Readable) { + const blob = await body.transformToByteArray() + return Buffer.from(blob) + } + } else { + const filePath = path.join(getStoragePath(), chatflowId, chatId, fileName) + //raise error if file path is not absolute + if (!path.isAbsolute(filePath)) throw new Error(`Invalid file path`) + //raise error if file path contains '..' + if (filePath.includes('..')) throw new Error(`Invalid file path`) + //only return from the storage folder + if (!filePath.startsWith(getStoragePath())) throw new Error(`Invalid file path`) + + if (fs.existsSync(filePath)) { + return fs.createReadStream(filePath) + } else { + throw new Error(`File ${fileName} not found`) + } + } +} + +const getS3Config = () => { + const accessKeyId = process.env.S3_STORAGE_ACCESS_KEY_ID + const secretAccessKey = process.env.S3_STORAGE_SECRET_ACCESS_KEY + const region = process.env.S3_STORAGE_REGION + const Bucket = process.env.S3_STORAGE_BUCKET_NAME + if (!accessKeyId || !secretAccessKey || !region || !Bucket) { + throw new Error('S3 storage configuration is missing') + } + const s3Client = new S3Client({ + credentials: { + accessKeyId, + secretAccessKey + }, + region + }) + return { s3Client, Bucket } +} diff --git a/packages/components/src/utils.ts b/packages/components/src/utils.ts index 4d477ff26..b99b10808 100644 --- a/packages/components/src/utils.ts +++ b/packages/components/src/utils.ts @@ -769,10 +769,3 @@ export const prepareSandboxVars = (variables: IVariable[]) => { } return vars } - -/** - * Prepare storage path - */ -export const getStoragePath = (): string => { - return process.env.BLOB_STORAGE_PATH ? path.join(process.env.BLOB_STORAGE_PATH) : path.join(getUserHome(), '.flowise', 'storage') -} diff --git a/packages/server/.env.example b/packages/server/.env.example index a99d38b47..3079209bf 100644 --- a/packages/server/.env.example +++ b/packages/server/.env.example @@ -1,20 +1,19 @@ PORT=3000 -# CORS_ORIGINS="*" -# IFRAME_ORIGINS="*" -# DATABASE_PATH=/your_database_path/.flowise + # APIKEY_PATH=/your_api_key_path/.flowise # SECRETKEY_PATH=/your_api_key_path/.flowise -# LOG_PATH=/your_log_path/.flowise/logs -# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage # NUMBER_OF_PROXIES= 1 +# CORS_ORIGINS=* +# IFRAME_ORIGINS=* +# DATABASE_PATH=/your_database_path/.flowise # DATABASE_TYPE=postgres -# DATABASE_PORT="" +# DATABASE_PORT=5432 # DATABASE_HOST="" -# DATABASE_NAME="flowise" -# DATABASE_USER="" -# DATABASE_PASSWORD="" +# DATABASE_NAME=flowise +# DATABASE_USER=root +# DATABASE_PASSWORD=mypassword # DATABASE_SSL=true # DATABASE_SSL_KEY_BASE64= @@ -22,7 +21,9 @@ PORT=3000 # FLOWISE_PASSWORD=1234 # FLOWISE_SECRETKEY_OVERWRITE=myencryptionkey # FLOWISE_FILE_SIZE_LIMIT=50mb + # DEBUG=true +# LOG_PATH=/your_log_path/.flowise/logs # LOG_LEVEL=debug (error | warn | info | verbose | debug) # TOOL_FUNCTION_BUILTIN_DEP=crypto,fs # TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash @@ -37,3 +38,10 @@ PORT=3000 # Uncomment the following line to enable model list config, load the list of models from your local config file # see https://raw.githubusercontent.com/FlowiseAI/Flowise/main/packages/components/models.json for the format # MODEL_LIST_CONFIG_JSON=/your_model_list_config_file_path + +# STORAGE_TYPE=local (local | s3) +# BLOB_STORAGE_PATH=/your_storage_path/.flowise/storage +# S3_STORAGE_BUCKET_NAME=flowise +# S3_STORAGE_ACCESS_KEY_ID= +# S3_STORAGE_SECRET_ACCESS_KEY= +# S3_STORAGE_REGION=us-west-2 \ No newline at end of file diff --git a/packages/server/src/commands/start.ts b/packages/server/src/commands/start.ts index d884e317d..6a58f9223 100644 --- a/packages/server/src/commands/start.ts +++ b/packages/server/src/commands/start.ts @@ -46,7 +46,12 @@ export default class Start extends Command { LANGCHAIN_API_KEY: Flags.string(), LANGCHAIN_PROJECT: Flags.string(), DISABLE_FLOWISE_TELEMETRY: Flags.string(), - MODEL_LIST_CONFIG_JSON: Flags.string() + MODEL_LIST_CONFIG_JSON: Flags.string(), + STORAGE_TYPE: Flags.string(), + S3_STORAGE_BUCKET_NAME: Flags.string(), + S3_STORAGE_ACCESS_KEY_ID: Flags.string(), + S3_STORAGE_SECRET_ACCESS_KEY: Flags.string(), + S3_STORAGE_REGION: Flags.string() } async stopProcess() { @@ -94,10 +99,7 @@ export default class Start extends Command { if (flags.FLOWISE_PASSWORD) process.env.FLOWISE_PASSWORD = flags.FLOWISE_PASSWORD if (flags.APIKEY_PATH) process.env.APIKEY_PATH = flags.APIKEY_PATH - // Storage - if (flags.BLOB_STORAGE_PATH) process.env.BLOB_STORAGE_PATH = flags.BLOB_STORAGE_PATH - - //API Configuration + // API Configuration if (flags.FLOWISE_FILE_SIZE_LIMIT) process.env.FLOWISE_FILE_SIZE_LIMIT = flags.FLOWISE_FILE_SIZE_LIMIT // Credentials @@ -138,6 +140,14 @@ export default class Start extends Command { // Model list config if (flags.MODEL_LIST_CONFIG_JSON) process.env.MODEL_LIST_CONFIG_JSON = flags.MODEL_LIST_CONFIG_JSON + // Storage + if (flags.STORAGE_TYPE) process.env.STORAGE_TYPE = flags.STORAGE_TYPE + if (flags.BLOB_STORAGE_PATH) process.env.BLOB_STORAGE_PATH = flags.BLOB_STORAGE_PATH + if (flags.S3_STORAGE_BUCKET_NAME) process.env.S3_STORAGE_BUCKET_NAME = flags.S3_STORAGE_BUCKET_NAME + if (flags.S3_STORAGE_ACCESS_KEY_ID) process.env.S3_STORAGE_ACCESS_KEY_ID = flags.S3_STORAGE_ACCESS_KEY_ID + if (flags.S3_STORAGE_SECRET_ACCESS_KEY) process.env.S3_STORAGE_SECRET_ACCESS_KEY = flags.S3_STORAGE_SECRET_ACCESS_KEY + if (flags.S3_STORAGE_REGION) process.env.S3_STORAGE_REGION = flags.S3_STORAGE_REGION + await (async () => { try { logger.info('Starting Flowise...') diff --git a/packages/server/src/controllers/get-upload-file/index.ts b/packages/server/src/controllers/get-upload-file/index.ts index 773f08add..a33b73e0a 100644 --- a/packages/server/src/controllers/get-upload-file/index.ts +++ b/packages/server/src/controllers/get-upload-file/index.ts @@ -1,10 +1,11 @@ import { Request, Response, NextFunction } from 'express' -import path from 'path' +import fs from 'fs' import contentDisposition from 'content-disposition' -import { getStoragePath } from 'flowise-components' -import * as fs from 'fs' +import { streamStorageFile } from 'flowise-components' +import { StatusCodes } from 'http-status-codes' +import { InternalFlowiseError } from '../../errors/internalFlowiseError' -const streamUploadedImage = async (req: Request, res: Response, next: NextFunction) => { +const streamUploadedFile = async (req: Request, res: Response, next: NextFunction) => { try { if (!req.query.chatflowId || !req.query.chatId || !req.query.fileName) { return res.status(500).send(`Invalid file path`) @@ -12,20 +13,15 @@ const streamUploadedImage = async (req: Request, res: Response, next: NextFuncti const chatflowId = req.query.chatflowId as string const chatId = req.query.chatId as string const fileName = req.query.fileName as string - const filePath = path.join(getStoragePath(), chatflowId, chatId, fileName) - //raise error if file path is not absolute - if (!path.isAbsolute(filePath)) return res.status(500).send(`Invalid file path`) - //raise error if file path contains '..' - if (filePath.includes('..')) return res.status(500).send(`Invalid file path`) - //only return from the storage folder - if (!filePath.startsWith(getStoragePath())) return res.status(500).send(`Invalid file path`) + res.setHeader('Content-Disposition', contentDisposition(fileName)) + const fileStream = await streamStorageFile(chatflowId, chatId, fileName) - if (fs.existsSync(filePath)) { - res.setHeader('Content-Disposition', contentDisposition(path.basename(filePath))) - const fileStream = fs.createReadStream(filePath) + if (!fileStream) throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, `Error: streamStorageFile`) + + if (fileStream instanceof fs.ReadStream && fileStream?.pipe) { fileStream.pipe(res) } else { - return res.status(404).send(`File ${fileName} not found`) + res.send(fileStream) } } catch (error) { next(error) @@ -33,5 +29,5 @@ const streamUploadedImage = async (req: Request, res: Response, next: NextFuncti } export default { - streamUploadedImage + streamUploadedFile } diff --git a/packages/server/src/routes/get-upload-file/index.ts b/packages/server/src/routes/get-upload-file/index.ts index cb871ed30..319c73ec2 100644 --- a/packages/server/src/routes/get-upload-file/index.ts +++ b/packages/server/src/routes/get-upload-file/index.ts @@ -3,6 +3,6 @@ import getUploadFileController from '../../controllers/get-upload-file' const router = express.Router() // READ -router.get('/', getUploadFileController.streamUploadedImage) +router.get('/', getUploadFileController.streamUploadedFile) export default router diff --git a/packages/server/src/services/chat-messages/index.ts b/packages/server/src/services/chat-messages/index.ts index e22d7ff90..8782b9bee 100644 --- a/packages/server/src/services/chat-messages/index.ts +++ b/packages/server/src/services/chat-messages/index.ts @@ -1,13 +1,11 @@ import { FindOptionsWhere } from 'typeorm' -import path from 'path' import { StatusCodes } from 'http-status-codes' import { chatType, IChatMessage } from '../../Interface' import { utilGetChatMessage } from '../../utils/getChatMessage' import { utilAddChatMessage } from '../../utils/addChatMesage' import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { ChatMessageFeedback } from '../../database/entities/ChatMessageFeedback' -import { getStoragePath } from 'flowise-components' -import { deleteFolderRecursive } from '../../utils' +import { removeFilesFromStorage } from 'flowise-components' import logger from '../../utils/logger' import { ChatMessage } from '../../database/entities/ChatMessage' import { InternalFlowiseError } from '../../errors/internalFlowiseError' @@ -100,15 +98,14 @@ const removeAllChatMessages = async (chatId: string, chatflowid: string, deleteO try { const appServer = getRunningExpressApp() - // remove all related feedback records + // Remove all related feedback records const feedbackDeleteOptions: FindOptionsWhere = { chatId } await appServer.AppDataSource.getRepository(ChatMessageFeedback).delete(feedbackDeleteOptions) // Delete all uploads corresponding to this chatflow/chatId if (chatId) { try { - const directory = path.join(getStoragePath(), chatflowid, chatId) - deleteFolderRecursive(directory) + await removeFilesFromStorage(chatflowid, chatId) } catch (e) { logger.error(`[server]: Error deleting file storage for chatflow ${chatflowid}, chatId ${chatId}: ${e}`) } diff --git a/packages/server/src/services/chatflows/index.ts b/packages/server/src/services/chatflows/index.ts index 03c801f0e..3171fd6ca 100644 --- a/packages/server/src/services/chatflows/index.ts +++ b/packages/server/src/services/chatflows/index.ts @@ -1,19 +1,11 @@ -import path from 'path' import { StatusCodes } from 'http-status-codes' import { InternalFlowiseError } from '../../errors/internalFlowiseError' import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { IChatFlow } from '../../Interface' import { ChatFlow } from '../../database/entities/ChatFlow' -import { - getAppVersion, - getTelemetryFlowObj, - deleteFolderRecursive, - isFlowValidForStream, - constructGraphs, - getEndingNodes -} from '../../utils' +import { getAppVersion, getTelemetryFlowObj, isFlowValidForStream, constructGraphs, getEndingNodes } from '../../utils' import logger from '../../utils/logger' -import { getStoragePath } from 'flowise-components' +import { removeFolderFromStorage } from 'flowise-components' import { IReactFlowObject } from '../../Interface' import { utilGetUploadsConfig } from '../../utils/getUploadsConfig' import { ChatMessage } from '../../database/entities/ChatMessage' @@ -83,8 +75,7 @@ const deleteChatflow = async (chatflowId: string): Promise => { const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).delete({ id: chatflowId }) try { // Delete all uploads corresponding to this chatflow - const directory = path.join(getStoragePath(), chatflowId) - deleteFolderRecursive(directory) + await removeFolderFromStorage(chatflowId) // Delete all chat messages await appServer.AppDataSource.getRepository(ChatMessage).delete({ chatflowid: chatflowId }) @@ -174,7 +165,7 @@ const saveChatflow = async (newChatFlow: ChatFlow): Promise => { const step1Results = await appServer.AppDataSource.getRepository(ChatFlow).save(chatflow) // step 2 - convert base64 to file paths and update the chatflow - step1Results.flowData = updateFlowDataWithFilePaths(step1Results.id, incomingFlowData) + step1Results.flowData = await updateFlowDataWithFilePaths(step1Results.id, incomingFlowData) dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(step1Results) } else { const chatflow = appServer.AppDataSource.getRepository(ChatFlow).create(newChatFlow) @@ -198,7 +189,7 @@ const updateChatflow = async (chatflow: ChatFlow, updateChatFlow: ChatFlow): Pro try { const appServer = getRunningExpressApp() if (updateChatFlow.flowData && containsBase64File(updateChatFlow)) { - updateChatFlow.flowData = updateFlowDataWithFilePaths(chatflow.id, updateChatFlow.flowData) + updateChatFlow.flowData = await updateFlowDataWithFilePaths(chatflow.id, updateChatFlow.flowData) } const newDbChatflow = appServer.AppDataSource.getRepository(ChatFlow).merge(chatflow, updateChatFlow) const dbResponse = await appServer.AppDataSource.getRepository(ChatFlow).save(newDbChatflow) diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index 3bedbd4b0..71e393aef 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -1,8 +1,7 @@ import { Request } from 'express' -import { IFileUpload, getStoragePath, convertSpeechToText, ICommonObject } from 'flowise-components' +import { IFileUpload, convertSpeechToText, ICommonObject, addFileToStorage } from 'flowise-components' import { StatusCodes } from 'http-status-codes' import { IncomingInput, IMessage, INodeData, IReactFlowObject, IReactFlowNode, IDepthQueue, chatType, IChatMessage } from '../Interface' -import path from 'path' import { InternalFlowiseError } from '../errors/internalFlowiseError' import { ChatFlow } from '../database/entities/ChatFlow' import { Server } from 'socket.io' @@ -69,17 +68,12 @@ export const utilBuildChatflow = async (req: Request, socketIO?: Server, isInter if ((upload.type === 'file' || upload.type === 'audio') && upload.data) { const filename = upload.name - const dir = path.join(getStoragePath(), chatflowid, chatId) - if (!fs.existsSync(dir)) { - fs.mkdirSync(dir, { recursive: true }) - } - const filePath = path.join(dir, filename) const splitDataURI = upload.data.split(',') const bf = Buffer.from(splitDataURI.pop() || '', 'base64') - fs.writeFileSync(filePath, bf) - - // Omit upload.data since we don't store the content in database + const mime = splitDataURI[0].split(':')[1].split(';')[0] + await addFileToStorage(mime, bf, filename, chatflowid, chatId) upload.type = 'stored-file' + // Omit upload.data since we don't store the content in database fileUploads[i] = omit(upload, ['data']) } diff --git a/packages/server/src/utils/fileRepository.ts b/packages/server/src/utils/fileRepository.ts index ab40ab76f..1147aeb98 100644 --- a/packages/server/src/utils/fileRepository.ts +++ b/packages/server/src/utils/fileRepository.ts @@ -1,8 +1,6 @@ import { ChatFlow } from '../database/entities/ChatFlow' -import path from 'path' -import { getStoragePath } from 'flowise-components' -import fs from 'fs' import { IReactFlowObject } from '../Interface' +import { addBase64FilesToStorage } from 'flowise-components' export const containsBase64File = (chatflow: ChatFlow) => { const parsedFlowData: IReactFlowObject = JSON.parse(chatflow.flowData) @@ -48,23 +46,7 @@ export const containsBase64File = (chatflow: ChatFlow) => { return found } -function addFileToStorage(file: string, chatflowid: string, fileNames: string[]) { - const dir = path.join(getStoragePath(), chatflowid) - if (!fs.existsSync(dir)) { - fs.mkdirSync(dir, { recursive: true }) - } - - const splitDataURI = file.split(',') - const filename = splitDataURI.pop()?.split(':')[1] ?? '' - const bf = Buffer.from(splitDataURI.pop() || '', 'base64') - - const filePath = path.join(dir, filename) - fs.writeFileSync(filePath, bf) - fileNames.push(filename) - return 'FILE-STORAGE::' + JSON.stringify(fileNames) -} - -export const updateFlowDataWithFilePaths = (chatflowid: string, flowData: string) => { +export const updateFlowDataWithFilePaths = async (chatflowid: string, flowData: string) => { try { const parsedFlowData: IReactFlowObject = JSON.parse(flowData) const re = new RegExp('^data.*;base64', 'i') @@ -93,14 +75,14 @@ export const updateFlowDataWithFilePaths = (chatflowid: string, flowData: string for (let j = 0; j < files.length; j++) { const file = files[j] if (re.test(file)) { - node.data.inputs[key] = addFileToStorage(file, chatflowid, fileNames) + node.data.inputs[key] = await addBase64FilesToStorage(file, chatflowid, fileNames) } } } catch (e) { continue } } else if (re.test(input)) { - node.data.inputs[key] = addFileToStorage(input, chatflowid, fileNames) + node.data.inputs[key] = await addBase64FilesToStorage(input, chatflowid, fileNames) } } } diff --git a/packages/server/src/utils/index.ts b/packages/server/src/utils/index.ts index 324ae4418..434d147f1 100644 --- a/packages/server/src/utils/index.ts +++ b/packages/server/src/utils/index.ts @@ -1324,34 +1324,6 @@ export const getAllValuesFromJson = (obj: any): any[] => { return values } -/** - * Delete file & folder recursively - * @param {string} directory - */ -export const deleteFolderRecursive = (directory: string) => { - if (fs.existsSync(directory)) { - fs.readdir(directory, (error, files) => { - if (error) throw new Error('Could not read directory') - - files.forEach((file) => { - const file_path = path.join(directory, file) - - fs.stat(file_path, (error, stat) => { - if (error) throw new Error('File do not exist') - - if (!stat.isDirectory()) { - fs.unlink(file_path, (error) => { - if (error) throw new Error('Could not delete file') - }) - } else { - deleteFolderRecursive(file_path) - } - }) - }) - }) - } -} - /** * Get only essential flow data items for telemetry * @param {IReactFlowNode[]} nodes diff --git a/packages/ui/src/views/chatmessage/ChatMessage.jsx b/packages/ui/src/views/chatmessage/ChatMessage.jsx index 988114848..6a30feb8c 100644 --- a/packages/ui/src/views/chatmessage/ChatMessage.jsx +++ b/packages/ui/src/views/chatmessage/ChatMessage.jsx @@ -257,7 +257,7 @@ export const ChatMessage = ({ open, chatflowid, isDialog, previews, setPreviews data: base64data, preview: audioUploadSVG, type: 'audio', - name: 'audio.wav', + name: `audio_${Date.now()}.wav`, mime: mimeType } setPreviews((prevPreviews) => [...prevPreviews, upload])