Use existing sse streamer to stream tts audio before sse client is removed

This commit is contained in:
Ilango Rajagopal 2025-08-20 13:12:59 +05:30
parent 2247646182
commit 2b5554aafe
3 changed files with 62 additions and 89 deletions

View File

@ -268,4 +268,30 @@ export class SSEStreamer implements IServerSideEventStreamer {
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} }
} }
streamTTSDataEvent(chatId: string, audioChunk: string): void {
const client = this.clients[chatId]
console.log('clients', this.clients)
console.log('client', client)
if (client) {
const clientResponse = {
event: 'tts_data',
data: audioChunk
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamTTSEndEvent(chatId: string): void {
const client = this.clients[chatId]
console.log('clients', this.clients)
console.log('client', client)
if (client) {
const clientResponse = {
event: 'tts_end',
data: {}
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
} }

View File

@ -11,8 +11,7 @@ import {
IMessage, IMessage,
IServerSideEventStreamer, IServerSideEventStreamer,
convertChatHistoryToText, convertChatHistoryToText,
generateFollowUpPrompts, generateFollowUpPrompts
convertTextToSpeech
} from 'flowise-components' } from 'flowise-components'
import { import {
IncomingAgentflowInput, IncomingAgentflowInput,
@ -58,6 +57,7 @@ import { ChatMessage } from '../database/entities/ChatMessage'
import { Telemetry } from './telemetry' import { Telemetry } from './telemetry'
import { getWorkspaceSearchOptions } from '../enterprise/utils/ControllerServiceUtils' import { getWorkspaceSearchOptions } from '../enterprise/utils/ControllerServiceUtils'
import { UsageCacheManager } from '../UsageCacheManager' import { UsageCacheManager } from '../UsageCacheManager'
import { generateTTSForResponseStream, shouldAutoPlayTTS } from './buildChatflow'
interface IWaitingNode { interface IWaitingNode {
nodeId: string nodeId: string
@ -136,59 +136,6 @@ interface IExecuteNodeParams {
subscriptionId: string subscriptionId: string
} }
// Helper function to check if auto-play TTS is enabled
const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boolean => {
if (!textToSpeechConfig) return false
try {
const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig
// Check each provider to see if any has autoPlay enabled and status true
for (const providerKey in config) {
const provider = config[providerKey]
if (provider && provider.status === true && provider.autoPlay === true) {
return true
}
}
return false
} catch (error) {
return false
}
}
// Helper function to generate TTS for response
const generateTTSForResponse = async (
responseText: string,
textToSpeechConfig: string | undefined,
options: ICommonObject
): Promise<Buffer | null> => {
try {
if (!textToSpeechConfig) return null
const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig
// Find the active provider configuration
let activeProviderConfig = null
for (const providerKey in config) {
const provider = config[providerKey]
if (provider && provider.status === true) {
activeProviderConfig = {
name: providerKey,
credentialId: provider.credentialId,
voice: provider.voice,
model: provider.model
}
break
}
}
if (!activeProviderConfig) return null
const audioBuffer = await convertTextToSpeech(responseText, activeProviderConfig, options)
return audioBuffer
} catch (error) {
logger.error(`[server]: TTS generation failed: ${getErrorMessage(error)}`)
return null
}
}
interface IExecuteAgentFlowParams extends Omit<IExecuteFlowParams, 'incomingInput'> { interface IExecuteAgentFlowParams extends Omit<IExecuteFlowParams, 'incomingInput'> {
incomingInput: IncomingAgentflowInput incomingInput: IncomingAgentflowInput
} }
@ -2092,7 +2039,6 @@ export const executeAgentFlow = async ({
if (sessionId) result.sessionId = sessionId if (sessionId) result.sessionId = sessionId
/*** Auto-play TTS Logic ***/
if (shouldAutoPlayTTS(chatflow.textToSpeech) && result.text) { if (shouldAutoPlayTTS(chatflow.textToSpeech) && result.text) {
const options = { const options = {
orgId, orgId,
@ -2102,14 +2048,8 @@ export const executeAgentFlow = async ({
databaseEntities databaseEntities
} }
const audioBuffer = await generateTTSForResponse(result.text, chatflow.textToSpeech, options) if (sseStreamer) {
if (audioBuffer) { await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, sseStreamer)
const audioBase64 = audioBuffer.toString('base64')
// Agent flows are always streamed, so send audio via SSE
if (sseStreamer) {
sseStreamer.streamAudioEvent(chatId, audioBase64)
}
} }
} }

View File

@ -7,6 +7,7 @@ import {
IFileUpload, IFileUpload,
convertSpeechToText, convertSpeechToText,
convertTextToSpeech, convertTextToSpeech,
convertTextToSpeechStream,
ICommonObject, ICommonObject,
addSingleFileToStorage, addSingleFileToStorage,
generateFollowUpPrompts, generateFollowUpPrompts,
@ -17,7 +18,8 @@ import {
getFileFromUpload, getFileFromUpload,
removeSpecificFileFromUpload, removeSpecificFileFromUpload,
EvaluationRunner, EvaluationRunner,
handleEscapeCharacters handleEscapeCharacters,
IServerSideEventStreamer
} from 'flowise-components' } from 'flowise-components'
import { StatusCodes } from 'http-status-codes' import { StatusCodes } from 'http-status-codes'
import { import {
@ -71,12 +73,10 @@ import { executeAgentFlow } from './buildAgentflow'
import { Workspace } from '../enterprise/database/entities/workspace.entity' import { Workspace } from '../enterprise/database/entities/workspace.entity'
import { Organization } from '../enterprise/database/entities/organization.entity' import { Organization } from '../enterprise/database/entities/organization.entity'
// Helper function to check if auto-play TTS is enabled
const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boolean => { const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boolean => {
if (!textToSpeechConfig) return false if (!textToSpeechConfig) return false
try { try {
const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig
// Check each provider to see if any has autoPlay enabled and status true
for (const providerKey in config) { for (const providerKey in config) {
const provider = config[providerKey] const provider = config[providerKey]
if (provider && provider.status === true && provider.autoPlay === true) { if (provider && provider.status === true && provider.autoPlay === true) {
@ -85,21 +85,22 @@ const shouldAutoPlayTTS = (textToSpeechConfig: string | undefined | null): boole
} }
return false return false
} catch (error) { } catch (error) {
logger.error(`Error parsing textToSpeechConfig: ${getErrorMessage(error)}`)
return false return false
} }
} }
// Helper function to generate TTS for response const generateTTSForResponseStream = async (
const generateTTSForResponse = async (
responseText: string, responseText: string,
textToSpeechConfig: string | undefined, textToSpeechConfig: string | undefined,
options: ICommonObject options: ICommonObject,
): Promise<Buffer | null> => { chatId: string,
sseStreamer: IServerSideEventStreamer
): Promise<void> => {
try { try {
if (!textToSpeechConfig) return null if (!textToSpeechConfig) return
const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig const config = typeof textToSpeechConfig === 'string' ? JSON.parse(textToSpeechConfig) : textToSpeechConfig
// Find the active provider configuration
let activeProviderConfig = null let activeProviderConfig = null
for (const providerKey in config) { for (const providerKey in config) {
const provider = config[providerKey] const provider = config[providerKey]
@ -114,13 +115,24 @@ const generateTTSForResponse = async (
} }
} }
if (!activeProviderConfig) return null if (!activeProviderConfig) return
const audioBuffer = await convertTextToSpeech(responseText, activeProviderConfig, options) await convertTextToSpeechStream(
return audioBuffer responseText,
activeProviderConfig,
options,
(chunk: Buffer) => {
const audioBase64 = chunk.toString('base64')
logger.info(`Received TTS chunk: ${audioBase64}`)
sseStreamer.streamTTSDataEvent(chatId, audioBase64)
},
() => {
sseStreamer.streamTTSEndEvent(chatId)
}
)
} catch (error) { } catch (error) {
logger.error(`[server]: TTS generation failed: ${getErrorMessage(error)}`) logger.error(`[server]: TTS streaming failed: ${getErrorMessage(error)}`)
return null sseStreamer.streamTTSEndEvent(chatId)
} }
} }
@ -880,8 +892,6 @@ export const executeFlow = async ({
if (Object.keys(setVariableNodesOutput).length) result.flowVariables = setVariableNodesOutput if (Object.keys(setVariableNodesOutput).length) result.flowVariables = setVariableNodesOutput
if (shouldAutoPlayTTS(chatflow.textToSpeech) && result.text) { if (shouldAutoPlayTTS(chatflow.textToSpeech) && result.text) {
logger.info('[server]: Generating TTS for response')
logger.info(`[server/executeFlow]: TTS config: ${JSON.stringify(chatflow.textToSpeech)}`)
const options = { const options = {
orgId, orgId,
chatflowid, chatflowid,
@ -890,15 +900,10 @@ export const executeFlow = async ({
databaseEntities databaseEntities
} }
const audioBuffer = await generateTTSForResponse(result.text, chatflow.textToSpeech, options) if (streaming && sseStreamer) {
if (audioBuffer) { await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, sseStreamer)
const audioBase64 = audioBuffer.toString('base64') } else if (sseStreamer) {
await generateTTSForResponseStream(result.text, chatflow.textToSpeech, options, chatId, sseStreamer)
if (streaming && sseStreamer) {
sseStreamer.streamAudioEvent(chatId, audioBase64)
} else {
result.audioData = audioBase64
}
} }
} }
@ -1129,3 +1134,5 @@ const incrementFailedMetricCounter = (metricsProvider: IMetricsProvider, isInter
) )
} }
} }
export { shouldAutoPlayTTS, generateTTSForResponseStream }