from utils.get_logger import setup_logger from rag.db import MilvusOperate, MysqlOperate from .slice_metadata import process_single_slice_metadata logger = setup_logger(__name__) async def update_slice(slice_json: dict) -> dict: """ 更新切片信息,成功后自动生成切片元数据 参数: - knowledge_id: 知识库ID - slice_id: 切片ID - document_id: 文档ID - slice_text: 切片文本 返回: - dict: 操作结果 """ logger.info(f"更新切片信息的请求参数:{slice_json}") collection_name = slice_json.get("knowledge_id") embedding_name = slice_json.get("embedding_id") resp = MilvusOperate(collection_name=collection_name, embedding_name=embedding_name)._put_by_id(slice_json) logger.info(f"更新切片信息的结果:{resp}") # 更新成功后,查询文档元数据开关并生成切片元数据 if resp.get("code") == 200: document_id = slice_json.get("document_id") mysql_client = MysqlOperate() success, flags = mysql_client.query_doc_metadata_flags(document_id) logger.info(f'{flags.get("qa")}, {flags.get("question")}, {flags.get("summary")}') if success and any([flags.get("qa"), flags.get("question"), flags.get("summary")]): slice_id = slice_json.get("slice_id") slice_text = slice_json.get("slice_text") separator_num = slice_json.get("customSeparator", "-1") # 重试机制:最多重试3次 max_retries = 3 for attempt in range(max_retries): meta_success, meta_msg = await process_single_slice_metadata( knowledge_id=collection_name, slice_id=slice_id, slice_text=slice_text, enable_qa=flags.get("qa", False), enable_question=flags.get("question", False), enable_summary=flags.get("summary", False), embedding_id=embedding_name, separator_num=separator_num ) if meta_success: logger.info(f"切片 {slice_id} 元数据生成成功") break logger.warning(f"切片 {slice_id} 元数据生成失败 (第{attempt+1}次): {meta_msg}") if attempt == max_retries - 1: logger.error(f"切片 {slice_id} 元数据生成失败,已达最大重试次数") return resp async def insert_slice(slice_json: dict) -> dict: """ 新增切片信息,成功后自动生成切片元数据 参数: - knowledge_id: 知识库ID - document_id: 文档ID - slice_text: 切片文本 返回: - dict: 操作结果 """ logger.info(f"新增切片信息的请求参数:{slice_json}") collection_name = slice_json.get("knowledge_id") embedding_name = slice_json.get("embedding_id") resp = MilvusOperate(collection_name=collection_name, embedding_name=embedding_name)._insert_slice(slice_json) logger.info(f"新增切片信息的结果:{resp}") # 新增成功后,查询文档元数据开关并生成切片元数据 if resp.get("status") == "insert_success": document_id = slice_json.get("document_id") slice_id = resp.get("slice_id") mysql_client = MysqlOperate() success, flags = mysql_client.query_doc_metadata_flags(document_id) if success and any([flags.get("qa"), flags.get("question"), flags.get("summary")]): slice_text = slice_json.get("slice_text") separator_num = slice_json.get("customSeparator", "-1") # 重试机制:最多重试3次 max_retries = 3 for attempt in range(max_retries): meta_success, meta_msg = await process_single_slice_metadata( knowledge_id=collection_name, slice_id=slice_id, slice_text=slice_text, enable_qa=flags.get("qa", False), enable_question=flags.get("question", False), enable_summary=flags.get("summary", False), embedding_id=embedding_name, separator_num=separator_num ) if meta_success: logger.info(f"切片 {slice_id} 元数据生成成功") break logger.warning(f"切片 {slice_id} 元数据生成失败 (第{attempt+1}次): {meta_msg}") if attempt == max_retries - 1: logger.error(f"切片 {slice_id} 元数据生成失败,已达最大重试次数") return resp