#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import time import aiohttp from openai import AsyncOpenAI import json_repair from utils.get_logger import setup_logger from config import paddleocr_config import asyncio logger = setup_logger(__name__) class PaddleOCRLoader: """PaddleOCR-VL PDF 解析器""" def __init__(self, file_json): self.file_json = file_json self.knowledge_id = file_json.get("knowledge_id") self.document_id = None self.service_url = paddleocr_config.get("service_url", "http://127.0.0.1:8119") self.vl_server_url = paddleocr_config.get("vl_server_url", "http://127.0.0.1:8118/v1") self.output_dir = paddleocr_config.get("output_dir", "./tmp_file/paddleocr_parsed") self.timeout = paddleocr_config.get("timeout", 600) self.title_opt_config = paddleocr_config.get("title_optimization", {}) logger.info(f"PaddleOCR Loader 初始化成功, service_url={self.service_url}") async def extract_text(self, pdf_path, doc_id=None): """ 解析 PDF 文件 返回: (content_list, md_path, pdf_file_name) - 与其他解析器保持一致的返回格式 """ self.document_id = doc_id logger.info(f"开始使用 PaddleOCR-VL 解析 PDF: {pdf_path}, doc_id: {doc_id}") try: # 调用 PaddleOCR-VL 服务 json_data, md_path, pdf_file_name = await self._call_service(pdf_path) if not json_data: raise ValueError("PaddleOCR-VL 服务返回空结果") # 转换为统一格式 content_list = self._convert_to_content_list(json_data, pdf_file_name) # 标题优化(可选,默认不启用) content_list = await self._optimize_titles(content_list) logger.info(f"PaddleOCR-VL 解析完成,共 {len(content_list)} 个元素") return content_list, md_path, pdf_file_name except Exception as e: logger.error(f"PaddleOCR-VL 解析失败: {e}", exc_info=True) raise async def _call_service(self, pdf_path): """调用 PaddleOCR-VL 服务""" url = f"{self.service_url}/parse" payload = { "pdf_path": pdf_path, "output_dir": self.output_dir, "vl_rec_server_url": self.vl_server_url } MAX_RETRY = 5 RETRY_INTERVAL = 10 last_exception = None for attempt in range(1, MAX_RETRY + 1): try: # connector = aiohttp.TCPConnector(force_close=True) # 禁用 HTTP keep-alive # connector = aiohttp.TCPConnector(limit=50) # 将并发数量降低 async with aiohttp.ClientSession() as session: async with session.post( url, json=payload, timeout=aiohttp.ClientTimeout(total=self.timeout) ) as resp: if resp.status != 200: raise ValueError(f"服务返回错误状态: {resp.status}") result = await resp.json() if result.get("code") != 200: raise ValueError(f"服务返回错误: {result.get('message')}") data = result.get("data", {}) return ( data.get("json_data"), data.get("md_path"), data.get("pdf_file_name") ) except aiohttp.ClientError as e: last_exception = e logger.warning(f"PaddleOCR 服务调用失败 (尝试 {attempt}/{MAX_RETRY}): {e}") if attempt < MAX_RETRY: logger.info(f"等待 {RETRY_INTERVAL}s 后重试...") await asyncio.sleep(RETRY_INTERVAL) # 所有重试失败后才抛出异常 raise RuntimeError( f"PaddleOCR predict 连续失败 {MAX_RETRY} 次,最后错误: {last_exception}" ) def _convert_to_content_list(self, json_data, pdf_file_name): """ 将 PaddleOCR-VL JSON 数据转换为统一的 content_list 格式 block_label 类型映射: - text: 普通文本 - paragraph_title: 标题(根据#号确定级别) - header: 页眉(排除) - figure_title: 表格/图片标题 - table: 表格 - number: 页码(排除) - content: 文本块 -> text - doc_title: 标题(默认2级) - image: 图片 - vision_footnote: 页脚(排除) """ content_list = [] pages = json_data.get("pages", []) for page_data in pages: res = page_data.get("res", {}) # 处理嵌套的 res 结构 if "res" in res: res = res.get("res", {}) page_idx = res.get("page_index", 0) parsing_res_list = res.get("parsing_res_list", []) # 处理 figure_title + table 组合 i = 0 while i < len(parsing_res_list): block = parsing_res_list[i] block_label = block.get("block_label", "") block_content = block.get("block_content", "") block_bbox = block.get("block_bbox", []) # 排除页眉、页脚、页码 if block_label in ("header", "number", "vision_footnote"): i += 1 continue # figure_title 处理 if block_label == "figure_title": # 检查下一个是否是 table if i + 1 < len(parsing_res_list): next_block = parsing_res_list[i + 1] if next_block.get("block_label") == "table": # 将 figure_title 作为 table_caption table_content = self._process_table( next_block, page_idx, table_caption=block_content ) if table_content: content_list.append(table_content) i += 2 # 跳过当前和下一个 continue # 否则当作 text 处理 content_list.append({ "type": "text", "text": block_content, "text_level": None, "page_idx": page_idx, "bbox": block_bbox }) i += 1 continue # 处理各种类型 if block_label == "text" or block_label == "content": content_list.append({ "type": "text", "text": block_content, "text_level": None, "page_idx": page_idx, "bbox": block_bbox }) elif block_label == "paragraph_title": text_level = self._extract_title_level(block_content) clean_text = self._remove_title_prefix(block_content) content_list.append({ "type": "text", "text": clean_text, "text_level": text_level, "page_idx": page_idx, "bbox": block_bbox }) elif block_label == "doc_title": # doc_title 默认为 2 级标题 content_list.append({ "type": "text", "text": block_content, "text_level": 2, "page_idx": page_idx, "bbox": block_bbox }) elif block_label == "table": table_content = self._process_table(block, page_idx) if table_content: content_list.append(table_content) elif block_label == "image": image_content = self._process_image(block, page_idx, pdf_file_name) if image_content: content_list.append(image_content) i += 1 return content_list def _extract_title_level(self, text): """从文本开头的 # 号提取标题级别""" if not text: return None match = re.match(r'^(#{1,6})\s+', text) if match: return len(match.group(1)) return None def _remove_title_prefix(self, text): """移除文本开头的 # 号前缀""" if not text: return text return re.sub(r'^#{1,6}\s+', '', text) def _process_table(self, block, page_idx, table_caption=None): """处理表格块""" block_content = block.get("block_content", "") block_bbox = block.get("block_bbox", []) img_path = block.get("img_path", "") return { "type": "table", "table_body": block_content, "table_caption": [table_caption] if table_caption else [], "page_idx": page_idx, "bbox": block_bbox, "img_path": img_path } def _process_image(self, block, page_idx, pdf_file_name): """ 处理图片块 图片内容格式:
