import aiohttp import aiofiles from rag.db import MilvusOperate, MysqlOperate from rag.document_load.pdf_load import MinerUParsePdf from rag.document_load.office_load import MinerUParseOffice from rag.document_load.txt_load import TextLoad from rag.document_load.image_load import MinerUParseImage from utils.upload_file_to_oss import UploadMinio from utils.get_logger import setup_logger from config import minio_config import os import time from uuid import uuid1 from langchain_text_splitters import RecursiveCharacterTextSplitter pdf_parse = MinerUParsePdf() office_parse = MinerUParseOffice() text_parse = TextLoad() image_parse = MinerUParseImage() logger = setup_logger(__name__) class ProcessDocuments(): def __init__(self, file_json): self.file_json = file_json self.knowledge_id = self.file_json.get("knowledge_id") self.mysql_client = MysqlOperate() self.minio_client = UploadMinio() self.milvus_client = MilvusOperate(collection_name=self.knowledge_id) def _get_file_type(self, name): if name.endswith(".txt"): return text_parse elif name.endswith('.pdf'): return pdf_parse elif name.endswith((".doc", ".docx", "ppt", "pptx")): return office_parse elif name.endswith((".jpg", "png", "jpeg")): return image_parse else: raise "不支持的文件格式" async def save_file_temp(self, session, url, name): down_file_path = "./tmp_file" + f"/{self.knowledge_id}" # down_file_path = "./tmp_file" os.makedirs(down_file_path, exist_ok=True) down_file_name = down_file_path + f"/{name}" # if os.path.exists(down_file_name): # pass # else: async with session.get(url, ssl=False) as resp: resp.raise_for_status() content_length = resp.headers.get('Content-Length') if content_length: file_size = int(content_length) else: file_size = 0 async with aiofiles.open(down_file_name, 'wb') as f: async for chunk in resp.content.iter_chunked(1024): await f.write(chunk) return down_file_name, file_size def file_split_by_len(self, file_text): split_map = { "0": ["#"], # 按标题段落切片 "1": [""], # 按页切片 "2": ["\n"] # 按问答对 } separator_num = self.file_json.get("set_slice") slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n") separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value] logger.info(f"文本切分字符:{separator}") text_split = RecursiveCharacterTextSplitter( separators=separator, chunk_size=500, chunk_overlap=40, length_function=len ) texts = text_split.split_text(file_text) return texts def split_text(self, file_text): text_split = RecursiveCharacterTextSplitter( separators=["\n\n", "\n"], chunk_size=500, chunk_overlap=40, length_function=len ) texts = text_split.split_text(file_text) return texts def split_by_title(self, file_content_list, set_table, doc_id): # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据 text_lists = [] text = "" image_num = 1 flag_img_info = {} level_1_text = "" level_2_text = "" for i, content_dict in enumerate(file_content_list): text_type = content_dict.get("type") content_text = content_dict.get("text") if text_type == "text": text_level = content_dict.get("text_level", "") if text_level == 1: if not level_1_text: level_1_text = f"# {content_text}\n" text += f"# {content_text}\n" else: text_lists.append(text) text = f"# {content_text}\n" level_1_text = f"# {content_text}\n" level_2_text = "" elif text_level == 2: if not level_2_text: text += f"## {content_text}\n" level_2_text = f"## {content_text}\n" else: text_lists.append(text) text = level_1_text + f"## {content_text}\n" else: if text_level: text += text_level*"#" + " " + content_text + "\n" else: text += content_text elif text_type == "table" and set_table == "1": text += content_dict.get("table_body") elif text_type in ("image", "table"): image_path = content_dict.get("img_path") if not image_path: continue image_name = image_path.split("/")[1] save_image_path = "./tmp_file/images/" + f"/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" text += replace_text image_num += 1 if i+1 == len(file_content_list): text_lists.append(text) return text_lists, flag_img_info def split_by_page(self, file_content_list, set_table, doc_id): # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据 text_lists = [] current_page = "" text = "" image_num = 1 flag_img_info = {} for i,content_dict in enumerate(file_content_list): page_index = content_dict.get("page_idx") if i == 0: current_page = page_index elif page_index != current_page: text_lists.append(text) text = "" current_page = page_index text_type = content_dict.get("type") if text_type == "text": content_text = content_dict.get("text") text_level = content_dict.get("text_level") if text_level: text += "#" * text_level + " " + content_text else: text += content_text elif text_type == "table" and set_table == "1": text += content_dict.get("table_body") elif text_type in ("image", "table"): image_path = content_dict.get("img_path") image_name = image_path.split("/")[1] save_image_path = "./tmp_file/images/" + f"/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" text += replace_text image_num += 1 if i+1 == len(file_content_list): text_lists.append(text) return text_lists, flag_img_info def split_by_self(self, file_content_list, set_table, slice_value, doc_id): # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断 logger.info(f"自定义的分隔符:{slice_value}") text = "" image_num = 1 flag_img_info = {} for i, content_dict in enumerate(file_content_list): text_type = content_dict.get("type") if text_type == "text": content_text = content_dict.get("text") text_level = content_dict.get("text_level") if text_level: text += "#" * text_level + " " + content_text else: text += content_text elif text_type == "table" and set_table == "1": text += content_dict.get("table_body") elif text_type in ("image", "table"): image_path = content_dict.get("img_path") image_name = image_path.split("/")[1] save_image_path = "./tmp_file/images/" + f"/{image_name}" replace_text = f"【示意图序号_{doc_id}_{image_num}】" minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg" self.minio_client.upload_file(save_image_path, minio_file_path) minio_url = minio_config.get("minio_url") minio_bucket = minio_config.get("minio_bucket") flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}" text += replace_text image_num += 1 split_lists = text.split(slice_value) text_lists = [] for split_text in split_lists: r = len(split_text)//500 if r >= 1: for i in range(r+1): t = split_text[i*500:(i+1)*500] if t: text_lists.append(t) else: text_lists.append(split_text) return text_lists, flag_img_info def file_split(self, file_content_list, doc_id): # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接 separator_num = self.file_json.get("set_slice") set_table = self.file_json.get("set_table") # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value] # logger.info(f"文本切分字符:{separator}") if isinstance(file_content_list, str): file_text = file_content_list text_lists = self.split_text(file_text) return text_lists, {} elif separator_num == "0": # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个# text_lists, flag_img_info = self.split_by_title(file_content_list, set_table, doc_id) return text_lists, flag_img_info elif separator_num == "1": # 按照页面方式切分 text_lists, flag_img_info = self.split_by_page(file_content_list, set_table, doc_id) return text_lists, flag_img_info elif separator_num == "2": # 按照问答对切分 针对exce文档,暂不实现 return [], {} else: # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500 slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n") text_lists, flag_img_info = self.split_by_self(file_content_list, set_table, slice_value, doc_id) return text_lists, flag_img_info def process_data_to_milvus_schema(self, text_lists, doc_id, name): """组织数据格式: { "content": text, "doc_id": doc_id, "chunk_id": chunk_id, "metadata": {"source": file_name}, } """ docs = [] total_len = 0 for i, text in enumerate(text_lists): chunk_id = str(uuid1()) chunk_len = len(text) total_len += chunk_len d = { "content": text, "doc_id": doc_id, "chunk_id": chunk_id, "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len} } docs.append(d) return docs, total_len async def process_documents(self, file_json): # 文档下载 separator_num = file_json.get("set_slice") if separator_num == "2": return {"code": 500, "message": "暂不支持解析"} docs = file_json.get("docs") flag = file_json.get("flag") success_doc = [] # 记录解析成功的文档id for doc in docs: url = doc.get("url") name = doc.get("name") doc_id = doc.get("document_id") async with aiohttp.ClientSession() as session: down_file_name, file_size = await self.save_file_temp(session, url, name) file_parse = self._get_file_type(name) file_content_list = await file_parse.extract_text(down_file_name) logger.info(f"mineru解析的pdf数据:{file_content_list}") text_lists, flag_img_info = self.file_split(file_content_list, doc_id) docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name) logger.info(f"存储到milvus的文本数据:{docs}") if flag == "upload": # 插入到milvus库中 insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id) if insert_slice_flag: # 插入到mysql的slice info数据库中 insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id) else: insert_img_flag = False parse_file_status = False if insert_img_flag: insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs) # 插入mysql中的bm_media_replacement表中 else: # self.milvus_client._delete_by_doc_id(doc_id=doc_id) insert_milvus_flag = False # return resp parse_file_status = False if insert_milvus_flag: parse_file_status = True else: self.mysql_client.delete_to_slice(doc_id=doc_id) # self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_image_url(doc_id=doc_id) # resp = {"code": 500, "message": insert_mysql_info} parse_file_status = False # return resp elif flag == "update": # 更新切片方式 # 先把库中的数据删除 self.milvus_client._delete_by_doc_id(doc_id=doc_id) self.mysql_client.delete_to_slice(doc_id=doc_id) insert_milvus_start_time = time.time() insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id) # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists) insert_milvus_end_time = time.time() logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}") if insert_slice_flag: # 插入到mysql的slice info数据库中 insert_mysql_start_time = time.time() insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs) insert_mysql_end_time = time.time() logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}") else: # resp = {"code": 500, "message": insert_milvus_str} # return resp insert_milvus_flag = False parse_file_status = False if insert_milvus_flag: # resp = {"code": 200, "message": "切片修改成功"} parse_file_status = True else: self.mysql_client.delete_to_slice(doc_id=doc_id) # self.milvus_client._delete_by_doc_id(doc_id=doc_id) # resp = {"code":500, "message": insert_mysql_info} parse_file_status = False # return resp if parse_file_status: success_doc.append(doc_id) else: if flag == "upload": for del_id in success_doc: self.milvus_client._delete_by_doc_id(doc_id=del_id) self.mysql_client.delete_image_url(doc_id=del_id) self.mysql_client.delete_to_slice(doc_id=del_id) return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}} return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}}