feat/fix: Enhance ConversationalRetrievalToolAgent performance and fix bugs (#5507)
* feat: Optimize ConversationalRetrievalToolAgent performance and add rephrase model support - fix duplicate rephrasing bug - Add optional separate rephrase model - Enable query normalization on first messages - Fix returnDirect tool behavior - Add backward-compatible rephrase prompt support * fix lint errors * Fix duplicate streaming and inconsistent chat history format
This commit is contained in:
parent
da32fc7167
commit
3ad2b3a559
|
|
@ -5,7 +5,7 @@ import { RunnableSequence } from '@langchain/core/runnables'
|
||||||
import { BaseChatModel } from '@langchain/core/language_models/chat_models'
|
import { BaseChatModel } from '@langchain/core/language_models/chat_models'
|
||||||
import { ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, PromptTemplate } from '@langchain/core/prompts'
|
import { ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, PromptTemplate } from '@langchain/core/prompts'
|
||||||
import { formatToOpenAIToolMessages } from 'langchain/agents/format_scratchpad/openai_tools'
|
import { formatToOpenAIToolMessages } from 'langchain/agents/format_scratchpad/openai_tools'
|
||||||
import { getBaseClasses, transformBracesWithColon } from '../../../src/utils'
|
import { getBaseClasses, transformBracesWithColon, convertChatHistoryToText, convertBaseMessagetoIMessage } from '../../../src/utils'
|
||||||
import { type ToolsAgentStep } from 'langchain/agents/openai/output_parser'
|
import { type ToolsAgentStep } from 'langchain/agents/openai/output_parser'
|
||||||
import {
|
import {
|
||||||
FlowiseMemory,
|
FlowiseMemory,
|
||||||
|
|
@ -23,8 +23,10 @@ import { Moderation, checkInputs, streamResponse } from '../../moderation/Modera
|
||||||
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
import { formatResponse } from '../../outputparsers/OutputParserHelpers'
|
||||||
import type { Document } from '@langchain/core/documents'
|
import type { Document } from '@langchain/core/documents'
|
||||||
import { BaseRetriever } from '@langchain/core/retrievers'
|
import { BaseRetriever } from '@langchain/core/retrievers'
|
||||||
import { RESPONSE_TEMPLATE } from '../../chains/ConversationalRetrievalQAChain/prompts'
|
import { RESPONSE_TEMPLATE, REPHRASE_TEMPLATE } from '../../chains/ConversationalRetrievalQAChain/prompts'
|
||||||
import { addImagesToMessages, llmSupportsVision } from '../../../src/multiModalUtils'
|
import { addImagesToMessages, llmSupportsVision } from '../../../src/multiModalUtils'
|
||||||
|
import { StringOutputParser } from '@langchain/core/output_parsers'
|
||||||
|
import { Tool } from '@langchain/core/tools'
|
||||||
|
|
||||||
class ConversationalRetrievalToolAgent_Agents implements INode {
|
class ConversationalRetrievalToolAgent_Agents implements INode {
|
||||||
label: string
|
label: string
|
||||||
|
|
@ -42,7 +44,7 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
|
||||||
constructor(fields?: { sessionId?: string }) {
|
constructor(fields?: { sessionId?: string }) {
|
||||||
this.label = 'Conversational Retrieval Tool Agent'
|
this.label = 'Conversational Retrieval Tool Agent'
|
||||||
this.name = 'conversationalRetrievalToolAgent'
|
this.name = 'conversationalRetrievalToolAgent'
|
||||||
this.author = 'niztal(falkor)'
|
this.author = 'niztal(falkor) and nikitas-novatix'
|
||||||
this.version = 1.0
|
this.version = 1.0
|
||||||
this.type = 'AgentExecutor'
|
this.type = 'AgentExecutor'
|
||||||
this.category = 'Agents'
|
this.category = 'Agents'
|
||||||
|
|
@ -79,6 +81,26 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
|
||||||
optional: true,
|
optional: true,
|
||||||
default: RESPONSE_TEMPLATE
|
default: RESPONSE_TEMPLATE
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
label: 'Rephrase Prompt',
|
||||||
|
name: 'rephrasePrompt',
|
||||||
|
type: 'string',
|
||||||
|
description: 'Using previous chat history, rephrase question into a standalone question',
|
||||||
|
warning: 'Prompt must include input variables: {chat_history} and {question}',
|
||||||
|
rows: 4,
|
||||||
|
additionalParams: true,
|
||||||
|
optional: true,
|
||||||
|
default: REPHRASE_TEMPLATE
|
||||||
|
},
|
||||||
|
{
|
||||||
|
label: 'Rephrase Model',
|
||||||
|
name: 'rephraseModel',
|
||||||
|
type: 'BaseChatModel',
|
||||||
|
description:
|
||||||
|
'Optional: Use a different (faster/cheaper) model for rephrasing. If not specified, uses the main Tool Calling Chat Model.',
|
||||||
|
optional: true,
|
||||||
|
additionalParams: true
|
||||||
|
},
|
||||||
{
|
{
|
||||||
label: 'Input Moderation',
|
label: 'Input Moderation',
|
||||||
description: 'Detect text that could generate harmful output and prevent it from being sent to the language model',
|
description: 'Detect text that could generate harmful output and prevent it from being sent to the language model',
|
||||||
|
|
@ -103,8 +125,9 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
|
||||||
this.sessionId = fields?.sessionId
|
this.sessionId = fields?.sessionId
|
||||||
}
|
}
|
||||||
|
|
||||||
async init(nodeData: INodeData, input: string, options: ICommonObject): Promise<any> {
|
// The agent will be prepared in run() with the correct user message - it needs the actual runtime input for rephrasing
|
||||||
return prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input })
|
async init(_nodeData: INodeData, _input: string, _options: ICommonObject): Promise<any> {
|
||||||
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
|
async run(nodeData: INodeData, input: string, options: ICommonObject): Promise<string | ICommonObject> {
|
||||||
|
|
@ -148,6 +171,23 @@ class ConversationalRetrievalToolAgent_Agents implements INode {
|
||||||
sseStreamer.streamUsedToolsEvent(chatId, res.usedTools)
|
sseStreamer.streamUsedToolsEvent(chatId, res.usedTools)
|
||||||
usedTools = res.usedTools
|
usedTools = res.usedTools
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the tool is set to returnDirect, stream the output to the client
|
||||||
|
if (res.usedTools && res.usedTools.length) {
|
||||||
|
let inputTools = nodeData.inputs?.tools
|
||||||
|
inputTools = flatten(inputTools)
|
||||||
|
for (const tool of res.usedTools) {
|
||||||
|
const inputTool = inputTools.find((inputTool: Tool) => inputTool.name === tool.tool)
|
||||||
|
if (inputTool && (inputTool as any).returnDirect && shouldStreamResponse) {
|
||||||
|
sseStreamer.streamTokenEvent(chatId, tool.toolOutput)
|
||||||
|
// Prevent CustomChainHandler from streaming the same output again
|
||||||
|
if (res.output === tool.toolOutput) {
|
||||||
|
res.output = ''
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// The CustomChainHandler will send the stream end event
|
||||||
} else {
|
} else {
|
||||||
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
|
res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] })
|
||||||
if (res.sourceDocuments) {
|
if (res.sourceDocuments) {
|
||||||
|
|
@ -210,9 +250,11 @@ const prepareAgent = async (
|
||||||
flowObj: { sessionId?: string; chatId?: string; input?: string }
|
flowObj: { sessionId?: string; chatId?: string; input?: string }
|
||||||
) => {
|
) => {
|
||||||
const model = nodeData.inputs?.model as BaseChatModel
|
const model = nodeData.inputs?.model as BaseChatModel
|
||||||
|
const rephraseModel = (nodeData.inputs?.rephraseModel as BaseChatModel) || model // Use main model if not specified
|
||||||
const maxIterations = nodeData.inputs?.maxIterations as string
|
const maxIterations = nodeData.inputs?.maxIterations as string
|
||||||
const memory = nodeData.inputs?.memory as FlowiseMemory
|
const memory = nodeData.inputs?.memory as FlowiseMemory
|
||||||
let systemMessage = nodeData.inputs?.systemMessage as string
|
let systemMessage = nodeData.inputs?.systemMessage as string
|
||||||
|
let rephrasePrompt = nodeData.inputs?.rephrasePrompt as string
|
||||||
let tools = nodeData.inputs?.tools
|
let tools = nodeData.inputs?.tools
|
||||||
tools = flatten(tools)
|
tools = flatten(tools)
|
||||||
const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history'
|
const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history'
|
||||||
|
|
@ -220,6 +262,9 @@ const prepareAgent = async (
|
||||||
const vectorStoreRetriever = nodeData.inputs?.vectorStoreRetriever as BaseRetriever
|
const vectorStoreRetriever = nodeData.inputs?.vectorStoreRetriever as BaseRetriever
|
||||||
|
|
||||||
systemMessage = transformBracesWithColon(systemMessage)
|
systemMessage = transformBracesWithColon(systemMessage)
|
||||||
|
if (rephrasePrompt) {
|
||||||
|
rephrasePrompt = transformBracesWithColon(rephrasePrompt)
|
||||||
|
}
|
||||||
|
|
||||||
const prompt = ChatPromptTemplate.fromMessages([
|
const prompt = ChatPromptTemplate.fromMessages([
|
||||||
['system', systemMessage ? systemMessage : `You are a helpful AI assistant.`],
|
['system', systemMessage ? systemMessage : `You are a helpful AI assistant.`],
|
||||||
|
|
@ -263,6 +308,37 @@ const prepareAgent = async (
|
||||||
|
|
||||||
const modelWithTools = model.bindTools(tools)
|
const modelWithTools = model.bindTools(tools)
|
||||||
|
|
||||||
|
// Function to get standalone question (either rephrased or original)
|
||||||
|
const getStandaloneQuestion = async (input: string): Promise<string> => {
|
||||||
|
// If no rephrase prompt, return the original input
|
||||||
|
if (!rephrasePrompt) {
|
||||||
|
return input
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get chat history (use empty string if none)
|
||||||
|
const messages = (await memory.getChatMessages(flowObj?.sessionId, true)) as BaseMessage[]
|
||||||
|
const iMessages = convertBaseMessagetoIMessage(messages)
|
||||||
|
const chatHistoryString = convertChatHistoryToText(iMessages)
|
||||||
|
|
||||||
|
// Always rephrase to normalize/expand user queries for better retrieval
|
||||||
|
try {
|
||||||
|
const CONDENSE_QUESTION_PROMPT = PromptTemplate.fromTemplate(rephrasePrompt)
|
||||||
|
const condenseQuestionChain = RunnableSequence.from([CONDENSE_QUESTION_PROMPT, rephraseModel, new StringOutputParser()])
|
||||||
|
const res = await condenseQuestionChain.invoke({
|
||||||
|
question: input,
|
||||||
|
chat_history: chatHistoryString
|
||||||
|
})
|
||||||
|
return res
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Error rephrasing question:', error)
|
||||||
|
// On error, fall back to original input
|
||||||
|
return input
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get standalone question before creating runnable
|
||||||
|
const standaloneQuestion = await getStandaloneQuestion(flowObj?.input || '')
|
||||||
|
|
||||||
const runnableAgent = RunnableSequence.from([
|
const runnableAgent = RunnableSequence.from([
|
||||||
{
|
{
|
||||||
[inputKey]: (i: { input: string; steps: ToolsAgentStep[] }) => i.input,
|
[inputKey]: (i: { input: string; steps: ToolsAgentStep[] }) => i.input,
|
||||||
|
|
@ -272,7 +348,9 @@ const prepareAgent = async (
|
||||||
return messages ?? []
|
return messages ?? []
|
||||||
},
|
},
|
||||||
context: async (i: { input: string; chatHistory?: string }) => {
|
context: async (i: { input: string; chatHistory?: string }) => {
|
||||||
const relevantDocs = await vectorStoreRetriever.invoke(i.input)
|
// Use the standalone question (rephrased or original) for retrieval
|
||||||
|
const retrievalQuery = standaloneQuestion || i.input
|
||||||
|
const relevantDocs = await vectorStoreRetriever.invoke(retrievalQuery)
|
||||||
const formattedDocs = formatDocs(relevantDocs)
|
const formattedDocs = formatDocs(relevantDocs)
|
||||||
return formattedDocs
|
return formattedDocs
|
||||||
}
|
}
|
||||||
|
|
@ -295,4 +373,6 @@ const prepareAgent = async (
|
||||||
return executor
|
return executor
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { nodeClass: ConversationalRetrievalToolAgent_Agents }
|
module.exports = {
|
||||||
|
nodeClass: ConversationalRetrievalToolAgent_Agents
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue