Bugfix/Openai assistant thread not found (#3426)

fix openai assistant thread not found by exponential backoff retries
This commit is contained in:
Henry Heng 2024-10-29 15:26:59 +00:00 committed by GitHub
parent cf0617537c
commit ebc4641a60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 149 additions and 92 deletions

View File

@ -267,28 +267,54 @@ class OpenAIAssistant_Agents implements INode {
// List all runs, in case existing thread is still running // List all runs, in case existing thread is still running
if (!isNewThread) { if (!isNewThread) {
const promise = (threadId: string) => { const promise = (threadId: string) => {
return new Promise<void>((resolve) => { return new Promise<void>((resolve, reject) => {
const maxWaitTime = 30000 // Maximum wait time of 30 seconds
const startTime = Date.now()
let delay = 500 // Initial delay between retries
const maxRetries = 10
let retries = 0
const timeout = setInterval(async () => { const timeout = setInterval(async () => {
const allRuns = await openai.beta.threads.runs.list(threadId) try {
if (allRuns.data && allRuns.data.length) { const allRuns = await openai.beta.threads.runs.list(threadId)
const firstRunId = allRuns.data[0].id if (allRuns.data && allRuns.data.length) {
const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status const firstRunId = allRuns.data[0].id
if ( const runStatus = allRuns.data.find((run) => run.id === firstRunId)?.status
runStatus && if (
(runStatus === 'cancelled' || runStatus &&
runStatus === 'completed' || (runStatus === 'cancelled' ||
runStatus === 'expired' || runStatus === 'completed' ||
runStatus === 'failed' || runStatus === 'expired' ||
runStatus === 'requires_action') runStatus === 'failed' ||
) { runStatus === 'requires_action')
) {
clearInterval(timeout)
resolve()
}
} else {
clearInterval(timeout) clearInterval(timeout)
resolve() reject(new Error(`Empty Thread: ${threadId}`))
}
} catch (error: any) {
if (error.response?.status === 404) {
clearInterval(timeout)
reject(new Error(`Thread not found: ${threadId}`))
} else if (error.response?.status === 429 && retries < maxRetries) {
retries++
delay *= 2
console.warn(`Rate limit exceeded, retrying in ${delay}ms...`)
} else {
clearInterval(timeout)
reject(new Error(`Unexpected error: ${error.message}`))
} }
} else {
clearInterval(timeout)
resolve()
} }
}, 500)
// Timeout condition to stop the loop if maxWaitTime is exceeded
if (Date.now() - startTime > maxWaitTime) {
clearInterval(timeout)
reject(new Error('Timeout waiting for thread to finish.'))
}
}, delay)
}) })
} }
await promise(threadId) await promise(threadId)
@ -576,96 +602,127 @@ class OpenAIAssistant_Agents implements INode {
const promise = (threadId: string, runId: string) => { const promise = (threadId: string, runId: string) => {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const maxWaitTime = 30000 // Maximum wait time of 30 seconds
const startTime = Date.now()
let delay = 500 // Initial delay between retries
const maxRetries = 10
let retries = 0
const timeout = setInterval(async () => { const timeout = setInterval(async () => {
const run = await openai.beta.threads.runs.retrieve(threadId, runId) try {
const state = run.status const run = await openai.beta.threads.runs.retrieve(threadId, runId)
if (state === 'completed') { const state = run.status
clearInterval(timeout)
resolve(state) if (state === 'completed') {
} else if (state === 'requires_action') {
if (run.required_action?.submit_tool_outputs.tool_calls) {
clearInterval(timeout) clearInterval(timeout)
const actions: ICommonObject[] = [] resolve(state)
run.required_action.submit_tool_outputs.tool_calls.forEach((item) => { } else if (state === 'requires_action') {
const functionCall = item.function if (run.required_action?.submit_tool_outputs.tool_calls) {
let args = {} clearInterval(timeout)
try { const actions: ICommonObject[] = []
args = JSON.parse(functionCall.arguments) run.required_action.submit_tool_outputs.tool_calls.forEach((item) => {
} catch (e) { const functionCall = item.function
console.error('Error parsing arguments, default to empty object') let args = {}
} try {
actions.push({ args = JSON.parse(functionCall.arguments)
tool: functionCall.name, } catch (e) {
toolInput: args, console.error('Error parsing arguments, default to empty object')
toolCallId: item.id }
actions.push({
tool: functionCall.name,
toolInput: args,
toolCallId: item.id
})
}) })
})
const submitToolOutputs = [] const submitToolOutputs = []
for (let i = 0; i < actions.length; i += 1) { for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool) const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue if (!tool) continue
// Start tool analytics // Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds) const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
if (shouldStreamResponse && sseStreamer) { if (shouldStreamResponse && sseStreamer) {
sseStreamer.streamToolEvent(chatId, tool.name) sseStreamer.streamToolEvent(chatId, tool.name)
}
try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, {
sessionId: threadId,
chatId: options.chatId,
input
})
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({
tool_call_id: actions[i].toolCallId,
output: toolOutput
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
} catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
clearInterval(timeout)
reject(
new Error(
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}`
)
)
return
}
} }
const newRun = await openai.beta.threads.runs.retrieve(threadId, runId)
const newStatus = newRun?.status
try { try {
const toolOutput = await tool.call(actions[i].toolInput, undefined, undefined, { if (submitToolOutputs.length && newStatus === 'requires_action') {
sessionId: threadId, await openai.beta.threads.runs.submitToolOutputs(threadId, runId, {
chatId: options.chatId, tool_outputs: submitToolOutputs
input })
}) resolve(state)
await analyticHandlers.onToolEnd(toolIds, toolOutput) } else {
submitToolOutputs.push({ await openai.beta.threads.runs.cancel(threadId, runId)
tool_call_id: actions[i].toolCallId, resolve('requires_action_retry')
output: toolOutput }
})
usedTools.push({
tool: tool.name,
toolInput: actions[i].toolInput,
toolOutput
})
} catch (e) { } catch (e) {
await analyticHandlers.onToolEnd(toolIds, e)
console.error('Error executing tool', e)
clearInterval(timeout) clearInterval(timeout)
reject( reject(
new Error( new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`)
`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Tool: ${tool.name}`
)
) )
break
} }
} }
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
const newRun = await openai.beta.threads.runs.retrieve(threadId, runId) clearInterval(timeout)
const newStatus = newRun?.status reject(
new Error(
try { `Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`
if (submitToolOutputs.length && newStatus === 'requires_action') { )
await openai.beta.threads.runs.submitToolOutputs(threadId, runId, { )
tool_outputs: submitToolOutputs }
}) } catch (error: any) {
resolve(state) if (error.response?.status === 404 || error.response?.status === 429) {
} else { clearInterval(timeout)
await openai.beta.threads.runs.cancel(threadId, runId) reject(new Error(`API error: ${error.response?.status} for Thread ID: ${threadId}, Run ID: ${runId}`))
resolve('requires_action_retry') } else if (retries < maxRetries) {
} retries++
} catch (e) { delay *= 2 // Exponential backoff
clearInterval(timeout) console.warn(`Transient error, retrying in ${delay}ms...`)
reject(new Error(`Error submitting tool outputs: ${state}, Thread ID: ${threadId}, Run ID: ${runId}`)) } else {
} clearInterval(timeout)
reject(new Error(`Max retries reached. Error: ${error.message}`))
} }
} else if (state === 'cancelled' || state === 'expired' || state === 'failed') {
clearInterval(timeout)
reject(
new Error(`Error processing thread: ${state}, Thread ID: ${threadId}, Run ID: ${runId}, Status: ${state}`)
)
} }
}, 500)
// Stop the loop if maximum wait time is exceeded
if (Date.now() - startTime > maxWaitTime) {
clearInterval(timeout)
reject(new Error('Timeout waiting for thread to finish.'))
}
}, delay)
}) })
} }