diff --git a/packages/server/src/utils/SSEStreamer.ts b/packages/server/src/utils/SSEStreamer.ts index e762fc417..77d976c09 100644 --- a/packages/server/src/utils/SSEStreamer.ts +++ b/packages/server/src/utils/SSEStreamer.ts @@ -268,4 +268,30 @@ export class SSEStreamer implements IServerSideEventStreamer { client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } } + + streamTTSDataEvent(chatId: string, audioChunk: string): void { + const client = this.clients[chatId] + console.log('clients', this.clients) + console.log('client', client) + if (client) { + const clientResponse = { + event: 'tts_data', + data: audioChunk + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + } + } + + streamTTSEndEvent(chatId: string): void { + const client = this.clients[chatId] + console.log('clients', this.clients) + console.log('client', client) + if (client) { + const clientResponse = { + event: 'tts_end', + data: {} + } + client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') + } + } } diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index f432691d4..7053020c6 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -11,8 +11,7 @@ import { IMessage, IServerSideEventStreamer, convertChatHistoryToText, - generateFollowUpPrompts, - convertTextToSpeech + generateFollowUpPrompts } from 'flowise-components' import { IncomingAgentflowInput, @@ -58,6 +57,7 @@ import { ChatMessage } from '../database/entities/ChatMessage' import { Telemetry } from './telemetry' import { getWorkspaceSearchOptions } from '../enterprise/utils/ControllerServiceUtils' import { UsageCacheManager } from '../UsageCacheManager' +import { generateTTSForResponseStream, shouldAutoPlayTTS } from './buildChatflow' interface IWaitingNode { nodeId: string @@ -136,59 +136,6 @@ interface IExecuteNodeParams { subscriptionId: string } -// Helper function to check if auto-play TTS is enabled -const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boolean => { - if (!textToSpeechConfig) return false - try { - const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig - // Check each provider to see if any has autoPlay enabled and status true - for (const providerKey in config) { - const provider = config[providerKey] - if (provider && provider.status === true && provider.autoPlay === true) { - return true - } - } - return false - } catch (error) { - return false - } -} - -// Helper function to generate TTS for response -const generateTTSForResponse = async ( - responseText: string, - textToSpeechConfig: string | undefined, - options: ICommonObject -): Promise => { - try { - if (!textToSpeechConfig) return null - const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig - - // Find the active provider configuration - let activeProviderConfig = null - for (const providerKey in config) { - const provider = config[providerKey] - if (provider && provider.status === true) { - activeProviderConfig = { - name: providerKey, - credentialId: provider.credentialId, - voice: provider.voice, - model: provider.model - } - break - } - } - - if (!activeProviderConfig) return null - - const audioBuffer = await convertTextToSpeech(responseText, activeProviderConfig, options) - return audioBuffer - } catch (error) { - logger.error(`[server]: TTS generation failed: ${getErrorMessage(error)}`) - return null - } -} - interface IExecuteAgentFlowParams extends Omit { incomingInput: IncomingAgentflowInput } @@ -2092,7 +2039,6 @@ export const executeAgentFlow = async ({ if (sessionId) result.sessionId = sessionId - /*** Auto-play TTS Logic ***/ if (shouldAutoPlayTTS(chatflow.textToSpeech) && result.text) { const options = { orgId, @@ -2102,14 +2048,8 @@ export const executeAgentFlow = async ({ databaseEntities } - const audioBuffer = await generateTTSForResponse(result.text, chatflow.textToSpeech, options) - if (audioBuffer) { - const audioBase64 = audioBuffer.toString('base64') - - // Agent flows are always streamed, so send audio via SSE - if (sseStreamer) { - sseStreamer.streamAudioEvent(chatId, audioBase64) - } + if (sseStreamer) { + await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, sseStreamer) } } diff --git a/packages/server/src/utils/buildChatflow.ts b/packages/server/src/utils/buildChatflow.ts index e705b3e98..27d0092f7 100644 --- a/packages/server/src/utils/buildChatflow.ts +++ b/packages/server/src/utils/buildChatflow.ts @@ -7,6 +7,7 @@ import { IFileUpload, convertSpeechToText, convertTextToSpeech, + convertTextToSpeechStream, ICommonObject, addSingleFileToStorage, generateFollowUpPrompts, @@ -17,7 +18,8 @@ import { getFileFromUpload, removeSpecificFileFromUpload, EvaluationRunner, - handleEscapeCharacters + handleEscapeCharacters, + IServerSideEventStreamer } from 'flowise-components' import { StatusCodes } from 'http-status-codes' import { @@ -71,12 +73,10 @@ import { executeAgentFlow } from './buildAgentflow' import { Workspace } from '../enterprise/database/entities/workspace.entity' import { Organization } from '../enterprise/database/entities/organization.entity' -// Helper function to check if auto-play TTS is enabled const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boolean => { if (!textToSpeechConfig) return false try { const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig - // Check each provider to see if any has autoPlay enabled and status true for (const providerKey in config) { const provider = config[providerKey] if (provider && provider.status === true && provider.autoPlay === true) { @@ -85,21 +85,22 @@ const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boole } return false } catch (error) { + logger.error(`Error parsing textToSpeechConfig: ${getErrorMessage(error)}`) return false } } -// Helper function to generate TTS for response -const generateTTSForResponse = async ( +const generateTTSForResponseStream = async ( responseText: string, textToSpeechConfig: string | undefined, - options: ICommonObject -): Promise => { + options: ICommonObject, + chatId: string, + sseStreamer: IServerSideEventStreamer +): Promise => { try { - if (!textToSpeechConfig) return null + if (!textToSpeechConfig) return const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig - // Find the active provider configuration let activeProviderConfig = null for (const providerKey in config) { const provider = config[providerKey] @@ -114,13 +115,24 @@ const generateTTSForResponse = async ( } } - if (!activeProviderConfig) return null + if (!activeProviderConfig) return - const audioBuffer = await convertTextToSpeech(responseText, activeProviderConfig, options) - return audioBuffer + await convertTextToSpeechStream( + responseText, + activeProviderConfig, + options, + (chunk: Buffer) => { + const audioBase64 = chunk.toString('base64') + logger.info(`Received TTS chunk: ${audioBase64}`) + sseStreamer.streamTTSDataEvent(chatId, audioBase64) + }, + () => { + sseStreamer.streamTTSEndEvent(chatId) + } + ) } catch (error) { - logger.error(`[server]: TTS generation failed: ${getErrorMessage(error)}`) - return null + logger.error(`[server]: TTS streaming failed: ${getErrorMessage(error)}`) + sseStreamer.streamTTSEndEvent(chatId) } } @@ -880,8 +892,6 @@ export const executeFlow = async ({ if (Object.keys(setVariableNodesOutput).length) result.flowVariables = setVariableNodesOutput if (shouldAutoPlayTTS(chatflow.textToSpeech) && result.text) { - logger.info('[server]: Generating TTS for response') - logger.info(`[server/executeFlow]: TTS config: ${JSON.stringify(chatflow.textToSpeech)}`) const options = { orgId, chatflowid, @@ -890,15 +900,10 @@ export const executeFlow = async ({ databaseEntities } - const audioBuffer = await generateTTSForResponse(result.text, chatflow.textToSpeech, options) - if (audioBuffer) { - const audioBase64 = audioBuffer.toString('base64') - - if (streaming && sseStreamer) { - sseStreamer.streamAudioEvent(chatId, audioBase64) - } else { - result.audioData = audioBase64 - } + if (streaming && sseStreamer) { + await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, sseStreamer) + } else if (sseStreamer) { + await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, sseStreamer) } } @@ -1129,3 +1134,5 @@ const incrementFailedMetricCounter = (metricsProvider: IMetricsProvider, isInter ) } } + +export { shouldAutoPlayTTS, generateTTSForResponseStream }