Fix issues with TTS - openai voices, streaming audio, rate limiting, speed of speech

This commit is contained in:
Ilango Rajagopal 2025-08-22 12:35:33 +05:30
parent aa357c8373
commit ad44c7b661
8 changed files with 465 additions and 221 deletions

View File

@ -442,6 +442,7 @@ export interface IServerSideEventStreamer {
streamEndEvent(chatId: string): void streamEndEvent(chatId: string): void
streamUsageMetadataEvent(chatId: string, data: any): void streamUsageMetadataEvent(chatId: string, data: any): void
streamAudioEvent(chatId: string, audioData: string): void streamAudioEvent(chatId: string, audioData: string): void
streamTTSStartEvent(chatId: string, format: string): void
streamTTSDataEvent(chatId: string, audioChunk: string): void streamTTSDataEvent(chatId: string, audioChunk: string): void
streamTTSEndEvent(chatId: string): void streamTTSEndEvent(chatId: string): void
} }

View File

@ -15,9 +15,11 @@ export const convertTextToSpeechStream = async (
textToSpeechConfig: ICommonObject, textToSpeechConfig: ICommonObject,
options: ICommonObject, options: ICommonObject,
onChunk: (chunk: Buffer) => void, onChunk: (chunk: Buffer) => void,
onEnd: () => void onEnd: () => void,
onStart?: (format: string) => void
): Promise<void> => { ): Promise<void> => {
return new Promise<void>(async (resolve, reject) => { return new Promise<void>((resolve, reject) => {
const processStream = async () => {
try { try {
if (textToSpeechConfig) { if (textToSpeechConfig) {
const credentialId = textToSpeechConfig.credentialId as string const credentialId = textToSpeechConfig.credentialId as string
@ -25,6 +27,8 @@ export const convertTextToSpeechStream = async (
switch (textToSpeechConfig.name) { switch (textToSpeechConfig.name) {
case TextToSpeechType.OPENAI_TTS: { case TextToSpeechType.OPENAI_TTS: {
if (onStart) onStart('mp3')
const openai = new OpenAI({ const openai = new OpenAI({
apiKey: credentialData.openAIApiKey apiKey: credentialData.openAIApiKey
}) })
@ -43,31 +47,21 @@ export const convertTextToSpeechStream = async (
| 'sage' | 'sage'
| 'shimmer', | 'shimmer',
input: text, input: text,
response_format: 'wav' response_format: 'mp3'
}) })
const stream = Readable.fromWeb(response as unknown as ReadableStream) const stream = response.body as unknown as Readable
if (!stream) { if (!stream) {
throw new Error('Failed to get response stream') throw new Error('Failed to get response stream')
} }
stream.on('data', (chunk) => { await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 20)
onChunk(Buffer.from(chunk))
})
stream.on('end', () => {
onEnd()
resolve()
})
stream.on('error', (error) => {
reject(error)
})
break break
} }
case TextToSpeechType.ELEVEN_LABS_TTS: { case TextToSpeechType.ELEVEN_LABS_TTS: {
if (onStart) onStart('mp3')
const client = new ElevenLabsClient({ const client = new ElevenLabsClient({
apiKey: credentialData.elevenLabsApiKey apiKey: credentialData.elevenLabsApiKey
}) })
@ -82,19 +76,7 @@ export const convertTextToSpeechStream = async (
throw new Error('Failed to get response stream') throw new Error('Failed to get response stream')
} }
stream.on('data', (chunk) => { await processStreamWithRateLimit(stream, onChunk, onEnd, resolve, reject, 640, 40)
onChunk(Buffer.from(chunk))
})
stream.on('end', () => {
onEnd()
resolve()
})
stream.on('error', (error) => {
reject(error)
})
break break
} }
} }
@ -104,74 +86,65 @@ export const convertTextToSpeechStream = async (
} catch (error) { } catch (error) {
reject(error) reject(error)
} }
}
processStream()
}) })
} }
export const convertTextToSpeech = async (text: string, textToSpeechConfig: ICommonObject, options: ICommonObject): Promise<Buffer> => { const processStreamWithRateLimit = async (
if (textToSpeechConfig) { stream: Readable,
const credentialId = textToSpeechConfig.credentialId as string onChunk: (chunk: Buffer) => void,
const credentialData = await getCredentialData(credentialId ?? '', options) onEnd: () => void,
resolve: () => void,
reject: (error: any) => void,
targetChunkSize: number = 640,
rateLimitMs: number = 20
) => {
const TARGET_CHUNK_SIZE = targetChunkSize
const RATE_LIMIT_MS = rateLimitMs
switch (textToSpeechConfig.name) { let buffer: Buffer = Buffer.alloc(0)
case TextToSpeechType.OPENAI_TTS: { let isEnded = false
const openai = new OpenAI({
apiKey: credentialData.openAIApiKey
})
const response = await openai.audio.speech.create({ const processChunks = async () => {
model: 'gpt-4o-mini-tts', while (!isEnded || buffer.length > 0) {
voice: (textToSpeechConfig.voice || 'alloy') as if (buffer.length >= TARGET_CHUNK_SIZE) {
| 'alloy' const chunk = buffer.subarray(0, TARGET_CHUNK_SIZE)
| 'ash' buffer = buffer.subarray(TARGET_CHUNK_SIZE)
| 'ballad' onChunk(chunk)
| 'coral' await sleep(RATE_LIMIT_MS)
| 'echo' } else if (isEnded && buffer.length > 0) {
| 'fable' onChunk(buffer)
| 'nova' buffer = Buffer.alloc(0)
| 'onyx' } else if (!isEnded) {
| 'sage' await sleep(RATE_LIMIT_MS)
| 'shimmer',
input: text,
response_format: 'wav'
})
const audioBuffer = Buffer.from(await response.arrayBuffer())
return audioBuffer
}
case TextToSpeechType.ELEVEN_LABS_TTS: {
const client = new ElevenLabsClient({
apiKey: credentialData.elevenLabsApiKey
})
const audioStream = await client.textToSpeech.stream(textToSpeechConfig.voice || '21m00Tcm4TlvDq8ikWAM', {
text: text,
modelId: 'eleven_multilingual_v2'
})
const chunks: Buffer[] = []
const reader = audioStream.getReader()
try {
let result = await reader.read()
while (!result.done) {
if (result.value) {
chunks.push(Buffer.from(result.value))
}
result = await reader.read()
}
} finally {
reader.releaseLock()
}
const audioBuffer = Buffer.concat(chunks)
return audioBuffer
}
}
} else { } else {
throw new Error('Text to speech is not selected. Please configure TTS in the chatflow.') break
} }
return Buffer.alloc(0) }
onEnd()
resolve()
}
stream.on('data', (chunk) => {
buffer = Buffer.concat([buffer, Buffer.from(chunk)])
})
stream.on('end', () => {
isEnded = true
})
stream.on('error', (error) => {
reject(error)
})
processChunks().catch(reject)
}
const sleep = (ms: number): Promise<void> => {
return new Promise((resolve) => setTimeout(resolve, ms))
} }
export const getVoices = async (provider: string, credentialId: string, options: ICommonObject) => { export const getVoices = async (provider: string, credentialId: string, options: ICommonObject) => {

View File

@ -67,7 +67,6 @@ const generateTextToSpeech = async (req: Request, res: Response) => {
res.write(`data: ${JSON.stringify(clientResponse)}\n\n`) res.write(`data: ${JSON.stringify(clientResponse)}\n\n`)
}, },
async () => { async () => {
// Send end event
const endResponse = { const endResponse = {
event: 'tts_end', event: 'tts_end',
data: {} data: {}
@ -75,6 +74,14 @@ const generateTextToSpeech = async (req: Request, res: Response) => {
res.write('event: tts_end\n') res.write('event: tts_end\n')
res.write(`data: ${JSON.stringify(endResponse)}\n\n`) res.write(`data: ${JSON.stringify(endResponse)}\n\n`)
res.end() res.end()
},
(format: string) => {
const startResponse = {
event: 'tts_start',
data: { format }
}
res.write('event: tts_start\n')
res.write(`data: ${JSON.stringify(startResponse)}\n\n`)
} }
) )
} catch (error) { } catch (error) {

View File

@ -2,7 +2,7 @@ import { StatusCodes } from 'http-status-codes'
import { getRunningExpressApp } from '../../utils/getRunningExpressApp' import { getRunningExpressApp } from '../../utils/getRunningExpressApp'
import { InternalFlowiseError } from '../../errors/internalFlowiseError' import { InternalFlowiseError } from '../../errors/internalFlowiseError'
import { getErrorMessage } from '../../errors/utils' import { getErrorMessage } from '../../errors/utils'
import { convertTextToSpeech, getVoices } from 'flowise-components' import { getVoices } from 'flowise-components'
import { databaseEntities } from '../../utils' import { databaseEntities } from '../../utils'
export enum TextToSpeechProvider { export enum TextToSpeechProvider {
@ -23,51 +23,8 @@ export interface TTSResponse {
contentType: string contentType: string
} }
const generateTextToSpeech = async (request: TTSRequest): Promise<TTSResponse> => {
try {
const appServer = getRunningExpressApp()
const options = {
orgId: '',
chatflowid: '',
chatId: '',
appDataSource: appServer.AppDataSource,
databaseEntities: databaseEntities
}
const textToSpeechConfig = {
name: request.provider,
credentialId: request.credentialId,
voice: request.voice,
model: request.model
}
const audioBuffer = await convertTextToSpeech(request.text, textToSpeechConfig, options)
return {
audioBuffer,
contentType: 'audio/mpeg'
}
} catch (error) {
throw new InternalFlowiseError(
StatusCodes.INTERNAL_SERVER_ERROR,
`Error: textToSpeechService.generateTextToSpeech - ${getErrorMessage(error)}`
)
}
}
const getVoicesForProvider = async (provider: string, credentialId?: string): Promise<any[]> => { const getVoicesForProvider = async (provider: string, credentialId?: string): Promise<any[]> => {
try { try {
if (provider === TextToSpeechProvider.OPENAI) {
return [
{ id: 'alloy', name: 'Alloy' },
{ id: 'echo', name: 'Echo' },
{ id: 'fable', name: 'Fable' },
{ id: 'onyx', name: 'Onyx' },
{ id: 'nova', name: 'Nova' },
{ id: 'shimmer', name: 'Shimmer' }
]
}
if (!credentialId) { if (!credentialId) {
throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Credential ID required for this provider') throw new InternalFlowiseError(StatusCodes.BAD_REQUEST, 'Credential ID required for this provider')
} }
@ -91,6 +48,5 @@ const getVoicesForProvider = async (provider: string, credentialId?: string): Pr
} }
export default { export default {
generateTextToSpeech,
getVoices: getVoicesForProvider getVoices: getVoicesForProvider
} }

View File

@ -269,10 +269,19 @@ export class SSEStreamer implements IServerSideEventStreamer {
} }
} }
streamTTSStartEvent(chatId: string, format: string): void {
const client = this.clients[chatId]
if (client) {
const clientResponse = {
event: 'tts_start',
data: { format }
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
}
}
streamTTSDataEvent(chatId: string, audioChunk: string): void { streamTTSDataEvent(chatId: string, audioChunk: string): void {
const client = this.clients[chatId] const client = this.clients[chatId]
console.log('clients', this.clients)
console.log('client', client)
if (client) { if (client) {
const clientResponse = { const clientResponse = {
event: 'tts_data', event: 'tts_data',
@ -284,8 +293,6 @@ export class SSEStreamer implements IServerSideEventStreamer {
streamTTSEndEvent(chatId: string): void { streamTTSEndEvent(chatId: string): void {
const client = this.clients[chatId] const client = this.clients[chatId]
console.log('clients', this.clients)
console.log('client', client)
if (client) { if (client) {
const clientResponse = { const clientResponse = {
event: 'tts_end', event: 'tts_end',

View File

@ -6,7 +6,6 @@ import { omit } from 'lodash'
import { import {
IFileUpload, IFileUpload,
convertSpeechToText, convertSpeechToText,
convertTextToSpeech,
convertTextToSpeechStream, convertTextToSpeechStream,
ICommonObject, ICommonObject,
addSingleFileToStorage, addSingleFileToStorage,
@ -123,11 +122,13 @@ const generateTTSForResponseStream = async (
options, options,
(chunk: Buffer) => { (chunk: Buffer) => {
const audioBase64 = chunk.toString('base64') const audioBase64 = chunk.toString('base64')
logger.info(`Received TTS chunk: ${audioBase64}`)
sseStreamer.streamTTSDataEvent(chatId, audioBase64) sseStreamer.streamTTSDataEvent(chatId, audioBase64)
}, },
() => { () => {
sseStreamer.streamTTSEndEvent(chatId) sseStreamer.streamTTSEndEvent(chatId)
},
(format: string) => {
sseStreamer.streamTTSStartEvent(chatId, format)
} }
) )
} catch (error) { } catch (error) {

View File

@ -58,30 +58,10 @@ const textToSpeechProviders = {
{ {
label: 'Voice', label: 'Voice',
name: 'voice', name: 'voice',
type: 'options', type: 'voice_select',
description: 'The voice to use when generating the audio', description: 'The voice to use when generating the audio',
options: [
{ label: 'Alloy', name: 'alloy' },
{ label: 'Echo', name: 'echo' },
{ label: 'Fable', name: 'fable' },
{ label: 'Onyx', name: 'onyx' },
{ label: 'Nova', name: 'nova' },
{ label: 'Shimmer', name: 'shimmer' }
],
default: 'alloy', default: 'alloy',
optional: true optional: true
},
{
label: 'Model',
name: 'model',
type: 'options',
description: 'The TTS model to use',
options: [
{ label: 'TTS-1', name: 'tts-1' },
{ label: 'TTS-1 HD', name: 'tts-1-hd' }
],
default: 'tts-1',
optional: true
} }
] ]
}, },

View File

@ -257,6 +257,16 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP
const [ttsAudio, setTtsAudio] = useState({}) const [ttsAudio, setTtsAudio] = useState({})
const [isTTSEnabled, setIsTTSEnabled] = useState(false) const [isTTSEnabled, setIsTTSEnabled] = useState(false)
// TTS streaming state
const [ttsStreamingState, setTtsStreamingState] = useState({
mediaSource: null,
sourceBuffer: null,
audio: null,
chunkQueue: [],
isBuffering: false,
audioFormat: null
})
const isFileAllowedForUpload = (file) => { const isFileAllowedForUpload = (file) => {
const constraints = getAllowChatFlowUploads.data const constraints = getAllowChatFlowUploads.data
/** /**
@ -1042,6 +1052,15 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP
case 'audio': case 'audio':
handleAutoPlayAudio(payload.data) handleAutoPlayAudio(payload.data)
break break
case 'tts_start':
handleTTSStart(payload.data.format)
break
case 'tts_data':
handleTTSDataChunk(payload.data)
break
case 'tts_end':
handleTTSEnd()
break
case 'end': case 'end':
setLocalStorageChatflow(chatflowid, chatId) setLocalStorageChatflow(chatflowid, chatId)
closeResponse() closeResponse()
@ -1588,6 +1607,19 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP
return return
} }
// Use existing streaming infrastructure for manual TTS
handleTTSStart('mp3', (audio) => {
setTtsAudio((prev) => ({ ...prev, [messageId]: audio }))
audio.addEventListener('ended', () => {
setTtsAudio((prev) => {
const newState = { ...prev }
delete newState[messageId]
return newState
})
})
})
const response = await fetch('/api/v1/text-to-speech/generate', { const response = await fetch('/api/v1/text-to-speech/generate', {
method: 'POST', method: 'POST',
headers: { headers: {
@ -1608,23 +1640,48 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP
throw new Error(`TTS request failed: ${response.status}`) throw new Error(`TTS request failed: ${response.status}`)
} }
const audioBuffer = await response.arrayBuffer() const reader = response.body.getReader()
const audioBlob = new Blob([audioBuffer], { type: 'audio/mpeg' }) const decoder = new TextDecoder()
const audioUrl = URL.createObjectURL(audioBlob) let buffer = ''
const audio = new Audio(audioUrl)
setTtsAudio((prev) => ({ ...prev, [messageId]: audio })) let done = false
while (!done) {
const result = await reader.read()
done = result.done
if (done) {
break
}
const value = result.value
audio.addEventListener('ended', () => { // Decode the chunk as text and add to buffer
setTtsAudio((prev) => { const chunk = decoder.decode(value, { stream: true })
const newState = { ...prev } buffer += chunk
delete newState[messageId]
return newState
})
URL.revokeObjectURL(audioUrl)
})
await audio.play() // Process complete SSE events
const lines = buffer.split('\n\n')
buffer = lines.pop() || '' // Keep incomplete event in buffer
for (const eventBlock of lines) {
if (eventBlock.trim()) {
const event = parseSSEEvent(eventBlock)
if (event) {
// Handle the event just like the SSE handler does
switch (event.event) {
case 'tts_start':
break
case 'tts_data':
handleTTSDataChunk(event.data)
break
case 'tts_end':
handleTTSEnd()
break
default:
break
}
}
}
}
}
} catch (error) { } catch (error) {
console.error('Error with TTS:', error) console.error('Error with TTS:', error)
enqueueSnackbar({ enqueueSnackbar({
@ -1671,6 +1728,268 @@ const ChatMessage = ({ open, chatflowid, isAgentCanvas, isDialog, previews, setP
} }
} }
const parseSSEEvent = (eventBlock) => {
const lines = eventBlock.split('\n')
const event = {}
for (const line of lines) {
if (line.startsWith('event:')) {
event.event = line.substring(6).trim()
} else if (line.startsWith('data:')) {
const dataStr = line.substring(5).trim()
try {
const parsed = JSON.parse(dataStr)
if (parsed.data) {
event.data = parsed.data
}
} catch (e) {
console.error('Error parsing SSE data:', e, 'Raw data:', dataStr)
}
}
}
return event.event ? event : null
}
const initializeTTSStreaming = (format, onAudioReady = null) => {
try {
const mediaSource = new MediaSource()
const audio = new Audio()
audio.src = URL.createObjectURL(mediaSource)
mediaSource.addEventListener('sourceopen', () => {
try {
// Use the provided format, default to MP3 if not set
const mimeType = format === 'mp3' ? 'audio/mpeg' : 'audio/mpeg'
const sourceBuffer = mediaSource.addSourceBuffer(mimeType)
setTtsStreamingState((prevState) => ({
...prevState,
mediaSource,
sourceBuffer,
audio
}))
// Start playback
audio.play().catch((playError) => {
console.error('Error starting audio playback:', playError)
})
// Notify callback if provided
if (onAudioReady) {
onAudioReady(audio)
}
} catch (error) {
console.error('Error setting up source buffer:', error)
console.error('MediaSource readyState:', mediaSource.readyState)
console.error('Requested MIME type:', mimeType)
}
})
audio.addEventListener('ended', () => {
cleanupTTSStreaming()
})
} catch (error) {
console.error('Error initializing TTS streaming:', error)
}
}
const cleanupTTSStreaming = () => {
setTtsStreamingState((prevState) => {
if (prevState.audio) {
prevState.audio.pause()
prevState.audio.removeAttribute('src')
if (prevState.audio.src) {
URL.revokeObjectURL(prevState.audio.src)
}
}
if (prevState.mediaSource) {
if (prevState.mediaSource.readyState === 'open') {
try {
prevState.mediaSource.endOfStream()
} catch (e) {
// Ignore errors during cleanup
}
}
prevState.mediaSource.removeEventListener('sourceopen', () => {})
}
return {
mediaSource: null,
sourceBuffer: null,
audio: null,
chunkQueue: [],
isBuffering: false,
audioFormat: null
}
})
}
const processChunkQueue = () => {
setTtsStreamingState((prevState) => {
if (!prevState.sourceBuffer || prevState.sourceBuffer.updating || prevState.chunkQueue.length === 0) {
return prevState
}
const chunk = prevState.chunkQueue.shift()
try {
prevState.sourceBuffer.appendBuffer(chunk)
return {
...prevState,
chunkQueue: [...prevState.chunkQueue],
isBuffering: true
}
} catch (error) {
console.error('Error appending chunk to buffer:', error)
return prevState
}
})
}
const handleTTSStart = (format, onAudioReady = null) => {
// Store the audio format for this TTS session and initialize
setTtsStreamingState((prevState) => {
// Cleanup any existing streaming first
if (prevState.audio) {
prevState.audio.pause()
if (prevState.audio.src) {
URL.revokeObjectURL(prevState.audio.src)
}
}
if (prevState.mediaSource && prevState.mediaSource.readyState === 'open') {
try {
prevState.mediaSource.endOfStream()
} catch (e) {
// Ignore errors during cleanup
}
}
return {
mediaSource: null,
sourceBuffer: null,
audio: null,
chunkQueue: [],
isBuffering: false,
audioFormat: format
}
})
// Initialize TTS streaming with the correct format
setTimeout(() => initializeTTSStreaming(format, onAudioReady), 0)
}
const handleTTSDataChunk = (base64Data) => {
try {
const audioBuffer = Uint8Array.from(atob(base64Data), (c) => c.charCodeAt(0))
setTtsStreamingState((prevState) => {
// Add chunk to queue
const newState = {
...prevState,
chunkQueue: [...prevState.chunkQueue, audioBuffer]
}
// Process queue if sourceBuffer is ready
if (prevState.sourceBuffer && !prevState.sourceBuffer.updating) {
setTimeout(() => processChunkQueue(), 0)
}
return newState
})
} catch (error) {
console.error('Error handling TTS data chunk:', error)
}
}
const handleTTSEnd = () => {
setTtsStreamingState((prevState) => {
if (prevState.mediaSource && prevState.mediaSource.readyState === 'open') {
try {
// Process any remaining chunks first
if (prevState.sourceBuffer && prevState.chunkQueue.length > 0 && !prevState.sourceBuffer.updating) {
const remainingChunks = [...prevState.chunkQueue]
remainingChunks.forEach((chunk, index) => {
setTimeout(() => {
if (prevState.sourceBuffer && !prevState.sourceBuffer.updating) {
try {
prevState.sourceBuffer.appendBuffer(chunk)
if (index === remainingChunks.length - 1) {
// End stream after last chunk
setTimeout(() => {
if (prevState.mediaSource && prevState.mediaSource.readyState === 'open') {
prevState.mediaSource.endOfStream()
}
}, 100)
}
} catch (error) {
console.error('Error appending remaining chunk:', error)
}
}
}, index * 50)
})
return {
...prevState,
chunkQueue: []
}
}
// Wait for any pending buffer operations to complete
if (prevState.sourceBuffer && !prevState.sourceBuffer.updating) {
prevState.mediaSource.endOfStream()
} else if (prevState.sourceBuffer) {
// Wait for buffer to finish updating
prevState.sourceBuffer.addEventListener(
'updateend',
() => {
if (prevState.mediaSource && prevState.mediaSource.readyState === 'open') {
prevState.mediaSource.endOfStream()
}
},
{ once: true }
)
}
} catch (error) {
console.error('Error ending TTS stream:', error)
}
}
return prevState
})
}
// Set up sourceBuffer event listeners when it changes
useEffect(() => {
if (ttsStreamingState.sourceBuffer) {
const sourceBuffer = ttsStreamingState.sourceBuffer
const handleUpdateEnd = () => {
setTtsStreamingState((prevState) => ({
...prevState,
isBuffering: false
}))
// Process next chunk in queue
setTimeout(() => processChunkQueue(), 0)
}
sourceBuffer.addEventListener('updateend', handleUpdateEnd)
return () => {
sourceBuffer.removeEventListener('updateend', handleUpdateEnd)
}
}
}, [ttsStreamingState.sourceBuffer])
// Cleanup TTS streaming on component unmount
useEffect(() => {
return () => {
cleanupTTSStreaming()
}
}, [])
const getInputDisabled = () => { const getInputDisabled = () => {
return ( return (
loading || loading ||