slice_crud.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. from utils.get_logger import setup_logger
  2. from rag.db import MilvusOperate, MysqlOperate
  3. from .slice_metadata import process_single_slice_metadata
  4. logger = setup_logger(__name__)
  5. async def update_slice(slice_json: dict) -> dict:
  6. """
  7. 更新切片信息,成功后自动生成切片元数据
  8. 参数:
  9. - knowledge_id: 知识库ID
  10. - slice_id: 切片ID
  11. - document_id: 文档ID
  12. - slice_text: 切片文本
  13. 返回:
  14. - dict: 操作结果
  15. """
  16. logger.info(f"更新切片信息的请求参数:{slice_json}")
  17. collection_name = slice_json.get("knowledge_id")
  18. embedding_name = slice_json.get("embedding_id")
  19. resp = MilvusOperate(collection_name=collection_name, embedding_name=embedding_name)._put_by_id(slice_json)
  20. logger.info(f"更新切片信息的结果:{resp}")
  21. # 更新成功后,查询文档元数据开关并生成切片元数据
  22. if resp.get("code") == 200:
  23. document_id = slice_json.get("document_id")
  24. mysql_client = MysqlOperate()
  25. success, flags = mysql_client.query_doc_metadata_flags(document_id)
  26. logger.info(f'{flags.get("qa")}, {flags.get("question")}, {flags.get("summary")}')
  27. if success and any([flags.get("qa"), flags.get("question"), flags.get("summary")]):
  28. slice_id = slice_json.get("slice_id")
  29. slice_text = slice_json.get("slice_text")
  30. separator_num = slice_json.get("customSeparator", "-1")
  31. # 重试机制:最多重试3次
  32. max_retries = 3
  33. for attempt in range(max_retries):
  34. meta_success, meta_msg = await process_single_slice_metadata(
  35. knowledge_id=collection_name,
  36. slice_id=slice_id,
  37. slice_text=slice_text,
  38. enable_qa=flags.get("qa", False),
  39. enable_question=flags.get("question", False),
  40. enable_summary=flags.get("summary", False),
  41. embedding_id=embedding_name,
  42. separator_num=separator_num
  43. )
  44. if meta_success:
  45. logger.info(f"切片 {slice_id} 元数据生成成功")
  46. break
  47. logger.warning(f"切片 {slice_id} 元数据生成失败 (第{attempt+1}次): {meta_msg}")
  48. if attempt == max_retries - 1:
  49. logger.error(f"切片 {slice_id} 元数据生成失败,已达最大重试次数")
  50. return resp
  51. async def insert_slice(slice_json: dict) -> dict:
  52. """
  53. 新增切片信息,成功后自动生成切片元数据
  54. 参数:
  55. - knowledge_id: 知识库ID
  56. - document_id: 文档ID
  57. - slice_text: 切片文本
  58. 返回:
  59. - dict: 操作结果
  60. """
  61. logger.info(f"新增切片信息的请求参数:{slice_json}")
  62. collection_name = slice_json.get("knowledge_id")
  63. embedding_name = slice_json.get("embedding_id")
  64. resp = MilvusOperate(collection_name=collection_name, embedding_name=embedding_name)._insert_slice(slice_json)
  65. logger.info(f"新增切片信息的结果:{resp}")
  66. # 新增成功后,查询文档元数据开关并生成切片元数据
  67. if resp.get("status") == "insert_success":
  68. document_id = slice_json.get("document_id")
  69. slice_id = resp.get("slice_id")
  70. mysql_client = MysqlOperate()
  71. success, flags = mysql_client.query_doc_metadata_flags(document_id)
  72. if success and any([flags.get("qa"), flags.get("question"), flags.get("summary")]):
  73. slice_text = slice_json.get("slice_text")
  74. separator_num = slice_json.get("customSeparator", "-1")
  75. # 重试机制:最多重试3次
  76. max_retries = 3
  77. for attempt in range(max_retries):
  78. meta_success, meta_msg = await process_single_slice_metadata(
  79. knowledge_id=collection_name,
  80. slice_id=slice_id,
  81. slice_text=slice_text,
  82. enable_qa=flags.get("qa", False),
  83. enable_question=flags.get("question", False),
  84. enable_summary=flags.get("summary", False),
  85. embedding_id=embedding_name,
  86. separator_num=separator_num
  87. )
  88. if meta_success:
  89. logger.info(f"切片 {slice_id} 元数据生成成功")
  90. break
  91. logger.warning(f"切片 {slice_id} 元数据生成失败 (第{attempt+1}次): {meta_msg}")
  92. if attempt == max_retries - 1:
  93. logger.error(f"切片 {slice_id} 元数据生成失败,已达最大重试次数")
  94. return resp