Compare commits

...

1 Commits

Author SHA1 Message Date
Ilango f61a1ad248 Add tons of logs for redis pub-sub and sse streamer 2025-07-29 14:09:28 +05:30
3 changed files with 512 additions and 149 deletions

View File

@ -1,5 +1,6 @@
import { IServerSideEventStreamer } from 'flowise-components'
import { createClient } from 'redis'
import logger from '../utils/logger'
export class RedisEventPublisher implements IServerSideEventStreamer {
private redisPublisher: ReturnType<typeof createClient>
@ -44,11 +45,18 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}
async connect() {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisPublisher.connect()
logger.info(`[RedisPublisher] Connecting to Redis...`)
await this.redisPublisher.connect()
logger.info(`[RedisPublisher] Connected to Redis successfully`)
}
streamCustomEvent(chatId: string, eventType: string, data: any) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing custom event - chatId: ${chatId}, eventType: ${eventType}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -58,12 +66,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming custom event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming custom event:', error)
logger.error(`[RedisPublisher] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
}
}
streamStartEvent(chatId: string, data: string) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing start event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -73,12 +86,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming start event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming start event:', error)
logger.error(`[RedisPublisher] Error streaming start event - chatId: ${chatId}`, { error })
}
}
streamTokenEvent(chatId: string, data: string) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing token event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -88,12 +106,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming token event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming token event:', error)
logger.error(`[RedisPublisher] Error streaming token event - chatId: ${chatId}`, { error })
}
}
streamSourceDocumentsEvent(chatId: string, data: any) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing sourceDocuments event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -103,12 +126,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming sourceDocuments event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming sourceDocuments event:', error)
logger.error(`[RedisPublisher] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
}
}
streamArtifactsEvent(chatId: string, data: any) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing artifacts event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -118,12 +146,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming artifacts event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming artifacts event:', error)
logger.error(`[RedisPublisher] Error streaming artifacts event - chatId: ${chatId}`, { error })
}
}
streamUsedToolsEvent(chatId: string, data: any) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing usedTools event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -133,12 +166,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming usedTools event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming usedTools event:', error)
logger.error(`[RedisPublisher] Error streaming usedTools event - chatId: ${chatId}`, { error })
}
}
streamCalledToolsEvent(chatId: string, data: any) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing calledTools event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -148,12 +186,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming calledTools event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming calledTools event:', error)
logger.error(`[RedisPublisher] Error streaming calledTools event - chatId: ${chatId}`, { error })
}
}
streamFileAnnotationsEvent(chatId: string, data: any) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing fileAnnotations event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -163,12 +206,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming fileAnnotations event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming fileAnnotations event:', error)
logger.error(`[RedisPublisher] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
}
}
streamToolEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing tool event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -178,12 +226,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming tool event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming tool event:', error)
logger.error(`[RedisPublisher] Error streaming tool event - chatId: ${chatId}`, { error })
}
}
streamAgentReasoningEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing agentReasoning event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -193,12 +246,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming agentReasoning event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming agentReasoning event:', error)
logger.error(`[RedisPublisher] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
}
}
streamAgentFlowEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing agentFlowEvent event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -208,12 +266,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming agentFlow event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming agentFlow event:', error)
logger.error(`[RedisPublisher] Error streaming agentFlow event - chatId: ${chatId}`, { error })
}
}
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing agentFlowExecutedData event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -223,12 +286,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming agentFlowExecutedData event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming agentFlowExecutedData event:', error)
logger.error(`[RedisPublisher] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
}
}
streamNextAgentEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing nextAgent event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -238,12 +306,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming nextAgent event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming nextAgent event:', error)
logger.error(`[RedisPublisher] Error streaming nextAgent event - chatId: ${chatId}`, { error })
}
}
streamNextAgentFlowEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing nextAgentFlow event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -253,12 +326,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming nextAgentFlow event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming nextAgentFlow event:', error)
logger.error(`[RedisPublisher] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
}
}
streamActionEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing action event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -268,12 +346,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming action event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming action event:', error)
logger.error(`[RedisPublisher] Error streaming action event - chatId: ${chatId}`, { error })
}
}
streamAbortEvent(chatId: string): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing abort event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -283,7 +366,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming abort event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming abort event:', error)
logger.error(`[RedisPublisher] Error streaming abort event - chatId: ${chatId}`, { error })
}
}
@ -293,6 +378,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
streamErrorEvent(chatId: string, msg: string) {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing error event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -302,7 +390,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming error event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming error event:', error)
logger.error(`[RedisPublisher] Error streaming error event - chatId: ${chatId}`, { error })
}
}
@ -334,6 +424,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
streamUsageMetadataEvent(chatId: string, data: any): void {
try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing usageMetadata event - chatId: ${chatId}`)
this.redisPublisher.publish(
chatId,
JSON.stringify({
@ -343,13 +436,19 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
})
)
} catch (error) {
console.error('Error streaming usage metadata event:', error)
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming usage metadata event:', error)
logger.error(`[RedisPublisher] Error streaming usage metadata event - chatId: ${chatId}`, { error })
}
}
async disconnect() {
if (this.redisPublisher) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisPublisher.quit()
logger.info(`[RedisPublisher] Disconnecting from Redis...`)
await this.redisPublisher.quit()
logger.info(`[RedisPublisher] Disconnected from Redis`)
}
}
}

View File

@ -1,5 +1,6 @@
import { createClient } from 'redis'
import { SSEStreamer } from '../utils/SSEStreamer'
import logger from '../utils/logger'
export class RedisEventSubscriber {
private redisSubscriber: ReturnType<typeof createClient>
@ -47,7 +48,11 @@ export class RedisEventSubscriber {
}
async connect() {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisSubscriber.connect()
logger.info(`[RedisSubscriber] Connecting to Redis...`)
await this.redisSubscriber.connect()
logger.info(`[RedisSubscriber] Connected to Redis successfully`)
}
subscribe(channel: string) {
@ -58,22 +63,34 @@ export class RedisEventSubscriber {
// Check if already subscribed
if (this.subscribedChannels.has(channel)) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: return // Prevent duplicate subscription
logger.info(`[RedisSubscriber] Already subscribed to channel: ${channel}`)
return // Prevent duplicate subscription
}
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisSubscriber.subscribe(channel, (message) => { this.handleEvent(message) })
logger.info(`[RedisSubscriber] Subscribing to channel: ${channel}`)
this.redisSubscriber.subscribe(channel, (message) => {
this.handleEvent(message)
})
// Mark the channel as subscribed
this.subscribedChannels.add(channel)
logger.info(`[RedisSubscriber] Successfully subscribed to channel: ${channel}`)
}
private handleEvent(message: string) {
try {
// Parse the message from Redis
const event = JSON.parse(message)
const { eventType, chatId, data } = event
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: switch (eventType) { ... }
logger.info(`[RedisSubscriber] Received event - chatId: ${chatId}, eventType: ${eventType}`)
// Stream the event to the client
switch (eventType) {
case 'start':
@ -124,12 +141,32 @@ export class RedisEventSubscriber {
case 'metadata':
this.sseStreamer.streamMetadataEvent(chatId, data)
break
case 'calledTools':
this.sseStreamer.streamCalledToolsEvent(chatId, data)
break
case 'usageMetadata':
this.sseStreamer.streamUsageMetadataEvent(chatId, data)
break
default:
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: break
logger.warn(`[RedisSubscriber] Unknown event type: ${eventType} for chatId: ${chatId}`)
break
}
} catch (error) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no error handling)
logger.error(`[RedisSubscriber] Error handling event`, { error, message })
}
}
async disconnect() {
if (this.redisSubscriber) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisSubscriber.quit()
logger.info(`[RedisSubscriber] Disconnecting from Redis...`)
await this.redisSubscriber.quit()
logger.info(`[RedisSubscriber] Disconnected from Redis`)
}
}
}

View File

@ -1,5 +1,6 @@
import { Response } from 'express'
import { IServerSideEventStreamer } from 'flowise-components'
import logger from './logger'
// define a new type that has a client type (INTERNAL or EXTERNAL) and Response type
type Client = {
@ -14,34 +15,62 @@ export class SSEStreamer implements IServerSideEventStreamer {
clients: { [id: string]: Client } = {}
addExternalClient(chatId: string, res: Response) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
logger.info(`[SSEStreamer] Adding external client - chatId: ${chatId}`)
this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
}
addClient(chatId: string, res: Response) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
logger.info(`[SSEStreamer] Adding internal client - chatId: ${chatId}`)
this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
}
removeClient(chatId: string) {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'end', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.response.end()
logger.info(`[SSEStreamer] Removing client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'end',
data: '[DONE]'
}
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.end()
} catch (error) {
logger.error(`[SSEStreamer] Error removing client - chatId: ${chatId}`, { error })
}
delete this.clients[chatId]
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] Attempted to remove non-existent client - chatId: ${chatId}`)
}
}
streamCustomEvent(chatId: string, eventType: string, data: any) {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: eventType, data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming custom event to client - chatId: ${chatId}, eventType: ${eventType}`)
try {
const clientResponse = {
event: eventType,
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for custom event - chatId: ${chatId}, eventType: ${eventType}`)
}
}
@ -49,155 +78,324 @@ export class SSEStreamer implements IServerSideEventStreamer {
const client = this.clients[chatId]
// prevent multiple start events being streamed to the client
if (client && !client.started) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'start', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.started = true
logger.info(`[SSEStreamer] Streaming start event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'start',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.started = true
} catch (error) {
logger.error(`[SSEStreamer] Error streaming start event - chatId: ${chatId}`, { error })
}
} else if (client && client.started) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] Start event already sent for chatId: ${chatId}`)
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for start event - chatId: ${chatId}`)
}
}
streamTokenEvent(chatId: string, data: string) {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'token', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming token event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'token',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming token event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for token event - chatId: ${chatId}`)
}
}
streamSourceDocumentsEvent(chatId: string, data: any) {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'sourceDocuments', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming sourceDocuments event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'sourceDocuments',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for sourceDocuments event - chatId: ${chatId}`)
}
}
streamArtifactsEvent(chatId: string, data: any) {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'artifacts', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming artifacts event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'artifacts',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming artifacts event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for artifacts event - chatId: ${chatId}`)
}
}
streamUsedToolsEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'usedTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming usedTools event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'usedTools',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming usedTools event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for usedTools event - chatId: ${chatId}`)
}
}
streamCalledToolsEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'calledTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming calledTools event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'calledTools',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming calledTools event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for calledTools event - chatId: ${chatId}`)
}
}
streamFileAnnotationsEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'fileAnnotations', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming fileAnnotations event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'fileAnnotations',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for fileAnnotations event - chatId: ${chatId}`)
}
}
streamToolEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'tool', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming tool event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'tool',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming tool event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for tool event - chatId: ${chatId}`)
}
}
streamAgentReasoningEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'agentReasoning', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming agentReasoning event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'agentReasoning',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for agentReasoning event - chatId: ${chatId}`)
}
}
streamNextAgentEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'nextAgent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming nextAgent event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'nextAgent',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming nextAgent event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for nextAgent event - chatId: ${chatId}`)
}
}
streamAgentFlowEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'agentFlowEvent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming agentFlowEvent event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'agentFlowEvent',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming agentFlowEvent event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for agentFlowEvent event - chatId: ${chatId}`)
}
}
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'agentFlowExecutedData', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming agentFlowExecutedData event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'agentFlowExecutedData',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for agentFlowExecutedData event - chatId: ${chatId}`)
}
}
streamNextAgentFlowEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'nextAgentFlow', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming nextAgentFlow event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'nextAgentFlow',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for nextAgentFlow event - chatId: ${chatId}`)
}
}
streamActionEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'action', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming action event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'action',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming action event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for action event - chatId: ${chatId}`)
}
}
streamAbortEvent(chatId: string): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'abort', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming abort event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'abort',
data: '[DONE]'
}
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming abort event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for abort event - chatId: ${chatId}`)
}
}
@ -209,15 +407,29 @@ export class SSEStreamer implements IServerSideEventStreamer {
if (msg.includes('401 Incorrect API key provided')) msg = '401 Invalid model key or Incorrect local model configuration.'
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'error', data: msg }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming error event to client - chatId: ${chatId}, error: ${msg}`)
try {
const clientResponse = {
event: 'error',
data: msg
}
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming error event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for error event - chatId: ${chatId}`)
}
}
streamMetadataEvent(chatId: string, apiResponse: any) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const metadataJson: any = {}; ... (no initial logging)
logger.info(`[SSEStreamer] Processing metadata event - chatId: ${chatId}`)
const metadataJson: any = {}
if (apiResponse.chatId) {
metadataJson['chatId'] = apiResponse.chatId
@ -244,17 +456,32 @@ export class SSEStreamer implements IServerSideEventStreamer {
}
if (Object.keys(metadataJson).length > 0) {
this.streamCustomEvent(chatId, 'metadata', metadataJson)
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No metadata to stream - chatId: ${chatId}`)
}
}
streamUsageMetadataEvent(chatId: string, data: any): void {
const client = this.clients[chatId]
if (client) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const clientResponse = { event: 'usageMetadata', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
logger.info(`[SSEStreamer] Streaming usageMetadata event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'usageMetadata',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming usageMetadata event - chatId: ${chatId}`, { error })
}
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for usageMetadata event - chatId: ${chatId}`)
}
}
}