import fitz # PyMuPDF import os from PIL import Image import io import pdfplumber from langchain_community.document_loaders.unstructured import UnstructuredFileLoader from utils.upload_file_to_oss import UploadMinio from config import minio_config import os from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader from magic_pdf.data.dataset import PymuDocDataset from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze from magic_pdf.config.enums import SupportedPdfParseMethod class PDFLoader(UnstructuredFileLoader): def __init__(self, file_json): self.base_path = "./tmp_file" self.file_json = file_json self.flag = self.file_json.get("flag") # 后续优化 self.file_path_process() if self.flag == "update": self.flag_image_info_dict = {} if not self.output_pdf_path: self.upload_minio = UploadMinio() self.image_positions_dict = self.get_image_positions() self.images_path_dict, self.flag_image_info_dict = self.save_images() self.replace_images_with_text() else: self.upload_minio = UploadMinio() self.image_positions_dict = self.get_image_positions() self.images_path_dict, self.flag_image_info_dict = self.save_images() self.replace_images_with_text() def file_path_process(self): self.knowledge_id = self.file_json.get("knowledge_id") self.document_id = self.file_json.get("document_id") know_path = self.base_path + f"/{self.knowledge_id}" self.file_name = self.file_json.get("name") self.output_pdf_name = "output_" + self.file_name self.input_pdf_path = os.path.join(know_path, self.file_name) self.output_pdf_path = os.path.join(know_path, self.output_pdf_name) self.file_name_list = self.file_name.split(".") self.image_dir = ".".join(self.file_name_list[:-1]) self.save_image_path = know_path + "/" + self.document_id def get_image_positions(self): images_dict = {} with pdfplumber.open(self.input_pdf_path) as pdf: page_num = 0 for page in pdf.pages: images_dict[page_num] = {} image_num = 0 img_list = {} img_list[image_num] = {} for image in page.images: #print("Image position:", image) img_list[image_num] = {"x0":image['x0'],"y0":image['y0']} image_num += 1 img_list[image_num] = {} images_dict[page_num]=img_list page_num += 1 # print(f"images list info: {images_dict}") return images_dict def save_images(self): # 创建图片保存目录 os.makedirs(self.save_image_path, exist_ok=True) # 使用PyMuPDF打开PDF文件 doc = fitz.open(self.input_pdf_path) all_images_dict = {} pdf_img_index = 1 flag_img_info = {} for page_num in range(len(doc)): page = doc.load_page(page_num) images = page.get_images(full=True) page_image_dict = {} for img_index, img in enumerate(images): xref = img[0] # 图片的XRef编号 base_image = doc.extract_image(xref) image_bytes = base_image["image"] # 将字节数据转换为PIL图像 pil_image = Image.open(io.BytesIO(image_bytes)) # 生成唯一文件名 # img_name = f"page{page_num+1}_img{img_index+1}.{base_image['ext']}" img_name = f"{self.document_id}_{pdf_img_index}.{base_image['ext']}" img_path = os.path.join(self.save_image_path, img_name) # page_image_dict[img_index] = img_path # 保存成image_name image_str = self.knowledge_id + "/" + self.document_id + "/" + img_name replace_text = f"【示意图序号_{self.document_id}_{pdf_img_index}】" page_image_dict[img_index] = replace_text # 替换pdf中的文字 # 保存图片 pil_image.save(img_path) # 保存的图片上传的oss self.upload_minio.upload_file(img_path, f"/pdf/{image_str}") minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}//pdf/{image_str}" pdf_img_index += 1 all_images_dict[page_num] = page_image_dict # 关闭原始文档 doc.close() return all_images_dict, flag_img_info def replace_images_with_text(self): # 打开原始PDF doc = fitz.open(self.input_pdf_path) # 设置字体大小 font_size = 12 font_name = "SimSun" font_path = r"./utils/simsun.ttc" # 当前系统中的字体路径 # 遍历每一页 for page_num in range(len(doc)): page = doc.load_page(page_num) # 获取页面 images = page.get_images(full=True) # 获取页面中的所有图片 page_height = page.rect.height # print("page_height: ", page_height) for img_index, img in enumerate(images): xref = img[0] # 图片的XRef编号 base_image = doc.extract_image(xref) # 提取图片 bbox = fitz.Rect(img[1:5]) # print("bbox: ", bbox) # 删除图片 # page.delete_xref(xref) # 删除图片 doc._deleteObject(img[0]) # 准备替换文本 # replacement_text = f"page{page_num+1}_img{img_index+1}.png" replacement_text = self.images_path_dict[page_num][img_index] print(f"替换的文本:{replacement_text}") # 在删除的图片位置插入文本 try: x0 = self.image_positions_dict[page_num][img_index]['x0'] y0 = page_height - self.image_positions_dict[page_num][img_index]['y0'] # 插入文本坐标 print(f"x0: {x0}, y0: {y0}") # 使用fitz中自带的字体 china-s 效果显示不友好,插入的字体一行铺满 fontname="china-s", page.insert_text((x0,y0), replacement_text,fontname=font_name, fontfile=font_path, fontsize=font_size, color=(0, 0, 0)) #page.insert_text((x,y+y1), replacement_text, fontsize=font_size, color=(0, 0, 0)) except Exception as e: print(f"Error inserting text for image on page {page_num + 1}: {e}") # 保存修改后的PDF doc.save(self.output_pdf_path) doc.close() print(f"Processed PDF saved to: {self.output_pdf_path}") def file2text(self): pdf_text = "" with fitz.open(self.output_pdf_path) as doc: for i, page in enumerate(doc): text = page.get_text("text").strip() lines = text.split("\n") if len(lines) > 0 and lines[-1].strip().isdigit(): text = "\n".join(lines[:-1]) # 移除最后一行 if len(lines) > 0 and lines[0].strip().isdigit(): text = "\n".join(lines[1:]) # 移除第一行 # print(f"page text:{text.strip()}") # pdf_text += text + "\n" pdf_text += text # print(pdf_text) return pdf_text, self.flag_image_info_dict class MinerUParsePdf(): # def __init__(self, knowledge_id, minio_client): # self.knowledge_id = knowledge_id # self.minio_client = minio_client async def extract_text(self, file_path): # pdf_file_name = file_path # prepare env # local_image_dir = f"./tmp_file/{self.knowledge_id}/{doc_id}" local_image_dir = f"./tmp_file/images" image_dir = str(os.path.basename(local_image_dir)) os.makedirs(local_image_dir, exist_ok=True) image_writer = FileBasedDataWriter(local_image_dir) # read bytes reader1 = FileBasedDataReader("") pdf_bytes = reader1.read(file_path) # read the pdf content # proc ## Create Dataset Instance ds = PymuDocDataset(pdf_bytes) infer_result = ds.apply(doc_analyze, ocr=True) ## pipeline pipe_result = infer_result.pipe_ocr_mode(image_writer) content_list_content = pipe_result.get_content_list(image_dir) # image_num = 1 # text = "" # flag_img_info = {} # current_page = "" # for i,content_dict in enumerate(content_list_content): # page_index = content_dict.get("page_idx") # if i == 0: # current_page = page_index # elif page_index != current_page: # text += "" # current_page = page_index # else: # pass # if content_dict.get("type") == "text": # content_text = content_dict.get("text") # text_level = content_dict.get("text_level") # if text_level: # text += "#" * text_level + content_text # else: # text += content_text # elif content_dict.get("type") in ("image", "table"): # image_path = content_dict.get("img_path") # image_name = image_path.split("/")[1] # save_image_path = local_image_dir + f"/{image_name}" # replace_text = f"【示意图序号_{doc_id}_{image_num}】" # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" # self.minio_client.upload_file(save_image_path, minio_file_path) # minio_url = minio_config.get("minio_url") # minio_bucket = minio_config.get("minio_bucket") # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" # text += replace_text # image_num += 1 # else: # ... return content_list_content if __name__ == "__main__": # input_pdf = r"G:/work/资料/5.1 BMP业务系统使用手册 - 切片.pdf" # output_pdf = "./output.pdf" # image_folder = "./extracted_images" file_json = { "knowledge_id": "1234", "name": "5.1 BMP业务系统使用手册 - 切片.pdf", "document_id": "2222" } loader = PDFLoader(file_json) loader.replace_images_with_text()