import asyncio import aiohttp import aiofiles import requests from rag.db import MilvusOperate, MysqlOperate from rag.task_registry import task_registry, TaskCancelledException from rag.document_load.pdf_load import MinerUParsePdf, MinerUParsePdfClient # from rag.document_load.dots_pdf_load import DotsPDFLoader from rag.document_load.paddleocr_load import PaddleOCRLoader # from rag.document_load.office_load import MinerUParseOffice from rag.document_load.txt_load import TextLoad # from rag.document_load.image_load import MinerUParseImage from rag.document_load.excel_load import parse_excel from utils.upload_file_to_oss import UploadMinio from utils.get_logger import setup_logger from config import minio_config import os import time from uuid import uuid1 from langchain_text_splitters import RecursiveCharacterTextSplitter from rag.document_load.document_format_conversion import * import re from typing import List, Dict from config import progress_callback_config from rag.document_load.md_splitter import MarkdownSplitter import sys sys.path.append("/opt/lightRAG_dir") from lightRAG import AsyncLightRAGManager class VLMRetryExhaustedError(RuntimeError): """VLM 多次重试失败""" pass # 使用 API 客户端模式 pdf_parse = MinerUParsePdfClient() # 备用:直接调用模式 # pdf_parse = MinerUParsePdf() # office_parse = MinerUParseOffice() text_parse = TextLoad() # image_parse = MinerUParseImage() logger = setup_logger(__name__) embedding_model_mapping_sequence_len = { "bge-m3": 5000, "Qwen3-Embedding": 30000, } class ProcessDocuments(): def __init__(self, file_json): self.file_json = file_json self.knowledge_id = self.file_json.get("knowledge_id") self.parser_type = "mineru" # 默认解析器类型,将在 process_documents 中更新 self.mysql_client = MysqlOperate() self.minio_client = UploadMinio() self.embedding_name = file_json.get("embedding_id", "e5") self.milvus_client = MilvusOperate(collection_name=self.knowledge_id, embedding_name=self.embedding_name) self.max_cut_len = embedding_model_mapping_sequence_len.get(self.embedding_name, 5000) # def _get_file_type(self, name): # if name.endswith(".txt"): # return text_parse # elif name.endswith('.pdf'): # return pdf_parse # elif name.endswith((".doc", ".docx", "ppt", "pptx")): # return office_parse # elif name.endswith((".jpg", "png", "jpeg")): # return image_parse # else: # raise "不支持的文件格式" def _get_image_base_path(self, pdf_file_name): """ 根据解析器类型返回图片基础路径 返回: str: 图片基础路径 """ if self.parser_type == "dots": return "./tmp_file/dots_parsed/" + pdf_file_name + "/images" elif self.parser_type == "2": return "./tmp_file/paddleocr_parsed/" + pdf_file_name + "/imgs" else: # mineru 默认路径 return "./tmp_file/" + pdf_file_name + "/vlm/images" # 只接收txt和pdf格式 def _get_file_type(self, name, parser_type="mineru"): """ 根据文件类型和解析器类型返回对应的解析器 参数: name: 文件名 parser_type: 解析器类型,"mineru" 或 "dots" """ if name.endswith(".txt"): return text_parse elif name.endswith('.pdf'): if parser_type == "dots": # Dots 解析器需要 file_json,动态创建 # return DotsPDFLoader(self.file_json) return elif parser_type == "2": # PaddleOCR-VL 解析器 return PaddleOCRLoader(self.file_json) else: # 默认使用 MinerU 解析器 return pdf_parse async def send_get_request(self, url, headers=None): """ 发送GET请求 参数: url: 请求的URL地址 headers: 可选的请求头字典 返回: dict: 包含状态码、响应数据等信息 """ try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, ssl=False) as resp: status_code = resp.status response_data = await resp.text() return { "code": 200, "status_code": status_code, "data": response_data, "message": "GET请求成功" } except Exception as e: logger.error(f"GET请求失败 [url={url}]: {e}") return { "code": 500, "message": f"GET请求失败: {str(e)}" } async def send_post_request(self, url, json_data=None, headers=None): """ 发送POST请求 参数: url: 请求的URL地址 json_data: JSON格式的请求体数据(字典类型) headers: 可选的请求头字典 返回: dict: 包含状态码、响应数据等信息 """ try: # 设置默认的Content-Type为application/json if headers is None: headers = {} if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' async with aiohttp.ClientSession() as session: async with session.post(url, json=json_data, headers=headers, ssl=False) as resp: status_code = resp.status response_data = await resp.text() logger.info(f"POST请求成功 [url={url}]: {response_data} json_data={json_data}") return { "code": 200, "status_code": status_code, "data": response_data, "message": "POST请求成功" } except Exception as e: logger.error(f"POST请求失败 [url={url}]: {e}") return { "code": 500, "message": f"POST请求失败: {str(e)}" } def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30): """ 同步发送POST请求 参数: url: 请求的URL地址 json_data: JSON格式的请求体数据(字典类型) headers: 可选的请求头字典 timeout: 请求超时时间(秒),默认30秒 返回: dict: 包含状态码、响应数据等信息 """ try: # 设置默认的Content-Type为application/json if headers is None: headers = {} if 'Content-Type' not in headers: headers['Content-Type'] = 'application/json' resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False) status_code = resp.status_code response_data = resp.text logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}") return { "code": 200, "status_code": status_code, "data": response_data, "message": "POST请求成功" } except requests.exceptions.Timeout as e: logger.error(f"同步POST请求超时 [url={url}]: {e}") return { "code": 500, "message": f"POST请求超时: {str(e)}" } except requests.exceptions.RequestException as e: logger.error(f"同步POST请求失败 [url={url}]: {e}") return { "code": 500, "message": f"POST请求失败: {str(e)}" } async def save_file_temp(self, session, url, name, max_retries=5): down_file_path = "./tmp_file" + f"/{self.knowledge_id}" # down_file_path = "./tmp_file" os.makedirs(down_file_path, exist_ok=True) down_file_name = down_file_path + f"/{name}" # if os.path.exists(down_file_name): # pass # else: headers = { "User-Agent": "Mozilla/5.0", "Accept": "*/*" } for i in range(max_retries): try: async with session.get(url, ssl=False, headers=headers) as resp: resp.raise_for_status() content_length = resp.headers.get('Content-Length') if content_length: file_size = int(content_length) else: file_size = 0 async with aiofiles.open(down_file_name, 'wb') as f: async for chunk in resp.content.iter_chunked(1024): await f.write(chunk) return down_file_name, file_size except Exception as e: logger.info(f"文件下载失败:{e}") logger.info(f"准备重试:{i + 1}/{max_retries}") def file_split_by_len(self, file_text): split_map = { "0": ["#"], # 按标题段落切片 "1": [""], # 按页切片 "2": ["\n"] # 按问答对 } separator_num = self.file_json.get("customSeparator") slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n") separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value] logger.info(f"文本切分字符:{separator}") text_split = RecursiveCharacterTextSplitter( separators=separator, chunk_size=500, chunk_overlap=40, length_function=len ) texts = text_split.split_text(file_text) return texts def split_text(self, file_text): text_split = RecursiveCharacterTextSplitter( separators=["\n\n", "\n"], chunk_size=500, chunk_overlap=40, length_function=len ) texts = text_split.split_text(file_text) return texts def split_into_atoms(self, text: str) -> List[Dict]: """ 将文本拆为不可拆分的结构原子 """ TABLE_PATTERN = re.compile(r"", re.IGNORECASE) PLACEHOLDER_PATTERN = re.compile(r"【[^】]+】") atoms = [] idx = 0 matches = [] for m in TABLE_PATTERN.finditer(text): matches.append((m.start(), m.end(), "table")) for m in PLACEHOLDER_PATTERN.finditer(text): matches.append((m.start(), m.end(), "placeholder")) matches.sort(key=lambda x: x[0]) for start, end, typ in matches: if idx < start: atoms.append({ "type": "text", "content": text[idx:start] }) atoms.append({ "type": typ, "content": text[start:end] }) idx = end if idx < len(text): atoms.append({ "type": "text", "content": text[idx:] }) return atoms # def calc_overlap_atoms(self, prev_atoms, overlap_ratio=0.1): # total_len = sum(len(a["content"]) for a in prev_atoms) # target_len = int(total_len * overlap_ratio) # overlap_atoms = [] # cur_len = 0 # # 从后往前收集原子 # for atom in reversed(prev_atoms): # overlap_atoms.insert(0, atom) # cur_len += len(atom["content"]) # if cur_len >= target_len: # break # return overlap_atoms def calc_overlap_atoms(self, prev_atoms, overlap_ratio=0.1): total_len = sum(len(a["content"]) if a["type"] == "text" else 0 for a in prev_atoms) target_len = int(total_len * overlap_ratio) overlap_atoms = [] cur_len = 0 # 从后往前收集原子 for atom in reversed(prev_atoms): if atom["type"] == "text": cur_len += len(atom["content"]) if cur_len > target_len: atom["content"] = atom["content"][-(target_len - (cur_len - len(atom["content"]))):] overlap_atoms.insert(0, atom) else: overlap_atoms.insert(0, atom) if cur_len >= target_len: break return overlap_atoms def overlap_chunks(self, chunks, overlap_ratio=0.1): """ chunks: 原始 chunk 列表 """ new_chunks = [] prev_atoms = None for chunk in chunks: atoms = self.split_into_atoms(chunk) if prev_atoms: overlap_atoms = self.calc_overlap_atoms(prev_atoms, overlap_ratio) merged_atoms = overlap_atoms + atoms else: merged_atoms = atoms # new_chunk = chunk.copy() new_chunk = "".join(a["content"] for a in merged_atoms) new_chunks.append(new_chunk) prev_atoms = atoms return new_chunks def split_large_atom(self, atom: Dict, max_chunk_size: int) -> List[Dict]: """ 切分超长的单个原子(表格、占位符、文本) 参数: atom: 原子字典 {"type": "table/placeholder/text", "content": "..."} max_chunk_size: 最大字符数限制 返回: List[Dict]: 切分后的原子列表 """ content = atom["content"] atom_type = atom["type"] # 如果不超长,直接返回 if len(content) <= max_chunk_size: return [atom] logger.warning(f"检测到超长原子 [type={atom_type}, length={len(content)}],进行切分") result_atoms = [] if atom_type == "table": # 表格按行切分 # 提取表格标签 table_match = re.match(r'(]*>)(.*)()', content, re.DOTALL | re.IGNORECASE) if not table_match: # 无法解析表格结构,按字符强制切分 logger.warning("无法解析表格结构,按字符强制切分") for i in range(0, len(content), max_chunk_size): chunk = content[i:i + max_chunk_size] result_atoms.append({"type": atom_type, "content": chunk}) return result_atoms table_open = table_match.group(1) table_body = table_match.group(2) table_close = table_match.group(3) # 按 行切分 rows = re.findall(r']*>.*?', table_body, re.DOTALL | re.IGNORECASE) if not rows: # 没有找到行,按字符强制切分 for i in range(0, len(content), max_chunk_size): chunk = content[i:i + max_chunk_size] result_atoms.append({"type": atom_type, "content": chunk}) return result_atoms # 组装表格块 current_chunk = table_open for row in rows: # 检查加入这一行后是否超长 if len(current_chunk) + len(row) + len(table_close) > max_chunk_size: # 当前块已满,保存并开始新块 if current_chunk != table_open: # 确保不是空表格 result_atoms.append({"type": atom_type, "content": current_chunk + table_close}) current_chunk = table_open else: # 单行就超长,强制加入 result_atoms.append({"type": atom_type, "content": current_chunk + row + table_close}) current_chunk = table_open continue current_chunk += row # 添加最后一个块 if current_chunk != table_open: result_atoms.append({"type": atom_type, "content": current_chunk + table_close}) elif atom_type == "placeholder": # 占位符按字符切分(通常占位符不应该超长,但以防万一) logger.warning(f"占位符超长 [length={len(content)}],按字符切分") for i in range(0, len(content), max_chunk_size): chunk = content[i:i + max_chunk_size] result_atoms.append({"type": atom_type, "content": chunk}) else: # text # 普通文本按字符切分 for i in range(0, len(content), max_chunk_size): chunk = content[i:i + max_chunk_size] result_atoms.append({"type": atom_type, "content": chunk}) logger.info(f"超长原子切分完成 [原长度={len(content)}, 切分为{len(result_atoms)}个块]") return result_atoms def smart_chunk_with_overlap(self, text: str, max_chunk_size: int = 5000, overlap_ratio: float = 0.1) -> List[str]: """ 智能分割长文本并添加重叠 参数: text: 输入的长文本字符串 max_chunk_size: 每块的最大字符数(所有内容),默认5000 overlap_ratio: 重叠率,默认0.1(10%) 返回: List[str]: 分割后带重叠的文本块列表 处理流程: 1. 如果文本不超长,直接返回 2. 将文本拆分为原子(表格、占位符、普通文本) 3. 对超长原子进行切分 4. 按照max_chunk_size将原子组装成块(计算所有内容的总长度) 5. 对所有块添加重叠率处理(重叠时只计算纯文本长度) """ if len(text) <= max_chunk_size: return [text] # 步骤1:将文本拆分为原子 atoms = self.split_into_atoms(text) # 步骤2:对超长原子进行切分 processed_atoms = [] for atom in atoms: if len(atom["content"]) > max_chunk_size: # 超长原子需要切分 split_atoms = self.split_large_atom(atom, max_chunk_size) processed_atoms.extend(split_atoms) else: processed_atoms.append(atom) # 步骤3:按照max_chunk_size组装原子成块(计算所有内容的总长度) chunks = [] current_chunk_atoms = [] current_total_len = 0 # 计算所有内容的总长度 for atom in processed_atoms: atom_len = len(atom["content"]) # 所有类型都计算长度 # 检查是否需要开始新块 if current_total_len > 0 and current_total_len + atom_len > max_chunk_size: # 当前块已满,保存并开始新块 chunk_text = "".join(a["content"] for a in current_chunk_atoms) chunks.append(chunk_text) current_chunk_atoms = [] current_total_len = 0 # 添加原子到当前块 current_chunk_atoms.append(atom) current_total_len += atom_len # 添加最后一个块 if current_chunk_atoms: chunk_text = "".join(a["content"] for a in current_chunk_atoms) chunks.append(chunk_text) # 步骤4:对所有块添加重叠率处理(重叠时只计算纯文本长度) if len(chunks) <= 1: return chunks return self.overlap_chunks(chunks, overlap_ratio) def chunk_text_for_rag(self, text: str, max_chars: int = 5000) -> List[str]: """ 分割长文本(兼容旧接口,内部调用新方法) 参数: text: 输入的长文本字符串 max_chars: 每块的最大字符数,默认5000 返回: List[str]: 分割后的文本块列表 """ return self.smart_chunk_with_overlap(text, max_chunk_size=max_chars, overlap_ratio=0.1) def find_split_by_table(self, chunk: str): """ 在块中查找 标签作为分割点 优先在完整表格边界处分割,避免截断表格 返回: split_position: 分割位置(标签之后),如果没找到返回 None """ # 从后向前查找最后一个完整的 标签 close_tag = '' last_close_pos = chunk.rfind(close_tag) if last_close_pos == -1: return None # 分割点在 之后 split_pos = last_close_pos + len(close_tag) # 确保分割点不是在块的最开始或最末尾(避免空块) if split_pos < 100 or split_pos >= len(chunk) - 100: return None return split_pos def find_split_point(self, chunk: str, full_text: str, chunk_start: int): """ 在块中查找最近的x.x标签作为分割点 返回: (split_position, overlap_content): 分割位置和重叠内容 """ # 匹配x.x格式的标签(如1.1, 2.3, 10.5等) pattern = r'\d+\.\d+' # pattern = r'\b\d+\.\d+\b' matches = list(re.finditer(pattern, chunk)) if not matches: return None, None # 找最后一个匹配的标签 last_match = matches[-1] tag_start = last_match.start() # 提取该标签的完整内容(从标签到下一个标签或块尾) tag_content_start = chunk_start + tag_start # 在全文中查找下一个x.x标签 remaining_text = full_text[tag_content_start:] next_tag_match = re.search(pattern, remaining_text[len(last_match.group()):]) if next_tag_match: tag_content_end = tag_content_start + len(last_match.group()) + next_tag_match.start() else: tag_content_end = len(full_text) overlap_content = full_text[tag_content_start:tag_content_end] # 检查重叠内容长度 if len(overlap_content) > self.max_cut_len: # 重叠内容本身超过限制,返回None使用句号分割 return None, None return tag_start, overlap_content def find_split_by_period(self, chunk: str, full_text: str, chunk_start: int): """ 按句号"。"查找分割点 返回: (split_position, overlap_content): 分割位置和重叠内容 """ # 从后向前查找句号 last_period = chunk.rfind('。') if last_period == -1: return None, None # 分割点是句号之后 split_pos = last_period + 1 # 查找这个句子的开始位置作为重叠内容 # 向前查找上一个句号 prev_period = chunk.rfind('。', 0, last_period) if prev_period != -1: overlap_start = prev_period + 1 overlap_content = chunk[overlap_start:split_pos] else: # 如果前面没有句号,则从块开始到当前句号 overlap_content = chunk[:split_pos] # 检查重叠内容长度 if len(overlap_content) > self.max_cut_len: overlap_content = "" return split_pos, overlap_content def process_table_images(self, table_body, doc_id, pdf_file_name, image_num, flag_img_info): """ 处理表格中的标签,提取图片并替换为占位符 参数: table_body: 表格HTML内容 doc_id: 文档ID pdf_file_name: PDF文件名 image_num: 当前图片序号 flag_img_info: 图片信息字典 返回: tuple: (处理后的表格HTML, 更新后的图片序号) """ if not table_body: return table_body, image_num # 匹配表格中的标签,提取src属性 img_pattern = re.compile(r']*src=["\']([^"\']+)["\'][^>]*>', re.IGNORECASE) def replace_img(match): nonlocal image_num img_src = match.group(1) try: # 提取图片文件名 image_name = img_src.split("/")[-1] # 使用_get_image_base_path拼接完整路径 image_base_path = self._get_image_base_path(pdf_file_name) save_image_path = f"{image_base_path}/{image_name}" # 检查图片文件是否存在 if not os.path.exists(save_image_path): logger.warning(f"表格中的图片文件不存在: {save_image_path}") return match.group(0) # 保持原样 # 生成占位符 # replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{image_name}.jpg" full_img_url = f"{minio_url}/{minio_bucket}/{minio_file_path}" original_img_tag = match.group(0) replace_text = re.sub( r'src=["\']([^"\']+)["\']', f'src="{full_img_url}"', original_img_tag, flags=re.IGNORECASE ) # 上传到MinIO self.minio_client.upload_file(save_image_path, minio_file_path) # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" image_num += 1 return replace_text except Exception as e: logger.error(f"处理表格中的图片失败 [src={img_src}]: {e}") return match.group(0) # 保持原样 # 替换所有标签 processed_table = img_pattern.sub(replace_img, table_body) return processed_table, image_num def process_table_content(self, table_caption, table_body, max_table_chars=8000): """ 处理表格内容,防止HTML表格过大导致数据库字段溢出 参数: table_caption: 表格标题列表 table_body: 表格HTML内容 max_table_chars: 表格内容最大字符数限制 返回: str: 处理后的表格文本 """ if not table_body: return "" # 组合标题和内容 caption_text = " ".join(table_caption) if table_caption else "" full_table_text = f"{caption_text}\n{table_body}\n" if caption_text else f"{table_body}\n" # 如果表格内容不超过限制,直接返回 if len(full_table_text) <= max_table_chars: return full_table_text logger.warning(f"表格内容过长({len(full_table_text)}字符),进行截断处理") # 尝试智能截断:优先保留表格结构 if "" in table_body: # 查找完整的表格标签 table_start = table_body.find(" 100: # 确保有足够空间 truncated_body = table_body[:available_chars] # 尝试在最后一个完整的行结束处截断 last_tr_end = truncated_body.rfind("") if last_tr_end != -1 and last_tr_end > available_chars // 2: truncated_body = truncated_body[:last_tr_end + 5] # 包含 # 确保表格标签闭合 if "" not in truncated_body: truncated_body += "" result = f"{caption_text}\n{truncated_body}\n[表格内容已截断]\n" if caption_text else f"{truncated_body}\n[表格内容已截断]\n" return result # 如果无法智能截断,进行简单截断 max_content_chars = max_table_chars - len("[内容已截断]\n") if caption_text: max_content_chars -= len(caption_text) + 1 # 减去标题和换行符 truncated_content = table_body[:max_content_chars] if max_content_chars > 0 else "" return f"{caption_text}\n{truncated_content}[内容已截断]\n" else: truncated_content = table_body[:max_content_chars] if max_content_chars > 0 else "" return f"{truncated_content}[内容已截断]\n" # 0 def split_by_title(self, file_content_list, set_table, doc_id, pdf_file_name): # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据 stop_title_list = ['前言', '目录', '目次', 'context', 'CONTEXT', 'content', 'CONTENT', 'contents', 'CONTENTS'] text_lists = [] bbox_list = [] page_list = [] text = "" first_bbox = None first_page = None image_num = 1 flag_img_info = {} first_level_1 = True level_1_text = "" level_2_text = "" for i, content_dict in enumerate(file_content_list): # 记录当前块的首bbox和首page if first_bbox is None and content_dict.get("bbox"): first_bbox = content_dict.get("bbox") first_page = content_dict.get("page_idx", 0) + 1 text_type = content_dict.get("type") content_text = content_dict.get("text") if text_type == "text": text_level = content_dict.get("text_level", "") if text_level == 1: if not level_1_text and first_level_1: if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text): if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text).group(1).strip() not in stop_title_list: level_1_text = f"# {content_text}\n" # first_level_1 = True if first_level_1: text += f"# {content_text}\n" first_level_1 = False else: if len(text) > self.max_cut_len: logger.info(f"块长度过大:{len(text)}") text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) logger.info(f"切分后的块:{text_chunks}") text_lists.extend(text_chunks) bbox_list.extend([first_bbox] * len(text_chunks)) page_list.extend([first_page] * len(text_chunks)) else: text_lists.append(text) bbox_list.append(first_bbox) page_list.append(first_page) text = f"# {content_text}\n" # 屏蔽常见干扰标题 if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text): if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text).group(1).strip() not in stop_title_list: level_1_text = f"# {content_text}\n" else: level_1_text = "" level_2_text = "" first_bbox = content_dict.get("bbox") first_page = content_dict.get("page_idx", 0) + 1 elif text_level == 2: if not level_2_text: text += f"## {content_text}\n" level_2_text = f"## {content_text}\n" else: if len(text) > self.max_cut_len: logger.info(f"块长度过大:{len(text)}") text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) logger.info(f"切分后的块:{text_chunks}") text_lists.extend(text_chunks) bbox_list.extend([first_bbox] * len(text_chunks)) page_list.extend([first_page] * len(text_chunks)) else: text_lists.append(text) bbox_list.append(first_bbox) page_list.append(first_page) text = level_1_text + f"## {content_text}\n" first_bbox = content_dict.get("bbox") first_page = content_dict.get("page_idx", 0) + 1 else: if text_level: text += text_level*"#" + " " + content_text + "\n" else: text += content_text elif text_type == "table" and set_table == "1": table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text elif text_type in ("image", "table"): # image_path = content_dict.get("img_path") # if not image_path: # continue # image_name = image_path.split("/")[1] # # 根据解析器类型选择图片路径 # image_base_path = self._get_image_base_path(pdf_file_name) # save_image_path = f"{image_base_path}/{image_name}" # replace_text = f"【示意图序号_{doc_id}_{image_num}】" # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" # self.minio_client.upload_file(save_image_path, minio_file_path) # minio_url = minio_config.get("minio_url") # minio_bucket = minio_config.get("minio_bucket") # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" # if text_type == "table": # text += " ".join(content_dict.get("table_caption")) + '\n' + content_dict.get("table_body", "") + '\n' # text += replace_text # image_num += 1 try: image_path = content_dict.get("img_path") replace_text = "" if not image_path and text_type == "image": continue elif image_path: image_name = image_path.split("/")[-1] image_base_path = self._get_image_base_path(pdf_file_name) save_image_path = f"{image_base_path}/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" if text_type == "image": image_head = content_dict.get("image_caption", "") image_tail = content_dict.get("image_footnote", "") replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail) if text_type == "table": image_num += 1 table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) # table_text = self.process_table_content( # content_dict.get("table_caption", []), # content_dict.get("table_body", "") # ) text += table_text if replace_text: text += replace_text image_num += 1 except Exception as e: logger.error(f"处理图片或表格失败: {e}") if i+1 == len(file_content_list): if len(text) > self.max_cut_len: logger.info(f"块长度过大(最后一个):{len(text)}") text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) logger.info(f"切分后的块(最后一个):{text_chunks}") text_lists.extend(text_chunks) bbox_list.extend([first_bbox] * len(text_chunks)) page_list.extend([first_page] * len(text_chunks)) else: text_lists.append(text) bbox_list.append(first_bbox) page_list.append(first_page) elif text_type == "list": list_items = content_dict.get("list_items") if list_items: text += "\n".join(list_items) + "\n" return text_lists, flag_img_info, bbox_list, page_list # 1 def split_by_page(self, file_content_list, set_table, doc_id, pdf_file_name): # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据 text_lists = [] bbox_list = [] page_list = [] current_page = "" text = "" first_bbox = None first_page = None image_num = 1 flag_img_info = {} for i,content_dict in enumerate(file_content_list): page_index = content_dict.get("page_idx") if i == 0: current_page = page_index first_bbox = content_dict.get("bbox") first_page = page_index + 1 if page_index is not None else None elif page_index != current_page: if len(text) > self.max_cut_len: text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) text_lists.extend(text_chunks) bbox_list.extend([first_bbox] * len(text_chunks)) page_list.extend([first_page] * len(text_chunks)) else: text_lists.append(text) bbox_list.append(first_bbox) page_list.append(first_page) text = "" current_page = page_index first_bbox = content_dict.get("bbox") first_page = page_index + 1 if page_index is not None else None text_type = content_dict.get("type") if text_type == "text": content_text = content_dict.get("text") text_level = content_dict.get("text_level") if text_level: text += "#" * text_level + " " + content_text else: text += content_text elif text_type == "table" and set_table == "1": table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text elif text_type in ("image", "table"): # image_path = content_dict.get("img_path") # if not image_path: # continue # image_name = image_path.split("/")[1] # # 根据解析器类型选择图片路径 # image_base_path = self._get_image_base_path(pdf_file_name) # save_image_path = f"{image_base_path}/{image_name}" # replace_text = f"【示意图序号_{doc_id}_{image_num}】" # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" # self.minio_client.upload_file(save_image_path, minio_file_path) # minio_url = minio_config.get("minio_url") # minio_bucket = minio_config.get("minio_bucket") # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" # if text_type == "table": # text += " ".join(content_dict.get("table_caption", [])) + '\n' + content_dict.get("table_body", "") + '\n' # text += replace_text # image_num += 1 try: image_path = content_dict.get("img_path") replace_text = "" if not image_path and text_type == "image": continue elif image_path: image_name = image_path.split("/")[-1] image_base_path = self._get_image_base_path(pdf_file_name) save_image_path = f"{image_base_path}/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" if text_type == "image": image_head = content_dict.get("image_caption", "") image_tail = content_dict.get("image_footnote", "") replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail) if text_type == "table": image_num += 1 table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text if replace_text: text += replace_text image_num += 1 except Exception as e: logger.error(f"处理图片或表格失败: {e}") if i+1 == len(file_content_list): if len(text) > self.max_cut_len: text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) text_lists.extend(text_chunks) bbox_list.extend([first_bbox] * len(text_chunks)) page_list.extend([first_page] * len(text_chunks)) else: text_lists.append(text) bbox_list.append(first_bbox) page_list.append(first_page) return self.overlap_chunks(text_lists), flag_img_info, bbox_list, page_list # 其它 def split_by_self(self, file_content_list, set_table, slice_value, doc_id, pdf_file_name): # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断 logger.info(f"自定义的分隔符:{slice_value}") text = "" first_bbox = None first_page = None image_num = 1 flag_img_info = {} for i, content_dict in enumerate(file_content_list): # 记录首个bbox和page if first_bbox is None and content_dict.get("bbox"): first_bbox = content_dict.get("bbox") first_page = content_dict.get("page_idx", 0) + 1 text_type = content_dict.get("type") if text_type == "text": content_text = content_dict.get("text") text_level = content_dict.get("text_level") if text_level: text += "#" * text_level + " " + content_text else: text += content_text elif text_type == "table" and set_table == "1": table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text elif text_type in ("image", "table"): # image_path = content_dict.get("img_path") # if not image_path: # continue # image_name = image_path.split("/")[1] # # 根据解析器类型选择图片路径 # image_base_path = self._get_image_base_path(pdf_file_name) # save_image_path = f"{image_base_path}/{image_name}" # replace_text = f"【示意图序号_{doc_id}_{image_num}】" # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" # self.minio_client.upload_file(save_image_path, minio_file_path) # minio_url = minio_config.get("minio_url") # minio_bucket = minio_config.get("minio_bucket") # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" # if text_type == "table": # text += " ".join(content_dict.get("table_caption", [])) + '\n' + content_dict.get("table_body", "") + '\n' # text += replace_text # image_num += 1 try: image_path = content_dict.get("img_path") replace_text = "" if not image_path and text_type == "image": continue elif image_path: image_name = image_path.split("/")[-1] image_base_path = self._get_image_base_path(pdf_file_name) save_image_path = f"{image_base_path}/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" if text_type == "image": image_head = content_dict.get("image_caption", "") image_tail = content_dict.get("image_footnote", "") replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail) if text_type == "table": image_num += 1 table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text if replace_text: text += replace_text image_num += 1 except Exception as e: logger.error(f"处理图片或表格失败: {e}") split_lists = text.split(slice_value) text_lists = [] for split_text in split_lists: # r = len(split_text)//500 # if r >= 1: # for i in range(r+1): # t = split_text[i*500:(i+1)*500] # if t: # text_lists.append(t) if not split_text: continue if len(split_text) > self.max_cut_len: text_chunks = self.smart_chunk_with_overlap(split_text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) text_lists.extend(text_chunks) else: text_lists.append(split_text) # 所有块使用首个bbox和page bbox_list = [first_bbox] * len(text_lists) page_list = [first_page] * len(text_lists) return text_lists, flag_img_info, bbox_list, page_list # 3 def split_by_min_paragraph(self, file_content_list, set_table, doc_id, pdf_file_name): """ 按最小段落切分:每个标题及其下面的非标题内容切为一个块 特殊处理:1级标题(text_level=1)合并到后面非1级标题的块中,连续1级标题也合并 构建 title_dict,每个块包含完整的父标题链内容 数字结构标题(如5.1.1)的父级链查找 """ text_lists = [] bbox_list = [] page_list = [] text = "" first_bbox = None first_page = None # pending_level1 = "" # 累积的1级标题文本 pending_bbox = None # 累积1级标题时的首个bbox pending_page = None # 累积1级标题时的首个page image_num = 1 flag_img_info = {} path_title_list = [] father_path_title_list = [] title_dict = {} # 存储 title_path -> 标题内容的映射 number_title_dict = {} # 存储 数字编号 -> title_path 的映射 coln_log = False coln_log_level = 0 for i, content_dict in enumerate(file_content_list): text_type = content_dict.get("type") content_text = content_dict.get("text", "") if text_type == "text": text_level = content_dict.get("text_level", 0) # if text_level == 1: # # 1级标题:记录到 title_dict 并累积到pending_level1 # title_path = content_dict.get("title_path", "") # if title_path: # title_content = f"# {content_text}\n" # title_dict[title_path] = title_content # # 提取数字编号并记录映射 # number_match = re.match(r'^(\d+(?:\.\d+)*)', content_text) # if number_match: # number_title_dict[number_match.group(1)] = title_path # pending_level1 += f"# {content_text}\n" # if pending_bbox is None and content_dict.get("bbox"): # pending_bbox = content_dict.get("bbox") # pending_page = content_dict.get("page_idx", 0) + 1 if text_level: # 记录标题到 title_dict title_path = content_dict.get("title_path", "") if title_path: title_content = "#" * text_level + f" {content_text}\n" title_dict[title_path] = title_content # 提取数字编号并记录映射 number_match = re.match(r'^(\d+(?:\.\d+)*)', content_text) if number_match: # if number_match.group(1) not in number_title_dict: number_title_dict[number_match.group(1)] = title_content if coln_log: title_number = re.match(r'^(\d+(?:\.\d+)*)', content_text).group(1) if re.match(r'^(\d+(?:\.\d+)*)', content_text) else "" # logger.info(title_number) if text_level < coln_log_level or title_number not in ["1","2","3","4","5","6","7","8","9"]: coln_log = False coln_log_level = 0 else: text += "#" * text_level + f" {content_text}\n" continue # 保存当前块(如果有内容) if text: if len(text) > self.max_cut_len: text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) text_lists.extend(text_chunks) bbox_list.extend([first_bbox] * len(text_chunks)) page_list.extend([first_page] * len(text_chunks)) # 使用当前块的标题路径 last_path = path_title_list[-1] if path_title_list else "" last_father = father_path_title_list[-1] if father_path_title_list else "" path_title_list.extend([last_path] * len(text_chunks)) father_path_title_list.extend([last_father] * len(text_chunks)) else: text_lists.append(text) bbox_list.append(first_bbox) page_list.append(first_page) # 使用当前块的标题路径 path_title_list.append(path_title_list[-1] if path_title_list else "") father_path_title_list.append(father_path_title_list[-1] if father_path_title_list else "") # 新块:构建完整的父标题链 title_chain_text = self._build_title_chain(title_path, title_dict, number_title_dict) if not coln_log: if content_text[-1] in [":", ":"]: coln_log = True coln_log_level = text_level # 更新"当前块"的标题路径(为下一次保存块做准备) if path_title_list: # 如果列表不为空,更新最后一个元素 path_title_list[-1] = title_path parts = title_path.split("->") if title_path else [] father_path_title_list[-1] = "->".join(parts[:-1]) if len(parts) > 1 else "" else: # 第一次遇到标题,添加到列表 path_title_list.append(title_path) parts = title_path.split("->") if title_path else [] father_path_title_list.append("->".join(parts[:-1]) if len(parts) > 1 else "") text = title_chain_text # 新块的首bbox/page:优先用pending的,否则用当前的 first_bbox = pending_bbox if pending_bbox else content_dict.get("bbox") first_page = pending_page if pending_page else (content_dict.get("page_idx", 0) + 1 if content_dict.get("page_idx") is not None else None) pending_bbox = None pending_page = None # if not coln_log: # if content_text[-1] in [":", ":"]: # coln_log = True # coln_log_level = text_level # # text += "#" * text_level + f" {content_text}\n" # continue else: # 非标题内容:追加到当前块 if first_bbox is None and content_dict.get("bbox"): first_bbox = content_dict.get("bbox") first_page = content_dict.get("page_idx", 0) + 1 text += content_text elif text_type == "table" and set_table == "1": table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text elif text_type in ("image", "table"): try: image_path = content_dict.get("img_path") replace_text = "" if not image_path and text_type == "image": continue elif image_path: image_name = image_path.split("/")[-1] image_base_path = self._get_image_base_path(pdf_file_name) save_image_path = f"{image_base_path}/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" if text_type == "image": image_head = content_dict.get("image_caption", "") image_tail = content_dict.get("image_footnote", "") replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail) if text_type == "table": image_num += 1 table_body = content_dict.get("table_body", "") # 先处理表格中的图片 table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info) # 再处理表格内容 table_text = self.process_table_content( content_dict.get("table_caption", []), table_body ) text += table_text if replace_text: text += replace_text image_num += 1 except Exception as e: logger.error(f"处理图片或表格失败: {e}") elif text_type == "list": list_items = content_dict.get("list_items") if list_items: text += "\n".join(list_items) + "\n" # # 处理最后一个块 # if pending_level1 or text: # final_text = pending_level1 + text if pending_level1 else text # if final_text: # if len(final_text) > 5000: # text_chunks = self.smart_chunk_with_overlap(final_text, max_chunk_size=5000, overlap_ratio=0.1) # text_lists.extend(text_chunks) # final_bbox = pending_bbox if pending_bbox else first_bbox # final_page = pending_page if pending_page else first_page # bbox_list.extend([final_bbox] * len(text_chunks)) # page_list.extend([final_page] * len(text_chunks)) # path_title_list.extend([""] * len(text_chunks)) # father_path_title_list.extend([""] * len(text_chunks)) # else: # text_lists.append(final_text) # bbox_list.append(pending_bbox if pending_bbox else first_bbox) # page_list.append(pending_page if pending_page else first_page) # path_title_list.append("") # father_path_title_list.append("") if text: final_text = text if final_text: if len(final_text) > self.max_cut_len: text_chunks = self.smart_chunk_with_overlap(final_text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1) text_lists.extend(text_chunks) final_bbox = pending_bbox if pending_bbox else first_bbox final_page = pending_page if pending_page else first_page bbox_list.extend([final_bbox] * len(text_chunks)) page_list.extend([final_page] * len(text_chunks)) path_title_list.extend([""] * len(text_chunks)) father_path_title_list.extend([""] * len(text_chunks)) else: text_lists.append(final_text) bbox_list.append(pending_bbox if pending_bbox else first_bbox) page_list.append(pending_page if pending_page else first_page) path_title_list.append("") father_path_title_list.append("") return text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list def _build_title_chain(self, title_path: str, title_dict: Dict[str, str], number_title_dict: Dict[str, str] = None) -> str: """ 根据 title_path 构建完整的父标题链内容 参数: title_path: 标题路径,格式如 "1->1.1->1.1.1" title_dict: 标题字典,格式如 {"1": "1、xx\n", "1->1.1": "1.1、xx\n", ...} number_title_dict: 数字标题映射字典,格式如 {"5.1.1": "path", "5.1": "path", "5": "path"} 返回: 完整的标题链文本,如 "1、xx\n1.1、xx\n1.1.1、xx\n" """ if not title_path: return "" # 分解 title_path,构建所有父路径 parts = title_path.split("->") title_chain = [] # 使用数字结构查找父级链 if number_title_dict: # 获取当前标题路径对应的标题内容 current_title = title_dict.get(title_path, "") # 使用正则提取数字编号 number_match = re.match(r'^(#+\s+)?(\d+(?:\.\d+)*)', current_title) if number_match: number = number_match.group(2) # 提取数字部分,如 "5.1.1" # 按"."切分,逐层查找父标题 number_parts = number.split(".") for i in range(len(number_parts)): parent_number = ".".join(number_parts[:i+1]) if parent_number in number_title_dict: parent_path = number_title_dict[parent_number] # if parent_path in title_dict: # current_len = len(re.match(r'^(#+)', parent_path).group(1) if re.match(r'^(#+)', parent_path) else '') # if title_chain: # last_len = len(re.match(r'^(#+)', current_title).group(1) if re.match(r'^(#+)', title_chain[-1]) else '') # else: # last_len = 100 # if current_len >= last_len: # continue title_chain.append(parent_path) return "".join(title_chain) # 如果不是数字结构或未找到,按标题路径 for i in range(len(parts)): # 构建从根到当前层级的路径 current_path = "->".join(parts[:i+1]) if current_path in title_dict: title_chain.append(title_dict[current_path]) return "".join(title_chain) def file_split(self, file_content_list, doc_id, pdf_file_name): # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接 separator_num = self.file_json.get("customSeparator") set_table = self.file_json.get("set_table") # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value] # logger.info(f"文本切分字符:{separator}") if isinstance(file_content_list, str): file_text = file_content_list text_lists = self.split_text(file_text) # 字符串切分无bbox/page信息 bbox_list = [None] * len(text_lists) page_list = [None] * len(text_lists) return text_lists, {}, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list # 0自定义,1按照页,2是最大,3是最小 elif separator_num == "2": # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个# text_lists, flag_img_info, bbox_list, page_list = self.split_by_title(file_content_list, set_table, doc_id, pdf_file_name) return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list elif separator_num == "1": # 按照页面方式切分 text_lists, flag_img_info, bbox_list, page_list = self.split_by_page(file_content_list, set_table, doc_id, pdf_file_name) return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list elif separator_num == "-1": # 按照问答对切分 针对exce文档,暂不实现 return [], {}, [], [], [], [] elif separator_num == "3": # 按最小段落切分:每个标题+其下内容为一块,1级标题合并到后续非1级块 text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list = self.split_by_min_paragraph(file_content_list, set_table, doc_id, pdf_file_name) return text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list elif separator_num == "0": # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500 slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n") text_lists, flag_img_info, bbox_list, page_list = self.split_by_self(file_content_list, set_table, slice_value, doc_id, pdf_file_name) return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list def parse_excel_to_text_lists(self, excel_file_path): """ 解析 Excel 文件,返回 Markdown 表格格式的文本列表 每个工作表(sheet)的第一行作为整个表的表头,后续所有块的数据行都使用该表头 """ try: from tabulate import tabulate except ImportError: logger.error("缺少 tabulate 库,请安装: pip install tabulate") logger.info("降级使用纯文本格式") # 降级为原始纯文本格式 result = parse_excel(excel_file_path) text_lists = [] for sheet_name, blocks in result.items(): for block in blocks: content = block.get("content", []) lines = [" ".join(str(cell) if cell is not None else "" for cell in row) for row in content] text = "\n".join(lines).strip() if text: text_lists.append(text) return text_lists result = parse_excel(excel_file_path) text_lists = [] for sheet_name, blocks in result.items(): if not blocks: continue # 获取该工作表的第一个块的第一行作为整个表的表头 sheet_headers = None first_block_content = blocks[0].get("content", []) if first_block_content: sheet_headers = [str(cell).strip() if cell is not None else "" for cell in first_block_content[0]] if not sheet_headers: logger.warning(f"工作表 {sheet_name} 没有有效的表头,跳过") # continue logger.info(f"工作表 {sheet_name} 的表头: {sheet_headers}") for block_idx, block in enumerate(blocks): content = block.get("content", []) if not content: continue try: # 确定数据行:第一个块跳过第一行(表头),后续块使用全部行 if block_idx == 0: # 第一个块:跳过第一行(表头),其余行作为数据 data_rows = content[1:] else: # 后续块:所有行都是数据行 data_rows = content # 格式化数据行 rows = [] for row in data_rows: formatted_row = [str(cell).strip() if cell is not None else "" for cell in row] rows.append(formatted_row) # 生成 Markdown 表格 if rows: # 有数据行 md_table = tabulate(rows, headers=sheet_headers, tablefmt="github") text_lists.append(md_table) logger.debug(f"成功生成 Markdown 表格 [sheet={sheet_name}, block={block_idx}],表头: {sheet_headers}, 数据行数: {len(rows)}") else: logger.debug(f"跳过空数据块 [sheet={sheet_name}, block={block_idx}]") except Exception as e: logger.error(f"生成 Markdown 表格失败 [sheet={sheet_name}, block={block_idx}]: {e}") # 降级为纯文本格式 if block_idx == 0: data_rows = content[1:] else: data_rows = content lines = [" ".join(str(cell) if cell is not None else "" for cell in row) for row in data_rows] text = "\n".join(lines).strip() if text: text_lists.append(text) logger.info(f"降级使用纯文本格式 [sheet={sheet_name}, block={block_idx}]") logger.info(f"Excel 解析完成,共生成 {len(text_lists)} 个文本块") return text_lists def process_data_to_milvus_schema(self, text_lists, doc_id, name, path_title_list, father_path_title_list=None, bbox_list=None, page_list=None): """组织数据格式: { "content": text, "doc_id": doc_id, "chunk_id": chunk_id, "metadata": {"source": file_name}, "path_title": path_title_list[i], "Chapter": path_title_list[i], "Father_Chapter": father_path_title_list[i], "bbox": bbox_list[i], "page": page_list[i] } """ if father_path_title_list is None: father_path_title_list = [""] * len(text_lists) if bbox_list is None: bbox_list = [None] * len(text_lists) if page_list is None: page_list = [None] * len(text_lists) docs = [] total_len = 0 for i, text in enumerate(text_lists): chunk_id = str(uuid1()) chunk_len = len(text) total_len += chunk_len # bbox转为字符串存储 bbox_val = bbox_list[i] if i < len(bbox_list) else None bbox_str = str(bbox_val) if bbox_val else None page_val = page_list[i] if i < len(page_list) else None d = { "content": text, "doc_id": doc_id, "chunk_id": chunk_id, "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len, "knowledge_id": self.knowledge_id}, # "path_title": path_title_list[i], "Chapter": path_title_list[i], "Father_Chapter": father_path_title_list[i], "bbox": bbox_str, "page": page_val } docs.append(d) return docs, total_len async def process_documents(self, file_json, task_id=None): # 提取用户ID user_id = file_json.get("userId", "") embedding_id = file_json.get("embedding_id", "bge-m3") # ===== 注册任务到全局任务表 ===== task_ctx = None if task_id: task_ctx = task_registry.register(task_id, user_id, self.knowledge_id) # 启动进度(独立线程) reporter = None if task_id: from rag.progress_reporter import ProgressReporter # from config import progress_callback_config estimate_seconds = progress_callback_config.get("estimate_seconds", 120) callback_url = progress_callback_config.get("base_url") reporter = ProgressReporter(task_id, callback_url, estimate_seconds, user_id) reporter.start() # 关联 reporter 到任务上下文(用于取消时停止进度上报) if task_ctx: task_ctx.reporter = reporter # 更新任务状态为开始(1) # self.mysql_client.update_task_status_start(task_id) # 初始化成功文档列表(在 try 外面定义,确保异常处理时可访问) success_doc = [] try: # 文档下载 separator_num = file_json.get("customSeparator", "") if separator_num == "-1": if reporter: reporter.complete(success=False) # if task_id: # self.mysql_client.update_task_status_error(task_id) return {"code": 500, "message": "暂不支持解析"} docs = file_json.get("docs") flag = file_json.get("flag") for doc in docs: # ===== 检查点1:每个文档处理前检查取消标志 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在文档处理前被取消") raise TaskCancelledException("任务已被用户取消") url = doc.get("url") name = doc.get("name", "").replace(" ", "") doc_id = doc.get("document_id") # ===== 文档复制模式处理 ===== old_document_id = doc.get("oldDocumentId") old_knowledge_id = doc.get("oldKnowledgeId") tenant_id = file_json.get("tenantId", "000000") # 如果提供old 文档id 和 old 知识库id 则表示文档存在直接复制 if old_document_id and old_knowledge_id: logger.info(f"进入文档复制模式: oldDocumentId={old_document_id}, oldKnowledgeId={old_knowledge_id}, newDocId={doc_id}, newKnowledgeId={self.knowledge_id}") try: # 1. 复制 Milvus 向量数据(使用前端传入的 doc_id) old_milvus_client = MilvusOperate(collection_name=old_knowledge_id, embedding_name=self.embedding_name) milvus_resp = old_milvus_client._copy_single_doc_to_collection( new_collection_name=self.knowledge_id, old_doc_id=old_document_id, new_doc_id=doc_id, embedding_name=self.embedding_name ) if milvus_resp.get("code") != 200: logger.error(f"[文档复制模式] Milvus数据复制失败: {milvus_resp.get('message')}") self.send_post_request_sync( f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", {"status": "2", "documentId": doc_id} ) if reporter: reporter.complete(success=False) return {"code": 500, "message": f"文档复制模式:Milvus复制失败 - {milvus_resp.get('message')}", "knowledge_id": self.knowledge_id, "doc_info": {}} # 获取 chunk_id 映射 chunk_id_mapping = milvus_resp.get("chunk_id_mapping", {}) logger.info(f"[文档复制模式] Milvus数据复制成功,共 {milvus_resp.get('data', {}).get('total_records', 0)} 条记录") # 2. 复制 MySQL 元数据(slice_text 和 old_slice_text 都用旧的 old_slice_text) mysql_success, mysql_result = self.mysql_client.copy_single_doc_metadata_for_document_copy( source_knowledge_id=old_knowledge_id, source_doc_id=old_document_id, new_knowledge_id=self.knowledge_id, new_doc_id=doc_id, chunk_id_mapping=chunk_id_mapping, tenant_id=tenant_id ) knowledge_id = file_json.get("knowledge_id", "") enabled_kn_rs = self.mysql_client.query_knowledge_by_ids([knowledge_id]) enabled_kn_gp_ids = set() for r in enabled_kn_rs: if r["knowledge_graph"]: enabled_kn_gp_ids.add(r["knowledge_id"]) if enabled_kn_gp_ids: # if file_json.get("lightrag"): result = mysql_result.get("result", {}) knowledge_id = result.get("new_knowledge_id", "") document_id = result.get("new_doc_id", "") datas = result.get("lightrag_data", []) # logger.info(f"11111111111111111111111{result}") rag_mgr = AsyncLightRAGManager() await rag_mgr.init_workspace(knowledge_id) await rag_mgr.insert(knowledge_id, {"content": datas, "ids": [document_id]}) await rag_mgr.close() if not mysql_success: logger.error(f"[文档复制模式] MySQL元数据复制失败: {mysql_result.get('error')}") # 回滚 Milvus 数据 self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.send_post_request_sync( f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", {"status": "2", "documentId": doc_id} ) if reporter: reporter.complete(success=False) return {"code": 500, "message": f"文档复制模式:MySQL复制失败 - {mysql_result.get('error')}", "knowledge_id": self.knowledge_id, "doc_info": {}} logger.info(f"[文档复制模式] MySQL元数据复制成功: {mysql_result}") # 3. 发送成功回调 # slice_count = mysql_result.get("slice_count", 0) self.send_post_request_sync( f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", { "knowledgeId": self.knowledge_id, "documentId": doc_id, # "sliceTotal": slice_count, "status": "1" } ) if reporter: reporter.complete(success=True) logger.info(f"[文档复制模式] 文档复制完成: oldDocId={old_document_id} -> newDocId={doc_id}") return { "code": 200, "message": "文档复制模式:复制成功", "knowledge_id": self.knowledge_id, "doc_info": { # "slice_num": slice_count, "old_document_id": old_document_id, "new_document_id": doc_id } } except Exception as e: logger.error(f"[文档复制模式] 处理异常: {e}", exc_info=True) self.send_post_request_sync( f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", {"status": "2", "documentId": doc_id} ) if reporter: reporter.complete(success=False) return {"code": 500, "message": f"文档复制模式异常: {str(e)}", "knowledge_id": self.knowledge_id, "doc_info": {}} # ===== 文档复制模式处理结束 ===== # 文件单独处理(standardClassification=3)(临时) remark = doc.get("remark", "") standard_classification = doc.get("standardClassification", "") # standardClassification 为 '3' 时,不解析不分块,直接拼接数据入库 if standard_classification == "3": logger.info(f"standardClassification=3,跳过解析,直接使用文件名+URL+remark入库: {name}") # 拼接文本:文件名 + URL + remark combined_text = f"{name}\n {url}\n {remark}" text_lists = [combined_text] # 空变量赋值 flag_img_info = {} full_md_url = "" full_pdf_url = "" oss_id = "" file_size = None path_title_list = [""] father_path_title_list = [""] bbox_list = [None] page_list = [None] milvus_docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name, path_title_list, father_path_title_list, bbox_list, page_list) logger.info(f"standardClassification=3,存储到milvus的文本数据:{milvus_docs}") # ===== 检查点2:入库前检查取消标志 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库") raise TaskCancelledException("任务已被用户取消") if flag == "upload": insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(milvus_docs, self.knowledge_id, doc_id, tenant_id, user_id) if insert_slice_flag: insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id) else: insert_img_flag = False parse_file_status = False if insert_img_flag: insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(milvus_docs) else: insert_milvus_flag = False parse_file_status = False if insert_milvus_flag: parse_file_status = True if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}") self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) raise TaskCancelledException("任务已被用户取消") else: self.mysql_client.delete_to_slice(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) parse_file_status = False elif flag == "update": self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(milvus_docs, self.knowledge_id, doc_id, tenant_id, user_id) if insert_slice_flag: insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(milvus_docs) else: insert_milvus_flag = False parse_file_status = False if insert_milvus_flag: parse_file_status = True if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}") self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) raise TaskCancelledException("任务已被用户取消") else: self.mysql_client.delete_to_slice(doc_id=doc_id) parse_file_status = False if parse_file_status: success_doc.append(doc_id) else: if flag == "upload": for del_id in success_doc: self.milvus_client._delete_by_doc_id(doc_id=del_id) self.mysql_client.delete_image_url(doc_id=del_id) self.mysql_client.delete_to_slice(doc_id=del_id) if reporter: reporter.complete(success=False) # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"), "tenantId": tenant_id}) return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}} # 继续下一个文档 continue ## 文件模式(standardClassification=3) ↑ async with aiohttp.ClientSession() as session: # 下载并保存文件 down_file_name, file_size = await self.save_file_temp(session, url, name) excel_file_path = down_file_name # 保留原始 Excel 文件路径 # 只接收txt和pdf格式,其它格式统一转换成pdf,不支持格式在convert_to_pdf中判断 if not name.endswith(".txt") and not name.endswith('.pdf') and not file_json.get("markDownFlg"): # 转化为pdf文件 down_file_name = convert_to_pdf(down_file_name) name = os.path.basename(down_file_name) if down_file_name: # 上传PDF文件到MinIO pdf_file_name = os.path.basename(down_file_name) minio_pdf_path = f"/pdf/{self.knowledge_id}/{doc_id}/{pdf_file_name}" upload_success = self.minio_client.upload_file(down_file_name, minio_pdf_path) pdf_file_size = os.path.getsize(down_file_name) # 记录OSS信息到数据库 if upload_success: minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") full_pdf_url = f"{minio_url}/{minio_bucket}{minio_pdf_path}" _, oss_id = self.mysql_client.insert_oss_record(pdf_file_name, full_pdf_url, user_id, ".pdf", pdf_file_size) logger.info(f"PDF文件已上传至MinIO并记录OSS: {full_pdf_url}") # Excel 特殊解析模式 excel_except = file_json.get("excel_parsing", False) if excel_except and excel_file_path.endswith((".xls", ".xlsx")): logger.info(f"使用 Excel 特殊解析模式: {excel_file_path}") text_lists = self.parse_excel_to_text_lists(excel_file_path) flag_img_info = {} path_title_list = [""] * len(text_lists) father_path_title_list = [""] * len(text_lists) bbox_list = [None] * len(text_lists) page_list = [None] * len(text_lists) # 生成并上传 MD 文件 full_md_url = "" try: # 将所有表格合并为一个 MD 文件 md_content = "\n\n".join(text_lists) # 保存 MD 文件到临时目录 excel_base_name = os.path.splitext(os.path.basename(excel_file_path))[0] md_file_name = f"{excel_base_name}_tables.md" temp_md_dir = f"./tmp_file/{self.knowledge_id}" os.makedirs(temp_md_dir, exist_ok=True) temp_md_path = os.path.join(temp_md_dir, md_file_name) # 写入 MD 文件 with open(temp_md_path, 'w', encoding='utf-8') as f: f.write(md_content) logger.info(f"Excel MD 文件已保存: {temp_md_path}") # 上传到 MinIO minio_md_path = f"/md/{self.knowledge_id}/{doc_id}/{md_file_name}" upload_success = self.minio_client.upload_file(temp_md_path, minio_md_path) if upload_success: md_file_size = os.path.getsize(temp_md_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") full_md_url = f"{minio_url}/{minio_bucket}{minio_md_path}" # 记录 OSS 信息到数据库 _, oss_id = self.mysql_client.insert_oss_record( md_file_name, full_md_url, user_id, ".md", md_file_size ) logger.info(f"Excel MD 文件已上传至 MinIO 并记录 OSS: {full_md_url}") else: logger.warning(f"Excel MD 文件上传失败: {temp_md_path}") except Exception as e: logger.error(f"Excel MD 文件生成或上传失败: {e}", exc_info=True) full_md_url = "" # MD 解析模式 elif file_json.get("markDownFlg"): split_mode = file_json.get("customSeparator", "2") if split_mode == "3": split_mode = "min" elif split_mode == "2": split_mode = "max" logger.info(f"使用MD解析规则,切分方式:{split_mode}") text_lists = await MarkdownSplitter(max_chunk_size=self.max_cut_len).split_markdown(down_file_name, split_mode) logger.info(f"MD解析,存入mlivus的数据{text_lists}") flag_img_info = {} path_title_list = [""] * len(text_lists) father_path_title_list = [""] * len(text_lists) bbox_list = [None] * len(text_lists) page_list = [None] * len(text_lists) full_md_url = url full_pdf_url = "" oss_id = "" # from rag.document_load.md_to_html_to_pdf import AsyncMdToPdf # md_to_pdf_file_name = await AsyncMdToPdf().convert_md_to_pdf(down_file_name) # if md_to_pdf_file_name: # down_file_name = md_to_pdf_file_name # name = os.path.basename(down_file_name) else: # 获取解析器类型(默认 mineru) parser_type = file_json.get("parsingType", "0") self.parser_type = parser_type # 保存解析器类型供后续使用 logger.info(f"使用解析器: {parser_type}") file_parse = self._get_file_type(name, parser_type) max_retries = 3 retry_delay = 5 last_error = None for attempt in range(max_retries): try: if parser_type == "dots": file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name, doc_id) elif parser_type == "2": file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name, doc_id) else: # MinerU 解析 file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name) break # 成功则跳出重试循环 except Exception as e: last_error = e logger.error(f"解析失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}") if attempt < max_retries - 1: logger.info(f"等待 {retry_delay} 秒后重试...") await asyncio.sleep(retry_delay) else: logger.error(f"解析失败,已重试 {max_retries} 次") raise RuntimeError(f"文档解析失败: {str(last_error)}") # 上传 MD 文件到 minio full_md_url = "" if path_md: md_file_name = os.path.basename(path_md) file_extension = os.path.splitext(md_file_name)[1] minio_md_path = f"/md/{self.knowledge_id}/{doc_id}/{md_file_name}" upload_success = self.minio_client.upload_file(path_md, minio_md_path) md_file_size = os.path.getsize(path_md) # 记录 OSS 信息到数据库 if upload_success: minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") full_md_url = f"{minio_url}/{minio_bucket}{minio_md_path}" _, oss_id = self.mysql_client.insert_oss_record(md_file_name, full_md_url, user_id, file_extension, md_file_size) logger.info(f"MD 文件已上传至 minio 并记录 OSS: {full_md_url}") logger.info(f"mineru解析的pdf数据:{file_content_list}") split_result = self.file_split(file_content_list, doc_id, pdf_file_name) # 返回值:text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list = split_result docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name, path_title_list, father_path_title_list, bbox_list, page_list) logger.info(f"存储到milvus的文本数据:{docs}") # lightrag_sign = file_json.get("lightrag", False) knowledge_id = file_json.get("knowledge_id", "") enabled_kn_rs = self.mysql_client.query_knowledge_by_ids([knowledge_id]) enabled_kn_gp_ids = set() for r in enabled_kn_rs: if r["knowledge_graph"]: enabled_kn_gp_ids.add(r["knowledge_id"]) if enabled_kn_gp_ids: # if lightrag_sign: lightrag_data = {} contents = [] for doc in docs: contents.append(doc.get("content", "")) # lightrag_data[knowledge_id] = {"content": contents, "ids": [docs[0].get("doc_id", "")]} # logger.info(f"contents:{lightrag_data}") logger.info(f"LightRAG_Ids:{enabled_kn_gp_ids}") rag_mgr = AsyncLightRAGManager() await rag_mgr.init_workspace(knowledge_id) await rag_mgr.insert(knowledge_id, {"content": contents, "ids": [docs[0].get("doc_id", "")]}) await rag_mgr.close() # ===== 检查点2:解析完成、入库前检查取消标志 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库") raise TaskCancelledException("任务已被用户取消") if flag == "upload": # 插入到milvus库中 insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id, tenant_id, user_id) if insert_slice_flag: # 插入到mysql的slice info数据库中 insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id) else: insert_img_flag = False parse_file_status = False if insert_img_flag: # insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs) # 批量插入 text_lists = [doc.get("content") for doc in docs] insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs, text_lists) # 插入mysql中的bm_media_replacement表中 else: # self.milvus_client._delete_by_doc_id(doc_id=doc_id) insert_milvus_flag = False # return resp parse_file_status = False if insert_milvus_flag: parse_file_status = True # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}") self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) raise TaskCancelledException("任务已被用户取消") else: self.mysql_client.delete_to_slice(doc_id=doc_id) # self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) # resp = {"code": 500, "message": insert_mysql_info} parse_file_status = False # return resp elif flag == "update": # 更新切片方式 # 先把库中的数据删除 self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) insert_milvus_start_time = time.time() insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id, tenant_id, user_id) # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists) insert_milvus_end_time = time.time() logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}") if insert_slice_flag: # 插入到mysql的slice info数据库中 insert_mysql_start_time = time.time() insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs) insert_mysql_end_time = time.time() logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}") else: # resp = {"code": 500, "message": insert_milvus_str} # return resp insert_milvus_flag = False parse_file_status = False if insert_milvus_flag: # resp = {"code": 200, "message": "切片修改成功"} parse_file_status = True # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 ===== if task_ctx and task_ctx.is_cancelled: logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}") self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) raise TaskCancelledException("任务已被用户取消") else: self.mysql_client.delete_to_slice(doc_id=doc_id) # self.milvus_client._delete_by_doc_id(doc_id=doc_id) # resp = {"code":500, "message": insert_mysql_info} parse_file_status = False # return resp if parse_file_status: success_doc.append(doc_id) else: if flag == "upload": for del_id in success_doc: self.milvus_client._delete_by_doc_id(doc_id=del_id) self.mysql_client.delete_image_url(doc_id=del_id) self.mysql_client.delete_to_slice(doc_id=del_id) if reporter: reporter.complete(success=False) # if task_id: # self.mysql_client.update_task_status_error(task_id) self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"), "tenantId": tenant_id}) # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}} # 任务完成:先更新数据库状态为完成(2),再发送100%进度 # if task_id and user_id: # self.mysql_client.update_task_status_complete(task_id, user_id) self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1", "tenantId": tenant_id}) # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"}) if reporter: reporter.complete(success=True) # await self.send_post_request("http://10.10.10.3:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists)}) # self.send_post_request_sync("http://10.10.10.2:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"}) return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}} except TaskCancelledException as e: # ===== 处理任务取消异常 ===== logger.info(f"任务被取消 [task_id={task_id}]: {e}") # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id}) # 1. 首先删除 bm_document 表中的记录 if task_id: del_success, del_info = self.mysql_client.delete_document(task_id) if del_success: logger.info(f"已删除 bm_document 记录: task_id={task_id}") else: logger.warning(f"删除 bm_document 记录失败: {del_info}") # 2. 清理之前已成功处理的文档数据(slice、image、milvus) for del_id in success_doc: logger.info(f"清理已入库的文档数据: doc_id={del_id}") self.milvus_client._delete_by_doc_id(doc_id=del_id) self.mysql_client.delete_image_url(doc_id=del_id) self.mysql_client.delete_to_slice(doc_id=del_id) if reporter: reporter.complete(success=False) return {"code": 499, "message": "任务已取消", "knowledge_id": self.knowledge_id, "doc_info": {}} except VLMRetryExhaustedError as e: self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id}) if reporter: reporter.complete(success=False) # if task_id: # self.mysql_client.update_task_status_error(task_id) return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}} except Exception as e: # 捕获所有异常,统一处理 logger.error(f"文件处理异常 [task_id={task_id}]: {e}", exc_info=True) # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")}) self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id}) if reporter: reporter.complete(success=False) # if task_id: # self.mysql_client.update_task_status_error(task_id) return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}} finally: # ===== 注销任务 ===== if task_id: task_registry.unregister(task_id)