pdf_load.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. import fitz # PyMuPDF
  2. import os
  3. from PIL import Image
  4. import io
  5. import pdfplumber
  6. from langchain_community.document_loaders.unstructured import UnstructuredFileLoader
  7. from utils.upload_file_to_oss import UploadMinio
  8. from config import minio_config
  9. import os
  10. from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader
  11. from magic_pdf.data.dataset import PymuDocDataset
  12. from magic_pdf.model.doc_analyze_by_custom_model import doc_analyze
  13. from magic_pdf.config.enums import SupportedPdfParseMethod
  14. class PDFLoader(UnstructuredFileLoader):
  15. def __init__(self, file_json):
  16. self.base_path = "./tmp_file"
  17. self.file_json = file_json
  18. self.flag = self.file_json.get("flag") # 后续优化
  19. self.file_path_process()
  20. if self.flag == "update":
  21. self.flag_image_info_dict = {}
  22. if not self.output_pdf_path:
  23. self.upload_minio = UploadMinio()
  24. self.image_positions_dict = self.get_image_positions()
  25. self.images_path_dict, self.flag_image_info_dict = self.save_images()
  26. self.replace_images_with_text()
  27. else:
  28. self.upload_minio = UploadMinio()
  29. self.image_positions_dict = self.get_image_positions()
  30. self.images_path_dict, self.flag_image_info_dict = self.save_images()
  31. self.replace_images_with_text()
  32. def file_path_process(self):
  33. self.knowledge_id = self.file_json.get("knowledge_id")
  34. self.document_id = self.file_json.get("document_id")
  35. know_path = self.base_path + f"/{self.knowledge_id}"
  36. self.file_name = self.file_json.get("name")
  37. self.output_pdf_name = "output_" + self.file_name
  38. self.input_pdf_path = os.path.join(know_path, self.file_name)
  39. self.output_pdf_path = os.path.join(know_path, self.output_pdf_name)
  40. self.file_name_list = self.file_name.split(".")
  41. self.image_dir = ".".join(self.file_name_list[:-1])
  42. self.save_image_path = know_path + "/" + self.document_id
  43. def get_image_positions(self):
  44. images_dict = {}
  45. with pdfplumber.open(self.input_pdf_path) as pdf:
  46. page_num = 0
  47. for page in pdf.pages:
  48. images_dict[page_num] = {}
  49. image_num = 0
  50. img_list = {}
  51. img_list[image_num] = {}
  52. for image in page.images:
  53. #print("Image position:", image)
  54. img_list[image_num] = {"x0":image['x0'],"y0":image['y0']}
  55. image_num += 1
  56. img_list[image_num] = {}
  57. images_dict[page_num]=img_list
  58. page_num += 1
  59. # print(f"images list info: {images_dict}")
  60. return images_dict
  61. def save_images(self):
  62. # 创建图片保存目录
  63. os.makedirs(self.save_image_path, exist_ok=True)
  64. # 使用PyMuPDF打开PDF文件
  65. doc = fitz.open(self.input_pdf_path)
  66. all_images_dict = {}
  67. pdf_img_index = 1
  68. flag_img_info = {}
  69. for page_num in range(len(doc)):
  70. page = doc.load_page(page_num)
  71. images = page.get_images(full=True)
  72. page_image_dict = {}
  73. for img_index, img in enumerate(images):
  74. xref = img[0] # 图片的XRef编号
  75. base_image = doc.extract_image(xref)
  76. image_bytes = base_image["image"]
  77. # 将字节数据转换为PIL图像
  78. pil_image = Image.open(io.BytesIO(image_bytes))
  79. # 生成唯一文件名
  80. # img_name = f"page{page_num+1}_img{img_index+1}.{base_image['ext']}"
  81. img_name = f"{self.document_id}_{pdf_img_index}.{base_image['ext']}"
  82. img_path = os.path.join(self.save_image_path, img_name)
  83. # page_image_dict[img_index] = img_path
  84. # 保存成image_name
  85. image_str = self.knowledge_id + "/" + self.document_id + "/" + img_name
  86. replace_text = f"【示意图序号_{self.document_id}_{pdf_img_index}】"
  87. page_image_dict[img_index] = replace_text # 替换pdf中的文字
  88. # 保存图片
  89. pil_image.save(img_path)
  90. # 保存的图片上传的oss
  91. self.upload_minio.upload_file(img_path, f"/pdf/{image_str}")
  92. minio_url = minio_config.get("minio_url")
  93. minio_bucket = minio_config.get("minio_bucket")
  94. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}//pdf/{image_str}"
  95. pdf_img_index += 1
  96. all_images_dict[page_num] = page_image_dict
  97. # 关闭原始文档
  98. doc.close()
  99. return all_images_dict, flag_img_info
  100. def replace_images_with_text(self):
  101. # 打开原始PDF
  102. doc = fitz.open(self.input_pdf_path)
  103. # 设置字体大小
  104. font_size = 12
  105. font_name = "SimSun"
  106. font_path = r"./utils/simsun.ttc" # 当前系统中的字体路径
  107. # 遍历每一页
  108. for page_num in range(len(doc)):
  109. page = doc.load_page(page_num) # 获取页面
  110. images = page.get_images(full=True) # 获取页面中的所有图片
  111. page_height = page.rect.height
  112. # print("page_height: ", page_height)
  113. for img_index, img in enumerate(images):
  114. xref = img[0] # 图片的XRef编号
  115. base_image = doc.extract_image(xref) # 提取图片
  116. bbox = fitz.Rect(img[1:5])
  117. # print("bbox: ", bbox)
  118. # 删除图片
  119. # page.delete_xref(xref) # 删除图片
  120. doc._deleteObject(img[0])
  121. # 准备替换文本
  122. # replacement_text = f"page{page_num+1}_img{img_index+1}.png"
  123. replacement_text = self.images_path_dict[page_num][img_index]
  124. print(f"替换的文本:{replacement_text}")
  125. # 在删除的图片位置插入文本
  126. try:
  127. x0 = self.image_positions_dict[page_num][img_index]['x0']
  128. y0 = page_height - self.image_positions_dict[page_num][img_index]['y0']
  129. # 插入文本坐标
  130. print(f"x0: {x0}, y0: {y0}")
  131. # 使用fitz中自带的字体 china-s 效果显示不友好,插入的字体一行铺满 fontname="china-s",
  132. page.insert_text((x0,y0), replacement_text,fontname=font_name, fontfile=font_path, fontsize=font_size, color=(0, 0, 0))
  133. #page.insert_text((x,y+y1), replacement_text, fontsize=font_size, color=(0, 0, 0))
  134. except Exception as e:
  135. print(f"Error inserting text for image on page {page_num + 1}: {e}")
  136. # 保存修改后的PDF
  137. doc.save(self.output_pdf_path)
  138. doc.close()
  139. print(f"Processed PDF saved to: {self.output_pdf_path}")
  140. def file2text(self):
  141. pdf_text = ""
  142. with fitz.open(self.output_pdf_path) as doc:
  143. for i, page in enumerate(doc):
  144. text = page.get_text("text").strip()
  145. lines = text.split("\n")
  146. if len(lines) > 0 and lines[-1].strip().isdigit():
  147. text = "\n".join(lines[:-1]) # 移除最后一行
  148. if len(lines) > 0 and lines[0].strip().isdigit():
  149. text = "\n".join(lines[1:]) # 移除第一行
  150. # print(f"page text:{text.strip()}")
  151. # pdf_text += text + "\n"
  152. pdf_text += text
  153. # print(pdf_text)
  154. return pdf_text, self.flag_image_info_dict
  155. class MinerUParsePdf():
  156. # def __init__(self, knowledge_id, minio_client):
  157. # self.knowledge_id = knowledge_id
  158. # self.minio_client = minio_client
  159. async def extract_text(self, file_path):
  160. # pdf_file_name = file_path
  161. # prepare env
  162. # local_image_dir = f"./tmp_file/{self.knowledge_id}/{doc_id}"
  163. local_image_dir = f"./tmp_file/images"
  164. image_dir = str(os.path.basename(local_image_dir))
  165. os.makedirs(local_image_dir, exist_ok=True)
  166. image_writer = FileBasedDataWriter(local_image_dir)
  167. # read bytes
  168. reader1 = FileBasedDataReader("")
  169. pdf_bytes = reader1.read(file_path) # read the pdf content
  170. # proc
  171. ## Create Dataset Instance
  172. ds = PymuDocDataset(pdf_bytes)
  173. infer_result = ds.apply(doc_analyze, ocr=True)
  174. ## pipeline
  175. pipe_result = infer_result.pipe_ocr_mode(image_writer)
  176. content_list_content = pipe_result.get_content_list(image_dir)
  177. # image_num = 1
  178. # text = ""
  179. # flag_img_info = {}
  180. # current_page = ""
  181. # for i,content_dict in enumerate(content_list_content):
  182. # page_index = content_dict.get("page_idx")
  183. # if i == 0:
  184. # current_page = page_index
  185. # elif page_index != current_page:
  186. # text += "<page>"
  187. # current_page = page_index
  188. # else:
  189. # pass
  190. # if content_dict.get("type") == "text":
  191. # content_text = content_dict.get("text")
  192. # text_level = content_dict.get("text_level")
  193. # if text_level:
  194. # text += "#" * text_level + content_text
  195. # else:
  196. # text += content_text
  197. # elif content_dict.get("type") in ("image", "table"):
  198. # image_path = content_dict.get("img_path")
  199. # image_name = image_path.split("/")[1]
  200. # save_image_path = local_image_dir + f"/{image_name}"
  201. # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  202. # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  203. # self.minio_client.upload_file(save_image_path, minio_file_path)
  204. # minio_url = minio_config.get("minio_url")
  205. # minio_bucket = minio_config.get("minio_bucket")
  206. # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  207. # text += replace_text
  208. # image_num += 1
  209. # else:
  210. # ...
  211. return content_list_content
  212. if __name__ == "__main__":
  213. # input_pdf = r"G:/work/资料/5.1 BMP业务系统使用手册 - 切片.pdf"
  214. # output_pdf = "./output.pdf"
  215. # image_folder = "./extracted_images"
  216. file_json = {
  217. "knowledge_id": "1234",
  218. "name": "5.1 BMP业务系统使用手册 - 切片.pdf",
  219. "document_id": "2222"
  220. }
  221. loader = PDFLoader(file_json)
  222. loader.replace_images_with_text()