import { flatten } from 'lodash' import { DataType, ErrorCode, MetricType, IndexType } from '@zilliz/milvus2-sdk-node' import { Document } from 'langchain/document' import { MilvusLibArgs, Milvus } from 'langchain/vectorstores/milvus' import { Embeddings } from 'langchain/embeddings/base' import { ICommonObject, INode, INodeData, INodeOutputsValue, INodeParams } from '../../../src/Interface' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' interface InsertRow { [x: string]: string | number[] } class Milvus_VectorStores implements INode { label: string name: string version: number description: string type: string icon: string category: string badge: string baseClasses: string[] inputs: INodeParams[] credential: INodeParams outputs: INodeOutputsValue[] constructor() { this.label = 'Milvus' this.name = 'milvus' this.version = 1.0 this.type = 'Milvus' this.icon = 'milvus.svg' this.category = 'Vector Stores' this.description = `Upsert embedded data and perform similarity search upon query using Milvus, world's most advanced open-source vector database` this.baseClasses = [this.type, 'VectorStoreRetriever', 'BaseRetriever'] this.badge = 'NEW' this.credential = { label: 'Connect Credential', name: 'credential', type: 'credential', optional: true, credentialNames: ['milvusAuth'] } this.inputs = [ { label: 'Document', name: 'document', type: 'Document', list: true, optional: true }, { label: 'Embeddings', name: 'embeddings', type: 'Embeddings' }, { label: 'Milvus Server URL', name: 'milvusServerUrl', type: 'string', placeholder: 'http://localhost:19530' }, { label: 'Milvus Collection Name', name: 'milvusCollection', type: 'string' }, { label: 'Milvus Filter', name: 'milvusFilter', type: 'string', optional: true, description: 'Filter data with a simple string query. Refer Milvus docs for more details.', placeholder: 'doc=="a"', additionalParams: true }, { label: 'Top K', name: 'topK', description: 'Number of top results to fetch. Default to 4', placeholder: '4', type: 'number', additionalParams: true, optional: true } ] this.outputs = [ { label: 'Milvus Retriever', name: 'retriever', baseClasses: this.baseClasses }, { label: 'Milvus Vector Store', name: 'vectorStore', baseClasses: [this.type, ...getBaseClasses(Milvus)] } ] } //@ts-ignore vectorStoreMethods = { async upsert(nodeData: INodeData, options: ICommonObject): Promise { // server setup const address = nodeData.inputs?.milvusServerUrl as string const collectionName = nodeData.inputs?.milvusCollection as string // embeddings const docs = nodeData.inputs?.document as Document[] const embeddings = nodeData.inputs?.embeddings as Embeddings // credential const credentialData = await getCredentialData(nodeData.credential ?? '', options) const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData) const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData) // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName } if (milvusUser) milVusArgs.username = milvusUser if (milvusPassword) milVusArgs.password = milvusPassword const flattenDocs = docs && docs.length ? flatten(docs) : [] const finalDocs = [] for (let i = 0; i < flattenDocs.length; i += 1) { if (flattenDocs[i] && flattenDocs[i].pageContent) { finalDocs.push(new Document(flattenDocs[i])) } } try { const vectorStore = await MilvusUpsert.fromDocuments(finalDocs, embeddings, milVusArgs) // Avoid Illegal Invocation vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: string) => { return await similaritySearchVectorWithScore(query, k, vectorStore, undefined, filter) } } catch (e) { throw new Error(e) } } } async init(nodeData: INodeData, _: string, options: ICommonObject): Promise { // server setup const address = nodeData.inputs?.milvusServerUrl as string const collectionName = nodeData.inputs?.milvusCollection as string const milvusFilter = nodeData.inputs?.milvusFilter as string // embeddings const embeddings = nodeData.inputs?.embeddings as Embeddings const topK = nodeData.inputs?.topK as string // output const output = nodeData.outputs?.output as string // format data const k = topK ? parseFloat(topK) : 4 // credential const credentialData = await getCredentialData(nodeData.credential ?? '', options) const milvusUser = getCredentialParam('milvusUser', credentialData, nodeData) const milvusPassword = getCredentialParam('milvusPassword', credentialData, nodeData) // init MilvusLibArgs const milVusArgs: MilvusLibArgs = { url: address, collectionName: collectionName } if (milvusUser) milVusArgs.username = milvusUser if (milvusPassword) milVusArgs.password = milvusPassword const vectorStore = await Milvus.fromExistingCollection(embeddings, milVusArgs) // Avoid Illegal Invocation vectorStore.similaritySearchVectorWithScore = async (query: number[], k: number, filter?: string) => { return await similaritySearchVectorWithScore(query, k, vectorStore, milvusFilter, filter) } if (output === 'retriever') { const retriever = vectorStore.asRetriever(k) return retriever } else if (output === 'vectorStore') { ;(vectorStore as any).k = k return vectorStore } return vectorStore } } const checkJsonString = (value: string): { isJson: boolean; obj: any } => { try { const result = JSON.parse(value) return { isJson: true, obj: result } } catch (e) { return { isJson: false, obj: null } } } const similaritySearchVectorWithScore = async (query: number[], k: number, vectorStore: Milvus, milvusFilter?: string, filter?: string) => { const hasColResp = await vectorStore.client.hasCollection({ collection_name: vectorStore.collectionName }) if (hasColResp.status.error_code !== ErrorCode.SUCCESS) { throw new Error(`Error checking collection: ${hasColResp}`) } if (hasColResp.value === false) { throw new Error(`Collection not found: ${vectorStore.collectionName}, please create collection before search.`) } const filterStr = milvusFilter ?? filter ?? '' await vectorStore.grabCollectionFields() const loadResp = await vectorStore.client.loadCollectionSync({ collection_name: vectorStore.collectionName }) if (loadResp.error_code !== ErrorCode.SUCCESS) { throw new Error(`Error loading collection: ${loadResp}`) } const outputFields = vectorStore.fields.filter((field) => field !== vectorStore.vectorField) const searchResp = await vectorStore.client.search({ collection_name: vectorStore.collectionName, search_params: { anns_field: vectorStore.vectorField, topk: k.toString(), metric_type: vectorStore.indexCreateParams.metric_type, params: vectorStore.indexSearchParams }, output_fields: outputFields, vector_type: DataType.FloatVector, vectors: [query], filter: filterStr }) if (searchResp.status.error_code !== ErrorCode.SUCCESS) { throw new Error(`Error searching data: ${JSON.stringify(searchResp)}`) } const results: [Document, number][] = [] searchResp.results.forEach((result) => { const fields = { pageContent: '', metadata: {} as Record } Object.keys(result).forEach((key) => { if (key === vectorStore.textField) { fields.pageContent = result[key] } else if (vectorStore.fields.includes(key) || key === vectorStore.primaryField) { if (typeof result[key] === 'string') { const { isJson, obj } = checkJsonString(result[key]) fields.metadata[key] = isJson ? obj : result[key] } else { fields.metadata[key] = result[key] } } }) results.push([new Document(fields), result.score]) }) return results } class MilvusUpsert extends Milvus { async addVectors(vectors: number[][], documents: Document[]): Promise { if (vectors.length === 0) { return } await this.ensureCollection(vectors, documents) const insertDatas: InsertRow[] = [] for (let index = 0; index < vectors.length; index++) { const vec = vectors[index] const doc = documents[index] const data: InsertRow = { [this.textField]: doc.pageContent, [this.vectorField]: vec } this.fields.forEach((field) => { switch (field) { case this.primaryField: if (!this.autoId) { if (doc.metadata[this.primaryField] === undefined) { throw new Error( `The Collection's primaryField is configured with autoId=false, thus its value must be provided through metadata.` ) } data[field] = doc.metadata[this.primaryField] } break case this.textField: data[field] = doc.pageContent break case this.vectorField: data[field] = vec break default: // metadata fields if (doc.metadata[field] === undefined) { throw new Error(`The field "${field}" is not provided in documents[${index}].metadata.`) } else if (typeof doc.metadata[field] === 'object') { data[field] = JSON.stringify(doc.metadata[field]) } else { data[field] = doc.metadata[field] } break } }) insertDatas.push(data) } const descIndexResp = await this.client.describeIndex({ collection_name: this.collectionName }) if (descIndexResp.status.error_code === ErrorCode.IndexNotExist) { const resp = await this.client.createIndex({ collection_name: this.collectionName, field_name: this.vectorField, index_name: `myindex_${Date.now().toString()}`, index_type: IndexType.AUTOINDEX, metric_type: MetricType.L2 }) if (resp.error_code !== ErrorCode.SUCCESS) { throw new Error(`Error creating index`) } } const insertResp = await this.client.insert({ collection_name: this.collectionName, fields_data: insertDatas }) if (insertResp.status.error_code !== ErrorCode.SUCCESS) { throw new Error(`Error inserting data: ${JSON.stringify(insertResp)}`) } await this.client.flushSync({ collection_names: [this.collectionName] }) } } module.exports = { nodeClass: Milvus_VectorStores }