dots_pdf_load.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. Dots OCR PDF 解析器
  5. 使用 Dots OCR 解析 PDF 文档,并转换为 MinerU 兼容格式
  6. """
  7. import os
  8. import time
  9. import json
  10. import re
  11. import sys
  12. import io
  13. import aiohttp
  14. # import fitz # PyMuPDF
  15. from PIL import Image
  16. from utils.upload_file_to_oss import UploadMinio
  17. from config import minio_config, dots_ocr_config, model_name_vllm_url_dict
  18. from utils.get_logger import setup_logger
  19. # 添加 dots.ocr 到 Python 路径
  20. dots_ocr_path = os.path.join(os.path.dirname(__file__), '../../dots.ocr-master')
  21. if os.path.exists(dots_ocr_path):
  22. sys.path.insert(0, dots_ocr_path)
  23. try:
  24. from dots_ocr.parser import DotsOCRParser
  25. except ImportError as e:
  26. raise ImportError(f"无法导入 DotsOCRParser: {e}")
  27. logger = setup_logger(__name__)
  28. class DotsPDFLoader:
  29. pass
  30. class DotsPDFLoaders:
  31. """Dots OCR PDF 解析器"""
  32. def __init__(self, file_json):
  33. self.file_json = file_json
  34. self.knowledge_id = file_json.get("knowledge_id")
  35. self.document_id = None # 将在 extract_text 中设置
  36. self.set_table = file_json.get("set_table", "1")
  37. self.upload_minio = UploadMinio()
  38. # 初始化 Dots OCR Parser
  39. self.parser = DotsOCRParser(
  40. protocol=dots_ocr_config.get("protocol", "http"),
  41. ip=dots_ocr_config.get("ip", "localhost"),
  42. port=dots_ocr_config.get("port", 8000),
  43. model_name=dots_ocr_config.get("model_name", "model"),
  44. temperature=dots_ocr_config.get("temperature", 0.1),
  45. top_p=dots_ocr_config.get("top_p", 1.0),
  46. num_thread=dots_ocr_config.get("num_thread", 64),
  47. dpi=dots_ocr_config.get("dpi", 200),
  48. output_dir=dots_ocr_config.get("output_dir", "./tmp_file/dots_parsed")
  49. )
  50. logger.info(f"Dots OCR Parser 初始化成功")
  51. async def extract_text(self, pdf_path, doc_id=None):
  52. """解析 PDF 文件"""
  53. # 设置 document_id
  54. self.document_id = doc_id
  55. logger.info(f"开始使用 Dots OCR 解析 PDF: {pdf_path}, doc_id: {doc_id}")
  56. try:
  57. pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S")
  58. results = self.parser.parse_file(pdf_path, prompt_mode="prompt_layout_all_en", save_dirname=pdf_file_name)
  59. if not results:
  60. raise ValueError("Dots OCR 解析返回空结果")
  61. logger.info(f"Dots OCR 解析完成,共 {len(results)} 页")
  62. content_list = []
  63. all_md_contents = []
  64. image_num = 1
  65. for page_idx, result in enumerate(results):
  66. layout_info_path = result.get('layout_info_path')
  67. if not layout_info_path or not os.path.exists(layout_info_path):
  68. continue
  69. # 读取并验证 JSON
  70. with open(layout_info_path, 'r', encoding='utf-8') as f:
  71. raw_content = f.read()
  72. layout_data = await self._validate_and_fix_json(raw_content, page_idx)
  73. if not layout_data:
  74. logger.warning(f"第 {page_idx + 1} 页 JSON 验证失败,跳过")
  75. continue
  76. md_content_path = result.get('md_content_path')
  77. if md_content_path and os.path.exists(md_content_path):
  78. with open(md_content_path, 'r', encoding='utf-8') as f:
  79. all_md_contents.append(f.read())
  80. page_content, image_num = self._process_layout_data(
  81. layout_data, pdf_path, page_idx, image_num, pdf_file_name
  82. )
  83. content_list.extend(page_content)
  84. merged_md_path = self._merge_md_files(all_md_contents)
  85. logger.info(f"Dots OCR 解析完成,共 {len(content_list)} 个元素")
  86. return content_list, merged_md_path, pdf_file_name
  87. except Exception as e:
  88. logger.error(f"Dots OCR 解析失败: {e}", exc_info=True)
  89. raise
  90. async def _validate_and_fix_json(self, raw_content, page_idx):
  91. """验证JSON并在需要时使用LLM修复"""
  92. # 第一次尝试:直接解析
  93. try:
  94. data = json.loads(raw_content)
  95. # 验证是否是列表且元素是字典
  96. if isinstance(data, list):
  97. # 过滤掉非字典元素
  98. valid_data = [item for item in data if isinstance(item, dict)]
  99. if len(valid_data) < len(data):
  100. logger.warning(f"第 {page_idx + 1} 页 JSON 包含 {len(data) - len(valid_data)} 个非字典元素,已过滤")
  101. return valid_data
  102. else:
  103. logger.warning(f"第 {page_idx + 1} 页 JSON 不是列表格式")
  104. return None
  105. except json.JSONDecodeError as e:
  106. logger.warning(f"第 {page_idx + 1} 页 JSON 解析失败: {e},尝试使用 LLM 修复")
  107. # 第二次尝试:使用 LLM 修复
  108. try:
  109. fixed_json = await self._fix_json_with_llm(raw_content)
  110. if fixed_json:
  111. data = json.loads(fixed_json)
  112. if isinstance(data, list):
  113. valid_data = [item for item in data if isinstance(item, dict)]
  114. logger.info(f"第 {page_idx + 1} 页 JSON 经 LLM 修复成功")
  115. return valid_data
  116. except Exception as e:
  117. logger.error(f"第 {page_idx + 1} 页 LLM 修复失败: {e}")
  118. # 最终失败,跳过该页
  119. logger.error(f"第 {page_idx + 1} 页 JSON 无法修复,跳过")
  120. return None
  121. async def _fix_json_with_llm(self, raw_content):
  122. """使用 LLM 修复非法 JSON"""
  123. try:
  124. # 获取 LLM 配置
  125. model_name = "Qwen3-Coder-30B-loft"
  126. base_url = model_name_vllm_url_dict.get(model_name)
  127. if not base_url:
  128. logger.error("未找到 LLM 配置")
  129. return None
  130. prompt = f"""你是一个JSON修复专家。下面的JSON数据可能格式错误,请修复它并返回合法的JSON数组。
  131. 要求:
  132. 1. 返回的必须是一个JSON数组(列表)
  133. 2. 数组中的每个元素必须是一个字典对象
  134. 3. 如果某个元素不是字典,请移除它
  135. 4. 只返回修复后的JSON,不要有任何其他说明文字
  136. 原始数据:
  137. {raw_content[:2000]}
  138. 请直接返回修复后的合法JSON:"""
  139. headers = {
  140. "Content-Type": "application/json",
  141. "Authorization": f"Bearer {os.getenv('DEEPSEEK_API_KEY', 'sk-dummy')}"
  142. }
  143. payload = {
  144. "model": model_name,
  145. "messages": [{"role": "user", "content": prompt}],
  146. "temperature": 0.1,
  147. "max_tokens": 4096
  148. }
  149. async with aiohttp.ClientSession() as session:
  150. async with session.post(
  151. f"{base_url}/chat/completions",
  152. json=payload,
  153. headers=headers,
  154. timeout=aiohttp.ClientTimeout(total=30)
  155. ) as resp:
  156. if resp.status == 200:
  157. result = await resp.json()
  158. fixed_json = result.get("choices", [{}])[0].get("message", {}).get("content", "")
  159. # 提取JSON部分(去除markdown标记)
  160. fixed_json = re.sub(r'^```json\s*', '', fixed_json, flags=re.MULTILINE)
  161. fixed_json = re.sub(r'\s*```$', '', fixed_json, flags=re.MULTILINE)
  162. return fixed_json.strip()
  163. else:
  164. logger.error(f"LLM API 调用失败: {resp.status}")
  165. return None
  166. except Exception as e:
  167. logger.error(f"LLM 修复 JSON 异常: {e}")
  168. return None
  169. def _extract_text_level(self, text):
  170. """从文本中提取 text_level(通过计算开头的 # 号数量)"""
  171. if not text:
  172. return None, text
  173. match = re.match(r'^(#{1,6})\s+(.+)', text)
  174. if match:
  175. level = len(match.group(1))
  176. clean_text = match.group(2)
  177. return level, clean_text
  178. return None, text
  179. def _crop_and_upload_image(self, page, bbox, image_num, category="image", pdf_file_name=""):
  180. """裁剪图片并上传到 minio"""
  181. try:
  182. # 验证 bbox 有效性
  183. if not bbox or len(bbox) != 4:
  184. logger.warning(f"无效的 bbox: {bbox}")
  185. return None, None, None
  186. x1, y1, x2, y2 = bbox
  187. # 检查坐标有效性
  188. page_rect = page.rect
  189. if x1 < 0 or y1 < 0 or x2 > page_rect.width or y2 > page_rect.height:
  190. logger.warning(f"bbox 超出页面范围: {bbox}, 页面大小: {page_rect.width}x{page_rect.height}")
  191. # 裁剪到页面范围内
  192. x1 = max(0, x1)
  193. y1 = max(0, y1)
  194. x2 = min(page_rect.width, x2)
  195. y2 = min(page_rect.height, y2)
  196. # 检查尺寸是否有效
  197. if x2 <= x1 or y2 <= y1:
  198. logger.warning(f"无效的 bbox 尺寸: {bbox}")
  199. return None, None, None
  200. # 使用 PyMuPDF 裁剪区域
  201. import fitz
  202. rect = fitz.Rect(x1, y1, x2, y2)
  203. pix = page.get_pixmap(clip=rect, dpi=150) # 降低 dpi
  204. # 转换为 PIL Image
  205. img_data = pix.tobytes("png")
  206. image = Image.open(io.BytesIO(img_data))
  207. # 保存临时文件
  208. temp_dir = f"./tmp_file/dots_parsed/{pdf_file_name}/images"
  209. os.makedirs(temp_dir, exist_ok=True)
  210. temp_img_path = f"{temp_dir}/{self.document_id}_{image_num}.jpg"
  211. image.save(temp_img_path, "JPEG")
  212. # 生成标识符(包含 doc_id)
  213. replace_text = f"【示意图序号_{self.document_id}_{image_num}】"
  214. # 上传到 minio
  215. minio_file_path = f"/pdf/{self.knowledge_id}/{self.document_id}/{replace_text}.jpg"
  216. self.upload_minio.upload_file(temp_img_path, minio_file_path)
  217. # 生成完整 URL
  218. minio_url = minio_config.get("minio_url")
  219. minio_bucket = minio_config.get("minio_bucket")
  220. full_url = f"{minio_url}/{minio_bucket}{minio_file_path}"
  221. logger.info(f"图片上传成功: {replace_text}")
  222. return replace_text, full_url, temp_img_path
  223. except Exception as e:
  224. logger.error(f"裁剪或上传图片失败: {e}")
  225. return None, None, None
  226. def _process_layout_data(self, layout_data, pdf_path, page_idx, image_num, pdf_file_name):
  227. """处理单页布局数据,转换为 MinerU 兼容格式"""
  228. content = []
  229. import fitz
  230. # 打开 PDF 页面
  231. pdf_doc = fitz.open(pdf_path)
  232. page = pdf_doc[page_idx]
  233. for cell in layout_data:
  234. category = cell.get('category')
  235. text = cell.get('text', '')
  236. bbox = cell.get('bbox', [])
  237. if category == 'Title':
  238. # Title 固定为 level 1
  239. _, clean_text = self._extract_text_level(text)
  240. content.append({
  241. "type": "text",
  242. "text": clean_text or text,
  243. "text_level": 1,
  244. "page_idx": page_idx
  245. })
  246. elif category == 'Section-header':
  247. # 从文本中提取 # 号数量
  248. text_level, clean_text = self._extract_text_level(text)
  249. content.append({
  250. "type": "text",
  251. "text": clean_text,
  252. "text_level": text_level,
  253. "page_idx": page_idx
  254. })
  255. elif category == 'Picture':
  256. # 裁剪图片并上传
  257. replace_text, full_url, img_path = self._crop_and_upload_image(
  258. page, bbox, image_num, "image", pdf_file_name
  259. )
  260. if replace_text:
  261. content.append({
  262. "type": "image",
  263. "img_path": f"images/{self.document_id}_{image_num}.jpg",
  264. "page_idx": page_idx,
  265. "replace_text": replace_text
  266. })
  267. image_num += 1
  268. elif category == 'Table':
  269. # 表格处理:根据 set_table 模式
  270. if self.set_table == "0":
  271. # 图片模式:裁剪表格图片
  272. replace_text, full_url, img_path = self._crop_and_upload_image(
  273. page, bbox, image_num, "table", pdf_file_name
  274. )
  275. if replace_text:
  276. content.append({
  277. "type": "table",
  278. "img_path": f"images/{self.document_id}_{image_num}.jpg",
  279. "table_body": replace_text,
  280. "table_caption": [],
  281. "page_idx": page_idx
  282. })
  283. image_num += 1
  284. else:
  285. # HTML 模式:直接使用 text
  286. content.append({
  287. "type": "table",
  288. "table_body": text,
  289. "table_caption": [],
  290. "page_idx": page_idx
  291. })
  292. elif category == 'List-item':
  293. # 列表项
  294. list_items = text.split('\n') if text else []
  295. content.append({
  296. "type": "list",
  297. "list_items": list_items,
  298. "page_idx": page_idx
  299. })
  300. elif category in ['Text', 'Caption', 'Footnote', 'Formula', 'Page-header', 'Page-footer']:
  301. # 普通文本处理
  302. content.append({
  303. "type": "text",
  304. "text": text,
  305. "text_level": None,
  306. "page_idx": page_idx
  307. })
  308. pdf_doc.close()
  309. return content, image_num
  310. def _merge_md_files(self, md_contents):
  311. """合并多个 MD 文件为一个"""
  312. if not md_contents:
  313. logger.warning("没有 MD 文件可合并")
  314. return ""
  315. # 在页与页之间插入分隔符
  316. merged_content = "\n\n<page>\n\n".join(md_contents)
  317. # 保存合并后的文件
  318. output_dir = dots_ocr_config.get("output_dir", "./tmp_file/dots_parsed")
  319. os.makedirs(output_dir, exist_ok=True)
  320. merged_path = os.path.join(output_dir, f"{self.document_id}_merged.md")
  321. with open(merged_path, 'w', encoding='utf-8') as f:
  322. f.write(merged_content)
  323. logger.info(f"MD 文件合并完成: {merged_path}")
  324. return merged_path