Compare commits
1 Commits
main
...
fix/predic
| Author | SHA1 | Date |
|---|---|---|
|
|
f61a1ad248 |
|
|
@ -1,5 +1,6 @@
|
|||
import { IServerSideEventStreamer } from 'flowise-components'
|
||||
import { createClient } from 'redis'
|
||||
import logger from '../utils/logger'
|
||||
|
||||
export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||
private redisPublisher: ReturnType<typeof createClient>
|
||||
|
|
@ -44,11 +45,18 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
}
|
||||
|
||||
async connect() {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: await this.redisPublisher.connect()
|
||||
logger.info(`[RedisPublisher] Connecting to Redis...`)
|
||||
await this.redisPublisher.connect()
|
||||
logger.info(`[RedisPublisher] Connected to Redis successfully`)
|
||||
}
|
||||
|
||||
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing custom event - chatId: ${chatId}, eventType: ${eventType}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -58,12 +66,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming custom event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming custom event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamStartEvent(chatId: string, data: string) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing start event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -73,12 +86,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming start event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming start event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming start event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamTokenEvent(chatId: string, data: string) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing token event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -88,12 +106,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming token event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming token event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming token event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamSourceDocumentsEvent(chatId: string, data: any) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing sourceDocuments event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -103,12 +126,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming sourceDocuments event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming sourceDocuments event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamArtifactsEvent(chatId: string, data: any) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing artifacts event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -118,12 +146,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming artifacts event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming artifacts event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming artifacts event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamUsedToolsEvent(chatId: string, data: any) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing usedTools event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -133,12 +166,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming usedTools event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming usedTools event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming usedTools event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamCalledToolsEvent(chatId: string, data: any) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing calledTools event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -148,12 +186,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming calledTools event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming calledTools event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming calledTools event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamFileAnnotationsEvent(chatId: string, data: any) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing fileAnnotations event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -163,12 +206,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming fileAnnotations event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming fileAnnotations event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamToolEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing tool event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -178,12 +226,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming tool event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming tool event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming tool event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamAgentReasoningEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing agentReasoning event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -193,12 +246,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming agentReasoning event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming agentReasoning event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamAgentFlowEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing agentFlowEvent event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -208,12 +266,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming agentFlow event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming agentFlow event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming agentFlow event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing agentFlowExecutedData event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -223,12 +286,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming agentFlowExecutedData event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming agentFlowExecutedData event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamNextAgentEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing nextAgent event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -238,12 +306,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming nextAgent event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming nextAgent event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming nextAgent event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamNextAgentFlowEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing nextAgentFlow event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -253,12 +326,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming nextAgentFlow event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming nextAgentFlow event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamActionEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing action event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -268,12 +346,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming action event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming action event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming action event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
streamAbortEvent(chatId: string): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing abort event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -283,7 +366,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming abort event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming abort event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming abort event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -293,6 +378,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
|
||||
streamErrorEvent(chatId: string, msg: string) {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing error event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -302,7 +390,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming error event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming error event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming error event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -334,6 +424,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
|
||||
streamUsageMetadataEvent(chatId: string, data: any): void {
|
||||
try {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisPublisher.publish(...)
|
||||
logger.info(`[RedisPublisher] Publishing usageMetadata event - chatId: ${chatId}`)
|
||||
this.redisPublisher.publish(
|
||||
chatId,
|
||||
JSON.stringify({
|
||||
|
|
@ -343,13 +436,19 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
|||
})
|
||||
)
|
||||
} catch (error) {
|
||||
console.error('Error streaming usage metadata event:', error)
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: console.error('Error streaming usage metadata event:', error)
|
||||
logger.error(`[RedisPublisher] Error streaming usage metadata event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
if (this.redisPublisher) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: await this.redisPublisher.quit()
|
||||
logger.info(`[RedisPublisher] Disconnecting from Redis...`)
|
||||
await this.redisPublisher.quit()
|
||||
logger.info(`[RedisPublisher] Disconnected from Redis`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { createClient } from 'redis'
|
||||
import { SSEStreamer } from '../utils/SSEStreamer'
|
||||
import logger from '../utils/logger'
|
||||
|
||||
export class RedisEventSubscriber {
|
||||
private redisSubscriber: ReturnType<typeof createClient>
|
||||
|
|
@ -47,7 +48,11 @@ export class RedisEventSubscriber {
|
|||
}
|
||||
|
||||
async connect() {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: await this.redisSubscriber.connect()
|
||||
logger.info(`[RedisSubscriber] Connecting to Redis...`)
|
||||
await this.redisSubscriber.connect()
|
||||
logger.info(`[RedisSubscriber] Connected to Redis successfully`)
|
||||
}
|
||||
|
||||
subscribe(channel: string) {
|
||||
|
|
@ -58,22 +63,34 @@ export class RedisEventSubscriber {
|
|||
|
||||
// Check if already subscribed
|
||||
if (this.subscribedChannels.has(channel)) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: return // Prevent duplicate subscription
|
||||
logger.info(`[RedisSubscriber] Already subscribed to channel: ${channel}`)
|
||||
return // Prevent duplicate subscription
|
||||
}
|
||||
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.redisSubscriber.subscribe(channel, (message) => { this.handleEvent(message) })
|
||||
logger.info(`[RedisSubscriber] Subscribing to channel: ${channel}`)
|
||||
this.redisSubscriber.subscribe(channel, (message) => {
|
||||
this.handleEvent(message)
|
||||
})
|
||||
|
||||
// Mark the channel as subscribed
|
||||
this.subscribedChannels.add(channel)
|
||||
logger.info(`[RedisSubscriber] Successfully subscribed to channel: ${channel}`)
|
||||
}
|
||||
|
||||
private handleEvent(message: string) {
|
||||
try {
|
||||
// Parse the message from Redis
|
||||
const event = JSON.parse(message)
|
||||
const { eventType, chatId, data } = event
|
||||
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: switch (eventType) { ... }
|
||||
logger.info(`[RedisSubscriber] Received event - chatId: ${chatId}, eventType: ${eventType}`)
|
||||
|
||||
// Stream the event to the client
|
||||
switch (eventType) {
|
||||
case 'start':
|
||||
|
|
@ -124,12 +141,32 @@ export class RedisEventSubscriber {
|
|||
case 'metadata':
|
||||
this.sseStreamer.streamMetadataEvent(chatId, data)
|
||||
break
|
||||
case 'calledTools':
|
||||
this.sseStreamer.streamCalledToolsEvent(chatId, data)
|
||||
break
|
||||
case 'usageMetadata':
|
||||
this.sseStreamer.streamUsageMetadataEvent(chatId, data)
|
||||
break
|
||||
default:
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: break
|
||||
logger.warn(`[RedisSubscriber] Unknown event type: ${eventType} for chatId: ${chatId}`)
|
||||
break
|
||||
}
|
||||
} catch (error) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no error handling)
|
||||
logger.error(`[RedisSubscriber] Error handling event`, { error, message })
|
||||
}
|
||||
}
|
||||
|
||||
async disconnect() {
|
||||
if (this.redisSubscriber) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: await this.redisSubscriber.quit()
|
||||
logger.info(`[RedisSubscriber] Disconnecting from Redis...`)
|
||||
await this.redisSubscriber.quit()
|
||||
logger.info(`[RedisSubscriber] Disconnected from Redis`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
import { Response } from 'express'
|
||||
import { IServerSideEventStreamer } from 'flowise-components'
|
||||
import logger from './logger'
|
||||
|
||||
// define a new type that has a client type (INTERNAL or EXTERNAL) and Response type
|
||||
type Client = {
|
||||
|
|
@ -14,34 +15,62 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
|||
clients: { [id: string]: Client } = {}
|
||||
|
||||
addExternalClient(chatId: string, res: Response) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
|
||||
logger.info(`[SSEStreamer] Adding external client - chatId: ${chatId}`)
|
||||
this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
|
||||
}
|
||||
|
||||
addClient(chatId: string, res: Response) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
|
||||
logger.info(`[SSEStreamer] Adding internal client - chatId: ${chatId}`)
|
||||
this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
|
||||
}
|
||||
|
||||
removeClient(chatId: string) {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'end', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.response.end()
|
||||
logger.info(`[SSEStreamer] Removing client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'end',
|
||||
data: '[DONE]'
|
||||
}
|
||||
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
client.response.end()
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error removing client - chatId: ${chatId}`, { error })
|
||||
}
|
||||
delete this.clients[chatId]
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] Attempted to remove non-existent client - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: eventType, data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming custom event to client - chatId: ${chatId}, eventType: ${eventType}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: eventType,
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for custom event - chatId: ${chatId}, eventType: ${eventType}`)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -49,155 +78,324 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
|||
const client = this.clients[chatId]
|
||||
// prevent multiple start events being streamed to the client
|
||||
if (client && !client.started) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'start', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.started = true
|
||||
logger.info(`[SSEStreamer] Streaming start event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'start',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
client.started = true
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming start event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else if (client && client.started) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] Start event already sent for chatId: ${chatId}`)
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for start event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
streamTokenEvent(chatId: string, data: string) {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'token', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming token event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'token',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming token event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for token event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
streamSourceDocumentsEvent(chatId: string, data: any) {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'sourceDocuments', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming sourceDocuments event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'sourceDocuments',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for sourceDocuments event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamArtifactsEvent(chatId: string, data: any) {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'artifacts', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming artifacts event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'artifacts',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming artifacts event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for artifacts event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamUsedToolsEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'usedTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming usedTools event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'usedTools',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming usedTools event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for usedTools event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamCalledToolsEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'calledTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming calledTools event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'calledTools',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming calledTools event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for calledTools event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamFileAnnotationsEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'fileAnnotations', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming fileAnnotations event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'fileAnnotations',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for fileAnnotations event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamToolEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'tool', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming tool event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'tool',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming tool event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for tool event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamAgentReasoningEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'agentReasoning', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming agentReasoning event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'agentReasoning',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for agentReasoning event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamNextAgentEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'nextAgent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming nextAgent event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'nextAgent',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming nextAgent event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for nextAgent event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamAgentFlowEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'agentFlowEvent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming agentFlowEvent event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'agentFlowEvent',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming agentFlowEvent event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for agentFlowEvent event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'agentFlowExecutedData', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming agentFlowExecutedData event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'agentFlowExecutedData',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for agentFlowExecutedData event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamNextAgentFlowEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'nextAgentFlow', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming nextAgentFlow event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'nextAgentFlow',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for nextAgentFlow event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
streamActionEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'action', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming action event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'action',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming action event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for action event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
streamAbortEvent(chatId: string): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'abort', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming abort event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'abort',
|
||||
data: '[DONE]'
|
||||
}
|
||||
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming abort event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for abort event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -209,15 +407,29 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
|||
if (msg.includes('401 Incorrect API key provided')) msg = '401 Invalid model key or Incorrect local model configuration.'
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'error', data: msg }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming error event to client - chatId: ${chatId}, error: ${msg}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'error',
|
||||
data: msg
|
||||
}
|
||||
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming error event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for error event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
streamMetadataEvent(chatId: string, apiResponse: any) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const metadataJson: any = {}; ... (no initial logging)
|
||||
logger.info(`[SSEStreamer] Processing metadata event - chatId: ${chatId}`)
|
||||
const metadataJson: any = {}
|
||||
if (apiResponse.chatId) {
|
||||
metadataJson['chatId'] = apiResponse.chatId
|
||||
|
|
@ -244,17 +456,32 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
|||
}
|
||||
if (Object.keys(metadataJson).length > 0) {
|
||||
this.streamCustomEvent(chatId, 'metadata', metadataJson)
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No metadata to stream - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
|
||||
streamUsageMetadataEvent(chatId: string, data: any): void {
|
||||
const client = this.clients[chatId]
|
||||
if (client) {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: const clientResponse = { event: 'usageMetadata', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
logger.info(`[SSEStreamer] Streaming usageMetadata event to client - chatId: ${chatId}`)
|
||||
try {
|
||||
const clientResponse = {
|
||||
event: 'usageMetadata',
|
||||
data: data
|
||||
}
|
||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||
} catch (error) {
|
||||
logger.error(`[SSEStreamer] Error streaming usageMetadata event - chatId: ${chatId}`, { error })
|
||||
}
|
||||
} else {
|
||||
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||
// Original: (no warning)
|
||||
logger.warn(`[SSEStreamer] No client found for usageMetadata event - chatId: ${chatId}`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue