Compare commits

...

1 Commits

Author SHA1 Message Date
Ilango f61a1ad248 Add tons of logs for redis pub-sub and sse streamer 2025-07-29 14:09:28 +05:30
3 changed files with 512 additions and 149 deletions

View File

@ -1,5 +1,6 @@
import { IServerSideEventStreamer } from 'flowise-components' import { IServerSideEventStreamer } from 'flowise-components'
import { createClient } from 'redis' import { createClient } from 'redis'
import logger from '../utils/logger'
export class RedisEventPublisher implements IServerSideEventStreamer { export class RedisEventPublisher implements IServerSideEventStreamer {
private redisPublisher: ReturnType<typeof createClient> private redisPublisher: ReturnType<typeof createClient>
@ -44,11 +45,18 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
} }
async connect() { async connect() {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisPublisher.connect()
logger.info(`[RedisPublisher] Connecting to Redis...`)
await this.redisPublisher.connect() await this.redisPublisher.connect()
logger.info(`[RedisPublisher] Connected to Redis successfully`)
} }
streamCustomEvent(chatId: string, eventType: string, data: any) { streamCustomEvent(chatId: string, eventType: string, data: any) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing custom event - chatId: ${chatId}, eventType: ${eventType}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -58,12 +66,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming custom event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming custom event:', error)
logger.error(`[RedisPublisher] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
} }
} }
streamStartEvent(chatId: string, data: string) { streamStartEvent(chatId: string, data: string) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing start event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -73,12 +86,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming start event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming start event:', error)
logger.error(`[RedisPublisher] Error streaming start event - chatId: ${chatId}`, { error })
} }
} }
streamTokenEvent(chatId: string, data: string) { streamTokenEvent(chatId: string, data: string) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing token event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -88,12 +106,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming token event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming token event:', error)
logger.error(`[RedisPublisher] Error streaming token event - chatId: ${chatId}`, { error })
} }
} }
streamSourceDocumentsEvent(chatId: string, data: any) { streamSourceDocumentsEvent(chatId: string, data: any) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing sourceDocuments event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -103,12 +126,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming sourceDocuments event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming sourceDocuments event:', error)
logger.error(`[RedisPublisher] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
} }
} }
streamArtifactsEvent(chatId: string, data: any) { streamArtifactsEvent(chatId: string, data: any) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing artifacts event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -118,12 +146,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming artifacts event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming artifacts event:', error)
logger.error(`[RedisPublisher] Error streaming artifacts event - chatId: ${chatId}`, { error })
} }
} }
streamUsedToolsEvent(chatId: string, data: any) { streamUsedToolsEvent(chatId: string, data: any) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing usedTools event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -133,12 +166,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming usedTools event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming usedTools event:', error)
logger.error(`[RedisPublisher] Error streaming usedTools event - chatId: ${chatId}`, { error })
} }
} }
streamCalledToolsEvent(chatId: string, data: any) { streamCalledToolsEvent(chatId: string, data: any) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing calledTools event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -148,12 +186,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming calledTools event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming calledTools event:', error)
logger.error(`[RedisPublisher] Error streaming calledTools event - chatId: ${chatId}`, { error })
} }
} }
streamFileAnnotationsEvent(chatId: string, data: any) { streamFileAnnotationsEvent(chatId: string, data: any) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing fileAnnotations event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -163,12 +206,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming fileAnnotations event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming fileAnnotations event:', error)
logger.error(`[RedisPublisher] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
} }
} }
streamToolEvent(chatId: string, data: any): void { streamToolEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing tool event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -178,12 +226,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming tool event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming tool event:', error)
logger.error(`[RedisPublisher] Error streaming tool event - chatId: ${chatId}`, { error })
} }
} }
streamAgentReasoningEvent(chatId: string, data: any): void { streamAgentReasoningEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing agentReasoning event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -193,12 +246,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming agentReasoning event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming agentReasoning event:', error)
logger.error(`[RedisPublisher] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
} }
} }
streamAgentFlowEvent(chatId: string, data: any): void { streamAgentFlowEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing agentFlowEvent event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -208,12 +266,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming agentFlow event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming agentFlow event:', error)
logger.error(`[RedisPublisher] Error streaming agentFlow event - chatId: ${chatId}`, { error })
} }
} }
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void { streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing agentFlowExecutedData event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -223,12 +286,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming agentFlowExecutedData event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming agentFlowExecutedData event:', error)
logger.error(`[RedisPublisher] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
} }
} }
streamNextAgentEvent(chatId: string, data: any): void { streamNextAgentEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing nextAgent event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -238,12 +306,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming nextAgent event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming nextAgent event:', error)
logger.error(`[RedisPublisher] Error streaming nextAgent event - chatId: ${chatId}`, { error })
} }
} }
streamNextAgentFlowEvent(chatId: string, data: any): void { streamNextAgentFlowEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing nextAgentFlow event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -253,12 +326,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming nextAgentFlow event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming nextAgentFlow event:', error)
logger.error(`[RedisPublisher] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
} }
} }
streamActionEvent(chatId: string, data: any): void { streamActionEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing action event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -268,12 +346,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming action event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming action event:', error)
logger.error(`[RedisPublisher] Error streaming action event - chatId: ${chatId}`, { error })
} }
} }
streamAbortEvent(chatId: string): void { streamAbortEvent(chatId: string): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing abort event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -283,7 +366,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming abort event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming abort event:', error)
logger.error(`[RedisPublisher] Error streaming abort event - chatId: ${chatId}`, { error })
} }
} }
@ -293,6 +378,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
streamErrorEvent(chatId: string, msg: string) { streamErrorEvent(chatId: string, msg: string) {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing error event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -302,7 +390,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming error event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming error event:', error)
logger.error(`[RedisPublisher] Error streaming error event - chatId: ${chatId}`, { error })
} }
} }
@ -334,6 +424,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
streamUsageMetadataEvent(chatId: string, data: any): void { streamUsageMetadataEvent(chatId: string, data: any): void {
try { try {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisPublisher.publish(...)
logger.info(`[RedisPublisher] Publishing usageMetadata event - chatId: ${chatId}`)
this.redisPublisher.publish( this.redisPublisher.publish(
chatId, chatId,
JSON.stringify({ JSON.stringify({
@ -343,13 +436,19 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
}) })
) )
} catch (error) { } catch (error) {
console.error('Error streaming usage metadata event:', error) // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: console.error('Error streaming usage metadata event:', error)
logger.error(`[RedisPublisher] Error streaming usage metadata event - chatId: ${chatId}`, { error })
} }
} }
async disconnect() { async disconnect() {
if (this.redisPublisher) { if (this.redisPublisher) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisPublisher.quit()
logger.info(`[RedisPublisher] Disconnecting from Redis...`)
await this.redisPublisher.quit() await this.redisPublisher.quit()
logger.info(`[RedisPublisher] Disconnected from Redis`)
} }
} }
} }

View File

@ -1,5 +1,6 @@
import { createClient } from 'redis' import { createClient } from 'redis'
import { SSEStreamer } from '../utils/SSEStreamer' import { SSEStreamer } from '../utils/SSEStreamer'
import logger from '../utils/logger'
export class RedisEventSubscriber { export class RedisEventSubscriber {
private redisSubscriber: ReturnType<typeof createClient> private redisSubscriber: ReturnType<typeof createClient>
@ -47,7 +48,11 @@ export class RedisEventSubscriber {
} }
async connect() { async connect() {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisSubscriber.connect()
logger.info(`[RedisSubscriber] Connecting to Redis...`)
await this.redisSubscriber.connect() await this.redisSubscriber.connect()
logger.info(`[RedisSubscriber] Connected to Redis successfully`)
} }
subscribe(channel: string) { subscribe(channel: string) {
@ -58,78 +63,110 @@ export class RedisEventSubscriber {
// Check if already subscribed // Check if already subscribed
if (this.subscribedChannels.has(channel)) { if (this.subscribedChannels.has(channel)) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: return // Prevent duplicate subscription
logger.info(`[RedisSubscriber] Already subscribed to channel: ${channel}`)
return // Prevent duplicate subscription return // Prevent duplicate subscription
} }
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.redisSubscriber.subscribe(channel, (message) => { this.handleEvent(message) })
logger.info(`[RedisSubscriber] Subscribing to channel: ${channel}`)
this.redisSubscriber.subscribe(channel, (message) => { this.redisSubscriber.subscribe(channel, (message) => {
this.handleEvent(message) this.handleEvent(message)
}) })
// Mark the channel as subscribed // Mark the channel as subscribed
this.subscribedChannels.add(channel) this.subscribedChannels.add(channel)
logger.info(`[RedisSubscriber] Successfully subscribed to channel: ${channel}`)
} }
private handleEvent(message: string) { private handleEvent(message: string) {
// Parse the message from Redis try {
const event = JSON.parse(message) // Parse the message from Redis
const { eventType, chatId, data } = event const event = JSON.parse(message)
const { eventType, chatId, data } = event
// Stream the event to the client // TODO: Remove debug logging after fixing Redis pub-sub issues
switch (eventType) { // Original: switch (eventType) { ... }
case 'start': logger.info(`[RedisSubscriber] Received event - chatId: ${chatId}, eventType: ${eventType}`)
this.sseStreamer.streamStartEvent(chatId, data)
break // Stream the event to the client
case 'token': switch (eventType) {
this.sseStreamer.streamTokenEvent(chatId, data) case 'start':
break this.sseStreamer.streamStartEvent(chatId, data)
case 'sourceDocuments': break
this.sseStreamer.streamSourceDocumentsEvent(chatId, data) case 'token':
break this.sseStreamer.streamTokenEvent(chatId, data)
case 'artifacts': break
this.sseStreamer.streamArtifactsEvent(chatId, data) case 'sourceDocuments':
break this.sseStreamer.streamSourceDocumentsEvent(chatId, data)
case 'usedTools': break
this.sseStreamer.streamUsedToolsEvent(chatId, data) case 'artifacts':
break this.sseStreamer.streamArtifactsEvent(chatId, data)
case 'fileAnnotations': break
this.sseStreamer.streamFileAnnotationsEvent(chatId, data) case 'usedTools':
break this.sseStreamer.streamUsedToolsEvent(chatId, data)
case 'tool': break
this.sseStreamer.streamToolEvent(chatId, data) case 'fileAnnotations':
break this.sseStreamer.streamFileAnnotationsEvent(chatId, data)
case 'agentReasoning': break
this.sseStreamer.streamAgentReasoningEvent(chatId, data) case 'tool':
break this.sseStreamer.streamToolEvent(chatId, data)
case 'nextAgent': break
this.sseStreamer.streamNextAgentEvent(chatId, data) case 'agentReasoning':
break this.sseStreamer.streamAgentReasoningEvent(chatId, data)
case 'agentFlowEvent': break
this.sseStreamer.streamAgentFlowEvent(chatId, data) case 'nextAgent':
break this.sseStreamer.streamNextAgentEvent(chatId, data)
case 'agentFlowExecutedData': break
this.sseStreamer.streamAgentFlowExecutedDataEvent(chatId, data) case 'agentFlowEvent':
break this.sseStreamer.streamAgentFlowEvent(chatId, data)
case 'nextAgentFlow': break
this.sseStreamer.streamNextAgentFlowEvent(chatId, data) case 'agentFlowExecutedData':
break this.sseStreamer.streamAgentFlowExecutedDataEvent(chatId, data)
case 'action': break
this.sseStreamer.streamActionEvent(chatId, data) case 'nextAgentFlow':
break this.sseStreamer.streamNextAgentFlowEvent(chatId, data)
case 'abort': break
this.sseStreamer.streamAbortEvent(chatId) case 'action':
break this.sseStreamer.streamActionEvent(chatId, data)
case 'error': break
this.sseStreamer.streamErrorEvent(chatId, data) case 'abort':
break this.sseStreamer.streamAbortEvent(chatId)
case 'metadata': break
this.sseStreamer.streamMetadataEvent(chatId, data) case 'error':
break this.sseStreamer.streamErrorEvent(chatId, data)
break
case 'metadata':
this.sseStreamer.streamMetadataEvent(chatId, data)
break
case 'calledTools':
this.sseStreamer.streamCalledToolsEvent(chatId, data)
break
case 'usageMetadata':
this.sseStreamer.streamUsageMetadataEvent(chatId, data)
break
default:
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: break
logger.warn(`[RedisSubscriber] Unknown event type: ${eventType} for chatId: ${chatId}`)
break
}
} catch (error) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no error handling)
logger.error(`[RedisSubscriber] Error handling event`, { error, message })
} }
} }
async disconnect() { async disconnect() {
if (this.redisSubscriber) { if (this.redisSubscriber) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: await this.redisSubscriber.quit()
logger.info(`[RedisSubscriber] Disconnecting from Redis...`)
await this.redisSubscriber.quit() await this.redisSubscriber.quit()
logger.info(`[RedisSubscriber] Disconnected from Redis`)
} }
} }
} }

View File

@ -1,5 +1,6 @@
import { Response } from 'express' import { Response } from 'express'
import { IServerSideEventStreamer } from 'flowise-components' import { IServerSideEventStreamer } from 'flowise-components'
import logger from './logger'
// define a new type that has a client type (INTERNAL or EXTERNAL) and Response type // define a new type that has a client type (INTERNAL or EXTERNAL) and Response type
type Client = { type Client = {
@ -14,34 +15,62 @@ export class SSEStreamer implements IServerSideEventStreamer {
clients: { [id: string]: Client } = {} clients: { [id: string]: Client } = {}
addExternalClient(chatId: string, res: Response) { addExternalClient(chatId: string, res: Response) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
logger.info(`[SSEStreamer] Adding external client - chatId: ${chatId}`)
this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false } this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
} }
addClient(chatId: string, res: Response) { addClient(chatId: string, res: Response) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
logger.info(`[SSEStreamer] Adding internal client - chatId: ${chatId}`)
this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false } this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
} }
removeClient(chatId: string) { removeClient(chatId: string) {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'end', // Original: const clientResponse = { event: 'end', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.response.end()
data: '[DONE]' logger.info(`[SSEStreamer] Removing client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'end',
data: '[DONE]'
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.end()
} catch (error) {
logger.error(`[SSEStreamer] Error removing client - chatId: ${chatId}`, { error })
} }
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.response.end()
delete this.clients[chatId] delete this.clients[chatId]
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] Attempted to remove non-existent client - chatId: ${chatId}`)
} }
} }
streamCustomEvent(chatId: string, eventType: string, data: any) { streamCustomEvent(chatId: string, eventType: string, data: any) {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: eventType, // Original: const clientResponse = { event: eventType, data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming custom event to client - chatId: ${chatId}, eventType: ${eventType}`)
try {
const clientResponse = {
event: eventType,
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for custom event - chatId: ${chatId}, eventType: ${eventType}`)
} }
} }
@ -49,155 +78,324 @@ export class SSEStreamer implements IServerSideEventStreamer {
const client = this.clients[chatId] const client = this.clients[chatId]
// prevent multiple start events being streamed to the client // prevent multiple start events being streamed to the client
if (client && !client.started) { if (client && !client.started) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'start', // Original: const clientResponse = { event: 'start', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.started = true
data: data logger.info(`[SSEStreamer] Streaming start event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'start',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
client.started = true
} catch (error) {
logger.error(`[SSEStreamer] Error streaming start event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else if (client && client.started) {
client.started = true // TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] Start event already sent for chatId: ${chatId}`)
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for start event - chatId: ${chatId}`)
} }
} }
streamTokenEvent(chatId: string, data: string) { streamTokenEvent(chatId: string, data: string) {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'token', // Original: const clientResponse = { event: 'token', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming token event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'token',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming token event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for token event - chatId: ${chatId}`)
} }
} }
streamSourceDocumentsEvent(chatId: string, data: any) { streamSourceDocumentsEvent(chatId: string, data: any) {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'sourceDocuments', // Original: const clientResponse = { event: 'sourceDocuments', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming sourceDocuments event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'sourceDocuments',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for sourceDocuments event - chatId: ${chatId}`)
} }
} }
streamArtifactsEvent(chatId: string, data: any) { streamArtifactsEvent(chatId: string, data: any) {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'artifacts', // Original: const clientResponse = { event: 'artifacts', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming artifacts event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'artifacts',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming artifacts event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for artifacts event - chatId: ${chatId}`)
} }
} }
streamUsedToolsEvent(chatId: string, data: any): void { streamUsedToolsEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'usedTools', // Original: const clientResponse = { event: 'usedTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming usedTools event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'usedTools',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming usedTools event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for usedTools event - chatId: ${chatId}`)
} }
} }
streamCalledToolsEvent(chatId: string, data: any): void { streamCalledToolsEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'calledTools', // Original: const clientResponse = { event: 'calledTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming calledTools event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'calledTools',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming calledTools event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for calledTools event - chatId: ${chatId}`)
} }
} }
streamFileAnnotationsEvent(chatId: string, data: any): void { streamFileAnnotationsEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'fileAnnotations', // Original: const clientResponse = { event: 'fileAnnotations', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming fileAnnotations event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'fileAnnotations',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for fileAnnotations event - chatId: ${chatId}`)
} }
} }
streamToolEvent(chatId: string, data: any): void { streamToolEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'tool', // Original: const clientResponse = { event: 'tool', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming tool event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'tool',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming tool event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for tool event - chatId: ${chatId}`)
} }
} }
streamAgentReasoningEvent(chatId: string, data: any): void { streamAgentReasoningEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'agentReasoning', // Original: const clientResponse = { event: 'agentReasoning', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming agentReasoning event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'agentReasoning',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for agentReasoning event - chatId: ${chatId}`)
} }
} }
streamNextAgentEvent(chatId: string, data: any): void { streamNextAgentEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'nextAgent', // Original: const clientResponse = { event: 'nextAgent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming nextAgent event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'nextAgent',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming nextAgent event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for nextAgent event - chatId: ${chatId}`)
} }
} }
streamAgentFlowEvent(chatId: string, data: any): void { streamAgentFlowEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'agentFlowEvent', // Original: const clientResponse = { event: 'agentFlowEvent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming agentFlowEvent event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'agentFlowEvent',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming agentFlowEvent event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for agentFlowEvent event - chatId: ${chatId}`)
} }
} }
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void { streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'agentFlowExecutedData', // Original: const clientResponse = { event: 'agentFlowExecutedData', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming agentFlowExecutedData event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'agentFlowExecutedData',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for agentFlowExecutedData event - chatId: ${chatId}`)
} }
} }
streamNextAgentFlowEvent(chatId: string, data: any): void { streamNextAgentFlowEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'nextAgentFlow', // Original: const clientResponse = { event: 'nextAgentFlow', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming nextAgentFlow event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'nextAgentFlow',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for nextAgentFlow event - chatId: ${chatId}`)
} }
} }
streamActionEvent(chatId: string, data: any): void { streamActionEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'action', // Original: const clientResponse = { event: 'action', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming action event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'action',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming action event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for action event - chatId: ${chatId}`)
} }
} }
streamAbortEvent(chatId: string): void { streamAbortEvent(chatId: string): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'abort', // Original: const clientResponse = { event: 'abort', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: '[DONE]' logger.info(`[SSEStreamer] Streaming abort event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'abort',
data: '[DONE]'
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming abort event - chatId: ${chatId}`, { error })
} }
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for abort event - chatId: ${chatId}`)
} }
} }
@ -209,15 +407,29 @@ export class SSEStreamer implements IServerSideEventStreamer {
if (msg.includes('401 Incorrect API key provided')) msg = '401 Invalid model key or Incorrect local model configuration.' if (msg.includes('401 Incorrect API key provided')) msg = '401 Invalid model key or Incorrect local model configuration.'
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'error', // Original: const clientResponse = { event: 'error', data: msg }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: msg logger.info(`[SSEStreamer] Streaming error event to client - chatId: ${chatId}, error: ${msg}`)
try {
const clientResponse = {
event: 'error',
data: msg
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming error event - chatId: ${chatId}`, { error })
} }
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for error event - chatId: ${chatId}`)
} }
} }
streamMetadataEvent(chatId: string, apiResponse: any) { streamMetadataEvent(chatId: string, apiResponse: any) {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: const metadataJson: any = {}; ... (no initial logging)
logger.info(`[SSEStreamer] Processing metadata event - chatId: ${chatId}`)
const metadataJson: any = {} const metadataJson: any = {}
if (apiResponse.chatId) { if (apiResponse.chatId) {
metadataJson['chatId'] = apiResponse.chatId metadataJson['chatId'] = apiResponse.chatId
@ -244,17 +456,32 @@ export class SSEStreamer implements IServerSideEventStreamer {
} }
if (Object.keys(metadataJson).length > 0) { if (Object.keys(metadataJson).length > 0) {
this.streamCustomEvent(chatId, 'metadata', metadataJson) this.streamCustomEvent(chatId, 'metadata', metadataJson)
} else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No metadata to stream - chatId: ${chatId}`)
} }
} }
streamUsageMetadataEvent(chatId: string, data: any): void { streamUsageMetadataEvent(chatId: string, data: any): void {
const client = this.clients[chatId] const client = this.clients[chatId]
if (client) { if (client) {
const clientResponse = { // TODO: Remove debug logging after fixing Redis pub-sub issues
event: 'usageMetadata', // Original: const clientResponse = { event: 'usageMetadata', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
data: data logger.info(`[SSEStreamer] Streaming usageMetadata event to client - chatId: ${chatId}`)
try {
const clientResponse = {
event: 'usageMetadata',
data: data
}
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
} catch (error) {
logger.error(`[SSEStreamer] Error streaming usageMetadata event - chatId: ${chatId}`, { error })
} }
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n') } else {
// TODO: Remove debug logging after fixing Redis pub-sub issues
// Original: (no warning)
logger.warn(`[SSEStreamer] No client found for usageMetadata event - chatId: ${chatId}`)
} }
} }
} }