| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- import fitz # PyMuPDF
- import os
- from PIL import Image
- import io
- import pdfplumber
- from langchain_community.document_loaders.unstructured import UnstructuredFileLoader
- from utils.upload_file_to_oss import UploadMinio
- from config import minio_config
- import os
- from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
- from magic_pdf.data.dataset import PymuDocDataset
- from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
- from magic_pdf.config.enums import SupportedPdfParseMethod
- class PDFLoader(UnstructuredFileLoader):
- def __init__(self, file_json):
- self.base_path = "./tmp_file"
- self.file_json = file_json
- self.flag = self.file_json.get("flag") # 后续优化
- self.file_path_process()
- if self.flag == "update":
- self.flag_image_info_dict = {}
- if not self.output_pdf_path:
- self.upload_minio = UploadMinio()
- self.image_positions_dict = self.get_image_positions()
- self.images_path_dict, self.flag_image_info_dict = self.save_images()
- self.replace_images_with_text()
- else:
- self.upload_minio = UploadMinio()
- self.image_positions_dict = self.get_image_positions()
- self.images_path_dict, self.flag_image_info_dict = self.save_images()
- self.replace_images_with_text()
- def file_path_process(self):
- self.knowledge_id = self.file_json.get("knowledge_id")
- self.document_id = self.file_json.get("document_id")
- know_path = self.base_path + f"/{self.knowledge_id}"
-
- self.file_name = self.file_json.get("name")
- self.output_pdf_name = "output_" + self.file_name
- self.input_pdf_path = os.path.join(know_path, self.file_name)
- self.output_pdf_path = os.path.join(know_path, self.output_pdf_name)
- self.file_name_list = self.file_name.split(".")
- self.image_dir = ".".join(self.file_name_list[:-1])
- self.save_image_path = know_path + "/" + self.document_id
-
- def get_image_positions(self):
- images_dict = {}
- with pdfplumber.open(self.input_pdf_path) as pdf:
- page_num = 0
- for page in pdf.pages:
- images_dict[page_num] = {}
- image_num = 0
- img_list = {}
- img_list[image_num] = {}
- for image in page.images:
- #print("Image position:", image)
- img_list[image_num] = {"x0":image['x0'],"y0":image['y0']}
- image_num += 1
- img_list[image_num] = {}
- images_dict[page_num]=img_list
- page_num += 1
- # print(f"images list info: {images_dict}")
- return images_dict
-
- def save_images(self):
- # 创建图片保存目录
- os.makedirs(self.save_image_path, exist_ok=True)
-
- # 使用PyMuPDF打开PDF文件
- doc = fitz.open(self.input_pdf_path)
- all_images_dict = {}
- pdf_img_index = 1
- flag_img_info = {}
- for page_num in range(len(doc)):
- page = doc.load_page(page_num)
- images = page.get_images(full=True)
- page_image_dict = {}
-
- for img_index, img in enumerate(images):
- xref = img[0] # 图片的XRef编号
- base_image = doc.extract_image(xref)
- image_bytes = base_image["image"]
-
- # 将字节数据转换为PIL图像
- pil_image = Image.open(io.BytesIO(image_bytes))
-
- # 生成唯一文件名
- # img_name = f"page{page_num+1}_img{img_index+1}.{base_image['ext']}"
- img_name = f"{self.document_id}_{pdf_img_index}.{base_image['ext']}"
- img_path = os.path.join(self.save_image_path, img_name)
-
- # page_image_dict[img_index] = img_path
- # 保存成image_name
- image_str = self.knowledge_id + "/" + self.document_id + "/" + img_name
- replace_text = f"【示意图序号_{self.document_id}_{pdf_img_index}】"
- page_image_dict[img_index] = replace_text # 替换pdf中的文字
- # 保存图片
- pil_image.save(img_path)
- # 保存的图片上传的oss
- self.upload_minio.upload_file(img_path, f"/pdf/{image_str}")
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}//pdf/{image_str}"
- pdf_img_index += 1
-
- all_images_dict[page_num] = page_image_dict
-
- # 关闭原始文档
- doc.close()
- return all_images_dict, flag_img_info
-
- def replace_images_with_text(self):
- # 打开原始PDF
- doc = fitz.open(self.input_pdf_path)
-
- # 设置字体大小
- font_size = 12
- font_name = "SimSun"
- font_path = r"./utils/simsun.ttc" # 当前系统中的字体路径
- # 遍历每一页
- for page_num in range(len(doc)):
- page = doc.load_page(page_num) # 获取页面
-
- images = page.get_images(full=True) # 获取页面中的所有图片
- page_height = page.rect.height
- # print("page_height: ", page_height)
-
- for img_index, img in enumerate(images):
- xref = img[0] # 图片的XRef编号
- base_image = doc.extract_image(xref) # 提取图片
-
- bbox = fitz.Rect(img[1:5])
- # print("bbox: ", bbox)
- # 删除图片
- # page.delete_xref(xref) # 删除图片
- doc._deleteObject(img[0])
-
- # 准备替换文本
- # replacement_text = f"page{page_num+1}_img{img_index+1}.png"
- replacement_text = self.images_path_dict[page_num][img_index]
- print(f"替换的文本:{replacement_text}")
-
- # 在删除的图片位置插入文本
- try:
-
- x0 = self.image_positions_dict[page_num][img_index]['x0']
- y0 = page_height - self.image_positions_dict[page_num][img_index]['y0']
- # 插入文本坐标
- print(f"x0: {x0}, y0: {y0}")
- # 使用fitz中自带的字体 china-s 效果显示不友好,插入的字体一行铺满 fontname="china-s",
- page.insert_text((x0,y0), replacement_text,fontname=font_name, fontfile=font_path, fontsize=font_size, color=(0, 0, 0))
- #page.insert_text((x,y+y1), replacement_text, fontsize=font_size, color=(0, 0, 0))
- except Exception as e:
- print(f"Error inserting text for image on page {page_num + 1}: {e}")
-
- # 保存修改后的PDF
- doc.save(self.output_pdf_path)
- doc.close()
- print(f"Processed PDF saved to: {self.output_pdf_path}")
- def file2text(self):
- pdf_text = ""
- with fitz.open(self.output_pdf_path) as doc:
- for i, page in enumerate(doc):
- text = page.get_text("text").strip()
- lines = text.split("\n")
- if len(lines) > 0 and lines[-1].strip().isdigit():
- text = "\n".join(lines[:-1]) # 移除最后一行
- if len(lines) > 0 and lines[0].strip().isdigit():
- text = "\n".join(lines[1:]) # 移除第一行
- # print(f"page text:{text.strip()}")
- # pdf_text += text + "\n"
- pdf_text += text
- # print(pdf_text)
- return pdf_text, self.flag_image_info_dict
- class MinerUParsePdf():
- # def __init__(self, knowledge_id, minio_client):
- # self.knowledge_id = knowledge_id
- # self.minio_client = minio_client
-
- async def extract_text(self, file_path):
- # pdf_file_name = file_path
- # prepare env
- # local_image_dir = f"./tmp_file/{self.knowledge_id}/{doc_id}"
- local_image_dir = f"./tmp_file/images"
- image_dir = str(os.path.basename(local_image_dir))
- os.makedirs(local_image_dir, exist_ok=True)
- image_writer = FileBasedDataWriter(local_image_dir)
- # read bytes
- reader1 = FileBasedDataReader("")
- pdf_bytes = reader1.read(file_path) # read the pdf content
- # proc
- ## Create Dataset Instance
- ds = PymuDocDataset(pdf_bytes)
- infer_result = ds.apply(doc_analyze, ocr=True)
- ## pipeline
- pipe_result = infer_result.pipe_ocr_mode(image_writer)
- content_list_content = pipe_result.get_content_list(image_dir)
- # image_num = 1
- # text = ""
- # flag_img_info = {}
- # current_page = ""
- # for i,content_dict in enumerate(content_list_content):
- # page_index = content_dict.get("page_idx")
- # if i == 0:
- # current_page = page_index
- # elif page_index != current_page:
- # text += "<page>"
- # current_page = page_index
- # else:
- # pass
- # if content_dict.get("type") == "text":
- # content_text = content_dict.get("text")
- # text_level = content_dict.get("text_level")
- # if text_level:
- # text += "#" * text_level + content_text
- # else:
- # text += content_text
- # elif content_dict.get("type") in ("image", "table"):
- # image_path = content_dict.get("img_path")
- # image_name = image_path.split("/")[1]
- # save_image_path = local_image_dir + f"/{image_name}"
- # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- # self.minio_client.upload_file(save_image_path, minio_file_path)
- # minio_url = minio_config.get("minio_url")
- # minio_bucket = minio_config.get("minio_bucket")
- # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- # text += replace_text
- # image_num += 1
-
- # else:
- # ...
-
- return content_list_content
-
- if __name__ == "__main__":
- # input_pdf = r"G:/work/资料/5.1 BMP业务系统使用手册 - 切片.pdf"
- # output_pdf = "./output.pdf"
- # image_folder = "./extracted_images"
- file_json = {
- "knowledge_id": "1234",
- "name": "5.1 BMP业务系统使用手册 - 切片.pdf",
- "document_id": "2222"
- }
- loader = PDFLoader(file_json)
- loader.replace_images_with_text()
|