Add Detailed Streaming to the Tool Agent (#4155)
* Add Detailed Streaming to the Tool Agent * lint fix --------- Co-authored-by: Henry <hzj94@hotmail.com>
This commit is contained in:
parent
cb06df4584
commit
9957184680
|
|
@ -24,7 +24,7 @@ import {
|
||||||
IUsedTool,
|
IUsedTool,
|
||||||
IVisionChatModal
|
IVisionChatModal
|
||||||
} from '../../../src/Interface'
|
} from '../../../src/Interface'
|
||||||
import { ConsoleCallbackHandler, CustomChainHandler, additionalCallbacks } from '../../../src/handler'
|
import { ConsoleCallbackHandler, CustomChainHandler, CustomStreamingHandler, additionalCallbacks } from '../../../src/handler'
|
||||||
import { AgentExecutor, ToolCallingAgentOutputParser } from '../../../src/agents'
|
import { AgentExecutor, ToolCallingAgentOutputParser } from '../../../src/agents'
|
||||||
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
|
import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation'
|
||||||
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
||||||
|
|
@ -101,6 +101,15 @@ class ToolAgent_Agents implements INode {
|
||||||
type: 'number',
|
type: 'number',
|
||||||
optional: true,
|
optional: true,
|
||||||
additionalParams: true
|
additionalParams: true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
label: 'Enable Detailed Streaming',
|
||||||
|
name: 'enableDetailedStreaming',
|
||||||
|
type: 'boolean',
|
||||||
|
default: false,
|
||||||
|
description: 'Stream detailed intermediate steps during agent execution',
|
||||||
|
optional: true,
|
||||||
|
additionalParams: true
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
this.sessionId = fields?.sessionId
|
this.sessionId = fields?.sessionId
|
||||||
|
|
@ -113,6 +122,7 @@ class ToolAgent_Agents implements INode {
|
||||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
|
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
|
||||||
const memory = nodeData.inputs?.memory as FlowiseMemory
|
const memory = nodeData.inputs?.memory as FlowiseMemory
|
||||||
const moderations = nodeData.inputs?.inputModeration as Moderation[]
|
const moderations = nodeData.inputs?.inputModeration as Moderation[]
|
||||||
|
const enableDetailedStreaming = nodeData.inputs?.enableDetailedStreaming as boolean
|
||||||
|
|
||||||
const shouldStreamResponse = options.shouldStreamResponse
|
const shouldStreamResponse = options.shouldStreamResponse
|
||||||
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
|
const sseStreamer: IServerSideEventStreamer = options.sseStreamer as IServerSideEventStreamer
|
||||||
|
|
@ -136,6 +146,13 @@ class ToolAgent_Agents implements INode {
|
||||||
const loggerHandler = new ConsoleCallbackHandler(options.logger)
|
const loggerHandler = new ConsoleCallbackHandler(options.logger)
|
||||||
const callbacks = await additionalCallbacks(nodeData, options)
|
const callbacks = await additionalCallbacks(nodeData, options)
|
||||||
|
|
||||||
|
// Add custom streaming handler if detailed streaming is enabled
|
||||||
|
let customStreamingHandler = null
|
||||||
|
|
||||||
|
if (enableDetailedStreaming && shouldStreamResponse) {
|
||||||
|
customStreamingHandler = new CustomStreamingHandler(sseStreamer, chatId)
|
||||||
|
}
|
||||||
|
|
||||||
let res: ChainValues = {}
|
let res: ChainValues = {}
|
||||||
let sourceDocuments: ICommonObject[] = []
|
let sourceDocuments: ICommonObject[] = []
|
||||||
let usedTools: IUsedTool[] = []
|
let usedTools: IUsedTool[] = []
|
||||||
|
|
@ -143,7 +160,14 @@ class ToolAgent_Agents implements INode {
|
||||||
|
|
||||||
if (shouldStreamResponse) {
|
if (shouldStreamResponse) {
|
||||||
const handler = new CustomChainHandler(sseStreamer, chatId)
|
const handler = new CustomChainHandler(sseStreamer, chatId)
|
||||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, handler, ...callbacks] })
|
const allCallbacks = [loggerHandler, handler, ...callbacks]
|
||||||
|
|
||||||
|
// Add detailed streaming handler if enabled
|
||||||
|
if (enableDetailedStreaming && customStreamingHandler) {
|
||||||
|
allCallbacks.push(customStreamingHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
res = await executor.invoke({ input }, { callbacks: allCallbacks })
|
||||||
if (res.sourceDocuments) {
|
if (res.sourceDocuments) {
|
||||||
if (sseStreamer) {
|
if (sseStreamer) {
|
||||||
sseStreamer.streamSourceDocumentsEvent(chatId, flatten(res.sourceDocuments))
|
sseStreamer.streamSourceDocumentsEvent(chatId, flatten(res.sourceDocuments))
|
||||||
|
|
@ -174,7 +198,14 @@ class ToolAgent_Agents implements INode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
|
const allCallbacks = [loggerHandler, ...callbacks]
|
||||||
|
|
||||||
|
// Add detailed streaming handler if enabled
|
||||||
|
if (enableDetailedStreaming && customStreamingHandler) {
|
||||||
|
allCallbacks.push(customStreamingHandler)
|
||||||
|
}
|
||||||
|
|
||||||
|
res = await executor.invoke({ input }, { callbacks: allCallbacks })
|
||||||
if (res.sourceDocuments) {
|
if (res.sourceDocuments) {
|
||||||
sourceDocuments = res.sourceDocuments
|
sourceDocuments = res.sourceDocuments
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import { LangWatch, LangWatchSpan, LangWatchTrace, autoconvertTypedValues } from
|
||||||
import { DataSource } from 'typeorm'
|
import { DataSource } from 'typeorm'
|
||||||
import { ChatGenerationChunk } from '@langchain/core/outputs'
|
import { ChatGenerationChunk } from '@langchain/core/outputs'
|
||||||
import { AIMessageChunk } from '@langchain/core/messages'
|
import { AIMessageChunk } from '@langchain/core/messages'
|
||||||
|
import { Serialized } from '@langchain/core/load/serializable'
|
||||||
|
|
||||||
interface AgentRun extends Run {
|
interface AgentRun extends Run {
|
||||||
actions: AgentAction[]
|
actions: AgentAction[]
|
||||||
|
|
@ -1499,3 +1500,86 @@ export class AnalyticHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Custom callback handler for streaming detailed intermediate information
|
||||||
|
* during agent execution, specifically tool invocation inputs and outputs.
|
||||||
|
*/
|
||||||
|
export class CustomStreamingHandler extends BaseCallbackHandler {
|
||||||
|
name = 'custom_streaming_handler'
|
||||||
|
|
||||||
|
private sseStreamer: IServerSideEventStreamer
|
||||||
|
private chatId: string
|
||||||
|
|
||||||
|
constructor(sseStreamer: IServerSideEventStreamer, chatId: string) {
|
||||||
|
super()
|
||||||
|
this.sseStreamer = sseStreamer
|
||||||
|
this.chatId = chatId
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle the start of a tool invocation
|
||||||
|
*/
|
||||||
|
async handleToolStart(tool: Serialized, input: string, runId: string, parentRunId?: string): Promise<void> {
|
||||||
|
if (!this.sseStreamer) return
|
||||||
|
|
||||||
|
const toolName = typeof tool === 'object' && tool.name ? tool.name : 'unknown-tool'
|
||||||
|
const toolInput = typeof input === 'string' ? input : JSON.stringify(input, null, 2)
|
||||||
|
|
||||||
|
// Stream the tool invocation details using the agent_trace event type for consistency
|
||||||
|
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
|
||||||
|
step: 'tool_start',
|
||||||
|
name: toolName,
|
||||||
|
input: toolInput,
|
||||||
|
runId,
|
||||||
|
parentRunId: parentRunId || null
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle the end of a tool invocation
|
||||||
|
*/
|
||||||
|
async handleToolEnd(output: string | object, runId: string, parentRunId?: string): Promise<void> {
|
||||||
|
if (!this.sseStreamer) return
|
||||||
|
|
||||||
|
const toolOutput = typeof output === 'string' ? output : JSON.stringify(output, null, 2)
|
||||||
|
|
||||||
|
// Stream the tool output details using the agent_trace event type for consistency
|
||||||
|
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
|
||||||
|
step: 'tool_end',
|
||||||
|
output: toolOutput,
|
||||||
|
runId,
|
||||||
|
parentRunId: parentRunId || null
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle tool errors
|
||||||
|
*/
|
||||||
|
async handleToolError(error: Error, runId: string, parentRunId?: string): Promise<void> {
|
||||||
|
if (!this.sseStreamer) return
|
||||||
|
|
||||||
|
// Stream the tool error details using the agent_trace event type for consistency
|
||||||
|
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
|
||||||
|
step: 'tool_error',
|
||||||
|
error: error.message,
|
||||||
|
runId,
|
||||||
|
parentRunId: parentRunId || null
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle agent actions
|
||||||
|
*/
|
||||||
|
async handleAgentAction(action: AgentAction, runId: string, parentRunId?: string): Promise<void> {
|
||||||
|
if (!this.sseStreamer) return
|
||||||
|
|
||||||
|
// Stream the agent action details using the agent_trace event type for consistency
|
||||||
|
this.sseStreamer.streamCustomEvent(this.chatId, 'agent_trace', {
|
||||||
|
step: 'agent_action',
|
||||||
|
action: JSON.stringify(action),
|
||||||
|
runId,
|
||||||
|
parentRunId: parentRunId || null
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue