add custom analytics

This commit is contained in:
Henry 2023-12-07 18:46:03 +00:00
parent b57df046dc
commit 7578183ac2
4 changed files with 522 additions and 4 deletions

View File

@ -8,6 +8,7 @@ import * as path from 'node:path'
import fetch from 'node-fetch' import fetch from 'node-fetch'
import { flatten, uniqWith, isEqual } from 'lodash' import { flatten, uniqWith, isEqual } from 'lodash'
import { zodToJsonSchema } from 'zod-to-json-schema' import { zodToJsonSchema } from 'zod-to-json-schema'
import { AnalyticHandler } from '../../../src/handler'
class OpenAIAssistant_Agents implements INode { class OpenAIAssistant_Agents implements INode {
label: string label: string
@ -149,6 +150,11 @@ class OpenAIAssistant_Agents implements INode {
const openai = new OpenAI({ apiKey: openAIApiKey }) const openai = new OpenAI({ apiKey: openAIApiKey })
// Start analytics
const analyticHandlers = new AnalyticHandler(nodeData, options)
await analyticHandlers.init()
const parentIds = await analyticHandlers.onChainStart('OpenAIAssistant', input)
try { try {
const assistantDetails = JSON.parse(assistant.details) const assistantDetails = JSON.parse(assistant.details)
const openAIAssistantId = assistantDetails.id const openAIAssistantId = assistantDetails.id
@ -171,7 +177,8 @@ class OpenAIAssistant_Agents implements INode {
} }
const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({ const chatmessage = await appDataSource.getRepository(databaseEntities['ChatMessage']).findOneBy({
chatId: options.chatId chatId: options.chatId,
chatflowid: options.chatflowid
}) })
let threadId = '' let threadId = ''
@ -185,7 +192,7 @@ class OpenAIAssistant_Agents implements INode {
threadId = thread.id threadId = thread.id
} }
// List all runs // List all runs, in case existing thread is still running
if (!isNewThread) { if (!isNewThread) {
const promise = (threadId: string) => { const promise = (threadId: string) => {
return new Promise<void>((resolve) => { return new Promise<void>((resolve) => {
@ -221,6 +228,7 @@ class OpenAIAssistant_Agents implements INode {
}) })
// Run assistant thread // Run assistant thread
const llmIds = await analyticHandlers.onLLMStart('ChatOpenAI', input, parentIds)
const runThread = await openai.beta.threads.runs.create(threadId, { const runThread = await openai.beta.threads.runs.create(threadId, {
assistant_id: retrievedAssistant.id assistant_id: retrievedAssistant.id
}) })
@ -253,7 +261,15 @@ class OpenAIAssistant_Agents implements INode {
for (let i = 0; i < actions.length; i += 1) { for (let i = 0; i < actions.length; i += 1) {
const tool = tools.find((tool: any) => tool.name === actions[i].tool) const tool = tools.find((tool: any) => tool.name === actions[i].tool)
if (!tool) continue if (!tool) continue
// Start tool analytics
const toolIds = await analyticHandlers.onToolStart(tool.name, actions[i].toolInput, parentIds)
const toolOutput = await tool.call(actions[i].toolInput) const toolOutput = await tool.call(actions[i].toolInput)
// End tool analytics
await analyticHandlers.onToolEnd(toolIds, toolOutput)
submitToolOutputs.push({ submitToolOutputs.push({
tool_call_id: actions[i].toolCallId, tool_call_id: actions[i].toolCallId,
output: toolOutput output: toolOutput
@ -302,7 +318,9 @@ class OpenAIAssistant_Agents implements INode {
runThreadId = newRunThread.id runThreadId = newRunThread.id
state = await promise(threadId, newRunThread.id) state = await promise(threadId, newRunThread.id)
} else { } else {
throw new Error(`Error processing thread: ${state}, Thread ID: ${threadId}`) const errMsg = `Error processing thread: ${state}, Thread ID: ${threadId}`
await analyticHandlers.onChainError(parentIds, errMsg)
throw new Error(errMsg)
} }
} }
@ -387,11 +405,18 @@ class OpenAIAssistant_Agents implements INode {
const bitmap = fsDefault.readFileSync(filePath) const bitmap = fsDefault.readFileSync(filePath)
const base64String = Buffer.from(bitmap).toString('base64') const base64String = Buffer.from(bitmap).toString('base64')
// TODO: Use a file path and retrieve image on the fly. Storing as base64 to localStorage and database will easily hit limits
const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>` const imgHTML = `<img src="data:image/png;base64,${base64String}" width="100%" height="max-content" alt="${fileObj.filename}" /><br/>`
returnVal += imgHTML returnVal += imgHTML
} }
} }
const imageRegex = /<img[^>]*\/>/g
let llmOutput = returnVal.replace(imageRegex, '')
llmOutput = llmOutput.replace('<br/>', '')
await analyticHandlers.onLLMEnd(llmIds, llmOutput)
await analyticHandlers.onChainEnd(parentIds, messageData, true)
return { return {
text: returnVal, text: returnVal,
usedTools, usedTools,
@ -399,6 +424,7 @@ class OpenAIAssistant_Agents implements INode {
assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData } assistant: { assistantId: openAIAssistantId, threadId, runId: runThreadId, messages: messageData }
} }
} catch (error) { } catch (error) {
await analyticHandlers.onChainError(parentIds, error, true)
throw new Error(error) throw new Error(error)
} }
} }

View File

@ -51,8 +51,9 @@
"husky": "^8.0.3", "husky": "^8.0.3",
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"langchain": "^0.0.196", "langchain": "^0.0.196",
"langfuse": "^1.2.0",
"langfuse-langchain": "^1.0.31", "langfuse-langchain": "^1.0.31",
"langsmith": "^0.0.32", "langsmith": "^0.0.49",
"linkifyjs": "^4.1.1", "linkifyjs": "^4.1.1",
"llmonitor": "^0.5.5", "llmonitor": "^0.5.5",
"mammoth": "^1.5.1", "mammoth": "^1.5.1",

View File

@ -8,6 +8,10 @@ import { LLMonitorHandler } from 'langchain/callbacks/handlers/llmonitor'
import { getCredentialData, getCredentialParam } from './utils' import { getCredentialData, getCredentialParam } from './utils'
import { ICommonObject, INodeData } from './Interface' import { ICommonObject, INodeData } from './Interface'
import CallbackHandler from 'langfuse-langchain' import CallbackHandler from 'langfuse-langchain'
import { RunTree, RunTreeConfig, Client as LangsmithClient } from 'langsmith'
import { Langfuse, LangfuseTraceClient, LangfuseSpanClient, LangfuseGenerationClient } from 'langfuse' // or "langfuse-node"
import monitor from 'llmonitor'
import { v4 as uuidv4 } from 'uuid'
interface AgentRun extends Run { interface AgentRun extends Run {
actions: AgentAction[] actions: AgentAction[]
@ -273,3 +277,488 @@ export const additionalCallbacks = async (nodeData: INodeData, options: ICommonO
throw new Error(e) throw new Error(e)
} }
} }
export class AnalyticHandler {
nodeData: INodeData
options: ICommonObject = {}
handlers: ICommonObject = {}
constructor(nodeData: INodeData, options: ICommonObject) {
this.options = options
this.nodeData = nodeData
this.init()
}
async init() {
try {
if (!this.options.analytic) return
const analytic = JSON.parse(this.options.analytic)
for (const provider in analytic) {
const providerStatus = analytic[provider].status as boolean
if (providerStatus) {
const credentialId = analytic[provider].credentialId as string
const credentialData = await getCredentialData(credentialId ?? '', this.options)
if (provider === 'langSmith') {
const langSmithProject = analytic[provider].projectName as string
const langSmithApiKey = getCredentialParam('langSmithApiKey', credentialData, this.nodeData)
const langSmithEndpoint = getCredentialParam('langSmithEndpoint', credentialData, this.nodeData)
const client = new LangsmithClient({
apiUrl: langSmithEndpoint ?? 'https://api.smith.langchain.com',
apiKey: langSmithApiKey
})
this.handlers['langSmith'] = { client, langSmithProject }
} else if (provider === 'langFuse') {
const release = analytic[provider].release as string
const langFuseSecretKey = getCredentialParam('langFuseSecretKey', credentialData, this.nodeData)
const langFusePublicKey = getCredentialParam('langFusePublicKey', credentialData, this.nodeData)
const langFuseEndpoint = getCredentialParam('langFuseEndpoint', credentialData, this.nodeData)
const langfuse = new Langfuse({
secretKey: langFuseSecretKey,
publicKey: langFusePublicKey,
baseUrl: langFuseEndpoint ?? 'https://cloud.langfuse.com',
release
})
this.handlers['langFuse'] = { client: langfuse }
} else if (provider === 'llmonitor') {
const llmonitorAppId = getCredentialParam('llmonitorAppId', credentialData, this.nodeData)
const llmonitorEndpoint = getCredentialParam('llmonitorEndpoint', credentialData, this.nodeData)
monitor.init({
appId: llmonitorAppId,
apiUrl: llmonitorEndpoint
})
this.handlers['llmonitor'] = { client: monitor }
}
}
}
} catch (e) {
throw new Error(e)
}
}
async onChainStart(name: string, input: string, parentIds?: ICommonObject) {
const returnIds: ICommonObject = {
langSmith: {},
langFuse: {},
llmonitor: {}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
if (!parentIds || !Object.keys(parentIds).length) {
const parentRunConfig: RunTreeConfig = {
name,
run_type: 'chain',
inputs: {
text: input
},
serialized: {},
project_name: this.handlers['langSmith'].langSmithProject,
client: this.handlers['langSmith'].client
}
const parentRun = new RunTree(parentRunConfig)
await parentRun.postRun()
this.handlers['langSmith'].chainRun = { [parentRun.id]: parentRun }
returnIds['langSmith'].chainRun = parentRun.id
} else {
const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun]
if (parentRun) {
const childChainRun = await parentRun.createChild({
name,
run_type: 'chain',
inputs: {
text: input
}
})
await childChainRun.postRun()
this.handlers['langSmith'].chainRun = { [childChainRun.id]: childChainRun }
returnIds['langSmith'].chainRun = childChainRun.id
}
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
let langfuseTraceClient: LangfuseTraceClient
if (!parentIds || !Object.keys(parentIds).length) {
const langfuse: Langfuse = this.handlers['langFuse'].client
langfuseTraceClient = langfuse.trace({
name,
userId: this.options.chatId,
metadata: { tags: ['openai-assistant'] }
})
} else {
langfuseTraceClient = this.handlers['langFuse'].trace[parentIds['langFuse']]
}
if (langfuseTraceClient) {
const span = langfuseTraceClient.span({
name,
input: {
text: input
}
})
this.handlers['langFuse'].trace = { [langfuseTraceClient.id]: langfuseTraceClient }
this.handlers['langFuse'].span = { [span.id]: span }
returnIds['langFuse'].trace = langfuseTraceClient.id
returnIds['langFuse'].span = span.id
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const monitor = this.handlers['llmonitor'].client
if (monitor) {
const runId = uuidv4()
await monitor.trackEvent('chain', 'start', {
runId,
name,
userId: this.options.chatId,
input
})
this.handlers['llmonitor'].chainEvent = { [runId]: runId }
returnIds['llmonitor'].chainEvent = runId
}
}
return returnIds
}
async onChainEnd(returnIds: ICommonObject, output: string | object, shutdown = false) {
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const chainRun: RunTree | undefined = this.handlers['langSmith'].chainRun[returnIds['langSmith'].chainRun]
if (chainRun) {
await chainRun.end({
outputs: {
output
}
})
await chainRun.patchRun()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const span: LangfuseSpanClient | undefined = this.handlers['langFuse'].span[returnIds['langFuse'].span]
if (span) {
span.end({
output
})
if (shutdown) {
const langfuse: Langfuse = this.handlers['langFuse'].client
await langfuse.shutdownAsync()
}
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const chainEventId = returnIds['llmonitor'].chainEvent
const monitor = this.handlers['llmonitor'].client
if (monitor && chainEventId) {
await monitor.trackEvent('chain', 'end', {
runId: chainEventId,
output
})
}
}
}
async onChainError(returnIds: ICommonObject, error: string | object, shutdown = false) {
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const chainRun: RunTree | undefined = this.handlers['langSmith'].chainRun[returnIds['langSmith'].chainRun]
if (chainRun) {
await chainRun.end({
error: {
error
}
})
await chainRun.patchRun()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const span: LangfuseSpanClient | undefined = this.handlers['langFuse'].span[returnIds['langFuse'].span]
if (span) {
span.end({
output: {
error
}
})
if (shutdown) {
const langfuse: Langfuse = this.handlers['langFuse'].client
await langfuse.shutdownAsync()
}
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const chainEventId = returnIds['llmonitor'].chainEvent
const monitor = this.handlers['llmonitor'].client
if (monitor && chainEventId) {
await monitor.trackEvent('chain', 'end', {
runId: chainEventId,
output: error
})
}
}
}
async onLLMStart(name: string, input: string, parentIds: ICommonObject) {
const returnIds: ICommonObject = {
langSmith: {},
langFuse: {},
llmonitor: {}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun]
if (parentRun) {
const childLLMRun = await parentRun.createChild({
name,
run_type: 'llm',
inputs: {
prompts: [input]
}
})
await childLLMRun.postRun()
this.handlers['langSmith'].llmRun = { [childLLMRun.id]: childLLMRun }
returnIds['langSmith'].llmRun = childLLMRun.id
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const trace: LangfuseTraceClient | undefined = this.handlers['langFuse'].trace[parentIds['langFuse'].trace]
if (trace) {
const generation = trace.generation({
name,
prompt: input
})
this.handlers['langFuse'].generation = { [generation.id]: generation }
returnIds['langFuse'].generation = generation.id
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const monitor = this.handlers['llmonitor'].client
const chainEventId: string = this.handlers['llmonitor'].chainEvent[parentIds['llmonitor'].chainEvent]
if (monitor && chainEventId) {
const runId = uuidv4()
await monitor.trackEvent('llm', 'start', {
runId,
parentRunId: chainEventId,
name,
userId: this.options.chatId,
input
})
this.handlers['llmonitor'].llmEvent = { [runId]: runId }
returnIds['llmonitor'].llmEvent = runId
}
}
return returnIds
}
async onLLMEnd(returnIds: ICommonObject, output: string) {
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const llmRun: RunTree | undefined = this.handlers['langSmith'].llmRun[returnIds['langSmith'].llmRun]
if (llmRun) {
await llmRun.end({
outputs: {
generations: [output]
}
})
await llmRun.patchRun()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const generation: LangfuseGenerationClient | undefined = this.handlers['langFuse'].generation[returnIds['langFuse'].generation]
if (generation) {
generation.end({
completion: output
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const llmEventId: string = this.handlers['llmonitor'].llmEvent[returnIds['llmonitor'].llmEvent]
const monitor = this.handlers['llmonitor'].client
if (monitor && llmEventId) {
await monitor.trackEvent('llm', 'end', {
runId: llmEventId,
output
})
}
}
}
async onLLMError(returnIds: ICommonObject, error: string | object) {
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const llmRun: RunTree | undefined = this.handlers['langSmith'].llmRun[returnIds['langSmith'].llmRun]
if (llmRun) {
await llmRun.end({
error: {
error
}
})
await llmRun.patchRun()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const generation: LangfuseGenerationClient | undefined = this.handlers['langFuse'].generation[returnIds['langFuse'].generation]
if (generation) {
generation.end({
completion: error
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const llmEventId: string = this.handlers['llmonitor'].llmEvent[returnIds['llmonitor'].llmEvent]
const monitor = this.handlers['llmonitor'].client
if (monitor && llmEventId) {
await monitor.trackEvent('llm', 'end', {
runId: llmEventId,
output: error
})
}
}
}
async onToolStart(name: string, input: string | object, parentIds: ICommonObject) {
const returnIds: ICommonObject = {
langSmith: {},
langFuse: {},
llmonitor: {}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const parentRun: RunTree | undefined = this.handlers['langSmith'].chainRun[parentIds['langSmith'].chainRun]
if (parentRun) {
const childToolRun = await parentRun.createChild({
name,
run_type: 'tool',
inputs: {
input
}
})
await childToolRun.postRun()
this.handlers['langSmith'].toolRun = { [childToolRun.id]: childToolRun }
returnIds['langSmith'].toolRun = childToolRun.id
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const trace: LangfuseTraceClient | undefined = this.handlers['langFuse'].trace[parentIds['langFuse'].trace]
if (trace) {
const toolSpan = trace.span({
name,
input
})
this.handlers['langFuse'].toolSpan = { [toolSpan.id]: toolSpan }
returnIds['langFuse'].toolSpan = toolSpan.id
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const monitor = this.handlers['llmonitor'].client
const chainEventId: string = this.handlers['llmonitor'].chainEvent[parentIds['llmonitor'].chainEvent]
if (monitor && chainEventId) {
const runId = uuidv4()
await monitor.trackEvent('tool', 'start', {
runId,
parentRunId: chainEventId,
name,
userId: this.options.chatId,
input
})
this.handlers['llmonitor'].toolEvent = { [runId]: runId }
returnIds['llmonitor'].toolEvent = runId
}
}
return returnIds
}
async onToolEnd(returnIds: ICommonObject, output: string | object) {
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const toolRun: RunTree | undefined = this.handlers['langSmith'].toolRun[returnIds['langSmith'].toolRun]
if (toolRun) {
await toolRun.end({
outputs: {
output
}
})
await toolRun.patchRun()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const toolSpan: LangfuseSpanClient | undefined = this.handlers['langFuse'].toolSpan[returnIds['langFuse'].toolSpan]
if (toolSpan) {
toolSpan.end({
output
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const toolEventId: string = this.handlers['llmonitor'].toolEvent[returnIds['llmonitor'].toolEvent]
const monitor = this.handlers['llmonitor'].client
if (monitor && toolEventId) {
await monitor.trackEvent('tool', 'end', {
runId: toolEventId,
output
})
}
}
}
async onToolError(returnIds: ICommonObject, error: string | object) {
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langSmith')) {
const toolRun: RunTree | undefined = this.handlers['langSmith'].toolRun[returnIds['langSmith'].toolRun]
if (toolRun) {
await toolRun.end({
error: {
error
}
})
await toolRun.patchRun()
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'langFuse')) {
const toolSpan: LangfuseSpanClient | undefined = this.handlers['langFuse'].toolSpan[returnIds['langFuse'].toolSpan]
if (toolSpan) {
toolSpan.end({
output: error
})
}
}
if (Object.prototype.hasOwnProperty.call(this.handlers, 'llmonitor')) {
const toolEventId: string = this.handlers['llmonitor'].llmEvent[returnIds['llmonitor'].toolEvent]
const monitor = this.handlers['llmonitor'].client
if (monitor && toolEventId) {
await monitor.trackEvent('tool', 'end', {
runId: toolEventId,
output: error
})
}
}
}
}

View File

@ -1470,6 +1470,7 @@ export class App {
let result = isStreamValid let result = isStreamValid
? await nodeInstance.run(nodeToExecuteData, incomingInput.question, { ? await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatflowid,
chatHistory, chatHistory,
socketIO, socketIO,
socketIOClientId: incomingInput.socketIOClientId, socketIOClientId: incomingInput.socketIOClientId,
@ -1480,6 +1481,7 @@ export class App {
chatId chatId
}) })
: await nodeInstance.run(nodeToExecuteData, incomingInput.question, { : await nodeInstance.run(nodeToExecuteData, incomingInput.question, {
chatflowid,
chatHistory, chatHistory,
logger, logger,
appDataSource: this.AppDataSource, appDataSource: this.AppDataSource,