| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- # 文档复制相关业务逻辑
- from utils.get_logger import setup_logger
- from rag.db import MilvusOperate, MysqlOperate
- logger = setup_logger(__name__)
- async def copy_docs_to_collection(copy_request: dict) -> dict:
- """
- 复制指定文档到新集合(索引)
-
- 参数:
- - source_collection: 源集合名称
- - new_collection: 新集合名称
- - doc_ids: 文档ID列表
- - embedding_id: 向量模型ID(可选,默认e5)
-
- 返回:
- - dict: 操作结果
- """
- source_collection = copy_request.get("source_collection")
- new_collection = copy_request.get("new_collection")
- doc_ids = copy_request.get("doc_ids", [])
- embedding_id = copy_request.get("embedding_id", "multilingual-e5-large-instruct")
-
- logger.info(f"复制文档请求 - 源集合: {source_collection}, 新集合: {new_collection}, 文档数: {len(doc_ids)}")
-
- if not source_collection or not new_collection:
- return {"code": 400, "message": "source_collection 和 new_collection 不能为空"}
-
- if not doc_ids:
- return {"code": 400, "message": "doc_ids 不能为空"}
-
- # 使用源集合的 MilvusOperate 实例进行复制操作
- milvus_operate = MilvusOperate(collection_name=source_collection, embedding_name=embedding_id)
- resp = milvus_operate._copy_docs_to_new_collection(new_collection, doc_ids, embedding_id)
-
- logger.info(f"复制文档结果:{resp}")
- return resp
- async def copy_multi_docs_to_collection(copy_request: dict) -> dict:
- """
- 从多个源集合复制文档到目标集合(包含向量库和MySQL元数据)
-
- 参数:
- - collections_docs_map: 字典,格式为 {"集合id1": [文档id列表], "集合id2": [文档id列表]}
- - target_collection: 目标集合名称(同时作为新的 knowledge_id)
- - embedding_id: 向量模型ID(可选,默认 multilingual-e5-large-instruct)
-
- 复制逻辑:
- 1. 向量库数据复制:doc_id 和 chunk_id 使用雪花算法生成新ID
- 2. MySQL bm_document 表:document_id 使用新ID,knowledge_id 替换为 target_collection,
- ref_document_id 存储旧的 document_id,tenant_id 使用前端传入值
- 3. MySQL slice_info 表:slice_id 使用向量库中的 chunk_id(保持一致),document_id 使用新ID,
- knowledge_id 替换为 target_collection,old_slice_text 存储原始文本
- 4. MySQL bm_media_replacement 表:document_id 使用新ID,knowledge_id 替换为 target_collection
-
- 返回:
- - dict: 操作结果
- """
- collections_docs_map = copy_request.get("collections_docs_map", {})
- target_collection = copy_request.get("target_collection")
- tenant_id = copy_request.get("tenant_id", "000000")
- embedding_id = copy_request.get("embedding_id", "multilingual-e5-large-instruct")
-
- if not target_collection:
- return {"code": 400, "message": "target_collection 不能为空"}
-
- if not collections_docs_map:
- return {"code": 400, "message": "collections_docs_map 不能为空"}
-
- logger.info(f"批量复制文档请求 - 源集合数: {len(collections_docs_map)}, 目标集合: {target_collection}, 租户ID: {tenant_id}")
-
- # 初始化 MySQL 客户端
- mysql_client = MysqlOperate()
-
- # 统计结果
- total_milvus_success = 0
- total_milvus_failed = 0
- total_doc_count = 0
- total_slice_count = 0
- total_media_count = 0
- all_doc_id_mappings = {}
- errors = []
-
- # 循环处理每个源集合
- for source_collection, doc_ids in collections_docs_map.items():
- if not doc_ids:
- logger.warning(f"集合 {source_collection} 的文档ID列表为空,跳过")
- continue
-
- logger.info(f"处理源集合: {source_collection}, 文档数: {len(doc_ids)}")
-
- try:
- # 1. 复制向量库数据(同时生成新的 doc_id 和 chunk_id)
- milvus_operate = MilvusOperate(collection_name=source_collection, embedding_name=embedding_id)
- milvus_resp = milvus_operate._copy_docs_to_new_collection(target_collection, doc_ids, embedding_id)
-
- if milvus_resp.get("code") == 200:
- logger.info(f"从 {source_collection} 复制向量库数据成功")
- total_milvus_success += milvus_resp.get("data", {}).get("total_records", 0)
-
- # 获取 doc_id 映射和 chunk_id 映射
- doc_id_mapping = milvus_resp.get("doc_id_mapping", {})
- chunk_id_mapping = milvus_resp.get("chunk_id_mapping", {})
- all_doc_id_mappings.update(doc_id_mapping)
-
- # 2. 复制 MySQL 元数据(使用相同的 doc_id 和 chunk_id 映射)
- mysql_success, mysql_result = mysql_client.copy_docs_metadata_to_new_knowledge(
- source_knowledge_id=source_collection,
- source_doc_ids=doc_ids,
- new_knowledge_id=target_collection,
- doc_id_mapping=doc_id_mapping,
- chunk_id_mapping=chunk_id_mapping,
- tenant_id=tenant_id
- )
-
- if mysql_success:
- total_doc_count += mysql_result.get("doc_count", 0)
- total_slice_count += mysql_result.get("slice_count", 0)
- total_media_count += mysql_result.get("media_count", 0)
- logger.info(f"从 {source_collection} 复制 MySQL 元数据成功: {mysql_result}")
- else:
- error_msg = f"从 {source_collection} 复制 MySQL 元数据失败: {mysql_result.get('error', '未知错误')}"
- logger.error(error_msg)
- errors.append(error_msg)
- else:
- total_milvus_failed += len(doc_ids)
- error_msg = f"从 {source_collection} 复制向量库失败: {milvus_resp.get('message')}"
- logger.error(error_msg)
- errors.append(error_msg)
-
- except Exception as e:
- error_msg = f"处理 {source_collection} 时发生异常: {str(e)}"
- logger.error(error_msg)
- errors.append(error_msg)
-
- # 构建响应
- result = {
- "code": 200 if not errors else 207, # 207 表示部分成功
- "message": "复制成功",
- }
-
- if errors:
- result["errors"] = errors
-
- logger.info(f"批量复制文档结果:{result}")
- return result
|