# 文档复制相关业务逻辑 from utils.get_logger import setup_logger from rag.db import MilvusOperate, MysqlOperate logger = setup_logger(__name__) async def copy_docs_to_collection(copy_request: dict) -> dict: """ 复制指定文档到新集合(索引) 参数: - source_collection: 源集合名称 - new_collection: 新集合名称 - doc_ids: 文档ID列表 - embedding_id: 向量模型ID(可选,默认e5) 返回: - dict: 操作结果 """ source_collection = copy_request.get("source_collection") new_collection = copy_request.get("new_collection") doc_ids = copy_request.get("doc_ids", []) embedding_id = copy_request.get("embedding_id", "multilingual-e5-large-instruct") logger.info(f"复制文档请求 - 源集合: {source_collection}, 新集合: {new_collection}, 文档数: {len(doc_ids)}") if not source_collection or not new_collection: return {"code": 400, "message": "source_collection 和 new_collection 不能为空"} if not doc_ids: return {"code": 400, "message": "doc_ids 不能为空"} # 使用源集合的 MilvusOperate 实例进行复制操作 milvus_operate = MilvusOperate(collection_name=source_collection, embedding_name=embedding_id) resp = milvus_operate._copy_docs_to_new_collection(new_collection, doc_ids, embedding_id) logger.info(f"复制文档结果:{resp}") return resp async def copy_multi_docs_to_collection(copy_request: dict) -> dict: """ 从多个源集合复制文档到目标集合(包含向量库和MySQL元数据) 参数: - collections_docs_map: 字典,格式为 {"集合id1": [文档id列表], "集合id2": [文档id列表]} - target_collection: 目标集合名称(同时作为新的 knowledge_id) - embedding_id: 向量模型ID(可选,默认 multilingual-e5-large-instruct) 复制逻辑: 1. 向量库数据复制:doc_id 和 chunk_id 使用雪花算法生成新ID 2. MySQL bm_document 表:document_id 使用新ID,knowledge_id 替换为 target_collection, ref_document_id 存储旧的 document_id,tenant_id 使用前端传入值 3. MySQL slice_info 表:slice_id 使用向量库中的 chunk_id(保持一致),document_id 使用新ID, knowledge_id 替换为 target_collection,old_slice_text 存储原始文本 4. MySQL bm_media_replacement 表:document_id 使用新ID,knowledge_id 替换为 target_collection 返回: - dict: 操作结果 """ collections_docs_map = copy_request.get("collections_docs_map", {}) target_collection = copy_request.get("target_collection") tenant_id = copy_request.get("tenant_id", "000000") embedding_id = copy_request.get("embedding_id", "multilingual-e5-large-instruct") if not target_collection: return {"code": 400, "message": "target_collection 不能为空"} if not collections_docs_map: return {"code": 400, "message": "collections_docs_map 不能为空"} logger.info(f"批量复制文档请求 - 源集合数: {len(collections_docs_map)}, 目标集合: {target_collection}, 租户ID: {tenant_id}") # 初始化 MySQL 客户端 mysql_client = MysqlOperate() # 统计结果 total_milvus_success = 0 total_milvus_failed = 0 total_doc_count = 0 total_slice_count = 0 total_media_count = 0 all_doc_id_mappings = {} errors = [] # 循环处理每个源集合 for source_collection, doc_ids in collections_docs_map.items(): if not doc_ids: logger.warning(f"集合 {source_collection} 的文档ID列表为空,跳过") continue logger.info(f"处理源集合: {source_collection}, 文档数: {len(doc_ids)}") try: # 1. 复制向量库数据(同时生成新的 doc_id 和 chunk_id) milvus_operate = MilvusOperate(collection_name=source_collection, embedding_name=embedding_id) milvus_resp = milvus_operate._copy_docs_to_new_collection(target_collection, doc_ids, embedding_id) if milvus_resp.get("code") == 200: logger.info(f"从 {source_collection} 复制向量库数据成功") total_milvus_success += milvus_resp.get("data", {}).get("total_records", 0) # 获取 doc_id 映射和 chunk_id 映射 doc_id_mapping = milvus_resp.get("doc_id_mapping", {}) chunk_id_mapping = milvus_resp.get("chunk_id_mapping", {}) all_doc_id_mappings.update(doc_id_mapping) # 2. 复制 MySQL 元数据(使用相同的 doc_id 和 chunk_id 映射) mysql_success, mysql_result = mysql_client.copy_docs_metadata_to_new_knowledge( source_knowledge_id=source_collection, source_doc_ids=doc_ids, new_knowledge_id=target_collection, doc_id_mapping=doc_id_mapping, chunk_id_mapping=chunk_id_mapping, tenant_id=tenant_id ) if mysql_success: total_doc_count += mysql_result.get("doc_count", 0) total_slice_count += mysql_result.get("slice_count", 0) total_media_count += mysql_result.get("media_count", 0) logger.info(f"从 {source_collection} 复制 MySQL 元数据成功: {mysql_result}") else: error_msg = f"从 {source_collection} 复制 MySQL 元数据失败: {mysql_result.get('error', '未知错误')}" logger.error(error_msg) errors.append(error_msg) else: total_milvus_failed += len(doc_ids) error_msg = f"从 {source_collection} 复制向量库失败: {milvus_resp.get('message')}" logger.error(error_msg) errors.append(error_msg) except Exception as e: error_msg = f"处理 {source_collection} 时发生异常: {str(e)}" logger.error(error_msg) errors.append(error_msg) # 构建响应 result = { "code": 200 if not errors else 207, # 207 表示部分成功 "message": "复制成功", } if errors: result["errors"] = errors logger.info(f"批量复制文档结果:{result}") return result