# import fitz # PyMuPDF import os from PIL import Image import io # import pdfplumber from langchain_community.document_loaders.unstructured import UnstructuredFileLoader from utils.upload_file_to_oss import UploadMinio from config import minio_config # import os # from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader # from magic_pdf.data.dataset import PymuDocDataset # from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze # from magic_pdf.config.enums import SupportedPdfParseMethod class PDFLoader(): def __init__(self, file_json): pass # class PDFLoader(UnstructuredFileLoader): # def __init__(self, file_json): # self.base_path = "./tmp_file" # self.file_json = file_json # self.flag = self.file_json.get("flag") # 后续优化 # self.file_path_process() # if self.flag == "update": # self.flag_image_info_dict = {} # if not self.output_pdf_path: # self.upload_minio = UploadMinio() # self.image_positions_dict = self.get_image_positions() # self.images_path_dict, self.flag_image_info_dict = self.save_images() # self.replace_images_with_text() # else: # self.upload_minio = UploadMinio() # self.image_positions_dict = self.get_image_positions() # self.images_path_dict, self.flag_image_info_dict = self.save_images() # self.replace_images_with_text() # def file_path_process(self): # self.knowledge_id = self.file_json.get("knowledge_id") # self.document_id = self.file_json.get("document_id") # know_path = self.base_path + f"/{self.knowledge_id}" # self.file_name = self.file_json.get("name") # self.output_pdf_name = "output_" + self.file_name # self.input_pdf_path = os.path.join(know_path, self.file_name) # self.output_pdf_path = os.path.join(know_path, self.output_pdf_name) # self.file_name_list = self.file_name.split(".") # self.image_dir = ".".join(self.file_name_list[:-1]) # self.save_image_path = know_path + "/" + self.document_id # def get_image_positions(self): # images_dict = {} # with pdfplumber.open(self.input_pdf_path) as pdf: # page_num = 0 # for page in pdf.pages: # images_dict[page_num] = {} # image_num = 0 # img_list = {} # img_list[image_num] = {} # for image in page.images: # #print("Image position:", image) # img_list[image_num] = {"x0":image['x0'],"y0":image['y0']} # image_num += 1 # img_list[image_num] = {} # images_dict[page_num]=img_list # page_num += 1 # # print(f"images list info: {images_dict}") # return images_dict # def save_images(self): # # 创建图片保存目录 # os.makedirs(self.save_image_path, exist_ok=True) # # 使用PyMuPDF打开PDF文件 # doc = fitz.open(self.input_pdf_path) # all_images_dict = {} # pdf_img_index = 1 # flag_img_info = {} # for page_num in range(len(doc)): # page = doc.load_page(page_num) # images = page.get_images(full=True) # page_image_dict = {} # for img_index, img in enumerate(images): # xref = img[0] # 图片的XRef编号 # base_image = doc.extract_image(xref) # image_bytes = base_image["image"] # # 将字节数据转换为PIL图像 # pil_image = Image.open(io.BytesIO(image_bytes)) # # 生成唯一文件名 # # img_name = f"page{page_num+1}_img{img_index+1}.{base_image['ext']}" # img_name = f"{self.document_id}_{pdf_img_index}.{base_image['ext']}" # img_path = os.path.join(self.save_image_path, img_name) # # page_image_dict[img_index] = img_path # # 保存成image_name # image_str = self.knowledge_id + "/" + self.document_id + "/" + img_name # replace_text = f"【示意图序号_{self.document_id}_{pdf_img_index}】" # page_image_dict[img_index] = replace_text # 替换pdf中的文字 # # 保存图片 # pil_image.save(img_path) # # 保存的图片上传的oss # self.upload_minio.upload_file(img_path, f"/pdf/{image_str}") # minio_url = minio_config.get("minio_url") # minio_bucket = minio_config.get("minio_bucket") # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}//pdf/{image_str}" # pdf_img_index += 1 # all_images_dict[page_num] = page_image_dict # # 关闭原始文档 # doc.close() # return all_images_dict, flag_img_info # def replace_images_with_text(self): # # 打开原始PDF # doc = fitz.open(self.input_pdf_path) # # 设置字体大小 # font_size = 12 # font_name = "SimSun" # font_path = r"./utils/simsun.ttc" # 当前系统中的字体路径 # # 遍历每一页 # for page_num in range(len(doc)): # page = doc.load_page(page_num) # 获取页面 # images = page.get_images(full=True) # 获取页面中的所有图片 # page_height = page.rect.height # # print("page_height: ", page_height) # for img_index, img in enumerate(images): # xref = img[0] # 图片的XRef编号 # base_image = doc.extract_image(xref) # 提取图片 # bbox = fitz.Rect(img[1:5]) # # print("bbox: ", bbox) # # 删除图片 - 使用安全方式,避免内部方法可能导致的核心转储 # try: # # 优先使用官方 API # page.delete_image(xref) # except AttributeError: # # 如果官方 API 不可用,使用 redact 方式遮盖图片区域 # try: # img_rect = page.get_image_rects(xref) # if img_rect: # page.add_redact_annot(img_rect[0]) # page.apply_redactions() # except Exception as e: # print(f"删除图片失败 (page={page_num}, img={img_index}): {e}") # # 准备替换文本 # # replacement_text = f"page{page_num+1}_img{img_index+1}.png" # replacement_text = self.images_path_dict[page_num][img_index] # print(f"替换的文本:{replacement_text}") # # 在删除的图片位置插入文本 # try: # x0 = self.image_positions_dict[page_num][img_index]['x0'] # y0 = page_height - self.image_positions_dict[page_num][img_index]['y0'] # # 插入文本坐标 # print(f"x0: {x0}, y0: {y0}") # # 使用fitz中自带的字体 china-s 效果显示不友好,插入的字体一行铺满 fontname="china-s", # page.insert_text((x0,y0), replacement_text,fontname=font_name, fontfile=font_path, fontsize=font_size, color=(0, 0, 0)) # #page.insert_text((x,y+y1), replacement_text, fontsize=font_size, color=(0, 0, 0)) # except Exception as e: # print(f"Error inserting text for image on page {page_num + 1}: {e}") # # 保存修改后的PDF # doc.save(self.output_pdf_path) # doc.close() # print(f"Processed PDF saved to: {self.output_pdf_path}") # def file2text(self): # pdf_text = "" # with fitz.open(self.output_pdf_path) as doc: # for i, page in enumerate(doc): # text = page.get_text("text").strip() # lines = text.split("\n") # if len(lines) > 0 and lines[-1].strip().isdigit(): # text = "\n".join(lines[:-1]) # 移除最后一行 # if len(lines) > 0 and lines[0].strip().isdigit(): # text = "\n".join(lines[1:]) # 移除第一行 # # print(f"page text:{text.strip()}") # # pdf_text += text + "\n" # pdf_text += text # # print(pdf_text) # return pdf_text, self.flag_image_info_dict # class MinerUParsePdf(): # # def __init__(self, knowledge_id, minio_client): # # self.knowledge_id = knowledge_id # # self.minio_client = minio_client # async def extract_text(self, file_path): # # pdf_file_name = file_path # # prepare env # # local_image_dir = f"./tmp_file/{self.knowledge_id}/{doc_id}" # local_image_dir = f"./tmp_file/images" # image_dir = str(os.path.basename(local_image_dir)) # os.makedirs(local_image_dir, exist_ok=True) # image_writer = FileBasedDataWriter(local_image_dir) # # read bytes # reader1 = FileBasedDataReader("") # pdf_bytes = reader1.read(file_path) # read the pdf content # # proc # ## Create Dataset Instance # ds = PymuDocDataset(pdf_bytes) # infer_result = ds.apply(doc_analyze, ocr=True) # ## pipeline # pipe_result = infer_result.pipe_ocr_mode(image_writer) # content_list_content = pipe_result.get_content_list(image_dir) # # image_num = 1 # # text = "" # # flag_img_info = {} # # current_page = "" # # for i,content_dict in enumerate(content_list_content): # # page_index = content_dict.get("page_idx") # # if i == 0: # # current_page = page_index # # elif page_index != current_page: # # text += "" # # current_page = page_index # # else: # # pass # # if content_dict.get("type") == "text": # # content_text = content_dict.get("text") # # text_level = content_dict.get("text_level") # # if text_level: # # text += "#" * text_level + content_text # # else: # # text += content_text # # elif content_dict.get("type") in ("image", "table"): # # image_path = content_dict.get("img_path") # # image_name = image_path.split("/")[1] # # save_image_path = local_image_dir + f"/{image_name}" # # replace_text = f"【示意图序号_{doc_id}_{image_num}】" # # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" # # self.minio_client.upload_file(save_image_path, minio_file_path) # # minio_url = minio_config.get("minio_url") # # minio_bucket = minio_config.get("minio_bucket") # # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" # # text += replace_text # # image_num += 1 # # else: # # ... # return content_list_content # import asyncio # import os # from mineru.cli.common import read_fn, prepare_env # from mineru.data.data_reader_writer import FileBasedDataWriter # from mineru.backend.vlm.vlm_analyze import aio_doc_analyze, ModelSingleton # from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make # from mineru.utils.enum_class import MakeMode # class MinerUParsePdf: # """ # MinerU 异步文档分析类 # 封装了模型加载、PDF读取、图片抽取、内容生成等完整流程。 # """ # def __init__(self, gpu_id: int = 0, output_dir: str = "./tmp_file", backend: str = "vllm-async-engine"): # """ # 初始化 MinerU 分析器。 # Args: # gpu_id (int): 指定使用的 GPU ID。 # output_dir (str): 临时输出目录,用于保存中间图片。 # backend (str): 使用的推理后端。 # """ # # GPU 环境变量(必须在模型加载前设置) # os.environ["CUDA_VISIBLE_DEVICES"] = "2,3" # os.environ["MINERU_MODEL_SOURCE"] = "modelscope" # self.output_dir = output_dir # self.backend = backend # self.predictor = None # 懒加载 # self.model_singleton = ModelSingleton() # async def _ensure_model_loaded(self, gpu_memory_utilization: float = 0.5): # """ # 确保模型加载,仅在首次调用时加载一次。 # """ # if self.predictor is None: # print("正在加载 MinerU 模型,请稍候...") # self.predictor = self.model_singleton.get_model( # backend=self.backend, # model_path=None, # server_url=None, # gpu_memory_utilization=gpu_memory_utilization, # ) # print("模型加载完成。") # async def extract_text(self, pdf_path: str): # """ # 分析指定 PDF 文件,返回带图像信息的内容结构。 # Args: # pdf_path (str): 要分析的 PDF 文件路径。 # Returns: # list: 内容段落列表,包含图文信息。 # """ # await self._ensure_model_loaded() # pdf_bytes = read_fn(pdf_path) # local_image_dir, _ = prepare_env(self.output_dir, "document", "vlm") # image_writer = FileBasedDataWriter(local_image_dir) # print(f"正在解析文档: {pdf_path}") # middle_json, infer_result = await aio_doc_analyze( # pdf_bytes, # image_writer=image_writer, # predictor=self.predictor, # backend=self.backend, # ) # pdf_info = middle_json.get("pdf_info", {}) # content_list = union_make(pdf_info, MakeMode.CONTENT_LIST, img_buket_path="images") # print("文档解析完成。") # return content_list # # ====== 使用示例 ====== # if __name__ == "__main__": # async def main(): # analyzer = MinerUAsyncAnalyzer(gpu_id=2, output_dir="./temp_output") # result = await analyzer.analyze_pdf("ceshi.pdf") # print(result) # asyncio.run(main()) # import asyncio # import os # import time # from mineru.cli.common import read_fn, prepare_env, _process_output # from mineru.data.data_reader_writer import FileBasedDataWriter # from mineru.backend.vlm.vlm_analyze import aio_doc_analyze, ModelSingleton # from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make # from mineru.utils.enum_class import MakeMode # import mineru # import threading # import logging # logging.basicConfig(level=logging.INFO) # # 全局服务器URL列表,用于负载均衡 # SERVER_URL_LIST = [ # # "http://127.0.0.1:9999", # # "http://127.0.0.1:9998", # # "http://127.0.0.1:9997", # "http://127.0.0.1:9990", # "http://127.0.0.1:9991", # "http://127.0.0.1:9992", # "http://127.0.0.1:9993" # ] # # 全局轮询索引和锁 # _server_index = 0 # _server_index_lock = threading.Lock() # def get_next_server_url(): # """ # 使用轮询机制获取下一个服务器URL # Returns: # str: 下一个可用的服务器URL # """ # global _server_index # with _server_index_lock: # url = SERVER_URL_LIST[_server_index % len(SERVER_URL_LIST)] # _server_index += 1 # logging.info(f"负载均衡: 选择服务器 {url} (索引: {_server_index - 1})") # return url # class MinerUParsePdf: # """ # MinerU 异步文档分析类 # 封装了模型加载、PDF读取、图片抽取、内容生成等完整流程。 # """ # def __init__(self, output_dir: str = "./tmp_file", server_url: str = None): # """ # 初始化 MinerU 分析器。 # Args: # output_dir (str): 临时输出目录,用于保存中间图片。 # server_url (str): vLLM 服务器地址。如果为 None,则使用负载均衡自动选择。 # """ # # 客户端不需要 GPU 环境变量 # os.environ["MINERU_MODEL_SOURCE"] = "modelscope" # self.output_dir = output_dir # self.backend = "http-client" # 使用 HTTP 客户端后端 # # 如果未指定 server_url,则使用负载均衡机制自动选择 # self.server_url = server_url if server_url is not None else get_next_server_url() # self.predictor = None # 懒加载 # self.model_singleton = ModelSingleton() # logging.info(f"#### mineru path: {os.path.dirname(mineru.__file__)}") # logging.info(f"#### 使用服务器URL: {self.server_url}") # # print("#### mineru path:", os.path.dirname(mineru.__file__)) # async def _ensure_model_loaded(self): # """ # 确保 HTTP 客户端连接建立,仅在首次调用时初始化一次。 # """ # if self.predictor is None: # print(f"正在连接 vLLM 服务器: {self.server_url}...") # self.predictor = self.model_singleton.get_model( # backend=self.backend, # model_path=None, # server_url=self.server_url, # ) # print("服务器连接成功。") # async def extract_text(self, pdf_path: str): # """ # 分析指定 PDF 文件,返回带图像信息的内容结构。 # Args: # pdf_path (str): 要分析的 PDF 文件路径。 # Returns: # list: 内容段落列表,包含图文信息。 # """ # await self._ensure_model_loaded() # pdf_bytes = read_fn(pdf_path) # pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S") # local_image_dir, local_md_dir = prepare_env(self.output_dir, pdf_file_name, "vlm") # # print("9999"*100,local_image_dir,local_md_dir) # image_writer = FileBasedDataWriter(local_image_dir) # md_writer = FileBasedDataWriter(local_md_dir) # print(f"正在解析文档: {pdf_path}") # middle_json, infer_result = await aio_doc_analyze( # pdf_bytes, # image_writer=image_writer, # predictor=self.predictor, # backend=self.backend, # ) # pdf_info = middle_json.get("pdf_info", {}) # content_list = union_make(pdf_info, MakeMode.CONTENT_LIST, img_buket_path=local_image_dir) # _process_output( # pdf_info=pdf_info, # pdf_bytes=pdf_bytes, # pdf_file_name=pdf_file_name, # local_md_dir=local_md_dir, # local_image_dir=local_image_dir, # md_writer=md_writer, # f_draw_layout_bbox=False, # f_draw_span_bbox=False, # f_dump_orig_pdf=False, # f_dump_md=True, # f_dump_content_list=True, # f_dump_middle_json=True, # f_dump_model_output=True, # f_make_md_mode=MakeMode.MM_MD, # middle_json=middle_json, # model_output=infer_result, # is_pipeline=False, # ) # path_md = f"{local_md_dir}/{pdf_file_name}.md" # print(f"文档解析完成。MD 文件已保存到: {path_md}") # return content_list, path_md, pdf_file_name # if __name__ == "__main__": # # input_pdf = r"G:/work/资料/5.1 BMP业务系统使用手册 - 切片.pdf" # # output_pdf = "./output.pdf" # # image_folder = "./extracted_images" # file_json = { # "knowledge_id": "1234", # "name": "5.1 BMP业务系统使用手册 - 切片.pdf", # "document_id": "2222" # } # loader = PDFLoader(file_json) # loader.replace_images_with_text() import asyncio import os import time import aiohttp from mineru.cli.common import read_fn, prepare_env, _process_output from mineru.data.data_reader_writer import FileBasedDataWriter from mineru.backend.vlm.vlm_analyze import aio_doc_analyze, ModelSingleton from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make from mineru.utils.enum_class import MakeMode import mineru import logging logging.basicConfig(level=logging.INFO) class VLMRetryExhaustedError(RuntimeError): pass class MinerUParsePdfClient: """ MinerU API 客户端 通过 HTTP 调用 mineru_server.py 提供的 API 服务 """ def __init__(self, output_dir: str = "./tmp_file", server_url: str = "http://127.0.0.1:8120"): """ 初始化 MinerU API 客户端 Args: output_dir (str): 临时输出目录 server_url (str): MinerU 服务器地址 """ self.output_dir = output_dir self.server_url = server_url self.api_endpoint = f"{server_url}/parse" logging.info(f"MinerU API 客户端初始化,服务器: {server_url}") async def extract_text(self, pdf_path: str): """ 通过 API 解析 PDF 文件 Args: pdf_path (str): PDF 文件路径 Returns: tuple: (content_list, path_md, pdf_file_name) """ logging.info(f"调用 MinerU API 解析: {pdf_path}") max_retries = 5 for attempt in range(max_retries): try: async with aiohttp.ClientSession() as session: payload = { "pdf_path": pdf_path, "output_dir": self.output_dir, "server_url": "http://127.0.0.1:9999" # vLLM 服务地址 } async with session.post(self.api_endpoint, json=payload, timeout=aiohttp.ClientTimeout(total=3000)) as response: if response.status != 200: error_text = await response.text() raise RuntimeError(f"MinerU API 调用失败: {response.status}, {error_text}") result = await response.json() if result.get("code") != 200: raise RuntimeError(f"MinerU API 返回错误: {result.get('message')}") data = result.get("data", {}) content_list = data.get("content_list", []) md_path = data.get("md_path", "") pdf_file_name = data.get("pdf_file_name", "") logging.info(f"MinerU API 解析成功: {pdf_file_name}") return content_list, md_path, pdf_file_name except Exception as e: logging.error(f"MinerU API 调用失败 (尝试 {attempt + 1}/{max_retries}): {e}") if attempt < max_retries - 1: await asyncio.sleep(2 ** attempt) # 指数退避 else: raise RuntimeError(f"MinerU API 调用失败") class MinerUParsePdf: """ MinerU 异步文档分析类 封装了模型加载、PDF读取、图片抽取、内容生成等完整流程。 """ def __init__(self, output_dir: str = "./tmp_file", server_url: str = "http://127.0.0.1:9999"): """ 初始化 MinerU 分析器。 Args: output_dir (str): 临时输出目录,用于保存中间图片。 server_url (str): vLLM 服务器地址。 """ # 客户端不需要 GPU 环境变量 os.environ["MINERU_MODEL_SOURCE"] = "modelscope" self.output_dir = output_dir self.backend = "http-client" # 使用 HTTP 客户端后端 # http-client self.server_url = server_url self.predictor = None # 懒加载 self.model_singleton = ModelSingleton() logging.info(f"#### mineru path: {os.path.dirname(mineru.__file__)}") # print("#### mineru path:", os.path.dirname(mineru.__file__)) async def _ensure_model_loaded(self): """ 确保 HTTP 客户端连接建立,仅在首次调用时初始化一次。 """ if self.predictor is None: print(f"正在连接 vLLM 服务器: {self.server_url}...") self.predictor = self.model_singleton.get_model( backend=self.backend, model_path=None, server_url=self.server_url, ) print("服务器连接成功。") async def extract_text(self, pdf_path: str): """ 分析指定 PDF 文件,返回带图像信息的内容结构。 Args: pdf_path (str): 要分析的 PDF 文件路径。 Returns: list: 内容段落列表,包含图文信息。 """ # last_exc = None # for i in range(10): # pass # else: # raise VLMRetryExhaustedError("VLM 连接失败") await self._ensure_model_loaded() pdf_bytes = read_fn(pdf_path) pdf_file_name = os.path.splitext(os.path.basename(pdf_path))[0] + time.strftime("_%Y%m%d%H%M%S") local_image_dir, local_md_dir = prepare_env(self.output_dir, pdf_file_name, "vlm") # local_image_dir, local_md_dir = prepare_env(self.output_dir, pdf_file_name, "") # print("9999"*100,local_image_dir,local_md_dir) image_writer = FileBasedDataWriter(local_image_dir) md_writer = FileBasedDataWriter(local_md_dir) print(f"正在解析文档: {pdf_path}") import asyncio from httpx import ReadError, ConnectError, ConnectTimeout max_retries = 5 for attempt in range(max_retries): try: middle_json, infer_result = await aio_doc_analyze( pdf_bytes, image_writer=image_writer, predictor=self.predictor, backend=self.backend, # server_url=self.server_url, ) break except (ReadError, ConnectError, ConnectTimeout) as e: if attempt == max_retries - 1: # raise e raise VLMRetryExhaustedError( "VLM 连接失败" ) logging.info(f"VLM连接失败,重试 {attempt + 1}/{max_retries}") await asyncio.sleep(2 ** attempt) # 指数退避 pdf_info = middle_json.get("pdf_info", {}) content_list = union_make(pdf_info, MakeMode.CONTENT_LIST, img_buket_path=local_image_dir) _process_output( pdf_info=pdf_info, pdf_bytes=pdf_bytes, pdf_file_name=pdf_file_name, local_md_dir=local_md_dir, local_image_dir=local_image_dir, md_writer=md_writer, f_draw_layout_bbox=False, f_draw_span_bbox=False, f_dump_orig_pdf=False, f_dump_md=True, f_dump_content_list=True, f_dump_middle_json=True, f_dump_model_output=True, f_make_md_mode=MakeMode.MM_MD, middle_json=middle_json, model_output=infer_result, is_pipeline=False, ) path_md = f"{local_md_dir}/{pdf_file_name}.md" print(f"文档解析完成。MD 文件已保存到: {path_md}") return content_list, path_md, pdf_file_name if __name__ == "__main__": # input_pdf = r"G:/work/资料/5.1 BMP业务系统使用手册 - 切片.pdf" # output_pdf = "./output.pdf" # image_folder = "./extracted_images" file_json = { "knowledge_id": "1234", "name": "5.1 BMP业务系统使用手册 - 切片.pdf", "document_id": "2222" } loader = PDFLoader(file_json) loader.replace_images_with_text()