import aiohttp import aiofiles import requests from rag.db import MilvusOperate, MysqlOperate from rag.task_registry import task_registry, TaskCancelledException from rag.document_load.pdf_load import MinerUParsePdf from rag.document_load.office_load import MinerUParseOffice from rag.document_load.txt_load import TextLoad from rag.document_load.image_load import MinerUParseImage from utils.upload_file_to_oss import UploadMinio from utils.get_logger import setup_logger from config import minio_config import os import time from uuid import uuid1 from langchain_text_splitters import RecursiveCharacterTextSplitter from rag.document_load.document_format_conversion import * import re from typing import List pdf_parse = MinerUParsePdf() office_parse = MinerUParseOffice() text_parse = TextLoad() image_parse = MinerUParseImage() logger = setup_logger(__name__) class ProcessDocuments(): def __init__(self, file_json): self.file_json = file_json self.knowledge_id = self.file_json.get("knowledge_id") self.mysql_client = MysqlOperate() self.minio_client = UploadMinio() self.milvus_client = MilvusOperate(collection_name=self.knowledge_id) # def _get_file_type(self, name): # if name.endswith(".txt"): # return text_parse # elif name.endswith('.pdf'): # return pdf_parse # elif name.endswith((".doc", ".docx", "ppt", "pptx")): # return office_parse # elif name.endswith((".jpg", "png", "jpeg")): # return image_parse # else: # raise "不支持的文件格式" # 只接收txt和pdf格式 def _get_file_type(self, name): if name.endswith(".txt"): return text_parse elif name.endswith('.pdf'): return pdf_parse # async def save_file_temp(self, session, url, name): # down_file_path = "./tmp_file" + f"/{self.knowledge_id}" # # down_file_path = "./tmp_file" # os.makedirs(down_file_path, exist_ok=True) # down_file_name = down_file_path + f"/{name}" # # if os.path.exists(down_file_name): # # pass # # else: # async with session.get(url, ssl=False) as resp: # resp.raise_for_status() # content_length = resp.headers.get('Content-Length') # if content_length: # file_size = int(content_length) # else: # file_size = 0 # async with aiofiles.open(down_file_name, 'wb') as f: # async for chunk in resp.content.iter_chunked(1024): # await f.write(chunk) async def save_file_temp(self, session, url, name, max_retries=3): down_file_path = "./tmp_file" + f"/{self.knowledge_id}" os.makedirs(down_file_path, exist_ok=True) down_file_name = down_file_path + f"/{name}" attempt = 0 while attempt < max_retries: try: async with session.get(url, ssl=False) as resp: resp.raise_for_status() content_length = resp.headers.get('Content-Length') file_size = int(content_length) if content_length else 0 async with aiofiles.open(down_file_name, 'wb') as f: async for chunk in resp.content.iter_chunked(1024): await f.write(chunk) # 成功就 return return down_file_name, file_size except Exception as e: attempt += 1 # 最后一次失败直接抛出 if attempt >= max_retries: raise # 指数退避:0.5s ~ 3s 内,随机涨 wait = min(3, 0.5 * (2 ** attempt) + random.random()) await asyncio.sleep(wait) return down_file_name, file_size def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30): """ 同步发送POST请求 参数: url: 请求的URL地址 json_data: JSON格式的请求体数据(字典类型) headers: 可选的请求头字典 timeout: 请求超时时间(秒),默认30秒 返回: dict: 包含状态码、响应数据等信息 """ try: # 设置默认的Content-Type为application/json if headers is None: headers = {} if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False) status_code = resp.status_code response_data = resp.text logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}") return { "code": 200, "status_code": status_code, "data": response_data, "message": "POST请求成功" } except requests.exceptions.Timeout as e: logger.error(f"同步POST请求超时 [url={url}]: {e}") return { "code": 500, "message": f"POST请求超时: {str(e)}" } except requests.exceptions.RequestException as e: logger.error(f"同步POST请求失败 [url={url}]: {e}") return { "code": 500, "message": f"POST请求失败: {str(e)}" } def file_split_by_len(self, file_text): split_map = { "0": ["#"], # 按标题段落切片 "1": [""], # 按页切片 "2": ["\n"] # 按问答对 } separator_num = self.file_json.get("set_slice") slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n") separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value] logger.info(f"文本切分字符:{separator}") text_split = RecursiveCharacterTextSplitter( separators=separator, chunk_size=500, chunk_overlap=40, length_function=len ) texts = text_split.split_text(file_text) return texts def split_text(self, file_text): text_split = RecursiveCharacterTextSplitter( separators=["\n\n", "\n"], chunk_size=500, chunk_overlap=40, length_function=len ) texts = text_split.split_text(file_text) return texts def chunk_text_for_rag(self, text: str, max_chars: int = 20000) -> List[str]: """ 分割长文本 参数: text: 输入的长文本字符串 max_chars: 每块的最大字符数,默认20000 返回: List[str]: 分割后的文本块列表 分割规则: 1. 在标签x.x(如1.1, 2.3等)处分割 2. 下一块包含上一块最后一个x.x标签的完整内容(重叠) 3. 如果没有x.x标签或加上重叠内容超过max_chars,则按句号"。"分割 """ if len(text) <= max_chars: return [text] chunks = [] start_idx = 0 while start_idx < len(text): # 确定当前块的结束位置 end_idx = min(start_idx + max_chars, len(text)) if end_idx >= len(text): # 最后一块,直接添加 chunks.append(text[start_idx:]) break # 在当前块范围内查找x.x标签 current_chunk = text[start_idx:end_idx] split_pos, overlap_content = self.find_split_point(current_chunk, text, start_idx) if split_pos is not None: # 找到了合适的x.x标签分割点 actual_end = start_idx + split_pos # 检查加上重叠内容是否超过限制 if overlap_content and len(current_chunk[:split_pos]) + len(overlap_content) <= max_chars: chunks.append(text[start_idx:actual_end]) # 下一块从重叠内容的起始位置开始 overlap_start = text.rfind(overlap_content, start_idx, actual_end) if overlap_start != -1: start_idx = overlap_start else: start_idx = actual_end else: # 重叠内容过长,使用句号分割 split_pos, overlap_content = self.find_split_by_period(current_chunk, text, start_idx) if split_pos is not None: actual_end = start_idx + split_pos chunks.append(text[start_idx:actual_end]) if overlap_content: overlap_start = text.rfind(overlap_content, start_idx, actual_end) start_idx = overlap_start if overlap_start != -1 else actual_end else: start_idx = actual_end else: # 连句号都找不到,强制分割 chunks.append(text[start_idx:end_idx]) start_idx = end_idx else: # 没有找到x.x标签,使用句号分割 split_pos, overlap_content = self.find_split_by_period(current_chunk, text, start_idx) if split_pos is not None: actual_end = start_idx + split_pos chunks.append(text[start_idx:actual_end]) if overlap_content: overlap_start = text.rfind(overlap_content, start_idx, actual_end) start_idx = overlap_start if overlap_start != -1 else actual_end else: start_idx = actual_end else: # 连句号都找不到,强制分割 chunks.append(text[start_idx:end_idx]) start_idx = end_idx return chunks def find_split_point(self, chunk: str, full_text: str, chunk_start: int): """ 在块中查找最近的x.x标签作为分割点 返回: (split_position, overlap_content): 分割位置和重叠内容 """ # 匹配x.x格式的标签(如1.1, 2.3, 10.5等) pattern = r'\d+\.\d+' # pattern = r'\b\d+\.\d+\b' matches = list(re.finditer(pattern, chunk)) if not matches: return None, None # 找最后一个匹配的标签 last_match = matches[-1] tag_start = last_match.start() # 提取该标签的完整内容(从标签到下一个标签或块尾) tag_content_start = chunk_start + tag_start # 在全文中查找下一个x.x标签 remaining_text = full_text[tag_content_start:] next_tag_match = re.search(pattern, remaining_text[len(last_match.group()):]) if next_tag_match: tag_content_end = tag_content_start + len(last_match.group()) + next_tag_match.start() else: tag_content_end = len(full_text) overlap_content = full_text[tag_content_start:tag_content_end] # 检查重叠内容长度 if len(overlap_content) > 20000: # 重叠内容本身超过限制,返回None使用句号分割 return None, None return tag_start, overlap_content def find_split_by_period(self, chunk: str, full_text: str, chunk_start: int): """ 按句号"。"查找分割点 返回: (split_position, overlap_content): 分割位置和重叠内容 """ # 从后向前查找句号 last_period = chunk.rfind('。') if last_period == -1: return None, None # 分割点是句号之后 split_pos = last_period + 1 # 查找这个句子的开始位置作为重叠内容 # 向前查找上一个句号 prev_period = chunk.rfind('。', 0, last_period) if prev_period != -1: overlap_start = prev_period + 1 overlap_content = chunk[overlap_start:split_pos] else: # 如果前面没有句号,则从块开始到当前句号 overlap_content = chunk[:split_pos] # 检查重叠内容长度 if len(overlap_content) > 20000: overlap_content = "" return split_pos, overlap_content def split_by_title(self, file_content_list, set_table, doc_id): # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据 text_lists = [] text = "" image_num = 1 flag_img_info = {} level_1_text = "" level_2_text = "" for i, content_dict in enumerate(file_content_list): text_type = content_dict.get("type") content_text = content_dict.get("text") if text_type == "text": text_level = content_dict.get("text_level", "") if text_level == 1: if not level_1_text: level_1_text = f"# {content_text}\n" text += f"# {content_text}\n" else: if len(text) >20000: text_chunks = self.chunk_text_for_rag(text) text_lists.extend(text_chunks) else: text_lists.append(text) text = f"# {content_text}\n" level_1_text = f"# {content_text}\n" level_2_text = "" elif text_level == 2: if not level_2_text: text += f"## {content_text}\n" level_2_text = f"## {content_text}\n" else: if len(text) >20000: text_chunks = self.chunk_text_for_rag(text) text_lists.extend(text_chunks) else: text_lists.append(text) text = level_1_text + f"## {content_text}\n" else: if text_level: text += text_level*"#" + " " + content_text + "\n" else: text += content_text elif text_type == "table" and set_table == "1": text += " ".join(content_dict.get("table_caption")) + '\n' text += content_dict.get("table_body") + '\n' elif text_type in ("image", "table"): image_path = content_dict.get("img_path") if not image_path: continue image_name = image_path.split("/")[1] # save_image_path = "./tmp_file/images/" + f"/{image_name}" save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" if text_type == "table": text += " ".join(content_dict.get("table_caption")) + '\n' text += replace_text image_num += 1 if i+1 == len(file_content_list): text_lists.append(text) elif text_type == "list": list_items = content_dict.get("list_items") if list_items: text += "\n".join(list_items) + "\n" return text_lists, flag_img_info def split_by_page(self, file_content_list, set_table, doc_id): # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据 text_lists = [] current_page = "" text = "" image_num = 1 flag_img_info = {} for i,content_dict in enumerate(file_content_list): page_index = content_dict.get("page_idx") if i == 0: current_page = page_index elif page_index != current_page: text_lists.append(text) text = "" current_page = page_index text_type = content_dict.get("type") if text_type == "text": content_text = content_dict.get("text") text_level = content_dict.get("text_level") if text_level: text += "#" * text_level + " " + content_text else: text += content_text elif text_type == "table" and set_table == "1": text += content_dict.get("table_body") elif text_type in ("image", "table"): image_path = content_dict.get("img_path") image_name = image_path.split("/")[1] # save_image_path = "./tmp_file/images/" + f"/{image_name}" save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" text += replace_text image_num += 1 if i+1 == len(file_content_list): text_lists.append(text) return text_lists, flag_img_info def split_by_self(self, file_content_list, set_table, slice_value, doc_id): # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断 logger.info(f"自定义的分隔符:{slice_value}") text = "" image_num = 1 flag_img_info = {} for i, content_dict in enumerate(file_content_list): text_type = content_dict.get("type") if text_type == "text": content_text = content_dict.get("text") text_level = content_dict.get("text_level") if text_level: text += "#" * text_level + " " + content_text else: text += content_text elif text_type == "table" and set_table == "1": text += content_dict.get("table_body") elif text_type in ("image", "table"): image_path = content_dict.get("img_path") image_name = image_path.split("/")[1] # save_image_path = "./tmp_file/images/" + f"/{image_name}" save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" text += replace_text image_num += 1 split_lists = text.split(slice_value) text_lists = [] for split_text in split_lists: r = len(split_text)//500 if r >= 1: for i in range(r+1): t = split_text[i*500:(i+1)*500] if t: text_lists.append(t) else: text_lists.append(split_text) return text_lists, flag_img_info def file_split(self, file_content_list, doc_id): # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接 separator_num = self.file_json.get("set_slice") set_table = self.file_json.get("set_table") # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value] # logger.info(f"文本切分字符:{separator}") if isinstance(file_content_list, str): file_text = file_content_list text_lists = self.split_text(file_text) return text_lists, {} elif separator_num == "0": # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个# text_lists, flag_img_info = self.split_by_title(file_content_list, set_table, doc_id) return text_lists, flag_img_info elif separator_num == "1": # 按照页面方式切分 text_lists, flag_img_info = self.split_by_page(file_content_list, set_table, doc_id) return text_lists, flag_img_info elif separator_num == "2": # 按照问答对切分 针对exce文档,暂不实现 return [], {} else: # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500 slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n") text_lists, flag_img_info = self.split_by_self(file_content_list, set_table, slice_value, doc_id) return text_lists, flag_img_info def process_data_to_milvus_schema(self, text_lists, doc_id, name): """组织数据格式: { "content": text, "doc_id": doc_id, "chunk_id": chunk_id, "metadata": {"source": file_name}, } """ docs = [] total_len = 0 for i, text in enumerate(text_lists): chunk_id = str(uuid1()) chunk_len = len(text) total_len += chunk_len d = { "content": text, "doc_id": doc_id, "chunk_id": chunk_id, "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len} } docs.append(d) return docs, total_len async def process_documents(self, file_json): user_id = file_json.get("userId", "") task_id = file_json.get("docs", "")[0].get("document_id") # ===== 注册任务到全局任务表 ===== task_ctx = None if task_id: task_ctx = task_registry.register(task_id, user_id, self.knowledge_id) # 启动进度(独立线程) reporter = None if task_id: from rag.progress_reporter import ProgressReporter from config import progress_callback_config estimate_seconds = progress_callback_config.get("estimate_seconds", 120) callback_url = progress_callback_config.get("default_url") reporter = ProgressReporter(task_id, callback_url, estimate_seconds, user_id) reporter.start() # 关联 reporter 到任务上下文(用于取消时停止进度上报) if task_ctx: task_ctx.reporter = reporter # 初始化成功文档列表(在 try 外面定义,确保异常处理时可访问) success_doc = [] try: # 文档下载 separator_num = file_json.get("set_slice") if separator_num == "2": if reporter: reporter.complete(success=False) return {"code": 500, "message": "暂不支持解析"} docs = file_json.get("docs") flag = file_json.get("flag") for doc in docs: # ===== 检查点1:每个文档处理前检查取消标志 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在文档处理前被取消") raise TaskCancelledException("任务已被用户取消") url = doc.get("url") name = doc.get("name") doc_id = doc.get("document_id") async with aiohttp.ClientSession() as session: down_file_name, file_size = await self.save_file_temp(session, url, name) # 只接收txt和pdf格式,其它格式统一转换成pdf,不支持格式在convert_to_pdf中判断 if not os.path.exists(down_file_name): down_file_name, file_size = await self.save_file_temp(session, url, name) logger.info(f'{doc_id}文件下载失败,重试中~') logger.info(f'下载文件路径:{down_file_name}') if not name.endswith(".txt") and not name.endswith('.pdf'): down_file_name = convert_to_pdf(down_file_name) name = os.path.basename(down_file_name) file_parse = self._get_file_type(name) file_content_list = await file_parse.extract_text(down_file_name) logger.info(f"mineru解析的pdf数据:{file_content_list}") text_lists, flag_img_info = self.file_split(file_content_list, doc_id) docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name) logger.info(f"存储到milvus的文本数据:{docs}") # ===== 检查点2:解析完成、入库前检查取消标志 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库") raise TaskCancelledException("任务已被用户取消") if flag == "upload": # 插入到milvus库中 insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id) if insert_slice_flag: # 插入到mysql的slice info数据库中 insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id) else: insert_img_flag = False parse_file_status = False if insert_img_flag: insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs) # 插入mysql中的bm_media_replacement表中 else: # self.milvus_client._delete_by_doc_id(doc_id=doc_id) insert_milvus_flag = False # return resp parse_file_status = False if insert_milvus_flag: parse_file_status = True # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}") self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) raise TaskCancelledException("任务已被用户取消") else: self.mysql_client.delete_to_slice(doc_id=doc_id) # self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) # resp = {"code": 500, "message": insert_mysql_info} parse_file_status = False # return resp elif flag == "update": # 更新切片方式 # 先把库中的数据删除 self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) insert_milvus_start_time = time.time() insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id) # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists) insert_milvus_end_time = time.time() logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}") if insert_slice_flag: # 插入到mysql的slice info数据库中 insert_mysql_start_time = time.time() insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs) insert_mysql_end_time = time.time() logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}") else: # resp = {"code": 500, "message": insert_milvus_str} # return resp insert_milvus_flag = False parse_file_status = False if insert_milvus_flag: # resp = {"code": 200, "message": "切片修改成功"} parse_file_status = True # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}") self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) raise TaskCancelledException("任务已被用户取消") else: self.mysql_client.delete_to_slice(doc_id=doc_id) # self.milvus_client._delete_by_doc_id(doc_id=doc_id) # resp = {"code":500, "message": insert_mysql_info} parse_file_status = False # return resp if parse_file_status: success_doc.append(doc_id) else: if flag == "upload": for del_id in success_doc: self.milvus_client._delete_by_doc_id(doc_id=del_id) self.mysql_client.delete_image_url(doc_id=del_id) self.mysql_client.delete_to_slice(doc_id=del_id) if reporter: reporter.complete(success=False) self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}} self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"}) # 任务完成:发送100%进度 if reporter: reporter.complete(success=True) return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}} except TaskCancelledException as e: # ===== 处理任务取消异常 ===== logger.info(f"任务被取消 [task_id={task_id}]: {e}") self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) # if task_id: # del_success, del_info = self.mysql_client.delete_document(task_id) # if del_success: # logger.info(f"已删除 bm_document 记录: task_id={task_id}") # else: # logger.warning(f"删除 bm_document 记录失败: {del_info}") # 清理之前已成功处理的文档数据(slice、image、milvus) for del_id in success_doc: logger.info(f"清理已入库的文档数据: doc_id={del_id}") self.milvus_client._delete_by_doc_id(doc_id=del_id) self.mysql_client.delete_image_url(doc_id=del_id) self.mysql_client.delete_to_slice(doc_id=del_id) if reporter: reporter.complete(success=False) return {"code": 499, "message": "任务已取消", "knowledge_id": self.knowledge_id, "doc_info": {}} except Exception as e: # 捕获所有异常,统一处理 logger.error(f"文件处理异常 [task_id={task_id}]: {e}", exc_info=True) self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) if reporter: reporter.complete(success=False) return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}} finally: # ===== 注销任务 ===== if task_id: task_registry.unregister(task_id)