#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Dots OCR PDF 解析器 使用 Dots OCR 解析 PDF 文档,并转换为 MinerU 兼容格式 """ import os import time import json import re import sys import io import aiohttp # import fitz # PyMuPDF from PIL import Image from utils.upload_file_to_oss import UploadMinio from config import minio_config, dots_ocr_config, model_name_vllm_url_dict from utils.get_logger import setup_logger # 添加 dots.ocr 到 Python 路径 dots_ocr_path = os.path.join(os.path.dirname(__file__), '../../dots.ocr-master') if os.path.exists(dots_ocr_path): sys.path.insert(0, dots_ocr_path) try: from dots_ocr.parser import DotsOCRParser except ImportError as e: raise ImportError(f"无法导入 DotsOCRParser: {e}") logger = setup_logger(__name__) class DotsPDFLoader: pass class DotsPDFLoaders: """Dots OCR PDF 解析器""" def __init__(self, file_json): self.file_json = file_json self.knowledge_id = file_json.get("knowledge_id") self.document_id = None # 将在 extract_text 中设置 self.set_table = file_json.get("set_table", "1") self.upload_minio = UploadMinio() # 初始化 Dots OCR Parser self.parser = DotsOCRParser( protocol=dots_ocr_config.get("protocol", "http"), ip=dots_ocr_config.get("ip", "localhost"), port=dots_ocr_config.get("port", 8000), model_name=dots_ocr_config.get("model_name", "model"), temperature=dots_ocr_config.get("temperature", 0.1), top_p=dots_ocr_config.get("top_p", 1.0), num_thread=dots_ocr_config.get("num_thread", 64), dpi=dots_ocr_config.get("dpi", 200), output_dir=dots_ocr_config.get("output_dir", "./tmp_file/dots_parsed") ) logger.info(f"Dots OCR Parser 初始化成功") async def extract_text(self, pdf_path, doc_id=None): """解析 PDF 文件""" # 设置 document_id self.document_id = doc_id logger.info(f"开始使用 Dots OCR 解析 PDF: {pdf_path}, doc_id: {doc_id}") try: pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S") results = self.parser.parse_file(pdf_path, prompt_mode="prompt_layout_all_en", save_dirname=pdf_file_name) if not results: raise ValueError("Dots OCR 解析返回空结果") logger.info(f"Dots OCR 解析完成,共 {len(results)} 页") content_list = [] all_md_contents = [] image_num = 1 for page_idx, result in enumerate(results): layout_info_path = result.get('layout_info_path') if not layout_info_path or not os.path.exists(layout_info_path): continue # 读取并验证 JSON with open(layout_info_path, 'r', encoding='utf-8') as f: raw_content = f.read() layout_data = await self._validate_and_fix_json(raw_content, page_idx) if not layout_data: logger.warning(f"第 {page_idx + 1} 页 JSON 验证失败,跳过") continue md_content_path = result.get('md_content_path') if md_content_path and os.path.exists(md_content_path): with open(md_content_path, 'r', encoding='utf-8') as f: all_md_contents.append(f.read()) page_content, image_num = self._process_layout_data( layout_data, pdf_path, page_idx, image_num, pdf_file_name ) content_list.extend(page_content) merged_md_path = self._merge_md_files(all_md_contents) logger.info(f"Dots OCR 解析完成,共 {len(content_list)} 个元素") return content_list, merged_md_path, pdf_file_name except Exception as e: logger.error(f"Dots OCR 解析失败: {e}", exc_info=True) raise async def _validate_and_fix_json(self, raw_content, page_idx): """验证JSON并在需要时使用LLM修复""" # 第一次尝试:直接解析 try: data = json.loads(raw_content) # 验证是否是列表且元素是字典 if isinstance(data, list): # 过滤掉非字典元素 valid_data = [item for item in data if isinstance(item, dict)] if len(valid_data) < len(data): logger.warning(f"第 {page_idx + 1} 页 JSON 包含 {len(data) - len(valid_data)} 个非字典元素,已过滤") return valid_data else: logger.warning(f"第 {page_idx + 1} 页 JSON 不是列表格式") return None except json.JSONDecodeError as e: logger.warning(f"第 {page_idx + 1} 页 JSON 解析失败: {e},尝试使用 LLM 修复") # 第二次尝试:使用 LLM 修复 try: fixed_json = await self._fix_json_with_llm(raw_content) if fixed_json: data = json.loads(fixed_json) if isinstance(data, list): valid_data = [item for item in data if isinstance(item, dict)] logger.info(f"第 {page_idx + 1} 页 JSON 经 LLM 修复成功") return valid_data except Exception as e: logger.error(f"第 {page_idx + 1} 页 LLM 修复失败: {e}") # 最终失败,跳过该页 logger.error(f"第 {page_idx + 1} 页 JSON 无法修复,跳过") return None async def _fix_json_with_llm(self, raw_content): """使用 LLM 修复非法 JSON""" try: # 获取 LLM 配置 model_name = "Qwen3-Coder-30B-loft" base_url = model_name_vllm_url_dict.get(model_name) if not base_url: logger.error("未找到 LLM 配置") return None prompt = f"""你是一个JSON修复专家。下面的JSON数据可能格式错误,请修复它并返回合法的JSON数组。 要求: 1. 返回的必须是一个JSON数组(列表) 2. 数组中的每个元素必须是一个字典对象 3. 如果某个元素不是字典,请移除它 4. 只返回修复后的JSON,不要有任何其他说明文字 原始数据: {raw_content[:2000]} 请直接返回修复后的合法JSON:""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY', 'sk-dummy')}" } payload = { "model": model_name, "messages": [{"role": "user", "content": prompt}], "temperature": 0.1, "max_tokens": 4096 } async with aiohttp.ClientSession() as session: async with session.post( f"{base_url}/chat/completions", json=payload, headers=headers, timeout=aiohttp.ClientTimeout(total=30) ) as resp: if resp.status == 200: result = await resp.json() fixed_json = result.get("choices", [{}])[0].get("message", {}).get("content", "") # 提取JSON部分(去除markdown标记) fixed_json = re.sub(r'^```json\s*', '', fixed_json, flags=re.MULTILINE) fixed_json = re.sub(r'\s*```$', '', fixed_json, flags=re.MULTILINE) return fixed_json.strip() else: logger.error(f"LLM API 调用失败: {resp.status}") return None except Exception as e: logger.error(f"LLM 修复 JSON 异常: {e}") return None def _extract_text_level(self, text): """从文本中提取 text_level(通过计算开头的 # 号数量)""" if not text: return None, text match = re.match(r'^(#{1,6})\s+(.+)', text) if match: level = len(match.group(1)) clean_text = match.group(2) return level, clean_text return None, text def _crop_and_upload_image(self, page, bbox, image_num, category="image", pdf_file_name=""): """裁剪图片并上传到 minio""" try: # 验证 bbox 有效性 if not bbox or len(bbox) != 4: logger.warning(f"无效的 bbox: {bbox}") return None, None, None x1, y1, x2, y2 = bbox # 检查坐标有效性 page_rect = page.rect if x1 < 0 or y1 < 0 or x2 > page_rect.width or y2 > page_rect.height: logger.warning(f"bbox 超出页面范围: {bbox}, 页面大小: {page_rect.width}x{page_rect.height}") # 裁剪到页面范围内 x1 = max(0, x1) y1 = max(0, y1) x2 = min(page_rect.width, x2) y2 = min(page_rect.height, y2) # 检查尺寸是否有效 if x2 <= x1 or y2 <= y1: logger.warning(f"无效的 bbox 尺寸: {bbox}") return None, None, None # 使用 PyMuPDF 裁剪区域 import fitz rect = fitz.Rect(x1, y1, x2, y2) pix = page.get_pixmap(clip=rect, dpi=150) # 降低 dpi # 转换为 PIL Image img_data = pix.tobytes("png") image = Image.open(io.BytesIO(img_data)) # 保存临时文件 temp_dir = f"./tmp_file/dots_parsed/{pdf_file_name}/images" os.makedirs(temp_dir, exist_ok=True) temp_img_path = f"{temp_dir}/{self.document_id}_{image_num}.jpg" image.save(temp_img_path, "JPEG") # 生成标识符(包含 doc_id) replace_text = f"【示意图序号_{self.document_id}_{image_num}】" # 上传到 minio minio_file_path = f"/pdf/{self.knowledge_id}/{self.document_id}/{replace_text}.jpg" self.upload_minio.upload_file(temp_img_path, minio_file_path) # 生成完整 URL minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") full_url = f"{minio_url}/{minio_bucket}{minio_file_path}" logger.info(f"图片上传成功: {replace_text}") return replace_text, full_url, temp_img_path except Exception as e: logger.error(f"裁剪或上传图片失败: {e}") return None, None, None def _process_layout_data(self, layout_data, pdf_path, page_idx, image_num, pdf_file_name): """处理单页布局数据,转换为 MinerU 兼容格式""" content = [] import fitz # 打开 PDF 页面 pdf_doc = fitz.open(pdf_path) page = pdf_doc[page_idx] for cell in layout_data: category = cell.get('category') text = cell.get('text', '') bbox = cell.get('bbox', []) if category == 'Title': # Title 固定为 level 1 _, clean_text = self._extract_text_level(text) content.append({ "type": "text", "text": clean_text or text, "text_level": 1, "page_idx": page_idx }) elif category == 'Section-header': # 从文本中提取 # 号数量 text_level, clean_text = self._extract_text_level(text) content.append({ "type": "text", "text": clean_text, "text_level": text_level, "page_idx": page_idx }) elif category == 'Picture': # 裁剪图片并上传 replace_text, full_url, img_path = self._crop_and_upload_image( page, bbox, image_num, "image", pdf_file_name ) if replace_text: content.append({ "type": "image", "img_path": f"images/{self.document_id}_{image_num}.jpg", "page_idx": page_idx, "replace_text": replace_text }) image_num += 1 elif category == 'Table': # 表格处理:根据 set_table 模式 if self.set_table == "0": # 图片模式:裁剪表格图片 replace_text, full_url, img_path = self._crop_and_upload_image( page, bbox, image_num, "table", pdf_file_name ) if replace_text: content.append({ "type": "table", "img_path": f"images/{self.document_id}_{image_num}.jpg", "table_body": replace_text, "table_caption": [], "page_idx": page_idx }) image_num += 1 else: # HTML 模式:直接使用 text content.append({ "type": "table", "table_body": text, "table_caption": [], "page_idx": page_idx }) elif category == 'List-item': # 列表项 list_items = text.split('\n') if text else [] content.append({ "type": "list", "list_items": list_items, "page_idx": page_idx }) elif category in ['Text', 'Caption', 'Footnote', 'Formula', 'Page-header', 'Page-footer']: # 普通文本处理 content.append({ "type": "text", "text": text, "text_level": None, "page_idx": page_idx }) pdf_doc.close() return content, image_num def _merge_md_files(self, md_contents): """合并多个 MD 文件为一个""" if not md_contents: logger.warning("没有 MD 文件可合并") return "" # 在页与页之间插入分隔符 merged_content = "\n\n\n\n".join(md_contents) # 保存合并后的文件 output_dir = dots_ocr_config.get("output_dir", "./tmp_file/dots_parsed") os.makedirs(output_dir, exist_ok=True) merged_path = os.path.join(output_dir, f"{self.document_id}_merged.md") with open(merged_path, 'w', encoding='utf-8') as f: f.write(merged_content) logger.info(f"MD 文件合并完成: {merged_path}") return merged_path