| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- Dots OCR PDF 解析器
- 使用 Dots OCR 解析 PDF 文档,并转换为 MinerU 兼容格式
- """
- import os
- import time
- import json
- import re
- import sys
- import io
- import aiohttp
- # import fitz # PyMuPDF
- from PIL import Image
- from utils.upload_file_to_oss import UploadMinio
- from config import minio_config, dots_ocr_config, model_name_vllm_url_dict
- from utils.get_logger import setup_logger
- # 添加 dots.ocr 到 Python 路径
- dots_ocr_path = os.path.join(os.path.dirname(__file__), '../../dots.ocr-master')
- if os.path.exists(dots_ocr_path):
- sys.path.insert(0, dots_ocr_path)
- try:
- from dots_ocr.parser import DotsOCRParser
- except ImportError as e:
- raise ImportError(f"无法导入 DotsOCRParser: {e}")
- logger = setup_logger(__name__)
- class DotsPDFLoader:
- pass
- class DotsPDFLoaders:
- """Dots OCR PDF 解析器"""
-
- def __init__(self, file_json):
- self.file_json = file_json
- self.knowledge_id = file_json.get("knowledge_id")
- self.document_id = None # 将在 extract_text 中设置
- self.set_table = file_json.get("set_table", "1")
- self.upload_minio = UploadMinio()
-
- # 初始化 Dots OCR Parser
- self.parser = DotsOCRParser(
- protocol=dots_ocr_config.get("protocol", "http"),
- ip=dots_ocr_config.get("ip", "localhost"),
- port=dots_ocr_config.get("port", 8000),
- model_name=dots_ocr_config.get("model_name", "model"),
- temperature=dots_ocr_config.get("temperature", 0.1),
- top_p=dots_ocr_config.get("top_p", 1.0),
- num_thread=dots_ocr_config.get("num_thread", 64),
- dpi=dots_ocr_config.get("dpi", 200),
- output_dir=dots_ocr_config.get("output_dir", "./tmp_file/dots_parsed")
- )
-
- logger.info(f"Dots OCR Parser 初始化成功")
-
- async def extract_text(self, pdf_path, doc_id=None):
- """解析 PDF 文件"""
- # 设置 document_id
- self.document_id = doc_id
- logger.info(f"开始使用 Dots OCR 解析 PDF: {pdf_path}, doc_id: {doc_id}")
-
- try:
- pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S")
- results = self.parser.parse_file(pdf_path, prompt_mode="prompt_layout_all_en", save_dirname=pdf_file_name)
-
- if not results:
- raise ValueError("Dots OCR 解析返回空结果")
-
- logger.info(f"Dots OCR 解析完成,共 {len(results)} 页")
-
- content_list = []
- all_md_contents = []
- image_num = 1
-
- for page_idx, result in enumerate(results):
- layout_info_path = result.get('layout_info_path')
- if not layout_info_path or not os.path.exists(layout_info_path):
- continue
-
- # 读取并验证 JSON
- with open(layout_info_path, 'r', encoding='utf-8') as f:
- raw_content = f.read()
-
- layout_data = await self._validate_and_fix_json(raw_content, page_idx)
- if not layout_data:
- logger.warning(f"第 {page_idx + 1} 页 JSON 验证失败,跳过")
- continue
-
- md_content_path = result.get('md_content_path')
- if md_content_path and os.path.exists(md_content_path):
- with open(md_content_path, 'r', encoding='utf-8') as f:
- all_md_contents.append(f.read())
-
- page_content, image_num = self._process_layout_data(
- layout_data, pdf_path, page_idx, image_num, pdf_file_name
- )
- content_list.extend(page_content)
-
- merged_md_path = self._merge_md_files(all_md_contents)
-
- logger.info(f"Dots OCR 解析完成,共 {len(content_list)} 个元素")
- return content_list, merged_md_path, pdf_file_name
-
- except Exception as e:
- logger.error(f"Dots OCR 解析失败: {e}", exc_info=True)
- raise
-
- async def _validate_and_fix_json(self, raw_content, page_idx):
- """验证JSON并在需要时使用LLM修复"""
- # 第一次尝试:直接解析
- try:
- data = json.loads(raw_content)
- # 验证是否是列表且元素是字典
- if isinstance(data, list):
- # 过滤掉非字典元素
- valid_data = [item for item in data if isinstance(item, dict)]
- if len(valid_data) < len(data):
- logger.warning(f"第 {page_idx + 1} 页 JSON 包含 {len(data) - len(valid_data)} 个非字典元素,已过滤")
- return valid_data
- else:
- logger.warning(f"第 {page_idx + 1} 页 JSON 不是列表格式")
- return None
- except json.JSONDecodeError as e:
- logger.warning(f"第 {page_idx + 1} 页 JSON 解析失败: {e},尝试使用 LLM 修复")
-
- # 第二次尝试:使用 LLM 修复
- try:
- fixed_json = await self._fix_json_with_llm(raw_content)
- if fixed_json:
- data = json.loads(fixed_json)
- if isinstance(data, list):
- valid_data = [item for item in data if isinstance(item, dict)]
- logger.info(f"第 {page_idx + 1} 页 JSON 经 LLM 修复成功")
- return valid_data
- except Exception as e:
- logger.error(f"第 {page_idx + 1} 页 LLM 修复失败: {e}")
-
- # 最终失败,跳过该页
- logger.error(f"第 {page_idx + 1} 页 JSON 无法修复,跳过")
- return None
-
- async def _fix_json_with_llm(self, raw_content):
- """使用 LLM 修复非法 JSON"""
- try:
- # 获取 LLM 配置
- model_name = "Qwen3-Coder-30B-loft"
- base_url = model_name_vllm_url_dict.get(model_name)
-
- if not base_url:
- logger.error("未找到 LLM 配置")
- return None
-
- prompt = f"""你是一个JSON修复专家。下面的JSON数据可能格式错误,请修复它并返回合法的JSON数组。
- 要求:
- 1. 返回的必须是一个JSON数组(列表)
- 2. 数组中的每个元素必须是一个字典对象
- 3. 如果某个元素不是字典,请移除它
- 4. 只返回修复后的JSON,不要有任何其他说明文字
- 原始数据:
- {raw_content[:2000]}
- 请直接返回修复后的合法JSON:"""
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY', 'sk-dummy')}"
- }
-
- payload = {
- "model": model_name,
- "messages": [{"role": "user", "content": prompt}],
- "temperature": 0.1,
- "max_tokens": 4096
- }
-
- async with aiohttp.ClientSession() as session:
- async with session.post(
- f"{base_url}/chat/completions",
- json=payload,
- headers=headers,
- timeout=aiohttp.ClientTimeout(total=30)
- ) as resp:
- if resp.status == 200:
- result = await resp.json()
- fixed_json = result.get("choices", [{}])[0].get("message", {}).get("content", "")
- # 提取JSON部分(去除markdown标记)
- fixed_json = re.sub(r'^```json\s*', '', fixed_json, flags=re.MULTILINE)
- fixed_json = re.sub(r'\s*```$', '', fixed_json, flags=re.MULTILINE)
- return fixed_json.strip()
- else:
- logger.error(f"LLM API 调用失败: {resp.status}")
- return None
- except Exception as e:
- logger.error(f"LLM 修复 JSON 异常: {e}")
- return None
-
- def _extract_text_level(self, text):
- """从文本中提取 text_level(通过计算开头的 # 号数量)"""
- if not text:
- return None, text
-
- match = re.match(r'^(#{1,6})\s+(.+)', text)
- if match:
- level = len(match.group(1))
- clean_text = match.group(2)
- return level, clean_text
-
- return None, text
-
- def _crop_and_upload_image(self, page, bbox, image_num, category="image", pdf_file_name=""):
- """裁剪图片并上传到 minio"""
- try:
- # 验证 bbox 有效性
- if not bbox or len(bbox) != 4:
- logger.warning(f"无效的 bbox: {bbox}")
- return None, None, None
-
- x1, y1, x2, y2 = bbox
-
- # 检查坐标有效性
- page_rect = page.rect
- if x1 < 0 or y1 < 0 or x2 > page_rect.width or y2 > page_rect.height:
- logger.warning(f"bbox 超出页面范围: {bbox}, 页面大小: {page_rect.width}x{page_rect.height}")
- # 裁剪到页面范围内
- x1 = max(0, x1)
- y1 = max(0, y1)
- x2 = min(page_rect.width, x2)
- y2 = min(page_rect.height, y2)
-
- # 检查尺寸是否有效
- if x2 <= x1 or y2 <= y1:
- logger.warning(f"无效的 bbox 尺寸: {bbox}")
- return None, None, None
-
- # 使用 PyMuPDF 裁剪区域
- import fitz
- rect = fitz.Rect(x1, y1, x2, y2)
- pix = page.get_pixmap(clip=rect, dpi=150) # 降低 dpi
-
- # 转换为 PIL Image
- img_data = pix.tobytes("png")
- image = Image.open(io.BytesIO(img_data))
-
- # 保存临时文件
- temp_dir = f"./tmp_file/dots_parsed/{pdf_file_name}/images"
- os.makedirs(temp_dir, exist_ok=True)
- temp_img_path = f"{temp_dir}/{self.document_id}_{image_num}.jpg"
- image.save(temp_img_path, "JPEG")
-
- # 生成标识符(包含 doc_id)
- replace_text = f"【示意图序号_{self.document_id}_{image_num}】"
-
- # 上传到 minio
- minio_file_path = f"/pdf/{self.knowledge_id}/{self.document_id}/{replace_text}.jpg"
- self.upload_minio.upload_file(temp_img_path, minio_file_path)
-
- # 生成完整 URL
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- full_url = f"{minio_url}/{minio_bucket}{minio_file_path}"
-
- logger.info(f"图片上传成功: {replace_text}")
-
- return replace_text, full_url, temp_img_path
-
- except Exception as e:
- logger.error(f"裁剪或上传图片失败: {e}")
- return None, None, None
-
- def _process_layout_data(self, layout_data, pdf_path, page_idx, image_num, pdf_file_name):
- """处理单页布局数据,转换为 MinerU 兼容格式"""
- content = []
-
- import fitz
- # 打开 PDF 页面
- pdf_doc = fitz.open(pdf_path)
- page = pdf_doc[page_idx]
-
- for cell in layout_data:
- category = cell.get('category')
- text = cell.get('text', '')
- bbox = cell.get('bbox', [])
-
- if category == 'Title':
- # Title 固定为 level 1
- _, clean_text = self._extract_text_level(text)
- content.append({
- "type": "text",
- "text": clean_text or text,
- "text_level": 1,
- "page_idx": page_idx
- })
-
- elif category == 'Section-header':
- # 从文本中提取 # 号数量
- text_level, clean_text = self._extract_text_level(text)
- content.append({
- "type": "text",
- "text": clean_text,
- "text_level": text_level,
- "page_idx": page_idx
- })
-
- elif category == 'Picture':
- # 裁剪图片并上传
- replace_text, full_url, img_path = self._crop_and_upload_image(
- page, bbox, image_num, "image", pdf_file_name
- )
-
- if replace_text:
- content.append({
- "type": "image",
- "img_path": f"images/{self.document_id}_{image_num}.jpg",
- "page_idx": page_idx,
- "replace_text": replace_text
- })
- image_num += 1
-
- elif category == 'Table':
- # 表格处理:根据 set_table 模式
- if self.set_table == "0":
- # 图片模式:裁剪表格图片
- replace_text, full_url, img_path = self._crop_and_upload_image(
- page, bbox, image_num, "table", pdf_file_name
- )
-
- if replace_text:
- content.append({
- "type": "table",
- "img_path": f"images/{self.document_id}_{image_num}.jpg",
- "table_body": replace_text,
- "table_caption": [],
- "page_idx": page_idx
- })
- image_num += 1
- else:
- # HTML 模式:直接使用 text
- content.append({
- "type": "table",
- "table_body": text,
- "table_caption": [],
- "page_idx": page_idx
- })
-
- elif category == 'List-item':
- # 列表项
- list_items = text.split('\n') if text else []
- content.append({
- "type": "list",
- "list_items": list_items,
- "page_idx": page_idx
- })
-
- elif category in ['Text', 'Caption', 'Footnote', 'Formula', 'Page-header', 'Page-footer']:
- # 普通文本处理
- content.append({
- "type": "text",
- "text": text,
- "text_level": None,
- "page_idx": page_idx
- })
-
- pdf_doc.close()
- return content, image_num
-
- def _merge_md_files(self, md_contents):
- """合并多个 MD 文件为一个"""
- if not md_contents:
- logger.warning("没有 MD 文件可合并")
- return ""
-
- # 在页与页之间插入分隔符
- merged_content = "\n\n<page>\n\n".join(md_contents)
-
- # 保存合并后的文件
- output_dir = dots_ocr_config.get("output_dir", "./tmp_file/dots_parsed")
- os.makedirs(output_dir, exist_ok=True)
- merged_path = os.path.join(output_dir, f"{self.document_id}_merged.md")
-
- with open(merged_path, 'w', encoding='utf-8') as f:
- f.write(merged_content)
-
- logger.info(f"MD 文件合并完成: {merged_path}")
- return merged_path
|