feat: Implement caching for MCP toolkit in CachePool
- Added methods to CachePool for managing MCP toolkit cache. - Integrated caching logic in CustomMCP to store and retrieve toolkit data based on workspaceId and configuration. - Updated node service to pass cachePool to CustomMCP for enhanced performance.
This commit is contained in:
parent
d584c0b700
commit
8c37d4ba3b
|
|
@ -3,6 +3,7 @@ import { ICommonObject, IDatabaseEntity, INode, INodeData, INodeOptionsValue, IN
|
||||||
import { MCPToolkit } from '../core'
|
import { MCPToolkit } from '../core'
|
||||||
import { getVars, prepareSandboxVars } from '../../../../src/utils'
|
import { getVars, prepareSandboxVars } from '../../../../src/utils'
|
||||||
import { DataSource } from 'typeorm'
|
import { DataSource } from 'typeorm'
|
||||||
|
import hash from 'object-hash'
|
||||||
|
|
||||||
const mcpServerConfig = `{
|
const mcpServerConfig = `{
|
||||||
"command": "npx",
|
"command": "npx",
|
||||||
|
|
@ -140,6 +141,24 @@ class Custom_MCP implements INode {
|
||||||
sandbox['$vars'] = prepareSandboxVars(variables)
|
sandbox['$vars'] = prepareSandboxVars(variables)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const workspaceId = options?.searchOptions?.workspaceId?._value || options?.workspaceId
|
||||||
|
|
||||||
|
let canonicalConfig
|
||||||
|
try {
|
||||||
|
canonicalConfig = JSON.parse(mcpServerConfig)
|
||||||
|
} catch (e) {
|
||||||
|
canonicalConfig = mcpServerConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
const cacheKey = hash({ workspaceId, canonicalConfig, sandbox })
|
||||||
|
|
||||||
|
if (options.cachePool) {
|
||||||
|
const cachedResult = await options.cachePool.getMCPCache(cacheKey)
|
||||||
|
if (cachedResult) {
|
||||||
|
return cachedResult.tools
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
let serverParams
|
let serverParams
|
||||||
if (typeof mcpServerConfig === 'object') {
|
if (typeof mcpServerConfig === 'object') {
|
||||||
|
|
@ -162,6 +181,10 @@ class Custom_MCP implements INode {
|
||||||
|
|
||||||
const tools = toolkit.tools ?? []
|
const tools = toolkit.tools ?? []
|
||||||
|
|
||||||
|
if (options.cachePool) {
|
||||||
|
await options.cachePool.addMCPCache(cacheKey, { toolkit, tools })
|
||||||
|
}
|
||||||
|
|
||||||
return tools as Tool[]
|
return tools as Tool[]
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
throw new Error(`Invalid MCP Server Config: ${error}`)
|
throw new Error(`Invalid MCP Server Config: ${error}`)
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ export class CachePool {
|
||||||
private redisClient: Redis | null = null
|
private redisClient: Redis | null = null
|
||||||
activeLLMCache: IActiveCache = {}
|
activeLLMCache: IActiveCache = {}
|
||||||
activeEmbeddingCache: IActiveCache = {}
|
activeEmbeddingCache: IActiveCache = {}
|
||||||
|
activeMCPCache: { [key: string]: any } = {}
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
if (process.env.MODE === MODE.QUEUE) {
|
if (process.env.MODE === MODE.QUEUE) {
|
||||||
|
|
@ -73,6 +74,29 @@ export class CachePool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add to the mcp toolkit cache pool
|
||||||
|
* @param {string} cacheKey
|
||||||
|
* @param {any} value
|
||||||
|
*/
|
||||||
|
async addMCPCache(cacheKey: string, value: any) {
|
||||||
|
// Only add to cache for non-queue mode, because we are storing the toolkit instances in memory, and we can't store them in redis
|
||||||
|
if (process.env.MODE !== MODE.QUEUE) {
|
||||||
|
this.activeMCPCache[`mcpCache:${cacheKey}`] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get item from mcp toolkit cache pool
|
||||||
|
* @param {string} cacheKey
|
||||||
|
*/
|
||||||
|
async getMCPCache(cacheKey: string): Promise<any | undefined> {
|
||||||
|
if (process.env.MODE !== MODE.QUEUE) {
|
||||||
|
return this.activeMCPCache[`mcpCache:${cacheKey}`]
|
||||||
|
}
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get item from llm cache pool
|
* Get item from llm cache pool
|
||||||
* @param {string} chatflowid
|
* @param {string} chatflowid
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,8 @@ const getSingleNodeAsyncOptions = async (nodeName: string, requestBody: any): Pr
|
||||||
componentNodes: appServer.nodesPool.componentNodes,
|
componentNodes: appServer.nodesPool.componentNodes,
|
||||||
previousNodes: requestBody.previousNodes,
|
previousNodes: requestBody.previousNodes,
|
||||||
currentNode: requestBody.currentNode,
|
currentNode: requestBody.currentNode,
|
||||||
searchOptions: requestBody.searchOptions
|
searchOptions: requestBody.searchOptions,
|
||||||
|
cachePool: appServer.cachePool
|
||||||
})
|
})
|
||||||
|
|
||||||
return dbResponse
|
return dbResponse
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue