diff --git a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts index f7c496c35..6ff7fed3a 100644 --- a/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts +++ b/packages/components/nodes/agents/OpenAIAssistant/OpenAIAssistant.ts @@ -18,6 +18,7 @@ import { AnalyticHandler } from '../../../src/handler' import { Moderation, checkInputs, streamResponse } from '../../moderation/Moderation' import { formatResponse } from '../../outputparsers/OutputParserHelpers' import { addSingleFileToStorage } from '../../../src/storageUtils' +import { DynamicStructuredTool } from '../../tools/OpenAPIToolkit/core' const lenticularBracketRegex = /【[^】]*】/g const imageRegex = /]*\/>/g @@ -504,7 +505,6 @@ class OpenAIAssistant_Agents implements INode { toolCallId: item.id }) }) - const submitToolOutputs = [] for (let i = 0; i < actions.length; i += 1) { const tool = tools.find((tool: any) => tool.name === actions[i].tool) @@ -539,30 +539,23 @@ class OpenAIAssistant_Agents implements INode { } try { - const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, { - tool_outputs: submitToolOutputs + await handleToolSubmission({ + openai, + threadId, + runThreadId, + submitToolOutputs, + tools, + analyticHandlers, + parentIds, + llmIds, + sseStreamer, + chatId, + options, + input, + usedTools, + text, + isStreamingStarted }) - - for await (const event of stream) { - if (event.event === 'thread.message.delta') { - const chunk = event.data.delta.content?.[0] - if (chunk && 'text' in chunk && chunk.text?.value) { - text += chunk.text.value - if (!isStreamingStarted) { - isStreamingStarted = true - if (sseStreamer) { - sseStreamer.streamStartEvent(chatId, chunk.text.value) - } - } - if (sseStreamer) { - sseStreamer.streamTokenEvent(chatId, chunk.text.value) - } - } - } - } - if (sseStreamer) { - sseStreamer.streamUsedToolsEvent(chatId, usedTools) - } } catch (error) { console.error('Error submitting tool outputs:', error) await openai.beta.threads.runs.cancel(threadId, runThreadId) @@ -634,7 +627,6 @@ class OpenAIAssistant_Agents implements INode { toolCallId: item.id }) }) - const submitToolOutputs = [] for (let i = 0; i < actions.length; i += 1) { const tool = tools.find((tool: any) => tool.name === actions[i].tool) @@ -895,15 +887,212 @@ const downloadFile = async (openAIApiKey: string, fileObj: any, fileName: string } } +interface ToolSubmissionParams { + openai: OpenAI + threadId: string + runThreadId: string + submitToolOutputs: any[] + tools: any[] + analyticHandlers: AnalyticHandler + parentIds: ICommonObject + llmIds: ICommonObject + sseStreamer: IServerSideEventStreamer + chatId: string + options: ICommonObject + input: string + usedTools: IUsedTool[] + text: string + isStreamingStarted: boolean +} + +interface ToolSubmissionResult { + text: string + isStreamingStarted: boolean +} + +async function handleToolSubmission(params: ToolSubmissionParams): Promise { + const { + openai, + threadId, + runThreadId, + submitToolOutputs, + tools, + analyticHandlers, + parentIds, + llmIds, + sseStreamer, + chatId, + options, + input, + usedTools + } = params + + let updatedText = params.text + let updatedIsStreamingStarted = params.isStreamingStarted + + const stream = openai.beta.threads.runs.submitToolOutputsStream(threadId, runThreadId, { + tool_outputs: submitToolOutputs + }) + + try { + for await (const event of stream) { + if (event.event === 'thread.message.delta') { + const chunk = event.data.delta.content?.[0] + if (chunk && 'text' in chunk && chunk.text?.value) { + updatedText += chunk.text.value + if (!updatedIsStreamingStarted) { + updatedIsStreamingStarted = true + if (sseStreamer) { + sseStreamer.streamStartEvent(chatId, chunk.text.value) + } + } + if (sseStreamer) { + sseStreamer.streamTokenEvent(chatId, chunk.text.value) + } + } + } else if (event.event === 'thread.run.requires_action') { + if (event.data.required_action?.submit_tool_outputs.tool_calls) { + const actions: ICommonObject[] = [] + + event.data.required_action.submit_tool_outputs.tool_calls.forEach((item) => { + const functionCall = item.function + let args = {} + try { + args = JSON.parse(functionCall.arguments) + } catch (e) { + console.error('Error parsing arguments, default to empty object') + } + actions.push({ + tool: functionCall.name, + toolInput: args, + toolCallId: item.id + }) + }) + + const nestedToolOutputs = [] + for (let i = 0; i < actions.length; i += 1) { + const tool = tools.find((tool: any) => tool.name === actions[i].tool) + if (!tool) continue + + const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds) + + try { + const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, { + sessionId: threadId, + chatId: options.chatId, + input + }) + await analyticHandlers.onToolEnd(toolIds, toolOutput) + nestedToolOutputs.push({ + tool_call_id: actions[i].toolCallId, + output: toolOutput + }) + usedTools.push({ + tool: tool.name, + toolInput: actions[i].toolInput, + toolOutput + }) + } catch (e) { + await analyticHandlers.onToolEnd(toolIds, e) + console.error('Error executing tool', e) + throw new Error(`Error executing tool. Tool: ${tool.name}. Thread ID: ${threadId}. Run ID: ${runThreadId}`) + } + } + + // Recursively handle nested tool submissions + const result = await handleToolSubmission({ + openai, + threadId, + runThreadId, + submitToolOutputs: nestedToolOutputs, + tools, + analyticHandlers, + parentIds, + llmIds, + sseStreamer, + chatId, + options, + input, + usedTools, + text: updatedText, + isStreamingStarted: updatedIsStreamingStarted + }) + updatedText = result.text + updatedIsStreamingStarted = result.isStreamingStarted + } + } + } + + if (sseStreamer) { + sseStreamer.streamUsedToolsEvent(chatId, usedTools) + } + + return { + text: updatedText, + isStreamingStarted: updatedIsStreamingStarted + } + } catch (error) { + console.error('Error submitting tool outputs:', error) + await openai.beta.threads.runs.cancel(threadId, runThreadId) + + const errMsg = `Error submitting tool outputs. Thread ID: ${threadId}. Run ID: ${runThreadId}` + + await analyticHandlers.onLLMError(llmIds, errMsg) + await analyticHandlers.onChainError(parentIds, errMsg, true) + + throw new Error(errMsg) + } +} + +interface JSONSchema { + type?: string + properties?: Record + additionalProperties?: boolean + required?: string[] + [key: string]: any +} + const formatToOpenAIAssistantTool = (tool: any): OpenAI.Beta.FunctionTool => { - return { + const parameters = zodToJsonSchema(tool.schema) as JSONSchema + + // For strict tools, we need to: + // 1. Set additionalProperties to false + // 2. Make all parameters required + // 3. Set the strict flag + if (tool instanceof DynamicStructuredTool && tool.isStrict()) { + // Get all property names from the schema + const properties = parameters.properties || {} + const allPropertyNames = Object.keys(properties) + + parameters.additionalProperties = false + parameters.required = allPropertyNames + + // Handle nested objects + for (const [_, prop] of Object.entries(properties)) { + if (prop.type === 'object') { + prop.additionalProperties = false + if (prop.properties) { + prop.required = Object.keys(prop.properties) + } + } + } + } + + const functionTool: OpenAI.Beta.FunctionTool = { type: 'function', function: { name: tool.name, description: tool.description, - parameters: zodToJsonSchema(tool.schema) + parameters } } + + // Add strict property if the tool is marked as strict + if (tool instanceof DynamicStructuredTool && tool.isStrict()) { + ;(functionTool.function as any).strict = true + } + + return functionTool } module.exports = { nodeClass: OpenAIAssistant_Agents } diff --git a/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts b/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts index f34759ae2..d44f5f103 100644 --- a/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts +++ b/packages/components/nodes/tools/OpenAPIToolkit/OpenAPIToolkit.ts @@ -48,6 +48,13 @@ class OpenAPIToolkit_Tools implements INode { additionalParams: true, optional: true }, + { + label: 'Remove null parameters', + name: 'removeNulls', + type: 'boolean', + optional: true, + description: 'Remove all keys with null values from the parsed arguments' + }, { label: 'Custom Code', name: 'customCode', @@ -71,6 +78,7 @@ class OpenAPIToolkit_Tools implements INode { const yamlFileBase64 = nodeData.inputs?.yamlFile as string const customCode = nodeData.inputs?.customCode as string const _headers = nodeData.inputs?.headers as string + const removeNulls = nodeData.inputs?.removeNulls as boolean const headers = typeof _headers === 'object' ? _headers : _headers ? JSON.parse(_headers) : {} @@ -106,7 +114,7 @@ class OpenAPIToolkit_Tools implements INode { const flow = { chatflowId: options.chatflowid } - const tools = getTools(_data.paths, baseUrl, headers, variables, flow, toolReturnDirect, customCode) + const tools = getTools(_data.paths, baseUrl, headers, variables, flow, toolReturnDirect, customCode, removeNulls) return tools } } @@ -119,17 +127,18 @@ const jsonSchemaToZodSchema = (schema: any, requiredList: string[], keyName: str zodShape[key] = jsonSchemaToZodSchema(schema.properties[key], requiredList, key) } return z.object(zodShape) - } else if (schema.oneOf) { - // Handle oneOf by mapping each option to a Zod schema - const zodSchemas = schema.oneOf.map((subSchema: any) => jsonSchemaToZodSchema(subSchema, requiredList, keyName)) - return z.union(zodSchemas) + } else if (schema.oneOf || schema.anyOf) { + // Handle oneOf/anyOf by mapping each option to a Zod schema + const schemas = schema.oneOf || schema.anyOf + const zodSchemas = schemas.map((subSchema: any) => jsonSchemaToZodSchema(subSchema, requiredList, keyName)) + return z.union(zodSchemas).describe(schema?.description ?? schema?.title ?? keyName) } else if (schema.enum) { - // Handle enum types + // Handle enum types with their title and description return requiredList.includes(keyName) - ? z.enum(schema.enum).describe(schema?.description ?? keyName) + ? z.enum(schema.enum).describe(schema?.description ?? schema?.title ?? keyName) : z .enum(schema.enum) - .describe(schema?.description ?? keyName) + .describe(schema?.description ?? schema?.title ?? keyName) .optional() } else if (schema.type === 'string') { return requiredList.includes(keyName) @@ -141,21 +150,32 @@ const jsonSchemaToZodSchema = (schema: any, requiredList: string[], keyName: str } else if (schema.type === 'array') { return z.array(jsonSchemaToZodSchema(schema.items, requiredList, keyName)) } else if (schema.type === 'boolean') { - return requiredList.includes(keyName) - ? z.number({ required_error: `${keyName} required` }).describe(schema?.description ?? keyName) - : z - .number() - .describe(schema?.description ?? keyName) - .optional() - } else if (schema.type === 'number') { return requiredList.includes(keyName) ? z.boolean({ required_error: `${keyName} required` }).describe(schema?.description ?? keyName) : z .boolean() .describe(schema?.description ?? keyName) .optional() + } else if (schema.type === 'number') { + let numberSchema = z.number() + if (typeof schema.minimum === 'number') { + numberSchema = numberSchema.min(schema.minimum) + } + if (typeof schema.maximum === 'number') { + numberSchema = numberSchema.max(schema.maximum) + } + return requiredList.includes(keyName) + ? numberSchema.describe(schema?.description ?? keyName) + : numberSchema.describe(schema?.description ?? keyName).optional() + } else if (schema.type === 'integer') { + let numberSchema = z.number().int() + return requiredList.includes(keyName) + ? numberSchema.describe(schema?.description ?? keyName) + : numberSchema.describe(schema?.description ?? keyName).optional() + } else if (schema.type === 'null') { + return z.null() } - + console.error(`jsonSchemaToZodSchema returns UNKNOWN! ${keyName}`, schema) // Fallback to unknown type if unrecognized return z.unknown() } @@ -163,9 +183,23 @@ const jsonSchemaToZodSchema = (schema: any, requiredList: string[], keyName: str const extractParameters = (param: ICommonObject, paramZodObj: ICommonObject) => { const paramSchema = param.schema const paramName = param.name - const paramDesc = param.description || param.name + const paramDesc = paramSchema.description || paramSchema.title || param.description || param.name - if (paramSchema.type === 'string') { + if (paramSchema.enum) { + const enumValues = paramSchema.enum as string[] + // Combine title and description from schema + const enumDesc = [paramSchema.title, paramSchema.description, `Valid values: ${enumValues.join(', ')}`].filter(Boolean).join('. ') + + if (param.required) { + paramZodObj[paramName] = z.enum(enumValues as [string, ...string[]]).describe(enumDesc) + } else { + paramZodObj[paramName] = z + .enum(enumValues as [string, ...string[]]) + .describe(enumDesc) + .optional() + } + return paramZodObj + } else if (paramSchema.type === 'string') { if (param.required) { paramZodObj[paramName] = z.string({ required_error: `${paramName} required` }).describe(paramDesc) } else { @@ -183,6 +217,10 @@ const extractParameters = (param: ICommonObject, paramZodObj: ICommonObject) => } else { paramZodObj[paramName] = z.boolean().describe(paramDesc).optional() } + } else if (paramSchema.anyOf || paramSchema.type === 'anyOf') { + // Handle anyOf by using jsonSchemaToZodSchema + const requiredList = param.required ? [paramName] : [] + paramZodObj[paramName] = jsonSchemaToZodSchema(paramSchema, requiredList, paramName) } return paramZodObj @@ -195,7 +233,8 @@ const getTools = ( variables: IVariable[], flow: ICommonObject, returnDirect: boolean, - customCode?: string + customCode?: string, + removeNulls?: boolean ) => { const tools = [] for (const path in paths) { @@ -269,7 +308,9 @@ const getTools = ( baseUrl: `${baseUrl}${path}`, method: method, headers, - customCode + customCode, + strict: spec['x-strict'] === true, + removeNulls } const dynamicStructuredTool = new DynamicStructuredTool(toolObj) diff --git a/packages/components/nodes/tools/OpenAPIToolkit/core.ts b/packages/components/nodes/tools/OpenAPIToolkit/core.ts index 8341adc54..f7701770e 100644 --- a/packages/components/nodes/tools/OpenAPIToolkit/core.ts +++ b/packages/components/nodes/tools/OpenAPIToolkit/core.ts @@ -7,6 +7,20 @@ import { CallbackManagerForToolRun, Callbacks, CallbackManager, parseCallbackCon import { availableDependencies, defaultAllowBuiltInDep, prepareSandboxVars } from '../../../src/utils' import { ICommonObject } from '../../../src/Interface' +const removeNulls = (obj: Record) => { + Object.keys(obj).forEach((key) => { + if (obj[key] === null) { + delete obj[key] + } else if (typeof obj[key] === 'object' && obj[key] !== null) { + removeNulls(obj[key]) + if (Object.keys(obj[key]).length === 0) { + delete obj[key] + } + } + }) + return obj +} + interface HttpRequestObject { PathParameters?: Record QueryParameters?: Record @@ -104,6 +118,8 @@ export interface DynamicStructuredToolInput< method: string headers: ICommonObject customCode?: string + strict?: boolean + removeNulls?: boolean } export class DynamicStructuredTool< @@ -122,12 +138,15 @@ export class DynamicStructuredTool< customCode?: string + strict?: boolean + func: DynamicStructuredToolInput['func'] // @ts-ignore schema: T private variables: any[] private flowObj: any + private removeNulls: boolean constructor(fields: DynamicStructuredToolInput) { super(fields) @@ -140,6 +159,8 @@ export class DynamicStructuredTool< this.method = fields.method this.headers = fields.headers this.customCode = fields.customCode + this.strict = fields.strict + this.removeNulls = fields.removeNulls ?? false } async call( @@ -156,7 +177,7 @@ export class DynamicStructuredTool< try { parsed = await this.schema.parseAsync(arg) } catch (e) { - throw new ToolInputParsingException(`Received tool input did not match expected schema`, JSON.stringify(arg)) + throw new ToolInputParsingException(`Received tool input did not match expected schema ${e}`, JSON.stringify(arg)) } const callbackManager_ = await CallbackManager.configure( config.callbacks, @@ -203,9 +224,15 @@ export class DynamicStructuredTool< fs: undefined, process: undefined } - if (typeof arg === 'object' && Object.keys(arg).length) { - for (const item in arg) { - sandbox[`$${item}`] = arg[item] + let processedArg = { ...arg } + + if (this.removeNulls && typeof processedArg === 'object' && processedArg !== null) { + processedArg = removeNulls(processedArg) + } + + if (typeof processedArg === 'object' && Object.keys(processedArg).length) { + for (const item in processedArg) { + sandbox[`$${item}`] = processedArg[item] } } @@ -262,4 +289,8 @@ export class DynamicStructuredTool< setFlowObject(flow: any) { this.flowObj = flow } + + isStrict(): boolean { + return this.strict === true + } }