removing child mode

This commit is contained in:
Henry 2023-08-11 19:06:05 +01:00
parent a8cd31bf26
commit 9daaef7583
11 changed files with 178 additions and 539 deletions

View File

@ -1,22 +1,22 @@
<!-- markdownlint-disable MD030 -->
# 贡献给Flowise
# 贡献给 Flowise
[English](<./CONTRIBUTING.md>) | 中文
[English](./CONTRIBUTING.md) | 中文
我们欢迎任何形式的贡献。
## ⭐ 点赞
点赞并分享[Github仓库](https://github.com/FlowiseAI/Flowise)。
点赞并分享[Github 仓库](https://github.com/FlowiseAI/Flowise)。
## 🙋 问题和回答
在[问题和回答](https://github.com/FlowiseAI/Flowise/discussions/categories/q-a)部分搜索任何问题,如果找不到,可以毫不犹豫地创建一个。这可能会帮助到其他有类似问题的人。
## 🙌 分享Chatflow
## 🙌 分享 Chatflow
是的分享你如何使用Flowise是一种贡献方式。将你的Chatflow导出为JSON附上截图并在[展示和分享](https://github.com/FlowiseAI/Flowise/discussions/categories/show-and-tell)部分分享。
是的!分享你如何使用 Flowise 是一种贡献方式。将你的 Chatflow 导出为 JSON附上截图并在[展示和分享](https://github.com/FlowiseAI/Flowise/discussions/categories/show-and-tell)部分分享。
## 💡 想法
@ -30,75 +30,75 @@
不确定要贡献什么?一些想法:
- 从Langchain创建新组件
- 更新现有组件,如扩展功能、修复错误
- 添加新的Chatflow想法
- Langchain 创建新组件
- 更新现有组件,如扩展功能、修复错误
- 添加新的 Chatflow 想法
### 开发人员
Flowise在一个单一的单体存储库中有3个不同的模块。
Flowise 在一个单一的单体存储库中有 3 个不同的模块。
- `server`用于提供API逻辑的Node后端
- `ui`React前端
- `components`Langchain组件
- `server`:用于提供 API 逻辑的 Node 后端
- `ui`React 前端
- `components`Langchain 组件
#### 先决条件
- 安装 [Yarn v1](https://classic.yarnpkg.com/en/docs/install)
```bash
npm i -g yarn
```
- 安装 [Yarn v1](https://classic.yarnpkg.com/en/docs/install)
```bash
npm i -g yarn
```
#### 逐步指南
1. Fork官方的[Flowise Github 仓库](https://github.com/FlowiseAI/Flowise)。
1. Fork 官方的[Flowise Github 仓库](https://github.com/FlowiseAI/Flowise)。
2. 克隆你fork的存储库。
2. 克隆你 fork 的存储库。
3. 创建一个新的分支,参考[指南](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-and-deleting-branches-within-your-repository)。命名约定:
- 对于功能分支:`feature/<你的新功能>`
- 对于bug修复分支`bugfix/<你的新bug修复>`。
- 对于功能分支:`feature/<你的新功能>`
- 对于 bug 修复分支:`bugfix/<你的新bug修复>`。
4. 切换到新创建的分支。
5. 进入存储库文件夹
```bash
cd Flowise
```
```bash
cd Flowise
```
6. 安装所有模块的依赖项:
```bash
yarn install
```
```bash
yarn install
```
7. 构建所有代码:
```bash
yarn build
```
```bash
yarn build
```
8. 在[http://localhost:3000](http://localhost:3000)上启动应用程序
```bash
yarn start
```
```bash
yarn start
```
9. 开发时:
- 在`packages/ui`中创建`.env`文件并指定`PORT`(参考`.env.example`
- 在`packages/server`中创建`.env`文件并指定`PORT`(参考`.env.example`
- 运行
- 在`packages/ui`中创建`.env`文件并指定`PORT`(参考`.env.example`
- 在`packages/server`中创建`.env`文件并指定`PORT`(参考`.env.example`
- 运行
```bash
yarn dev
```
```bash
yarn dev
```
对`packages/ui`或`packages/server`进行的任何更改都将反映在[http://localhost:8080](http://localhost:8080)上
对`packages/ui`或`packages/server`进行的任何更改都将反映在[http://localhost:8080](http://localhost:8080)上
对于`packages/components`中进行的更改,再次运行`yarn build`以应用更改。
对于`packages/components`中进行的更改,再次运行`yarn build`以应用更改。
10. 做完所有的更改后,运行以下命令来确保在生产环境中一切正常:
@ -118,26 +118,25 @@ Flowise在一个单一的单体存储库中有3个不同的模块。
Flowise 支持不同的环境变量来配置您的实例。您可以在 `packages/server` 文件夹中的 `.env` 文件中指定以下变量。阅读[更多信息](https://docs.flowiseai.com/environment-variables)
| 变量名 | 描述 | 类型 | 默认值 |
| -------------------------- | ------------------------------------------------------------ | ------------------------------------------------- | ----------------------------------- |
| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 |
| FLOWISE_USERNAME | 登录用户名 | 字符串 | |
| FLOWISE_PASSWORD | 登录密码 | 字符串 | |
| DEBUG | 打印组件的日志 | 布尔值 | |
| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` |
| LOG_LEVEL | 日志的不同级别 | 枚举字符串: `error`, `info`, `verbose`, `debug` | `info` |
| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` |
| EXECUTION_MODE | 预测是否在独立进程中运行还是在主进程中运行 | 枚举字符串: `child`, `main` | `main` |
| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | |
| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | |
| OVERRIDE_DATABASE | 是否使用默认值覆盖当前数据库 | 枚举字符串: `true`, `false` | `true` |
| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串: `sqlite`, `mysql`, `postgres` | `sqlite` |
| DATABASE_PATH | 数据库保存的位置(当 DATABASE_TYPE 是 sqlite 时) | 字符串 | `your-home-dir/.flowise` |
| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| 变量名 | 描述 | 类型 | 默认值 |
| -------------------------- | ------------------------------------------------------ | ----------------------------------------------- | ----------------------------------- |
| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 |
| FLOWISE_USERNAME | 登录用户名 | 字符串 | |
| FLOWISE_PASSWORD | 登录密码 | 字符串 | |
| DEBUG | 打印组件的日志 | 布尔值 | |
| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` |
| LOG_LEVEL | 日志的不同级别 | 枚举字符串: `error`, `info`, `verbose`, `debug` | `info` |
| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` |
| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | |
| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | |
| OVERRIDE_DATABASE | 是否使用默认值覆盖当前数据库 | 枚举字符串: `true`, `false` | `true` |
| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串: `sqlite`, `mysql`, `postgres` | `sqlite` |
| DATABASE_PATH | 数据库保存的位置(当 DATABASE_TYPE 是 sqlite 时) | 字符串 | `your-home-dir/.flowise` |
| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不是 sqlite 时) | 字符串 | |
您也可以在使用 `npx` 时指定环境变量。例如:
@ -153,4 +152,4 @@ npx flowise start --PORT=3000 --DEBUG=true
当您打开一个 Pull Request 时FlowiseAI 团队的成员将自动收到通知/指派。您也可以在 [Discord](https://discord.gg/jbaHfsRVBW) 上联系我们。
##
##

View File

@ -2,7 +2,7 @@
# Contributing to Flowise
English | [中文](<./CONTRIBUTING-ZH.md>)
English | [中文](./CONTRIBUTING-ZH.md)
We appreciate any form of contributions.
@ -129,7 +129,6 @@ Flowise support different environment variables to configure your instance. You
| LOG_PATH | Location where log files are stored | String | `your-path/Flowise/logs` |
| LOG_LEVEL | Different levels of logs | Enum String: `error`, `info`, `verbose`, `debug` | `info` |
| APIKEY_PATH | Location where api keys are saved | String | `your-path/Flowise/packages/server` |
| EXECUTION_MODE | Whether predictions run in their own process or the main process | Enum String: `child`, `main` | `main` |
| TOOL_FUNCTION_BUILTIN_DEP | NodeJS built-in modules to be used for Tool Function | String | |
| TOOL_FUNCTION_EXTERNAL_DEP | External modules to be used for Tool Function | String | |
| OVERRIDE_DATABASE | Override current database with default | Enum String: `true`, `false` | `true` |

View File

@ -17,7 +17,6 @@ LOG_PATH=/root/.flowise/logs
# FLOWISE_PASSWORD=1234
# DEBUG=true
# LOG_LEVEL=debug (error | warn | info | verbose | debug)
# EXECUTION_MODE=main (child | main)
# TOOL_FUNCTION_BUILTIN_DEP=crypto,fs
# TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash

View File

@ -15,7 +15,6 @@ services:
- SECRETKEY_PATH=${SECRETKEY_PATH}
- LOG_LEVEL=${LOG_LEVEL}
- LOG_PATH=${LOG_PATH}
- EXECUTION_MODE=${EXECUTION_MODE}
ports:
- '${PORT}:${PORT}'
volumes:

View File

@ -17,7 +17,6 @@ PASSPHRASE=MYPASSPHRASE # Passphrase used to create encryption key
# FLOWISE_PASSWORD=1234
# DEBUG=true
# LOG_LEVEL=debug (error | warn | info | verbose | debug)
# EXECUTION_MODE=main (child | main)
# TOOL_FUNCTION_BUILTIN_DEP=crypto,fs
# TOOL_FUNCTION_EXTERNAL_DEP=moment,lodash

View File

@ -1,20 +1,20 @@
<!-- markdownlint-disable MD030 -->
# Flowise - 低代码LLM应用程序构建器
# Flowise - 低代码 LLM 应用程序构建器
[English](<./README.md>) | 中文
[English](./README.md) | 中文
![Flowise](https://github.com/FlowiseAI/Flowise/blob/main/images/flowise.gif?raw=true)
拖放界面来构建自定义的LLM流程
拖放界面来构建自定义的 LLM 流程
## ⚡快速入门
## ⚡ 快速入门
1. 安装Flowise
1. 安装 Flowise
```bash
npm install -g flowise
```
2. 启动Flowise
2. 启动 Flowise
```bash
npx flowise start
@ -33,28 +33,27 @@ FLOWISE_PASSWORD=1234
## 🌱 环境变量
Flowise支持不同的环境变量来配置您的实例。您可以在`packages/server`文件夹中的`.env`文件中指定以下变量。阅读[更多](https://docs.flowiseai.com/environment-variables)
Flowise 支持不同的环境变量来配置您的实例。您可以在`packages/server`文件夹中的`.env`文件中指定以下变量。阅读[更多](https://docs.flowiseai.com/environment-variables)
| 变量 | 描述 | 类型 | 默认值 |
| ---------------- | ---------------------------------------------------------------- | ------------------------------------------------ | ----------------------------------- |
| PORT | Flowise运行的HTTP端口 | 数字 | 3000 |
| FLOWISE_USERNAME | 登录的用户名 | 字符串 | |
| FLOWISE_PASSWORD | 登录的密码 | 字符串 | |
| DEBUG | 打印组件的日志 | 布尔值 | |
| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` |
| LOG_LEVEL | 日志的不同级别 | 枚举字符串:`error`、`info`、`verbose`、`debug` | `info` |
| APIKEY_PATH | 存储API密钥的位置 | 字符串 | `your-path/Flowise/packages/server` |
| EXECUTION_MODE | 预测是在其自己的进程中运行还是在主进程中运行 | 枚举字符串:`child`、`main` | `main` |
| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的NodeJS内置模块 | 字符串 | |
| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | |
| OVERRIDE_DATABASE | 使用默认值覆盖当前数据库 | 枚举字符串:`true`、`false` | `true` |
| DATABASE_TYPE | 存储flowise数据的数据库类型 | 枚举字符串:`sqlite`、`mysql`、`postgres` | `sqlite` |
| DATABASE_PATH | 数据库的保存位置当DATABASE_TYPE为sqlite时 | 字符串 | `your-home-dir/.flowise` |
| DATABASE_HOST | 主机URL或IP地址当DATABASE_TYPE不为sqlite时 | 字符串 | |
| DATABASE_PORT | 数据库端口当DATABASE_TYPE不为sqlite时 | 字符串 | |
| DATABASE_USERNAME | 数据库用户名当DATABASE_TYPE不为sqlite时 | 字符串 | |
| DATABASE_PASSWORD | 数据库密码当DATABASE_TYPE不为sqlite时 | 字符串 | |
| DATABASE_NAME | 数据库名称当DATABASE_TYPE不为sqlite时 | 字符串 | |
| 变量 | 描述 | 类型 | 默认值 |
| -------------------------- | ------------------------------------------------------ | ----------------------------------------------- | ----------------------------------- |
| PORT | Flowise 运行的 HTTP 端口 | 数字 | 3000 |
| FLOWISE_USERNAME | 登录的用户名 | 字符串 | |
| FLOWISE_PASSWORD | 登录的密码 | 字符串 | |
| DEBUG | 打印组件的日志 | 布尔值 | |
| LOG_PATH | 存储日志文件的位置 | 字符串 | `your-path/Flowise/logs` |
| LOG_LEVEL | 日志的不同级别 | 枚举字符串:`error`、`info`、`verbose`、`debug` | `info` |
| APIKEY_PATH | 存储 API 密钥的位置 | 字符串 | `your-path/Flowise/packages/server` |
| TOOL_FUNCTION_BUILTIN_DEP | 用于工具函数的 NodeJS 内置模块 | 字符串 | |
| TOOL_FUNCTION_EXTERNAL_DEP | 用于工具函数的外部模块 | 字符串 | |
| OVERRIDE_DATABASE | 使用默认值覆盖当前数据库 | 枚举字符串:`true`、`false` | `true` |
| DATABASE_TYPE | 存储 flowise 数据的数据库类型 | 枚举字符串:`sqlite`、`mysql`、`postgres` | `sqlite` |
| DATABASE_PATH | 数据库的保存位置(当 DATABASE_TYPE 为 sqlite 时) | 字符串 | `your-home-dir/.flowise` |
| DATABASE_HOST | 主机 URL 或 IP 地址(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
| DATABASE_PORT | 数据库端口(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
| DATABASE_USERNAME | 数据库用户名(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
| DATABASE_PASSWORD | 数据库密码(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
| DATABASE_NAME | 数据库名称(当 DATABASE_TYPE 不为 sqlite 时) | 字符串 | |
您还可以在使用`npx`时指定环境变量。例如:
@ -64,7 +63,7 @@ npx flowise start --PORT=3000 --DEBUG=true
## 📖 文档
[Flowise文档](https://docs.flowiseai.com/)
[Flowise 文档](https://docs.flowiseai.com/)
## 🌐 自托管
@ -98,4 +97,4 @@ npx flowise start --PORT=3000 --DEBUG=true
## 📄 许可证
本仓库中的源代码在[MIT许可证](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md)下提供。
本仓库中的源代码在[MIT 许可证](https://github.com/FlowiseAI/Flowise/blob/master/LICENSE.md)下提供。

View File

@ -1,253 +0,0 @@
import path from 'path'
import { IChildProcessMessage, IReactFlowNode, IReactFlowObject, IRunChatflowMessageValue, INodeData } from './Interface'
import {
buildLangchain,
checkMemorySessionId,
constructGraphs,
getEndingNode,
getStartingNodes,
getUserHome,
replaceInputsWithConfig,
resolveVariables,
databaseEntities
} from './utils'
import { DataSource } from 'typeorm'
import { ChatFlow } from './entity/ChatFlow'
import { ChatMessage } from './entity/ChatMessage'
import { Tool } from './entity/Tool'
import { Credential } from './entity/Credential'
import logger from './utils/logger'
export class ChildProcess {
/**
* Stop child process when app is killed
*/
static async stopChildProcess() {
setTimeout(() => {
process.exit(0)
}, 50000)
}
/**
* Process prediction
* @param {IRunChatflowMessageValue} messageValue
* @return {Promise<void>}
*/
async runChildProcess(messageValue: IRunChatflowMessageValue): Promise<void> {
process.on('SIGTERM', ChildProcess.stopChildProcess)
process.on('SIGINT', ChildProcess.stopChildProcess)
await sendToParentProcess('start', '_')
try {
const childAppDataSource = await initDB()
// Create a Queue and add our initial node in it
const { endingNodeData, chatflow, chatId, incomingInput, componentNodes } = messageValue
let nodeToExecuteData: INodeData
let addToChatFlowPool: any = {}
/* Don't rebuild the flow (to avoid duplicated upsert, recomputation) when all these conditions met:
* - Node Data already exists in pool
* - Still in sync (i.e the flow has not been modified since)
* - Existing overrideConfig and new overrideConfig are the same
* - Flow doesn't start with nodes that depend on incomingInput.question
***/
if (endingNodeData) {
nodeToExecuteData = endingNodeData
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) {
await sendToParentProcess('error', `Ending node ${endingNodeId} not found`)
return
}
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) {
await sendToParentProcess('error', `Ending node ${endingNodeId} data not found`)
return
}
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
await sendToParentProcess('error', `Ending node must be either a Chain or Agent`)
return
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
await sendToParentProcess(
'error',
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
return
}
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
logger.debug(`[server] [mode:child]: Start building chatflow ${chatflow.id}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
componentNodes,
incomingInput.question,
chatId,
childAppDataSource,
incomingInput?.overrideConfig
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) {
await sendToParentProcess('error', `Node ${endingNodeId} not found`)
return
}
if (incomingInput.overrideConfig)
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
addToChatFlowPool = {
chatflowid: chatflow.id,
nodeToExecuteData,
startingNodes,
overrideConfig: incomingInput?.overrideConfig
}
}
const nodeInstanceFilePath = componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
logger.debug(`[server] [mode:child]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId)
const result = await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
appDataSource: childAppDataSource,
databaseEntities
})
logger.debug(`[server] [mode:child]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
await sendToParentProcess('finish', { result, addToChatFlowPool })
} catch (e: any) {
await sendToParentProcess('error', e.message)
logger.error('[server] [mode:child]: Error:', e)
}
}
}
/**
* Initialize DB in child process
* @returns {DataSource}
*/
async function initDB() {
let childAppDataSource
let homePath
const synchronize = process.env.OVERRIDE_DATABASE === 'false' ? false : true
switch (process.env.DATABASE_TYPE) {
case 'sqlite':
homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise')
childAppDataSource = new DataSource({
type: 'sqlite',
database: path.resolve(homePath, 'database.sqlite'),
synchronize,
entities: [ChatFlow, ChatMessage, Tool, Credential],
migrations: []
})
break
case 'mysql':
childAppDataSource = new DataSource({
type: 'mysql',
host: process.env.DATABASE_HOST,
port: parseInt(process.env.DATABASE_PORT || '3306'),
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
database: process.env.DATABASE_NAME,
charset: 'utf8mb4',
synchronize,
entities: [ChatFlow, ChatMessage, Tool, Credential],
migrations: []
})
break
case 'postgres':
childAppDataSource = new DataSource({
type: 'postgres',
host: process.env.DATABASE_HOST,
port: parseInt(process.env.DATABASE_PORT || '5432'),
username: process.env.DATABASE_USER,
password: process.env.DATABASE_PASSWORD,
database: process.env.DATABASE_NAME,
synchronize,
entities: [ChatFlow, ChatMessage, Tool, Credential],
migrations: []
})
break
default:
homePath = process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise')
childAppDataSource = new DataSource({
type: 'sqlite',
database: path.resolve(homePath, 'database.sqlite'),
synchronize,
entities: [ChatFlow, ChatMessage, Tool, Credential],
migrations: []
})
break
}
return await childAppDataSource.initialize()
}
/**
* Send data back to parent process
* @param {string} key Key of message
* @param {*} value Value of message
* @returns {Promise<void>}
*/
async function sendToParentProcess(key: string, value: any): Promise<void> {
// tslint:disable-line:no-any
return new Promise((resolve, reject) => {
process.send!(
{
key,
value
},
(error: Error) => {
if (error) {
return reject(error)
}
resolve()
}
)
})
}
const childProcess = new ChildProcess()
process.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'start') {
await childProcess.runChildProcess(message.value)
process.exit()
}
})

View File

@ -169,19 +169,6 @@ export interface IDatabaseExport {
apikeys: ICommonObject[]
}
export interface IRunChatflowMessageValue {
chatflow: IChatFlow
chatId: string
incomingInput: IncomingInput
componentNodes: IComponentNodes
endingNodeData?: INodeData
}
export interface IChildProcessMessage {
key: string
value?: any
}
export type ICredentialDataDecrypted = ICommonObject
// Plain credential object sent to server

View File

@ -25,7 +25,6 @@ export default class Start extends Command {
SECRETKEY_PATH: Flags.string(),
LOG_PATH: Flags.string(),
LOG_LEVEL: Flags.string(),
EXECUTION_MODE: Flags.string(),
TOOL_FUNCTION_BUILTIN_DEP: Flags.string(),
TOOL_FUNCTION_EXTERNAL_DEP: Flags.string(),
OVERRIDE_DATABASE: Flags.string(),
@ -73,7 +72,6 @@ export default class Start extends Command {
const { flags } = await this.parse(Start)
if (flags.PORT) process.env.PORT = flags.PORT
if (flags.EXECUTION_MODE) process.env.EXECUTION_MODE = flags.EXECUTION_MODE
if (flags.DEBUG) process.env.DEBUG = flags.DEBUG
// Authorization

View File

@ -16,8 +16,6 @@ import {
IReactFlowObject,
INodeData,
IDatabaseExport,
IRunChatflowMessageValue,
IChildProcessMessage,
ICredentialReturnResponse
} from './Interface'
import {
@ -57,7 +55,6 @@ import { Credential } from './entity/Credential'
import { Tool } from './entity/Tool'
import { ChatflowPool } from './ChatflowPool'
import { ICommonObject, INodeOptionsValue } from 'flowise-components'
import { fork } from 'child_process'
export class App {
app: express.Application
@ -764,68 +761,6 @@ export class App {
}
}
/**
* Start child process
* @param {ChatFlow} chatflow
* @param {IncomingInput} incomingInput
* @param {INodeData} endingNodeData
*/
async startChildProcess(chatflow: ChatFlow, chatId: string, incomingInput: IncomingInput, endingNodeData?: INodeData) {
try {
const controller = new AbortController()
const { signal } = controller
let childpath = path.join(__dirname, '..', 'dist', 'ChildProcess.js')
if (!fs.existsSync(childpath)) childpath = 'ChildProcess.ts'
const childProcess = fork(childpath, [], { signal })
const value = {
chatflow,
chatId,
incomingInput,
componentNodes: cloneDeep(this.nodesPool.componentNodes),
endingNodeData
} as IRunChatflowMessageValue
childProcess.send({ key: 'start', value } as IChildProcessMessage)
let childProcessTimeout: NodeJS.Timeout
return new Promise((resolve, reject) => {
childProcess.on('message', async (message: IChildProcessMessage) => {
if (message.key === 'finish') {
const { result, addToChatFlowPool } = message.value as ICommonObject
if (childProcessTimeout) {
clearTimeout(childProcessTimeout)
}
if (Object.keys(addToChatFlowPool).length) {
const { chatflowid, nodeToExecuteData, startingNodes, overrideConfig } = addToChatFlowPool
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, overrideConfig)
}
resolve(result)
}
if (message.key === 'start') {
if (process.env.EXECUTION_TIMEOUT) {
childProcessTimeout = setTimeout(async () => {
childProcess.kill()
resolve(undefined)
}, parseInt(process.env.EXECUTION_TIMEOUT, 10))
}
}
if (message.key === 'error') {
let errMessage = message.value as string
if (childProcessTimeout) {
clearTimeout(childProcessTimeout)
}
reject(errMessage)
}
})
})
} catch (err) {
logger.error('[server] [mode:child]: Error:', err)
}
}
/**
* Process Prediction
* @param {Request} req
@ -895,126 +830,104 @@ export class App {
)
}
if (process.env.EXECUTION_MODE === 'child') {
if (isFlowReusable()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
logger.debug(
`[server] [mode:child]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
try {
const result = await this.startChildProcess(chatflow, chatId, incomingInput, nodeToExecuteData)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
} else {
try {
const result = await this.startChildProcess(chatflow, chatId, incomingInput)
return res.json(result)
} catch (error) {
return res.status(500).send(error)
}
}
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
if (isFlowReusable()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
logger.debug(
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
} else {
/*** Get chatflows and prepare data ***/
const flowData = chatflow.flowData
const parsedFlowData: IReactFlowObject = JSON.parse(flowData)
const nodes = parsedFlowData.nodes
const edges = parsedFlowData.edges
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
if (isFlowReusable()) {
nodeToExecuteData = this.chatflowPool.activeChatflows[chatflowid].endingNodeData
isStreamValid = isFlowValidForStream(nodes, nodeToExecuteData)
logger.debug(
`[server]: Reuse existing chatflow ${chatflowid} with ending node ${nodeToExecuteData.label} (${nodeToExecuteData.id})`
)
} else {
/*** Get Ending Node with Directed Graph ***/
const { graph, nodeDependencies } = constructGraphs(nodes, edges)
const directedGraph = graph
const endingNodeId = getEndingNode(nodeDependencies, directedGraph)
if (!endingNodeId) return res.status(500).send(`Ending node ${endingNodeId} not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
const endingNodeData = nodes.find((nd) => nd.id === endingNodeId)?.data
if (!endingNodeData) return res.status(500).send(`Ending node ${endingNodeId} data not found`)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
chatId,
this.AppDataSource,
incomingInput?.overrideConfig
)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
if (incomingInput.overrideConfig)
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
if (endingNodeData && endingNodeData.category !== 'Chains' && endingNodeData.category !== 'Agents') {
return res.status(500).send(`Ending node must be either a Chain or Agent`)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
if (
endingNodeData.outputs &&
Object.keys(endingNodeData.outputs).length &&
!Object.values(endingNodeData.outputs).includes(endingNodeData.name)
) {
return res
.status(500)
.send(
`Output of ${endingNodeData.label} (${endingNodeData.id}) must be ${endingNodeData.label}, can't be an Output Prediction`
)
}
isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData)
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
isStreamValid = isFlowValidForStream(nodes, endingNodeData)
if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId)
/*** Get Starting Nodes with Non-Directed Graph ***/
const constructedObj = constructGraphs(nodes, edges, true)
const nonDirectedGraph = constructedObj.graph
const { startingNodeIds, depthQueue } = getStartingNodes(nonDirectedGraph, endingNodeId)
const result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
socketIO,
socketIOClientId: incomingInput.socketIOClientId,
logger,
appDataSource: this.AppDataSource,
databaseEntities
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
logger,
appDataSource: this.AppDataSource,
databaseEntities
})
logger.debug(`[server]: Start building chatflow ${chatflowid}`)
/*** BFS to traverse from Starting Nodes to Ending Node ***/
const reactFlowNodes = await buildLangchain(
startingNodeIds,
nodes,
graph,
depthQueue,
this.nodesPool.componentNodes,
incomingInput.question,
chatId,
this.AppDataSource,
incomingInput?.overrideConfig
)
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
return res.json(result)
const nodeToExecute = reactFlowNodes.find((node: IReactFlowNode) => node.id === endingNodeId)
if (!nodeToExecute) return res.status(404).send(`Node ${endingNodeId} not found`)
if (incomingInput.overrideConfig)
nodeToExecute.data = replaceInputsWithConfig(nodeToExecute.data, incomingInput.overrideConfig)
const reactFlowNodeData: INodeData = resolveVariables(nodeToExecute.data, reactFlowNodes, incomingInput.question)
nodeToExecuteData = reactFlowNodeData
const startingNodes = nodes.filter((nd) => startingNodeIds.includes(nd.id))
this.chatflowPool.add(chatflowid, nodeToExecuteData, startingNodes, incomingInput?.overrideConfig)
}
const nodeInstanceFilePath = this.nodesPool.componentNodes[nodeToExecuteData.name].filePath as string
const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass()
isStreamValid = isStreamValid && !isVectorStoreFaiss(nodeToExecuteData)
logger.debug(`[server]: Running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
if (nodeToExecuteData.instance) checkMemorySessionId(nodeToExecuteData.instance, chatId)
const result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
socketIO,
socketIOClientId: incomingInput.socketIOClientId,
logger,
appDataSource: this.AppDataSource,
databaseEntities
})
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatHistory: incomingInput.history,
logger,
appDataSource: this.AppDataSource,
databaseEntities
})
logger.debug(`[server]: Finished running ${nodeToExecuteData.label} (${nodeToExecuteData.id})`)
return res.json(result)
} catch (e: any) {
logger.error('[server]: Error:', e)
return res.status(500).send(e.message)

View File

@ -791,7 +791,7 @@ export const isFlowValidForStream = (reactFlowNodes: IReactFlowNode[], endingNod
isValidChainOrAgent = whitelistAgents.includes(endingNodeData.name)
}
return isChatOrLLMsExist && isValidChainOrAgent && !isVectorStoreFaiss(endingNodeData) && process.env.EXECUTION_MODE !== 'child'
return isChatOrLLMsExist && isValidChainOrAgent && !isVectorStoreFaiss(endingNodeData)
}
/**