document_copy.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # 文档复制相关业务逻辑
  2. from utils.get_logger import setup_logger
  3. from rag.db import MilvusOperate, MysqlOperate
  4. logger = setup_logger(__name__)
  5. async def copy_docs_to_collection(copy_request: dict) -> dict:
  6. """
  7. 复制指定文档到新集合(索引)
  8. 参数:
  9. - source_collection: 源集合名称
  10. - new_collection: 新集合名称
  11. - doc_ids: 文档ID列表
  12. - embedding_id: 向量模型ID(可选,默认e5)
  13. 返回:
  14. - dict: 操作结果
  15. """
  16. source_collection = copy_request.get("source_collection")
  17. new_collection = copy_request.get("new_collection")
  18. doc_ids = copy_request.get("doc_ids", [])
  19. embedding_id = copy_request.get("embedding_id", "multilingual-e5-large-instruct")
  20. logger.info(f"复制文档请求 - 源集合: {source_collection}, 新集合: {new_collection}, 文档数: {len(doc_ids)}")
  21. if not source_collection or not new_collection:
  22. return {"code": 400, "message": "source_collection 和 new_collection 不能为空"}
  23. if not doc_ids:
  24. return {"code": 400, "message": "doc_ids 不能为空"}
  25. # 使用源集合的 MilvusOperate 实例进行复制操作
  26. milvus_operate = MilvusOperate(collection_name=source_collection, embedding_name=embedding_id)
  27. resp = milvus_operate._copy_docs_to_new_collection(new_collection, doc_ids, embedding_id)
  28. logger.info(f"复制文档结果:{resp}")
  29. return resp
  30. async def copy_multi_docs_to_collection(copy_request: dict) -> dict:
  31. """
  32. 从多个源集合复制文档到目标集合(包含向量库和MySQL元数据)
  33. 参数:
  34. - collections_docs_map: 字典,格式为 {"集合id1": [文档id列表], "集合id2": [文档id列表]}
  35. - target_collection: 目标集合名称(同时作为新的 knowledge_id)
  36. - embedding_id: 向量模型ID(可选,默认 multilingual-e5-large-instruct)
  37. 复制逻辑:
  38. 1. 向量库数据复制:doc_id 和 chunk_id 使用雪花算法生成新ID
  39. 2. MySQL bm_document 表:document_id 使用新ID,knowledge_id 替换为 target_collection,
  40. ref_document_id 存储旧的 document_id,tenant_id 使用前端传入值
  41. 3. MySQL slice_info 表:slice_id 使用向量库中的 chunk_id(保持一致),document_id 使用新ID,
  42. knowledge_id 替换为 target_collection,old_slice_text 存储原始文本
  43. 4. MySQL bm_media_replacement 表:document_id 使用新ID,knowledge_id 替换为 target_collection
  44. 返回:
  45. - dict: 操作结果
  46. """
  47. collections_docs_map = copy_request.get("collections_docs_map", {})
  48. target_collection = copy_request.get("target_collection")
  49. tenant_id = copy_request.get("tenant_id", "000000")
  50. embedding_id = copy_request.get("embedding_id", "multilingual-e5-large-instruct")
  51. if not target_collection:
  52. return {"code": 400, "message": "target_collection 不能为空"}
  53. if not collections_docs_map:
  54. return {"code": 400, "message": "collections_docs_map 不能为空"}
  55. logger.info(f"批量复制文档请求 - 源集合数: {len(collections_docs_map)}, 目标集合: {target_collection}, 租户ID: {tenant_id}")
  56. # 初始化 MySQL 客户端
  57. mysql_client = MysqlOperate()
  58. # 统计结果
  59. total_milvus_success = 0
  60. total_milvus_failed = 0
  61. total_doc_count = 0
  62. total_slice_count = 0
  63. total_media_count = 0
  64. all_doc_id_mappings = {}
  65. errors = []
  66. # 循环处理每个源集合
  67. for source_collection, doc_ids in collections_docs_map.items():
  68. if not doc_ids:
  69. logger.warning(f"集合 {source_collection} 的文档ID列表为空,跳过")
  70. continue
  71. logger.info(f"处理源集合: {source_collection}, 文档数: {len(doc_ids)}")
  72. try:
  73. # 1. 复制向量库数据(同时生成新的 doc_id 和 chunk_id)
  74. milvus_operate = MilvusOperate(collection_name=source_collection, embedding_name=embedding_id)
  75. milvus_resp = milvus_operate._copy_docs_to_new_collection(target_collection, doc_ids, embedding_id)
  76. if milvus_resp.get("code") == 200:
  77. logger.info(f"从 {source_collection} 复制向量库数据成功")
  78. total_milvus_success += milvus_resp.get("data", {}).get("total_records", 0)
  79. # 获取 doc_id 映射和 chunk_id 映射
  80. doc_id_mapping = milvus_resp.get("doc_id_mapping", {})
  81. chunk_id_mapping = milvus_resp.get("chunk_id_mapping", {})
  82. all_doc_id_mappings.update(doc_id_mapping)
  83. # 2. 复制 MySQL 元数据(使用相同的 doc_id 和 chunk_id 映射)
  84. mysql_success, mysql_result = mysql_client.copy_docs_metadata_to_new_knowledge(
  85. source_knowledge_id=source_collection,
  86. source_doc_ids=doc_ids,
  87. new_knowledge_id=target_collection,
  88. doc_id_mapping=doc_id_mapping,
  89. chunk_id_mapping=chunk_id_mapping,
  90. tenant_id=tenant_id
  91. )
  92. if mysql_success:
  93. total_doc_count += mysql_result.get("doc_count", 0)
  94. total_slice_count += mysql_result.get("slice_count", 0)
  95. total_media_count += mysql_result.get("media_count", 0)
  96. logger.info(f"从 {source_collection} 复制 MySQL 元数据成功: {mysql_result}")
  97. else:
  98. error_msg = f"从 {source_collection} 复制 MySQL 元数据失败: {mysql_result.get('error', '未知错误')}"
  99. logger.error(error_msg)
  100. errors.append(error_msg)
  101. else:
  102. total_milvus_failed += len(doc_ids)
  103. error_msg = f"从 {source_collection} 复制向量库失败: {milvus_resp.get('message')}"
  104. logger.error(error_msg)
  105. errors.append(error_msg)
  106. except Exception as e:
  107. error_msg = f"处理 {source_collection} 时发生异常: {str(e)}"
  108. logger.error(error_msg)
  109. errors.append(error_msg)
  110. # 构建响应
  111. result = {
  112. "code": 200 if not errors else 207, # 207 表示部分成功
  113. "message": "复制成功",
  114. }
  115. if errors:
  116. result["errors"] = errors
  117. logger.info(f"批量复制文档结果:{result}")
  118. return result