@@ -128,50 +128,49 @@ export class FileModel {
128128 } ;
129129
130130 deleteMany = async ( ids : string [ ] , removeGlobalFile : boolean = true ) => {
131- const fileList = await this . findByIds ( ids ) ;
132- const hashList = fileList . map ( ( file ) => file . fileHash ! ) ;
131+ if ( ids . length === 0 ) return [ ] ;
133132
134133 return await this . db . transaction ( async ( trx ) => {
135- // 1. 删除相关的 chunks
134+ // 1. 先获取文件列表,以便返回删除的文件
135+ const fileList = await trx . query . files . findMany ( {
136+ where : and ( inArray ( files . id , ids ) , eq ( files . userId , this . userId ) ) ,
137+ } ) ;
138+
139+ if ( fileList . length === 0 ) return [ ] ;
140+
141+ // 提取需要检查的文件哈希值
142+ const hashList = fileList . map ( ( file ) => file . fileHash ! ) . filter ( Boolean ) ;
143+
144+ // 2. 删除相关的 chunks
136145 await this . deleteFileChunks ( trx as any , ids ) ;
137146
138- // delete the files
147+ // 3. 删除文件记录
139148 await trx . delete ( files ) . where ( and ( inArray ( files . id , ids ) , eq ( files . userId , this . userId ) ) ) ;
140149
141- // count the files by hash
142- const result = await trx
150+ // 如果不需要删除全局文件,直接返回
151+ if ( ! removeGlobalFile || hashList . length === 0 ) return fileList ;
152+
153+ // 4. 找出不再被引用的哈希值
154+ const remainingFiles = await trx
143155 . select ( {
144- count : count ( ) ,
145- hashId : files . fileHash ,
156+ fileHash : files . fileHash ,
146157 } )
147158 . from ( files )
148- . where ( inArray ( files . fileHash , hashList ) )
149- . groupBy ( files . fileHash ) ;
150-
151- // Create a Map to store the query result
152- const countMap = new Map ( result . map ( ( item ) => [ item . hashId , item . count ] ) ) ;
159+ . where ( inArray ( files . fileHash , hashList ) ) ;
153160
154- // Ensure that all incoming hashes have a result, even if it is 0
155- const fileHashCounts = hashList . map ( ( hashId ) => ( {
156- count : countMap . get ( hashId ) || 0 ,
157- hashId : hashId ,
158- } ) ) ;
161+ // 将仍在使用的哈希值放入Set中,便于快速查找
162+ const usedHashes = new Set ( remainingFiles . map ( ( file ) => file . fileHash ) ) ;
159163
160- const needToDeleteList = fileHashCounts . filter ( ( item ) => item . count === 0 ) ;
164+ // 找出需要删除的哈希值(不再被任何文件使用的)
165+ const hashesToDelete = hashList . filter ( ( hash ) => ! usedHashes . has ( hash ) ) ;
161166
162- if ( needToDeleteList . length === 0 || ! removeGlobalFile ) return ;
167+ if ( hashesToDelete . length === 0 ) return fileList ;
163168
164- // delete the file from global file if it is not used by other files
165- await trx . delete ( globalFiles ) . where (
166- inArray (
167- globalFiles . hashId ,
168- needToDeleteList . map ( ( item ) => item . hashId ! ) ,
169- ) ,
170- ) ;
169+ // 5. 删除不再被引用的全局文件
170+ await trx . delete ( globalFiles ) . where ( inArray ( globalFiles . hashId , hashesToDelete ) ) ;
171171
172- return fileList . filter ( ( file ) =>
173- needToDeleteList . some ( ( item ) => item . hashId === file . fileHash ) ,
174- ) ;
172+ // 返回删除的文件列表
173+ return fileList ;
175174 } ) ;
176175 } ;
177176
@@ -318,25 +317,58 @@ export class FileModel {
318317
319318 // 抽象出通用的删除 chunks 方法
320319 private deleteFileChunks = async ( trx : PgTransaction < any > , fileIds : string [ ] ) => {
321- const BATCH_SIZE = 1000 ; // 每批处理的数量
320+ if ( fileIds . length === 0 ) return ;
322321
323- // 1. 获取所有关联的 chunk IDs
322+ // 直接使用 JOIN 优化查询,减少数据传输量
324323 const relatedChunks = await trx
325324 . select ( { chunkId : fileChunks . chunkId } )
326325 . from ( fileChunks )
327- . where ( inArray ( fileChunks . fileId , fileIds ) ) ;
326+ . where (
327+ and (
328+ inArray ( fileChunks . fileId , fileIds ) ,
329+ // 确保只查询有效的 chunkId
330+ notExists (
331+ trx
332+ . select ( )
333+ . from ( knowledgeBaseFiles )
334+ . where ( eq ( knowledgeBaseFiles . fileId , fileChunks . fileId ) ) ,
335+ ) ,
336+ ) ,
337+ ) ;
328338
329339 const chunkIds = relatedChunks . map ( ( c ) => c . chunkId ) . filter ( Boolean ) as string [ ] ;
330340
331341 if ( chunkIds . length === 0 ) return ;
332342
333- // 2. 分批处理删除
334- for ( let i = 0 ; i < chunkIds . length ; i += BATCH_SIZE ) {
335- const batchChunkIds = chunkIds . slice ( i , i + BATCH_SIZE ) ;
343+ // 批量处理配置
344+ const BATCH_SIZE = 1000 ; // 增加批处理量
345+ const MAX_CONCURRENT_BATCHES = 3 ; // 最大并行批次数
346+
347+ // 分批并行处理
348+ for ( let i = 0 ; i < chunkIds . length ; i += BATCH_SIZE * MAX_CONCURRENT_BATCHES ) {
349+ const batchPromises = [ ] ;
336350
337- await trx . delete ( embeddings ) . where ( inArray ( embeddings . chunkId , batchChunkIds ) ) ;
351+ // 创建多个并行批次
352+ for ( let j = 0 ; j < MAX_CONCURRENT_BATCHES ; j ++ ) {
353+ const startIdx = i + j * BATCH_SIZE ;
354+ if ( startIdx >= chunkIds . length ) break ;
355+
356+ const batchChunkIds = chunkIds . slice ( startIdx , startIdx + BATCH_SIZE ) ;
357+ if ( batchChunkIds . length === 0 ) continue ;
358+
359+ // 为每个批次创建一个删除任务
360+ const batchPromise = ( async ( ) => {
361+ // 先删除嵌入向量
362+ await trx . delete ( embeddings ) . where ( inArray ( embeddings . chunkId , batchChunkIds ) ) ;
363+ // 再删除块
364+ await trx . delete ( chunks ) . where ( inArray ( chunks . id , batchChunkIds ) ) ;
365+ } ) ( ) ;
366+
367+ batchPromises . push ( batchPromise ) ;
368+ }
338369
339- await trx . delete ( chunks ) . where ( inArray ( chunks . id , batchChunkIds ) ) ;
370+ // 等待当前批次的所有任务完成
371+ await Promise . all ( batchPromises ) ;
340372 }
341373
342374 return chunkIds ;
0 commit comments