Update controllers - fix issue with sse client getting removed before tts events are sent

This commit is contained in:
Ilango Rajagopal 2025-08-20 13:00:10 +05:30
parent b30e4a9da8
commit 2247646182
4 changed files with 96 additions and 17 deletions

View File

@ -149,7 +149,6 @@ export const convertTextToSpeech = async (text: string, textToSpeechConfig: ICom
modelId: 'eleven_multilingual_v2'
})
// Convert the audio stream to buffer
const chunks: Buffer[] = []
const reader = audioStream.getReader()

View File

@ -3,6 +3,7 @@ import { utilBuildChatflow } from '../../utils/buildChatflow'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { getErrorMessage } from '../../errors/utils'
import { MODE } from '../../Interface'
import { generateTTSForResponseStream, shouldAutoPlayTTS } from '../../utils/buildChatflow'
// Send input message and get prediction result (Internal)
const createInternalPrediction = async (req: Request, res: Response, next: NextFunction) => {
@ -38,6 +39,19 @@ const createAndStreamInternalPrediction = async (req: Request, res: Response, ne
const apiResponse = await utilBuildChatflow(req, true)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
const chatflow = req.body.chatflow || req.body
if (shouldAutoPlayTTS(chatflow.textToSpeech) && apiResponse.text) {
const options = {
orgId: req.body.orgId || '',
chatflowid: req.body.chatflowid || '',
chatId: apiResponse.chatId,
appDataSource: getRunningExpressApp().AppDataSource,
databaseEntities: getRunningExpressApp().AppDataSource?.entityMetadatas || []
}
await generateTTSForResponseStream(apiResponse.text, chatflow.textToSpeech, options, apiResponse.chatId, sseStreamer)
}
} catch (error) {
if (chatId) {
sseStreamer.streamErrorEvent(chatId, getErrorMessage(error))

View File

@ -9,6 +9,7 @@ import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { v4 as uuidv4 } from 'uuid'
import { getErrorMessage } from '../../errors/utils'
import { MODE } from '../../Interface'
import { generateTTSForResponseStream, shouldAutoPlayTTS } from '../../utils/buildChatflow'
// Send input message and get prediction result (External)
const createPrediction = async (req: Request, res: Response, next: NextFunction) => {
@ -76,6 +77,25 @@ const createPrediction = async (req: Request, res: Response, next: NextFunction)
const apiResponse = await predictionsServices.buildChatflow(req)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
const chatflow = await chatflowsService.getChatflowById(req.params.id)
if (chatflow && shouldAutoPlayTTS(chatflow.textToSpeech) && apiResponse.text) {
const options = {
orgId: req.body.orgId || '',
chatflowid: req.params.id,
chatId: apiResponse.chatId,
appDataSource: getRunningExpressApp().AppDataSource,
databaseEntities: getRunningExpressApp().AppDataSource?.entityMetadatas || []
}
await generateTTSForResponseStream(
apiResponse.text,
chatflow.textToSpeech,
options,
apiResponse.chatId,
sseStreamer
)
}
} catch (error) {
if (chatId) {
sseStreamer.streamErrorEvent(chatId, getErrorMessage(error))

View File

@ -2,9 +2,11 @@ import { Request, Response, NextFunction } from 'express'
import textToSpeechService from '../../services/text-to-speech'
import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { StatusCodes } from 'http-status-codes'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { convertTextToSpeechStream } from 'flowise-components'
import { databaseEntities } from '../../utils'
// Generate text-to-speech audio
const generateTextToSpeech = async (req: Request, res: Response, next: NextFunction) => {
const generateTextToSpeech = async (req: Request, res: Response) => {
try {
const { text, provider, credentialId, voice, model } = req.body
@ -29,25 +31,69 @@ const generateTextToSpeech = async (req: Request, res: Response, next: NextFunct
)
}
const response = await textToSpeechService.generateTextToSpeech({
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
res.setHeader('Access-Control-Allow-Origin', '*')
res.setHeader('Access-Control-Allow-Headers', 'Cache-Control')
const appServer = getRunningExpressApp()
const options = {
orgId: '',
chatflowid: '',
chatId: '',
appDataSource: appServer.AppDataSource,
databaseEntities: databaseEntities
}
const textToSpeechConfig = {
name: provider,
credentialId: credentialId,
voice: voice,
model: model
}
await convertTextToSpeechStream(
text,
provider,
credentialId,
voice,
model
})
res.setHeader('Content-Type', response.contentType)
res.setHeader('Content-Length', response.audioBuffer.length)
res.setHeader('Cache-Control', 'public, max-age=3600')
return res.send(response.audioBuffer)
textToSpeechConfig,
options,
(chunk: Buffer) => {
const audioBase64 = chunk.toString('base64')
const clientResponse = {
event: 'tts_data',
data: audioBase64
}
res.write('event: tts_data\n')
res.write(`data: ${JSON.stringify(clientResponse)}\n\n`)
},
async () => {
// Send end event
const endResponse = {
event: 'tts_end',
data: {}
}
res.write('event: tts_end\n')
res.write(`data: ${JSON.stringify(endResponse)}\n\n`)
res.end()
}
)
} catch (error) {
next(error)
if (!res.headersSent) {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
}
const errorResponse = {
event: 'tts_error',
data: { error: error instanceof Error ? error.message : 'TTS generation failed' }
}
res.write('event: tts_error\n')
res.write(`data: ${JSON.stringify(errorResponse)}\n\n`)
res.end()
}
}
// Get available voices for a provider
const getVoices = async (req: Request, res: Response, next: NextFunction) => {
try {
const { provider, credentialId } = req.query