TTS abort controller

This commit is contained in:
Ilango Rajagopal 2025-09-22 16:05:54 +05:30
parent 3198e7817e
commit 27da0b62bd
4 changed files with 129 additions and 58 deletions

View File

@ -14,6 +14,7 @@ export const convertTextToSpeechStream = async (
text: string, text: string,
textToSpeechConfig: ICommonObject, textToSpeechConfig: ICommonObject,
options: ICommonObject, options: ICommonObject,
abortController: AbortController,
onStart: (format: string) => void, onStart: (format: string) => void,
onChunk: (chunk: Buffer) => void, onChunk: (chunk: Buffer) => void,
onEnd: () => void onEnd: () => void
@ -33,7 +34,8 @@ export const convertTextToSpeechStream = async (
apiKey: credentialData.openAIApiKey apiKey: credentialData.openAIApiKey
}) })
const response = await openai.audio.speech.create({ const response = await openai.audio.speech.create(
{
model: 'gpt-4o-mini-tts', model: 'gpt-4o-mini-tts',
voice: (textToSpeechConfig.voice || 'alloy') as voice: (textToSpeechConfig.voice || 'alloy') as
| 'alloy' | 'alloy'
@ -48,14 +50,18 @@ export const convertTextToSpeechStream = async (
| 'shimmer', | 'shimmer',
input: text, input: text,
response_format: 'mp3' response_format: 'mp3'
}) },
{
signal: abortController.signal
}
)
const stream = response.body as unknown as Readable const stream = response.body as unknown as Readable
if (!stream) { if (!stream) {
throw new Error('Failed to get response stream') throw new Error('Failed to get response stream')
} }
await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 20) await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 20, abortController)
break break
} }
@ -66,17 +72,21 @@ export const convertTextToSpeechStream = async (
apiKey: credentialData.elevenLabsApiKey apiKey: credentialData.elevenLabsApiKey
}) })
const response = await client.textToSpeech.stream(textToSpeechConfig.voice || '21m00Tcm4TlvDq8ikWAM', { const response = await client.textToSpeech.stream(
textToSpeechConfig.voice || '21m00Tcm4TlvDq8ikWAM',
{
text: text, text: text,
modelId: 'eleven_multilingual_v2' modelId: 'eleven_multilingual_v2'
}) },
{ abortSignal: abortController.signal }
)
const stream = Readable.fromWeb(response as unknown as ReadableStream) const stream = Readable.fromWeb(response as unknown as ReadableStream)
if (!stream) { if (!stream) {
throw new Error('Failed to get response stream') throw new Error('Failed to get response stream')
} }
await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 40) await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 40, abortController)
break break
} }
} }
@ -99,7 +109,8 @@ const processStreamWithRateLimit = async (
resolve: () => void, resolve: () => void,
reject: (error: any) => void, reject: (error: any) => void,
targetChunkSize: number = 640, targetChunkSize: number = 640,
rateLimitMs: number = 20 rateLimitMs: number = 20,
abortController: AbortController
) => { ) => {
const TARGET_CHUNK_SIZE = targetChunkSize const TARGET_CHUNK_SIZE = targetChunkSize
const RATE_LIMIT_MS = rateLimitMs const RATE_LIMIT_MS = rateLimitMs
@ -109,6 +120,13 @@ const processStreamWithRateLimit = async (
const processChunks = async () => { const processChunks = async () => {
while (!isEnded || buffer.length > 0) { while (!isEnded || buffer.length > 0) {
// Check if aborted
if (abortController.signal.aborted) {
stream.destroy()
reject(new Error('TTS generation aborted'))
return
}
if (buffer.length >= TARGET_CHUNK_SIZE) { if (buffer.length >= TARGET_CHUNK_SIZE) {
const chunk = buffer.subarray(0, TARGET_CHUNK_SIZE) const chunk = buffer.subarray(0, TARGET_CHUNK_SIZE)
buffer = buffer.subarray(TARGET_CHUNK_SIZE) buffer = buffer.subarray(TARGET_CHUNK_SIZE)
@ -129,7 +147,9 @@ const processStreamWithRateLimit = async (
} }
stream.on('data', (chunk) => { stream.on('data', (chunk) => {
if (!abortController.signal.aborted) {
buffer = Buffer.concat([buffer, Buffer.from(chunk)]) buffer = Buffer.concat([buffer, Buffer.from(chunk)])
}
}) })
stream.on('end', () => { stream.on('end', () => {
@ -140,6 +160,12 @@ const processStreamWithRateLimit = async (
reject(error) reject(error)
}) })
// Handle abort signal
abortController.signal.addEventListener('abort', () => {
stream.destroy()
reject(new Error('TTS generation aborted'))
})
processChunks().catch(reject) processChunks().catch(reject)
} }

View File

@ -92,10 +92,17 @@ const generateTextToSpeech = async (req: Request, res: Response) => {
model: model model: model
} }
// Create and store AbortController
const abortController = new AbortController()
const ttsAbortId = `tts_${chatId}_${chatMessageId}`
appServer.abortControllerPool.add(ttsAbortId, abortController)
try {
await convertTextToSpeechStream( await convertTextToSpeechStream(
text, text,
textToSpeechConfig, textToSpeechConfig,
options, options,
abortController,
(format: string) => { (format: string) => {
const startResponse = { const startResponse = {
event: 'tts_start', event: 'tts_start',
@ -121,8 +128,15 @@ const generateTextToSpeech = async (req: Request, res: Response) => {
res.write('event: tts_end\n') res.write('event: tts_end\n')
res.write(`data: ${JSON.stringify(endResponse)}\n\n`) res.write(`data: ${JSON.stringify(endResponse)}\n\n`)
res.end() res.end()
// Clean up from pool on successful completion
appServer.abortControllerPool.remove(ttsAbortId)
} }
) )
} catch (error) {
// Clean up from pool on error
appServer.abortControllerPool.remove(ttsAbortId)
throw error
}
} catch (error) { } catch (error) {
if (!res.headersSent) { if (!res.headersSent) {
res.setHeader('Content-Type', 'text/event-stream') res.setHeader('Content-Type', 'text/event-stream')
@ -160,6 +174,11 @@ const abortTextToSpeech = async (req: Request, res: Response) => {
const appServer = getRunningExpressApp() const appServer = getRunningExpressApp()
// Abort the TTS generation using existing pool
const ttsAbortId = `tts_${chatId}_${chatMessageId}`
appServer.abortControllerPool.abort(ttsAbortId)
// Send abort event to client
appServer.sseStreamer.streamTTSAbortEvent(chatId, chatMessageId) appServer.sseStreamer.streamTTSAbortEvent(chatId, chatMessageId)
res.json({ message: 'TTS stream aborted successfully', chatId, chatMessageId }) res.json({ message: 'TTS stream aborted successfully', chatId, chatMessageId })

View File

@ -2189,7 +2189,15 @@ export const executeAgentFlow = async ({
} }
if (sseStreamer) { if (sseStreamer) {
await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, chatMessage?.id, sseStreamer) await generateTTSForResponseStream(
result.text,
chatflow.textToSpeech,
options,
chatId,
chatMessage?.id,
sseStreamer,
abortController
)
} }
} }

View File

@ -95,7 +95,8 @@ const generateTTSForResponseStream = async (
options: ICommonObject, options: ICommonObject,
chatId: string, chatId: string,
chatMessageId: string, chatMessageId: string,
sseStreamer: IServerSideEventStreamer sseStreamer: IServerSideEventStreamer,
abortController?: AbortController
): Promise<void> => { ): Promise<void> => {
try { try {
if (!textToSpeechConfig) return if (!textToSpeechConfig) return
@ -121,6 +122,7 @@ const generateTTSForResponseStream = async (
responseText, responseText,
activeProviderConfig, activeProviderConfig,
options, options,
abortController || new AbortController(),
(format: string) => { (format: string) => {
sseStreamer.streamTTSStartEvent(chatId, chatMessageId, format) sseStreamer.streamTTSStartEvent(chatId, chatMessageId, format)
}, },
@ -908,9 +910,25 @@ export const executeFlow = async ({
} }
if (streaming && sseStreamer) { if (streaming && sseStreamer) {
await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, chatMessage?.id, sseStreamer) await generateTTSForResponseStream(
result.text,
chatflow.textToSpeech,
options,
chatId,
chatMessage?.id,
sseStreamer,
signal
)
} else if (sseStreamer) { } else if (sseStreamer) {
await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, chatMessage?.id, sseStreamer) await generateTTSForResponseStream(
result.text,
chatflow.textToSpeech,
options,
chatId,
chatMessage?.id,
sseStreamer,
signal
)
} }
} }