Compare commits
1 Commits
main
...
fix/predic
| Author | SHA1 | Date |
|---|---|---|
|
|
f61a1ad248 |
|
|
@ -1,5 +1,6 @@
|
||||||
import { IServerSideEventStreamer } from 'flowise-components'
|
import { IServerSideEventStreamer } from 'flowise-components'
|
||||||
import { createClient } from 'redis'
|
import { createClient } from 'redis'
|
||||||
|
import logger from '../utils/logger'
|
||||||
|
|
||||||
export class RedisEventPublisher implements IServerSideEventStreamer {
|
export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
private redisPublisher: ReturnType<typeof createClient>
|
private redisPublisher: ReturnType<typeof createClient>
|
||||||
|
|
@ -44,11 +45,18 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: await this.redisPublisher.connect()
|
||||||
|
logger.info(`[RedisPublisher] Connecting to Redis...`)
|
||||||
await this.redisPublisher.connect()
|
await this.redisPublisher.connect()
|
||||||
|
logger.info(`[RedisPublisher] Connected to Redis successfully`)
|
||||||
}
|
}
|
||||||
|
|
||||||
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing custom event - chatId: ${chatId}, eventType: ${eventType}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -58,12 +66,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming custom event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming custom event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamStartEvent(chatId: string, data: string) {
|
streamStartEvent(chatId: string, data: string) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing start event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -73,12 +86,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming start event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming start event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming start event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTokenEvent(chatId: string, data: string) {
|
streamTokenEvent(chatId: string, data: string) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing token event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -88,12 +106,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming token event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming token event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming token event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSourceDocumentsEvent(chatId: string, data: any) {
|
streamSourceDocumentsEvent(chatId: string, data: any) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing sourceDocuments event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -103,12 +126,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming sourceDocuments event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming sourceDocuments event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamArtifactsEvent(chatId: string, data: any) {
|
streamArtifactsEvent(chatId: string, data: any) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing artifacts event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -118,12 +146,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming artifacts event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming artifacts event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming artifacts event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamUsedToolsEvent(chatId: string, data: any) {
|
streamUsedToolsEvent(chatId: string, data: any) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing usedTools event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -133,12 +166,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming usedTools event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming usedTools event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming usedTools event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamCalledToolsEvent(chatId: string, data: any) {
|
streamCalledToolsEvent(chatId: string, data: any) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing calledTools event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -148,12 +186,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming calledTools event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming calledTools event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming calledTools event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamFileAnnotationsEvent(chatId: string, data: any) {
|
streamFileAnnotationsEvent(chatId: string, data: any) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing fileAnnotations event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -163,12 +206,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming fileAnnotations event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming fileAnnotations event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamToolEvent(chatId: string, data: any): void {
|
streamToolEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing tool event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -178,12 +226,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming tool event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming tool event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming tool event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamAgentReasoningEvent(chatId: string, data: any): void {
|
streamAgentReasoningEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing agentReasoning event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -193,12 +246,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming agentReasoning event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming agentReasoning event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamAgentFlowEvent(chatId: string, data: any): void {
|
streamAgentFlowEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing agentFlowEvent event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -208,12 +266,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming agentFlow event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming agentFlow event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming agentFlow event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
|
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing agentFlowExecutedData event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -223,12 +286,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming agentFlowExecutedData event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming agentFlowExecutedData event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamNextAgentEvent(chatId: string, data: any): void {
|
streamNextAgentEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing nextAgent event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -238,12 +306,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming nextAgent event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming nextAgent event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming nextAgent event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamNextAgentFlowEvent(chatId: string, data: any): void {
|
streamNextAgentFlowEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing nextAgentFlow event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -253,12 +326,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming nextAgentFlow event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming nextAgentFlow event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamActionEvent(chatId: string, data: any): void {
|
streamActionEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing action event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -268,12 +346,17 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming action event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming action event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming action event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamAbortEvent(chatId: string): void {
|
streamAbortEvent(chatId: string): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing abort event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -283,7 +366,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming abort event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming abort event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming abort event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -293,6 +378,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
|
|
||||||
streamErrorEvent(chatId: string, msg: string) {
|
streamErrorEvent(chatId: string, msg: string) {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing error event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -302,7 +390,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming error event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming error event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming error event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -334,6 +424,9 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
|
|
||||||
streamUsageMetadataEvent(chatId: string, data: any): void {
|
streamUsageMetadataEvent(chatId: string, data: any): void {
|
||||||
try {
|
try {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisPublisher.publish(...)
|
||||||
|
logger.info(`[RedisPublisher] Publishing usageMetadata event - chatId: ${chatId}`)
|
||||||
this.redisPublisher.publish(
|
this.redisPublisher.publish(
|
||||||
chatId,
|
chatId,
|
||||||
JSON.stringify({
|
JSON.stringify({
|
||||||
|
|
@ -343,13 +436,19 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error streaming usage metadata event:', error)
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: console.error('Error streaming usage metadata event:', error)
|
||||||
|
logger.error(`[RedisPublisher] Error streaming usage metadata event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async disconnect() {
|
async disconnect() {
|
||||||
if (this.redisPublisher) {
|
if (this.redisPublisher) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: await this.redisPublisher.quit()
|
||||||
|
logger.info(`[RedisPublisher] Disconnecting from Redis...`)
|
||||||
await this.redisPublisher.quit()
|
await this.redisPublisher.quit()
|
||||||
|
logger.info(`[RedisPublisher] Disconnected from Redis`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import { createClient } from 'redis'
|
import { createClient } from 'redis'
|
||||||
import { SSEStreamer } from '../utils/SSEStreamer'
|
import { SSEStreamer } from '../utils/SSEStreamer'
|
||||||
|
import logger from '../utils/logger'
|
||||||
|
|
||||||
export class RedisEventSubscriber {
|
export class RedisEventSubscriber {
|
||||||
private redisSubscriber: ReturnType<typeof createClient>
|
private redisSubscriber: ReturnType<typeof createClient>
|
||||||
|
|
@ -47,7 +48,11 @@ export class RedisEventSubscriber {
|
||||||
}
|
}
|
||||||
|
|
||||||
async connect() {
|
async connect() {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: await this.redisSubscriber.connect()
|
||||||
|
logger.info(`[RedisSubscriber] Connecting to Redis...`)
|
||||||
await this.redisSubscriber.connect()
|
await this.redisSubscriber.connect()
|
||||||
|
logger.info(`[RedisSubscriber] Connected to Redis successfully`)
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe(channel: string) {
|
subscribe(channel: string) {
|
||||||
|
|
@ -58,78 +63,110 @@ export class RedisEventSubscriber {
|
||||||
|
|
||||||
// Check if already subscribed
|
// Check if already subscribed
|
||||||
if (this.subscribedChannels.has(channel)) {
|
if (this.subscribedChannels.has(channel)) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: return // Prevent duplicate subscription
|
||||||
|
logger.info(`[RedisSubscriber] Already subscribed to channel: ${channel}`)
|
||||||
return // Prevent duplicate subscription
|
return // Prevent duplicate subscription
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.redisSubscriber.subscribe(channel, (message) => { this.handleEvent(message) })
|
||||||
|
logger.info(`[RedisSubscriber] Subscribing to channel: ${channel}`)
|
||||||
this.redisSubscriber.subscribe(channel, (message) => {
|
this.redisSubscriber.subscribe(channel, (message) => {
|
||||||
this.handleEvent(message)
|
this.handleEvent(message)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Mark the channel as subscribed
|
// Mark the channel as subscribed
|
||||||
this.subscribedChannels.add(channel)
|
this.subscribedChannels.add(channel)
|
||||||
|
logger.info(`[RedisSubscriber] Successfully subscribed to channel: ${channel}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
private handleEvent(message: string) {
|
private handleEvent(message: string) {
|
||||||
// Parse the message from Redis
|
try {
|
||||||
const event = JSON.parse(message)
|
// Parse the message from Redis
|
||||||
const { eventType, chatId, data } = event
|
const event = JSON.parse(message)
|
||||||
|
const { eventType, chatId, data } = event
|
||||||
|
|
||||||
// Stream the event to the client
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
switch (eventType) {
|
// Original: switch (eventType) { ... }
|
||||||
case 'start':
|
logger.info(`[RedisSubscriber] Received event - chatId: ${chatId}, eventType: ${eventType}`)
|
||||||
this.sseStreamer.streamStartEvent(chatId, data)
|
|
||||||
break
|
// Stream the event to the client
|
||||||
case 'token':
|
switch (eventType) {
|
||||||
this.sseStreamer.streamTokenEvent(chatId, data)
|
case 'start':
|
||||||
break
|
this.sseStreamer.streamStartEvent(chatId, data)
|
||||||
case 'sourceDocuments':
|
break
|
||||||
this.sseStreamer.streamSourceDocumentsEvent(chatId, data)
|
case 'token':
|
||||||
break
|
this.sseStreamer.streamTokenEvent(chatId, data)
|
||||||
case 'artifacts':
|
break
|
||||||
this.sseStreamer.streamArtifactsEvent(chatId, data)
|
case 'sourceDocuments':
|
||||||
break
|
this.sseStreamer.streamSourceDocumentsEvent(chatId, data)
|
||||||
case 'usedTools':
|
break
|
||||||
this.sseStreamer.streamUsedToolsEvent(chatId, data)
|
case 'artifacts':
|
||||||
break
|
this.sseStreamer.streamArtifactsEvent(chatId, data)
|
||||||
case 'fileAnnotations':
|
break
|
||||||
this.sseStreamer.streamFileAnnotationsEvent(chatId, data)
|
case 'usedTools':
|
||||||
break
|
this.sseStreamer.streamUsedToolsEvent(chatId, data)
|
||||||
case 'tool':
|
break
|
||||||
this.sseStreamer.streamToolEvent(chatId, data)
|
case 'fileAnnotations':
|
||||||
break
|
this.sseStreamer.streamFileAnnotationsEvent(chatId, data)
|
||||||
case 'agentReasoning':
|
break
|
||||||
this.sseStreamer.streamAgentReasoningEvent(chatId, data)
|
case 'tool':
|
||||||
break
|
this.sseStreamer.streamToolEvent(chatId, data)
|
||||||
case 'nextAgent':
|
break
|
||||||
this.sseStreamer.streamNextAgentEvent(chatId, data)
|
case 'agentReasoning':
|
||||||
break
|
this.sseStreamer.streamAgentReasoningEvent(chatId, data)
|
||||||
case 'agentFlowEvent':
|
break
|
||||||
this.sseStreamer.streamAgentFlowEvent(chatId, data)
|
case 'nextAgent':
|
||||||
break
|
this.sseStreamer.streamNextAgentEvent(chatId, data)
|
||||||
case 'agentFlowExecutedData':
|
break
|
||||||
this.sseStreamer.streamAgentFlowExecutedDataEvent(chatId, data)
|
case 'agentFlowEvent':
|
||||||
break
|
this.sseStreamer.streamAgentFlowEvent(chatId, data)
|
||||||
case 'nextAgentFlow':
|
break
|
||||||
this.sseStreamer.streamNextAgentFlowEvent(chatId, data)
|
case 'agentFlowExecutedData':
|
||||||
break
|
this.sseStreamer.streamAgentFlowExecutedDataEvent(chatId, data)
|
||||||
case 'action':
|
break
|
||||||
this.sseStreamer.streamActionEvent(chatId, data)
|
case 'nextAgentFlow':
|
||||||
break
|
this.sseStreamer.streamNextAgentFlowEvent(chatId, data)
|
||||||
case 'abort':
|
break
|
||||||
this.sseStreamer.streamAbortEvent(chatId)
|
case 'action':
|
||||||
break
|
this.sseStreamer.streamActionEvent(chatId, data)
|
||||||
case 'error':
|
break
|
||||||
this.sseStreamer.streamErrorEvent(chatId, data)
|
case 'abort':
|
||||||
break
|
this.sseStreamer.streamAbortEvent(chatId)
|
||||||
case 'metadata':
|
break
|
||||||
this.sseStreamer.streamMetadataEvent(chatId, data)
|
case 'error':
|
||||||
break
|
this.sseStreamer.streamErrorEvent(chatId, data)
|
||||||
|
break
|
||||||
|
case 'metadata':
|
||||||
|
this.sseStreamer.streamMetadataEvent(chatId, data)
|
||||||
|
break
|
||||||
|
case 'calledTools':
|
||||||
|
this.sseStreamer.streamCalledToolsEvent(chatId, data)
|
||||||
|
break
|
||||||
|
case 'usageMetadata':
|
||||||
|
this.sseStreamer.streamUsageMetadataEvent(chatId, data)
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: break
|
||||||
|
logger.warn(`[RedisSubscriber] Unknown event type: ${eventType} for chatId: ${chatId}`)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no error handling)
|
||||||
|
logger.error(`[RedisSubscriber] Error handling event`, { error, message })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async disconnect() {
|
async disconnect() {
|
||||||
if (this.redisSubscriber) {
|
if (this.redisSubscriber) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: await this.redisSubscriber.quit()
|
||||||
|
logger.info(`[RedisSubscriber] Disconnecting from Redis...`)
|
||||||
await this.redisSubscriber.quit()
|
await this.redisSubscriber.quit()
|
||||||
|
logger.info(`[RedisSubscriber] Disconnected from Redis`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
import { Response } from 'express'
|
import { Response } from 'express'
|
||||||
import { IServerSideEventStreamer } from 'flowise-components'
|
import { IServerSideEventStreamer } from 'flowise-components'
|
||||||
|
import logger from './logger'
|
||||||
|
|
||||||
// define a new type that has a client type (INTERNAL or EXTERNAL) and Response type
|
// define a new type that has a client type (INTERNAL or EXTERNAL) and Response type
|
||||||
type Client = {
|
type Client = {
|
||||||
|
|
@ -14,34 +15,62 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
||||||
clients: { [id: string]: Client } = {}
|
clients: { [id: string]: Client } = {}
|
||||||
|
|
||||||
addExternalClient(chatId: string, res: Response) {
|
addExternalClient(chatId: string, res: Response) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
|
||||||
|
logger.info(`[SSEStreamer] Adding external client - chatId: ${chatId}`)
|
||||||
this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
|
this.clients[chatId] = { clientType: 'EXTERNAL', response: res, started: false }
|
||||||
}
|
}
|
||||||
|
|
||||||
addClient(chatId: string, res: Response) {
|
addClient(chatId: string, res: Response) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
|
||||||
|
logger.info(`[SSEStreamer] Adding internal client - chatId: ${chatId}`)
|
||||||
this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
|
this.clients[chatId] = { clientType: 'INTERNAL', response: res, started: false }
|
||||||
}
|
}
|
||||||
|
|
||||||
removeClient(chatId: string) {
|
removeClient(chatId: string) {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'end',
|
// Original: const clientResponse = { event: 'end', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.response.end()
|
||||||
data: '[DONE]'
|
logger.info(`[SSEStreamer] Removing client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'end',
|
||||||
|
data: '[DONE]'
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
client.response.end()
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error removing client - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
|
||||||
client.response.end()
|
|
||||||
delete this.clients[chatId]
|
delete this.clients[chatId]
|
||||||
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] Attempted to remove non-existent client - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
streamCustomEvent(chatId: string, eventType: string, data: any) {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: eventType,
|
// Original: const clientResponse = { event: eventType, data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming custom event to client - chatId: ${chatId}, eventType: ${eventType}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: eventType,
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming custom event - chatId: ${chatId}, eventType: ${eventType}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for custom event - chatId: ${chatId}, eventType: ${eventType}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -49,155 +78,324 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
// prevent multiple start events being streamed to the client
|
// prevent multiple start events being streamed to the client
|
||||||
if (client && !client.started) {
|
if (client && !client.started) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'start',
|
// Original: const clientResponse = { event: 'start', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n'); client.started = true
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming start event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'start',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
client.started = true
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming start event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else if (client && client.started) {
|
||||||
client.started = true
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] Start event already sent for chatId: ${chatId}`)
|
||||||
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for start event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTokenEvent(chatId: string, data: string) {
|
streamTokenEvent(chatId: string, data: string) {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'token',
|
// Original: const clientResponse = { event: 'token', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming token event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'token',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming token event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for token event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSourceDocumentsEvent(chatId: string, data: any) {
|
streamSourceDocumentsEvent(chatId: string, data: any) {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'sourceDocuments',
|
// Original: const clientResponse = { event: 'sourceDocuments', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming sourceDocuments event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'sourceDocuments',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming sourceDocuments event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for sourceDocuments event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamArtifactsEvent(chatId: string, data: any) {
|
streamArtifactsEvent(chatId: string, data: any) {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'artifacts',
|
// Original: const clientResponse = { event: 'artifacts', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming artifacts event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'artifacts',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming artifacts event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for artifacts event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamUsedToolsEvent(chatId: string, data: any): void {
|
streamUsedToolsEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'usedTools',
|
// Original: const clientResponse = { event: 'usedTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming usedTools event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'usedTools',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming usedTools event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for usedTools event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamCalledToolsEvent(chatId: string, data: any): void {
|
streamCalledToolsEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'calledTools',
|
// Original: const clientResponse = { event: 'calledTools', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming calledTools event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'calledTools',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming calledTools event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for calledTools event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamFileAnnotationsEvent(chatId: string, data: any): void {
|
streamFileAnnotationsEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'fileAnnotations',
|
// Original: const clientResponse = { event: 'fileAnnotations', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming fileAnnotations event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'fileAnnotations',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming fileAnnotations event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for fileAnnotations event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamToolEvent(chatId: string, data: any): void {
|
streamToolEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'tool',
|
// Original: const clientResponse = { event: 'tool', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming tool event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'tool',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming tool event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for tool event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamAgentReasoningEvent(chatId: string, data: any): void {
|
streamAgentReasoningEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'agentReasoning',
|
// Original: const clientResponse = { event: 'agentReasoning', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming agentReasoning event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'agentReasoning',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming agentReasoning event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for agentReasoning event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamNextAgentEvent(chatId: string, data: any): void {
|
streamNextAgentEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'nextAgent',
|
// Original: const clientResponse = { event: 'nextAgent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming nextAgent event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'nextAgent',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming nextAgent event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for nextAgent event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamAgentFlowEvent(chatId: string, data: any): void {
|
streamAgentFlowEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'agentFlowEvent',
|
// Original: const clientResponse = { event: 'agentFlowEvent', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming agentFlowEvent event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'agentFlowEvent',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming agentFlowEvent event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for agentFlowEvent event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
|
streamAgentFlowExecutedDataEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'agentFlowExecutedData',
|
// Original: const clientResponse = { event: 'agentFlowExecutedData', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming agentFlowExecutedData event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'agentFlowExecutedData',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming agentFlowExecutedData event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for agentFlowExecutedData event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamNextAgentFlowEvent(chatId: string, data: any): void {
|
streamNextAgentFlowEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'nextAgentFlow',
|
// Original: const clientResponse = { event: 'nextAgentFlow', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming nextAgentFlow event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'nextAgentFlow',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming nextAgentFlow event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for nextAgentFlow event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
streamActionEvent(chatId: string, data: any): void {
|
streamActionEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'action',
|
// Original: const clientResponse = { event: 'action', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming action event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'action',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming action event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for action event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamAbortEvent(chatId: string): void {
|
streamAbortEvent(chatId: string): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'abort',
|
// Original: const clientResponse = { event: 'abort', data: '[DONE]' }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: '[DONE]'
|
logger.info(`[SSEStreamer] Streaming abort event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'abort',
|
||||||
|
data: '[DONE]'
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming abort event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for abort event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -209,15 +407,29 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
||||||
if (msg.includes('401 Incorrect API key provided')) msg = '401 Invalid model key or Incorrect local model configuration.'
|
if (msg.includes('401 Incorrect API key provided')) msg = '401 Invalid model key or Incorrect local model configuration.'
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'error',
|
// Original: const clientResponse = { event: 'error', data: msg }; client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: msg
|
logger.info(`[SSEStreamer] Streaming error event to client - chatId: ${chatId}, error: ${msg}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'error',
|
||||||
|
data: msg
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming error event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for error event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetadataEvent(chatId: string, apiResponse: any) {
|
streamMetadataEvent(chatId: string, apiResponse: any) {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: const metadataJson: any = {}; ... (no initial logging)
|
||||||
|
logger.info(`[SSEStreamer] Processing metadata event - chatId: ${chatId}`)
|
||||||
const metadataJson: any = {}
|
const metadataJson: any = {}
|
||||||
if (apiResponse.chatId) {
|
if (apiResponse.chatId) {
|
||||||
metadataJson['chatId'] = apiResponse.chatId
|
metadataJson['chatId'] = apiResponse.chatId
|
||||||
|
|
@ -244,17 +456,32 @@ export class SSEStreamer implements IServerSideEventStreamer {
|
||||||
}
|
}
|
||||||
if (Object.keys(metadataJson).length > 0) {
|
if (Object.keys(metadataJson).length > 0) {
|
||||||
this.streamCustomEvent(chatId, 'metadata', metadataJson)
|
this.streamCustomEvent(chatId, 'metadata', metadataJson)
|
||||||
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No metadata to stream - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamUsageMetadataEvent(chatId: string, data: any): void {
|
streamUsageMetadataEvent(chatId: string, data: any): void {
|
||||||
const client = this.clients[chatId]
|
const client = this.clients[chatId]
|
||||||
if (client) {
|
if (client) {
|
||||||
const clientResponse = {
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
event: 'usageMetadata',
|
// Original: const clientResponse = { event: 'usageMetadata', data: data }; client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
data: data
|
logger.info(`[SSEStreamer] Streaming usageMetadata event to client - chatId: ${chatId}`)
|
||||||
|
try {
|
||||||
|
const clientResponse = {
|
||||||
|
event: 'usageMetadata',
|
||||||
|
data: data
|
||||||
|
}
|
||||||
|
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`[SSEStreamer] Error streaming usageMetadata event - chatId: ${chatId}`, { error })
|
||||||
}
|
}
|
||||||
client.response.write('message:\ndata:' + JSON.stringify(clientResponse) + '\n\n')
|
} else {
|
||||||
|
// TODO: Remove debug logging after fixing Redis pub-sub issues
|
||||||
|
// Original: (no warning)
|
||||||
|
logger.warn(`[SSEStreamer] No client found for usageMetadata event - chatId: ${chatId}`)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue