| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399 |
- import aiohttp
- import aiofiles
- from rag.db import MilvusOperate, MysqlOperate
- from rag.document_load.pdf_load import MinerUParsePdf
- from rag.document_load.office_load import MinerUParseOffice
- from rag.document_load.txt_load import TextLoad
- from rag.document_load.image_load import MinerUParseImage
- from utils.upload_file_to_oss import UploadMinio
- from utils.get_logger import setup_logger
- from config import minio_config
- import os
- import time
- from uuid import uuid1
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- pdf_parse = MinerUParsePdf()
- office_parse = MinerUParseOffice()
- text_parse = TextLoad()
- image_parse = MinerUParseImage()
- logger = setup_logger(__name__)
- class ProcessDocuments():
- def __init__(self, file_json):
- self.file_json = file_json
- self.knowledge_id = self.file_json.get("knowledge_id")
- self.mysql_client = MysqlOperate()
- self.minio_client = UploadMinio()
- self.milvus_client = MilvusOperate(collection_name=self.knowledge_id)
- def _get_file_type(self, name):
- if name.endswith(".txt"):
- return text_parse
- elif name.endswith('.pdf'):
- return pdf_parse
- elif name.endswith((".doc", ".docx", "ppt", "pptx")):
- return office_parse
- elif name.endswith((".jpg", "png", "jpeg")):
- return image_parse
- else:
- raise "不支持的文件格式"
-
- async def save_file_temp(self, session, url, name):
- down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
- # down_file_path = "./tmp_file"
- os.makedirs(down_file_path, exist_ok=True)
- down_file_name = down_file_path + f"/{name}"
- # if os.path.exists(down_file_name):
- # pass
- # else:
- async with session.get(url, ssl=False) as resp:
- resp.raise_for_status()
- content_length = resp.headers.get('Content-Length')
- if content_length:
- file_size = int(content_length)
- else:
- file_size = 0
- async with aiofiles.open(down_file_name, 'wb') as f:
- async for chunk in resp.content.iter_chunked(1024):
- await f.write(chunk)
-
- return down_file_name, file_size
- def file_split_by_len(self, file_text):
- split_map = {
- "0": ["#"], # 按标题段落切片
- "1": ["<page>"], # 按页切片
- "2": ["\n"] # 按问答对
- }
- separator_num = self.file_json.get("set_slice")
- slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
- separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
- logger.info(f"文本切分字符:{separator}")
- text_split = RecursiveCharacterTextSplitter(
- separators=separator,
- chunk_size=500,
- chunk_overlap=40,
- length_function=len
- )
- texts = text_split.split_text(file_text)
- return texts
-
- def split_text(self, file_text):
- text_split = RecursiveCharacterTextSplitter(
- separators=["\n\n", "\n"],
- chunk_size=500,
- chunk_overlap=40,
- length_function=len
- )
- texts = text_split.split_text(file_text)
- return texts
-
- def split_by_title(self, file_content_list, set_table, doc_id):
- # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据
- text_lists = []
- text = ""
- image_num = 1
- flag_img_info = {}
- level_1_text = ""
- level_2_text = ""
- for i, content_dict in enumerate(file_content_list):
- text_type = content_dict.get("type")
- content_text = content_dict.get("text")
- if text_type == "text":
- text_level = content_dict.get("text_level", "")
- if text_level == 1:
- if not level_1_text:
- level_1_text = f"# {content_text}\n"
- text += f"# {content_text}\n"
- else:
- text_lists.append(text)
- text = f"# {content_text}\n"
- level_1_text = f"# {content_text}\n"
- level_2_text = ""
- elif text_level == 2:
- if not level_2_text:
- text += f"## {content_text}\n"
- level_2_text = f"## {content_text}\n"
- else:
- text_lists.append(text)
- text = level_1_text + f"## {content_text}\n"
- else:
- if text_level:
- text += text_level*"#" + " " + content_text + "\n"
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- text += content_dict.get("table_body")
- elif text_type in ("image", "table"):
- image_path = content_dict.get("img_path")
- if not image_path:
- continue
- image_name = image_path.split("/")[1]
- save_image_path = "./tmp_file/images/" + f"/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- text += replace_text
- image_num += 1
- if i+1 == len(file_content_list):
- text_lists.append(text)
- return text_lists, flag_img_info
-
- def split_by_page(self, file_content_list, set_table, doc_id):
- # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据
- text_lists = []
- current_page = ""
- text = ""
- image_num = 1
- flag_img_info = {}
- for i,content_dict in enumerate(file_content_list):
- page_index = content_dict.get("page_idx")
- if i == 0:
- current_page = page_index
- elif page_index != current_page:
- text_lists.append(text)
- text = ""
- current_page = page_index
- text_type = content_dict.get("type")
- if text_type == "text":
- content_text = content_dict.get("text")
- text_level = content_dict.get("text_level")
- if text_level:
- text += "#" * text_level + " " + content_text
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- text += content_dict.get("table_body")
- elif text_type in ("image", "table"):
- image_path = content_dict.get("img_path")
- image_name = image_path.split("/")[1]
- save_image_path = "./tmp_file/images/" + f"/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- text += replace_text
- image_num += 1
- if i+1 == len(file_content_list):
- text_lists.append(text)
- return text_lists, flag_img_info
- def split_by_self(self, file_content_list, set_table, slice_value, doc_id):
- # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断
- logger.info(f"自定义的分隔符:{slice_value}")
- text = ""
- image_num = 1
- flag_img_info = {}
- for i, content_dict in enumerate(file_content_list):
- text_type = content_dict.get("type")
- if text_type == "text":
- content_text = content_dict.get("text")
- text_level = content_dict.get("text_level")
- if text_level:
- text += "#" * text_level + " " + content_text
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- text += content_dict.get("table_body")
- elif text_type in ("image", "table"):
- image_path = content_dict.get("img_path")
- image_name = image_path.split("/")[1]
- save_image_path = "./tmp_file/images/" + f"/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- text += replace_text
- image_num += 1
- split_lists = text.split(slice_value)
- text_lists = []
- for split_text in split_lists:
- r = len(split_text)//500
- if r >= 1:
- for i in range(r+1):
- t = split_text[i*500:(i+1)*500]
- if t:
- text_lists.append(t)
- else:
- text_lists.append(split_text)
-
- return text_lists, flag_img_info
- def file_split(self, file_content_list, doc_id):
- # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接
- separator_num = self.file_json.get("set_slice")
- set_table = self.file_json.get("set_table")
- # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
- # logger.info(f"文本切分字符:{separator}")
- if isinstance(file_content_list, str):
- file_text = file_content_list
- text_lists = self.split_text(file_text)
- return text_lists, {}
- elif separator_num == "0":
- # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个#
- text_lists, flag_img_info = self.split_by_title(file_content_list, set_table, doc_id)
- return text_lists, flag_img_info
- elif separator_num == "1":
- # 按照页面方式切分
- text_lists, flag_img_info = self.split_by_page(file_content_list, set_table, doc_id)
- return text_lists, flag_img_info
- elif separator_num == "2":
- # 按照问答对切分 针对exce文档,暂不实现
- return [], {}
- else:
- # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500
- slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
- text_lists, flag_img_info = self.split_by_self(file_content_list, set_table, slice_value, doc_id)
- return text_lists, flag_img_info
-
- def process_data_to_milvus_schema(self, text_lists, doc_id, name):
- """组织数据格式:
- {
- "content": text,
- "doc_id": doc_id,
- "chunk_id": chunk_id,
- "metadata": {"source": file_name},
- }
- """
- docs = []
- total_len = 0
- for i, text in enumerate(text_lists):
- chunk_id = str(uuid1())
- chunk_len = len(text)
- total_len += chunk_len
- d = {
- "content": text,
- "doc_id": doc_id,
- "chunk_id": chunk_id,
- "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len}
- }
- docs.append(d)
- return docs, total_len
-
- async def process_documents(self, file_json):
- # 文档下载
- separator_num = file_json.get("set_slice")
- if separator_num == "2":
- return {"code": 500, "message": "暂不支持解析"}
- docs = file_json.get("docs")
- flag = file_json.get("flag")
- success_doc = [] # 记录解析成功的文档id
- for doc in docs:
- url = doc.get("url")
- name = doc.get("name")
- doc_id = doc.get("document_id")
- async with aiohttp.ClientSession() as session:
- down_file_name, file_size = await self.save_file_temp(session, url, name)
-
- file_parse = self._get_file_type(name)
- file_content_list = await file_parse.extract_text(down_file_name)
- logger.info(f"mineru解析的pdf数据:{file_content_list}")
- text_lists, flag_img_info = self.file_split(file_content_list, doc_id)
-
- docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name)
- logger.info(f"存储到milvus的文本数据:{docs}")
- if flag == "upload":
- # 插入到milvus库中
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
-
- if insert_milvus_flag:
- # 插入到mysql的slice info数据库中
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id)
- else:
- insert_slice_flag = False
- parse_file_status = False
- if insert_slice_flag:
- # 插入mysql中的bm_media_replacement表中
- insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
- else:
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- insert_img_flag = False
- # return resp
- parse_file_status = False
- if insert_img_flag:
- parse_file_status = True
-
- else:
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- # resp = {"code": 500, "message": insert_mysql_info}
- parse_file_status = False
- # return resp
-
- elif flag == "update": # 更新切片方式
- # 先把库中的数据删除
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- insert_milvus_start_time = time.time()
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
- # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists)
- insert_milvus_end_time = time.time()
- logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}")
- if insert_milvus_flag:
- # 插入到mysql的slice info数据库中
- insert_mysql_start_time = time.time()
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id)
- insert_mysql_end_time = time.time()
- logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}")
- else:
- # resp = {"code": 500, "message": insert_milvus_str}
- # return resp
- insert_slice_flag = False
- parse_file_status = False
-
- if insert_slice_flag:
- # resp = {"code": 200, "message": "切片修改成功"}
- parse_file_status = True
-
- else:
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- # resp = {"code":500, "message": insert_mysql_info}
- parse_file_status = False
- # return resp
- if parse_file_status:
- success_doc.append(doc_id)
- else:
- if flag == "upload":
- for del_id in success_doc:
- self.milvus_client._delete_by_doc_id(doc_id=del_id)
- self.mysql_client.delete_image_url(doc_id=del_id)
- self.mysql_client.delete_to_slice(doc_id=del_id)
- return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
- return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}}
|