diff --git a/packages/components/nodes/agents/ConversationalRetrievalToolAgent/ConversationalRetrievalToolAgent.ts b/packages/components/nodes/agents/ConversationalRetrievalToolAgent/ConversationalRetrievalToolAgent.ts index a850db307..7a8966e14 100644 --- a/packages/components/nodes/agents/ConversationalRetrievalToolAgent/ConversationalRetrievalToolAgent.ts +++ b/packages/components/nodes/agents/ConversationalRetrievalToolAgent/ConversationalRetrievalToolAgent.ts @@ -5,7 +5,7 @@ import { RunnableSequence } from '@langchain/core/runnables' import { BaseChatModel } from '@langchain/core/language_models/chat_models' import { ChatPromptTemplate, MessagesPlaceholder, HumanMessagePromptTemplate, PromptTemplate } from '@langchain/core/prompts' import { formatToOpenAIToolMessages } from 'langchain/agents/format_scratchpad/openai_tools' -import { getBaseClasses, transformBracesWithColon } from '../../../src/utils' +import { getBaseClasses, transformBracesWithColon, convertChatHistoryToText, convertBaseMessagetoIMessage } from '../../../src/utils' import { type ToolsAgentStep } from 'langchain/agents/openai/output_parser' import { FlowiseMemory, @@ -23,8 +23,10 @@ import { Moderation, checkInputs, streamResponse } from '../../moderation/Modera import { formatResponse } from '../../outputparsers/OutputParserHelpers' import type { Document } from '@langchain/core/documents' import { BaseRetriever } from '@langchain/core/retrievers' -import { RESPONSE_TEMPLATE } from '../../chains/ConversationalRetrievalQAChain/prompts' +import { RESPONSE_TEMPLATE, REPHRASE_TEMPLATE } from '../../chains/ConversationalRetrievalQAChain/prompts' import { addImagesToMessages, llmSupportsVision } from '../../../src/multiModalUtils' +import { StringOutputParser } from '@langchain/core/output_parsers' +import { Tool } from '@langchain/core/tools' class ConversationalRetrievalToolAgent_Agents implements INode { label: string @@ -42,7 +44,7 @@ class ConversationalRetrievalToolAgent_Agents implements INode { constructor(fields?: { sessionId?: string }) { this.label = 'Conversational Retrieval Tool Agent' this.name = 'conversationalRetrievalToolAgent' - this.author = 'niztal(falkor)' + this.author = 'niztal(falkor) and nikitas-novatix' this.version = 1.0 this.type = 'AgentExecutor' this.category = 'Agents' @@ -79,6 +81,26 @@ class ConversationalRetrievalToolAgent_Agents implements INode { optional: true, default: RESPONSE_TEMPLATE }, + { + label: 'Rephrase Prompt', + name: 'rephrasePrompt', + type: 'string', + description: 'Using previous chat history, rephrase question into a standalone question', + warning: 'Prompt must include input variables: {chat_history} and {question}', + rows: 4, + additionalParams: true, + optional: true, + default: REPHRASE_TEMPLATE + }, + { + label: 'Rephrase Model', + name: 'rephraseModel', + type: 'BaseChatModel', + description: + 'Optional: Use a different (faster/cheaper) model for rephrasing. If not specified, uses the main Tool Calling Chat Model.', + optional: true, + additionalParams: true + }, { label: 'Input Moderation', description: 'Detect text that could generate harmful output and prevent it from being sent to the language model', @@ -103,8 +125,9 @@ class ConversationalRetrievalToolAgent_Agents implements INode { this.sessionId = fields?.sessionId } - async init(nodeData: INodeData, input: string, options: ICommonObject): Promise { - return prepareAgent(nodeData, options, { sessionId: this.sessionId, chatId: options.chatId, input }) + // The agent will be prepared in run() with the correct user message - it needs the actual runtime input for rephrasing + async init(_nodeData: INodeData, _input: string, _options: ICommonObject): Promise { + return null } async run(nodeData: INodeData, input: string, options: ICommonObject): Promise { @@ -148,6 +171,23 @@ class ConversationalRetrievalToolAgent_Agents implements INode { sseStreamer.streamUsedToolsEvent(chatId, res.usedTools) usedTools = res.usedTools } + + // If the tool is set to returnDirect, stream the output to the client + if (res.usedTools && res.usedTools.length) { + let inputTools = nodeData.inputs?.tools + inputTools = flatten(inputTools) + for (const tool of res.usedTools) { + const inputTool = inputTools.find((inputTool: Tool) => inputTool.name === tool.tool) + if (inputTool && (inputTool as any).returnDirect && shouldStreamResponse) { + sseStreamer.streamTokenEvent(chatId, tool.toolOutput) + // Prevent CustomChainHandler from streaming the same output again + if (res.output === tool.toolOutput) { + res.output = '' + } + } + } + } + // The CustomChainHandler will send the stream end event } else { res = await executor.invoke({ input }, { callbacks: [loggerHandler, ...callbacks] }) if (res.sourceDocuments) { @@ -210,9 +250,11 @@ const prepareAgent = async ( flowObj: { sessionId?: string; chatId?: string; input?: string } ) => { const model = nodeData.inputs?.model as BaseChatModel + const rephraseModel = (nodeData.inputs?.rephraseModel as BaseChatModel) || model // Use main model if not specified const maxIterations = nodeData.inputs?.maxIterations as string const memory = nodeData.inputs?.memory as FlowiseMemory let systemMessage = nodeData.inputs?.systemMessage as string + let rephrasePrompt = nodeData.inputs?.rephrasePrompt as string let tools = nodeData.inputs?.tools tools = flatten(tools) const memoryKey = memory.memoryKey ? memory.memoryKey : 'chat_history' @@ -220,6 +262,9 @@ const prepareAgent = async ( const vectorStoreRetriever = nodeData.inputs?.vectorStoreRetriever as BaseRetriever systemMessage = transformBracesWithColon(systemMessage) + if (rephrasePrompt) { + rephrasePrompt = transformBracesWithColon(rephrasePrompt) + } const prompt = ChatPromptTemplate.fromMessages([ ['system', systemMessage ? systemMessage : `You are a helpful AI assistant.`], @@ -263,6 +308,37 @@ const prepareAgent = async ( const modelWithTools = model.bindTools(tools) + // Function to get standalone question (either rephrased or original) + const getStandaloneQuestion = async (input: string): Promise => { + // If no rephrase prompt, return the original input + if (!rephrasePrompt) { + return input + } + + // Get chat history (use empty string if none) + const messages = (await memory.getChatMessages(flowObj?.sessionId, true)) as BaseMessage[] + const iMessages = convertBaseMessagetoIMessage(messages) + const chatHistoryString = convertChatHistoryToText(iMessages) + + // Always rephrase to normalize/expand user queries for better retrieval + try { + const CONDENSE_QUESTION_PROMPT = PromptTemplate.fromTemplate(rephrasePrompt) + const condenseQuestionChain = RunnableSequence.from([CONDENSE_QUESTION_PROMPT, rephraseModel, new StringOutputParser()]) + const res = await condenseQuestionChain.invoke({ + question: input, + chat_history: chatHistoryString + }) + return res + } catch (error) { + console.error('Error rephrasing question:', error) + // On error, fall back to original input + return input + } + } + + // Get standalone question before creating runnable + const standaloneQuestion = await getStandaloneQuestion(flowObj?.input || '') + const runnableAgent = RunnableSequence.from([ { [inputKey]: (i: { input: string; steps: ToolsAgentStep[] }) => i.input, @@ -272,7 +348,9 @@ const prepareAgent = async ( return messages ?? [] }, context: async (i: { input: string; chatHistory?: string }) => { - const relevantDocs = await vectorStoreRetriever.invoke(i.input) + // Use the standalone question (rephrased or original) for retrieval + const retrievalQuery = standaloneQuestion || i.input + const relevantDocs = await vectorStoreRetriever.invoke(retrievalQuery) const formattedDocs = formatDocs(relevantDocs) return formattedDocs } @@ -295,4 +373,6 @@ const prepareAgent = async ( return executor } -module.exports = { nodeClass: ConversationalRetrievalToolAgent_Agents } +module.exports = { + nodeClass: ConversationalRetrievalToolAgent_Agents +}