diff --git a/packages/components/src/textToSpeech.ts b/packages/components/src/textToSpeech.ts index 5363b2fec..fdc17af65 100644 --- a/packages/components/src/textToSpeech.ts +++ b/packages/components/src/textToSpeech.ts @@ -20,6 +20,13 @@ export const convertTextToSpeechStream = async ( onEnd: () => void ): Promise => { return new Promise((resolve, reject) => { + let streamDestroyed = false + + // Handle abort signal early + if (abortController.signal.aborted) { + reject(new Error('TTS generation aborted')) + return + } const processStream = async () => { try { if (textToSpeechConfig) { @@ -61,7 +68,9 @@ export const convertTextToSpeechStream = async ( throw new Error('Failed to get response stream') } - await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 20, abortController) + await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 20, abortController, () => { + streamDestroyed = true + }) break } @@ -86,7 +95,9 @@ export const convertTextToSpeechStream = async ( throw new Error('Failed to get response stream') } - await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 40, abortController) + await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 40, abortController, () => { + streamDestroyed = true + }) break } } @@ -98,6 +109,13 @@ export const convertTextToSpeechStream = async ( } } + // Handle abort signal + abortController.signal.addEventListener('abort', () => { + if (!streamDestroyed) { + reject(new Error('TTS generation aborted')) + } + }) + processStream() }) } @@ -110,7 +128,8 @@ const processStreamWithRateLimit = async ( reject: (error: any) => void, targetChunkSize: number = 640, rateLimitMs: number = 20, - abortController: AbortController + abortController: AbortController, + onStreamDestroy?: () => void ) => { const TARGET_CHUNK_SIZE = targetChunkSize const RATE_LIMIT_MS = rateLimitMs @@ -122,7 +141,10 @@ const processStreamWithRateLimit = async ( while (!isEnded || buffer.length > 0) { // Check if aborted if (abortController.signal.aborted) { - stream.destroy() + if (!stream.destroyed) { + stream.destroy() + } + onStreamDestroy?.() reject(new Error('TTS generation aborted')) return } @@ -162,7 +184,10 @@ const processStreamWithRateLimit = async ( // Handle abort signal abortController.signal.addEventListener('abort', () => { - stream.destroy() + if (!stream.destroyed) { + stream.destroy() + } + onStreamDestroy?.() reject(new Error('TTS generation aborted')) }) diff --git a/packages/server/src/controllers/text-to-speech/index.ts b/packages/server/src/controllers/text-to-speech/index.ts index 0f8c44007..8bc2e959a 100644 --- a/packages/server/src/controllers/text-to-speech/index.ts +++ b/packages/server/src/controllers/text-to-speech/index.ts @@ -156,7 +156,7 @@ const generateTextToSpeech = async (req: Request, res: Response) => { const abortTextToSpeech = async (req: Request, res: Response) => { try { - const { chatId, chatMessageId } = req.body + const { chatId, chatMessageId, chatflowId } = req.body if (!chatId) { throw new InternalFlowiseError( @@ -172,12 +172,23 @@ const abortTextToSpeech = async (req: Request, res: Response) => { ) } + if (!chatflowId) { + throw new InternalFlowiseError( + StatusCodes.BAD_REQUEST, + `Error: textToSpeechController.abortTextToSpeech - chatflowId not provided!` + ) + } + const appServer = getRunningExpressApp() // Abort the TTS generation using existing pool const ttsAbortId = `tts_${chatId}_${chatMessageId}` appServer.abortControllerPool.abort(ttsAbortId) + // Also abort the main chat flow AbortController for auto-TTS + const chatFlowAbortId = `${chatflowId}_${chatId}` + appServer.abortControllerPool.abort(chatFlowAbortId) + // Send abort event to client appServer.sseStreamer.streamTTSAbortEvent(chatId, chatMessageId) diff --git a/packages/ui/src/views/chatmessage/ChatMessage.jsx b/packages/ui/src/views/chatmessage/ChatMessage.jsx index 9f667f4cb..a70fa5726 100644 --- a/packages/ui/src/views/chatmessage/ChatMessage.jsx +++ b/packages/ui/src/views/chatmessage/ChatMessage.jsx @@ -492,7 +492,7 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP // Abort TTS for any active streams const activeTTSMessages = Object.keys(isTTSLoading).concat(Object.keys(isTTSPlaying)) for (const messageId of activeTTSMessages) { - await ttsApi.abortTTS({ chatId, chatMessageId: messageId }) + await ttsApi.abortTTS({ chatflowId: chatflowid, chatId, chatMessageId: messageId }) } await chatmessageApi.abortMessage(chatflowid, chatId) @@ -1619,7 +1619,7 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP const handleTTSStop = async (messageId) => { setTTSAction(true) - await ttsApi.abortTTS({ chatId, chatMessageId: messageId }) + await ttsApi.abortTTS({ chatflowId: chatflowid, chatId, chatMessageId: messageId }) cleanupTTSForMessage(messageId) }