feat: add saveBatch and optimize duplication id handling

This commit is contained in:
chungyau97 2025-07-04 19:48:55 +08:00
parent 768de6140c
commit 8bf37ea8fe
2 changed files with 29 additions and 14 deletions

View File

@ -39,7 +39,7 @@ const importData = async (req: Request, res: Response, next: NextFunction) => {
}
await exportImportService.importData(importData, orgId, workspaceId, subscriptionId)
return res.json({ message: 'success' })
return res.status(StatusCodes.OK).json({ message: 'success' })
} catch (error) {
next(error)
}

View File

@ -1,5 +1,5 @@
import { StatusCodes } from 'http-status-codes'
import { In, QueryRunner } from 'typeorm'
import { EntityManager, In, QueryRunner } from 'typeorm'
import { v4 as uuidv4 } from 'uuid'
import { Assistant } from '../../database/entities/Assistant'
import { ChatFlow } from '../../database/entities/ChatFlow'
@ -255,11 +255,15 @@ async function replaceDuplicateIdsForChatMessage(
where: { id: In(ids) }
})
if (records.length < 0) return originalData
for (let record of records) {
const oldId = record.id
const newId = uuidv4()
originalData = JSON.parse(JSON.stringify(originalData).replaceAll(oldId, newId))
}
// replace duplicate ids found in db to new id
const dbExistingIds = records.map((record) => record.id)
originalData.ChatMessage = originalData.ChatMessage.map((item) => {
if (dbExistingIds.includes(item.id)) {
return { ...item, id: uuidv4() }
}
return item
})
return originalData
} catch (error) {
throw new InternalFlowiseError(
@ -459,11 +463,15 @@ async function replaceDuplicateIdsForDocumentStoreFileChunk(
where: { id: In(ids) }
})
if (records.length < 0) return originalData
for (let record of records) {
const oldId = record.id
const newId = uuidv4()
originalData = JSON.parse(JSON.stringify(originalData).replaceAll(oldId, newId))
}
// replace duplicate ids found in db to new id
const dbExistingIds = records.map((record) => record.id)
originalData.DocumentStoreFileChunk = originalData.DocumentStoreFileChunk.map((item) => {
if (dbExistingIds.includes(item.id)) {
return { ...item, id: uuidv4() }
}
return item
})
return originalData
} catch (error) {
throw new InternalFlowiseError(
@ -550,6 +558,13 @@ function insertWorkspaceId(importedData: any, activeWorkspaceId?: string) {
return importedData
}
async function saveBatch(manager: EntityManager, entity: any, items: any[], batchSize = 900) {
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize)
await manager.save(entity, batch)
}
}
const importData = async (importData: ExportData, orgId: string, activeWorkspaceId: string, subscriptionId: string) => {
// Initialize missing properties with empty arrays to avoid "undefined" errors
importData.AgentFlow = importData.AgentFlow || []
@ -705,13 +720,13 @@ const importData = async (importData: ExportData, orgId: string, activeWorkspace
if (importData.AssistantOpenAI.length > 0) await queryRunner.manager.save(Assistant, importData.AssistantOpenAI)
if (importData.AssistantAzure.length > 0) await queryRunner.manager.save(Assistant, importData.AssistantAzure)
if (importData.ChatFlow.length > 0) await queryRunner.manager.save(ChatFlow, importData.ChatFlow)
if (importData.ChatMessage.length > 0) await queryRunner.manager.save(ChatMessage, importData.ChatMessage)
if (importData.ChatMessage.length > 0) await saveBatch(queryRunner.manager, ChatMessage, importData.ChatMessage)
if (importData.ChatMessageFeedback.length > 0)
await queryRunner.manager.save(ChatMessageFeedback, importData.ChatMessageFeedback)
if (importData.CustomTemplate.length > 0) await queryRunner.manager.save(CustomTemplate, importData.CustomTemplate)
if (importData.DocumentStore.length > 0) await queryRunner.manager.save(DocumentStore, importData.DocumentStore)
if (importData.DocumentStoreFileChunk.length > 0)
await queryRunner.manager.save(DocumentStoreFileChunk, importData.DocumentStoreFileChunk)
await saveBatch(queryRunner.manager, DocumentStoreFileChunk, importData.DocumentStoreFileChunk)
if (importData.Tool.length > 0) await queryRunner.manager.save(Tool, importData.Tool)
if (importData.Execution.length > 0) await queryRunner.manager.save(Execution, importData.Execution)
if (importData.Variable.length > 0) await queryRunner.manager.save(Variable, importData.Variable)