Bugfix/Prevent streaming of chatflow tool and chain tool (#3257)

prevent streaming of chatflow tool and chain tool
This commit is contained in:
Henry Heng 2024-09-26 00:34:19 +01:00 committed by GitHub
parent 57f8169e87
commit b7cb8be7c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 40 deletions

View File

@ -14,41 +14,17 @@ export class ChainTool extends DynamicTool {
super({ super({
...rest, ...rest,
func: async (input, runManager) => { func: async (input, runManager) => {
// prevent sending SSE events of the sub-chain const childManagers = runManager?.getChild()
const sseStreamer = runManager?.handlers.find((handler) => handler instanceof CustomChainHandler)?.sseStreamer const handlers = childManagers?.handlers?.filter((handler) => !(handler instanceof CustomChainHandler)) || []
if (runManager) { if (childManagers) childManagers.handlers = handlers
const callbacks = runManager.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = undefined
}
}
}
if ((chain as any).prompt && (chain as any).prompt.promptValues) { if ((chain as any).prompt && (chain as any).prompt.promptValues) {
const promptValues = handleEscapeCharacters((chain as any).prompt.promptValues, true) const promptValues = handleEscapeCharacters((chain as any).prompt.promptValues, true)
const values = await chain.call(promptValues, childManagers)
const values = await chain.call(promptValues, runManager?.getChild())
if (runManager && sseStreamer) {
const callbacks = runManager.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = sseStreamer
}
}
}
return values?.text return values?.text
} }
const values = chain.run(input, runManager?.getChild()) const values = chain.run(input, childManagers)
if (runManager && sseStreamer) {
const callbacks = runManager.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = sseStreamer
}
}
}
return values return values
} }
}) })

View File

@ -7,7 +7,6 @@ import { StructuredTool } from '@langchain/core/tools'
import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface' import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, INodeParams } from '../../../src/Interface'
import { availableDependencies, defaultAllowBuiltInDep, getCredentialData, getCredentialParam } from '../../../src/utils' import { availableDependencies, defaultAllowBuiltInDep, getCredentialData, getCredentialParam } from '../../../src/utils'
import { v4 as uuidv4 } from 'uuid' import { v4 as uuidv4 } from 'uuid'
import { CustomChainHandler } from '../../../src'
class ChatflowTool_Tools implements INode { class ChatflowTool_Tools implements INode {
label: string label: string
@ -24,7 +23,7 @@ class ChatflowTool_Tools implements INode {
constructor() { constructor() {
this.label = 'Chatflow Tool' this.label = 'Chatflow Tool'
this.name = 'ChatflowTool' this.name = 'ChatflowTool'
this.version = 4.0 this.version = 5.0
this.type = 'ChatflowTool' this.type = 'ChatflowTool'
this.icon = 'chatflowTool.svg' this.icon = 'chatflowTool.svg'
this.category = 'Tools' this.category = 'Tools'
@ -58,6 +57,12 @@ class ChatflowTool_Tools implements INode {
placeholder: placeholder:
'State of the Union QA - useful for when you need to ask questions about the most recent state of the union address.' 'State of the Union QA - useful for when you need to ask questions about the most recent state of the union address.'
}, },
{
label: 'Return Direct',
name: 'returnDirect',
type: 'boolean',
optional: true
},
{ {
label: 'Override Config', label: 'Override Config',
name: 'overrideConfig', name: 'overrideConfig',
@ -135,6 +140,7 @@ class ChatflowTool_Tools implements INode {
const _name = nodeData.inputs?.name as string const _name = nodeData.inputs?.name as string
const description = nodeData.inputs?.description as string const description = nodeData.inputs?.description as string
const useQuestionFromChat = nodeData.inputs?.useQuestionFromChat as boolean const useQuestionFromChat = nodeData.inputs?.useQuestionFromChat as boolean
const returnDirect = nodeData.inputs?.returnDirect as boolean
const customInput = nodeData.inputs?.customInput as string const customInput = nodeData.inputs?.customInput as string
const overrideConfig = const overrideConfig =
typeof nodeData.inputs?.overrideConfig === 'string' && typeof nodeData.inputs?.overrideConfig === 'string' &&
@ -168,6 +174,7 @@ class ChatflowTool_Tools implements INode {
name, name,
baseURL, baseURL,
description, description,
returnDirect,
chatflowid: selectedChatflowId, chatflowid: selectedChatflowId,
startNewSession, startNewSession,
headers, headers,
@ -206,6 +213,7 @@ class ChatflowTool extends StructuredTool {
constructor({ constructor({
name, name,
description, description,
returnDirect,
input, input,
chatflowid, chatflowid,
startNewSession, startNewSession,
@ -215,6 +223,7 @@ class ChatflowTool extends StructuredTool {
}: { }: {
name: string name: string
description: string description: string
returnDirect: boolean
input: string input: string
chatflowid: string chatflowid: string
startNewSession: boolean startNewSession: boolean
@ -231,6 +240,7 @@ class ChatflowTool extends StructuredTool {
this.headers = headers this.headers = headers
this.chatflowid = chatflowid this.chatflowid = chatflowid
this.overrideConfig = overrideConfig this.overrideConfig = overrideConfig
this.returnDirect = returnDirect
} }
async call( async call(
@ -249,15 +259,6 @@ class ChatflowTool extends StructuredTool {
} catch (e) { } catch (e) {
throw new Error(`Received tool input did not match expected schema: ${JSON.stringify(arg)}`) throw new Error(`Received tool input did not match expected schema: ${JSON.stringify(arg)}`)
} }
// iterate over the callbacks and the sse streamer
if (config.callbacks instanceof CallbackManager) {
const callbacks = config.callbacks.handlers
for (let i = 0; i < callbacks.length; i += 1) {
if (callbacks[i] instanceof CustomChainHandler) {
;(callbacks[i] as any).sseStreamer = undefined
}
}
}
const callbackManager_ = await CallbackManager.configure( const callbackManager_ = await CallbackManager.configure(
config.callbacks, config.callbacks,
this.callbacks, this.callbacks,
@ -283,6 +284,9 @@ class ChatflowTool extends StructuredTool {
await runManager?.handleToolError(e) await runManager?.handleToolError(e)
throw e throw e
} }
if (result && typeof result !== 'string') {
result = JSON.stringify(result)
}
await runManager?.handleToolEnd(result) await runManager?.handleToolEnd(result)
return result return result
} }

View File

@ -359,6 +359,8 @@ export const utilBuildChatflow = async (req: Request, isInternal: boolean = fals
const nodeModule = await import(nodeInstanceFilePath) const nodeModule = await import(nodeInstanceFilePath)
const nodeInstance = new nodeModule.nodeClass({ sessionId }) const nodeInstance = new nodeModule.nodeClass({ sessionId })
isStreamValid = (req.body.streaming === 'true' || req.body.streaming === true) && isStreamValid
let result = isStreamValid let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatId, chatId,