| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787 |
- import aiohttp
- import aiofiles
- import requests
- from rag.db import MilvusOperate, MysqlOperate
- from rag.task_registry import task_registry, TaskCancelledException
- from rag.document_load.pdf_load import MinerUParsePdf
- from rag.document_load.office_load import MinerUParseOffice
- from rag.document_load.txt_load import TextLoad
- from rag.document_load.image_load import MinerUParseImage
- from utils.upload_file_to_oss import UploadMinio
- from utils.get_logger import setup_logger
- from config import minio_config
- import os
- import time
- from uuid import uuid1
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- from rag.document_load.document_format_conversion import *
- import re
- from typing import List
- pdf_parse = MinerUParsePdf()
- office_parse = MinerUParseOffice()
- text_parse = TextLoad()
- image_parse = MinerUParseImage()
- logger = setup_logger(__name__)
- class ProcessDocuments():
- def __init__(self, file_json):
- self.file_json = file_json
- self.knowledge_id = self.file_json.get("knowledge_id")
- self.mysql_client = MysqlOperate()
- self.minio_client = UploadMinio()
- self.milvus_client = MilvusOperate(collection_name=self.knowledge_id)
- # def _get_file_type(self, name):
- # if name.endswith(".txt"):
- # return text_parse
- # elif name.endswith('.pdf'):
- # return pdf_parse
- # elif name.endswith((".doc", ".docx", "ppt", "pptx")):
- # return office_parse
- # elif name.endswith((".jpg", "png", "jpeg")):
- # return image_parse
- # else:
- # raise "不支持的文件格式"
- # 只接收txt和pdf格式
- def _get_file_type(self, name):
- if name.endswith(".txt"):
- return text_parse
- elif name.endswith('.pdf'):
- return pdf_parse
-
- # async def save_file_temp(self, session, url, name):
- # down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
- # # down_file_path = "./tmp_file"
- # os.makedirs(down_file_path, exist_ok=True)
- # down_file_name = down_file_path + f"/{name}"
- # # if os.path.exists(down_file_name):
- # # pass
- # # else:
- # async with session.get(url, ssl=False) as resp:
- # resp.raise_for_status()
- # content_length = resp.headers.get('Content-Length')
- # if content_length:
- # file_size = int(content_length)
- # else:
- # file_size = 0
- # async with aiofiles.open(down_file_name, 'wb') as f:
- # async for chunk in resp.content.iter_chunked(1024):
- # await f.write(chunk)
- async def save_file_temp(self, session, url, name, max_retries=3):
- down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
- os.makedirs(down_file_path, exist_ok=True)
- down_file_name = down_file_path + f"/{name}"
- attempt = 0
- while attempt < max_retries:
- try:
- async with session.get(url, ssl=False) as resp:
- resp.raise_for_status()
- content_length = resp.headers.get('Content-Length')
- file_size = int(content_length) if content_length else 0
- async with aiofiles.open(down_file_name, 'wb') as f:
- async for chunk in resp.content.iter_chunked(1024):
- await f.write(chunk)
- # 成功就 return
- return down_file_name, file_size
- except Exception as e:
- attempt += 1
- # 最后一次失败直接抛出
- if attempt >= max_retries:
- raise
- # 指数退避:0.5s ~ 3s 内,随机涨
- wait = min(3, 0.5 * (2 ** attempt) + random.random())
- await asyncio.sleep(wait)
- return down_file_name, file_size
- def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30):
- """
- 同步发送POST请求
-
- 参数:
- url: 请求的URL地址
- json_data: JSON格式的请求体数据(字典类型)
- headers: 可选的请求头字典
- timeout: 请求超时时间(秒),默认30秒
-
- 返回:
- dict: 包含状态码、响应数据等信息
- """
- try:
- # 设置默认的Content-Type为application/json
- if headers is None:
- headers = {}
- if 'Content-Type' not in headers:
- headers['Content-Type'] = 'application/json'
-
- resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False)
- status_code = resp.status_code
- response_data = resp.text
- logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}")
- return {
- "code": 200,
- "status_code": status_code,
- "data": response_data,
- "message": "POST请求成功"
- }
- except requests.exceptions.Timeout as e:
- logger.error(f"同步POST请求超时 [url={url}]: {e}")
- return {
- "code": 500,
- "message": f"POST请求超时: {str(e)}"
- }
- except requests.exceptions.RequestException as e:
- logger.error(f"同步POST请求失败 [url={url}]: {e}")
- return {
- "code": 500,
- "message": f"POST请求失败: {str(e)}"
- }
- def file_split_by_len(self, file_text):
- split_map = {
- "0": ["#"], # 按标题段落切片
- "1": ["<page>"], # 按页切片
- "2": ["\n"] # 按问答对
- }
- separator_num = self.file_json.get("set_slice")
- slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
- separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
- logger.info(f"文本切分字符:{separator}")
- text_split = RecursiveCharacterTextSplitter(
- separators=separator,
- chunk_size=500,
- chunk_overlap=40,
- length_function=len
- )
- texts = text_split.split_text(file_text)
- return texts
-
- def split_text(self, file_text):
- text_split = RecursiveCharacterTextSplitter(
- separators=["\n\n", "\n"],
- chunk_size=500,
- chunk_overlap=40,
- length_function=len
- )
- texts = text_split.split_text(file_text)
- return texts
- def chunk_text_for_rag(self, text: str, max_chars: int = 20000) -> List[str]:
- """
- 分割长文本
-
- 参数:
- text: 输入的长文本字符串
- max_chars: 每块的最大字符数,默认20000
-
- 返回:
- List[str]: 分割后的文本块列表
-
- 分割规则:
- 1. 在标签x.x(如1.1, 2.3等)处分割
- 2. 下一块包含上一块最后一个x.x标签的完整内容(重叠)
- 3. 如果没有x.x标签或加上重叠内容超过max_chars,则按句号"。"分割
- """
-
- if len(text) <= max_chars:
- return [text]
-
- chunks = []
- start_idx = 0
-
- while start_idx < len(text):
- # 确定当前块的结束位置
- end_idx = min(start_idx + max_chars, len(text))
-
- if end_idx >= len(text):
- # 最后一块,直接添加
- chunks.append(text[start_idx:])
- break
-
- # 在当前块范围内查找x.x标签
- current_chunk = text[start_idx:end_idx]
- split_pos, overlap_content = self.find_split_point(current_chunk, text, start_idx)
-
- if split_pos is not None:
- # 找到了合适的x.x标签分割点
- actual_end = start_idx + split_pos
-
- # 检查加上重叠内容是否超过限制
- if overlap_content and len(current_chunk[:split_pos]) + len(overlap_content) <= max_chars:
- chunks.append(text[start_idx:actual_end])
- # 下一块从重叠内容的起始位置开始
- overlap_start = text.rfind(overlap_content, start_idx, actual_end)
- if overlap_start != -1:
- start_idx = overlap_start
- else:
- start_idx = actual_end
- else:
- # 重叠内容过长,使用句号分割
- split_pos, overlap_content = self.find_split_by_period(current_chunk, text, start_idx)
- if split_pos is not None:
- actual_end = start_idx + split_pos
- chunks.append(text[start_idx:actual_end])
- if overlap_content:
- overlap_start = text.rfind(overlap_content, start_idx, actual_end)
- start_idx = overlap_start if overlap_start != -1 else actual_end
- else:
- start_idx = actual_end
- else:
- # 连句号都找不到,强制分割
- chunks.append(text[start_idx:end_idx])
- start_idx = end_idx
- else:
- # 没有找到x.x标签,使用句号分割
- split_pos, overlap_content = self.find_split_by_period(current_chunk, text, start_idx)
- if split_pos is not None:
- actual_end = start_idx + split_pos
- chunks.append(text[start_idx:actual_end])
- if overlap_content:
- overlap_start = text.rfind(overlap_content, start_idx, actual_end)
- start_idx = overlap_start if overlap_start != -1 else actual_end
- else:
- start_idx = actual_end
- else:
- # 连句号都找不到,强制分割
- chunks.append(text[start_idx:end_idx])
- start_idx = end_idx
-
- return chunks
- def find_split_point(self, chunk: str, full_text: str, chunk_start: int):
- """
- 在块中查找最近的x.x标签作为分割点
-
- 返回:
- (split_position, overlap_content): 分割位置和重叠内容
- """
- # 匹配x.x格式的标签(如1.1, 2.3, 10.5等)
- pattern = r'\d+\.\d+'
- # pattern = r'\b\d+\.\d+\b'
- matches = list(re.finditer(pattern, chunk))
-
- if not matches:
- return None, None
-
- # 找最后一个匹配的标签
- last_match = matches[-1]
- tag_start = last_match.start()
-
- # 提取该标签的完整内容(从标签到下一个标签或块尾)
- tag_content_start = chunk_start + tag_start
-
- # 在全文中查找下一个x.x标签
- remaining_text = full_text[tag_content_start:]
- next_tag_match = re.search(pattern, remaining_text[len(last_match.group()):])
-
- if next_tag_match:
- tag_content_end = tag_content_start + len(last_match.group()) + next_tag_match.start()
- else:
- tag_content_end = len(full_text)
-
- overlap_content = full_text[tag_content_start:tag_content_end]
-
- # 检查重叠内容长度
- if len(overlap_content) > 20000:
- # 重叠内容本身超过限制,返回None使用句号分割
- return None, None
-
- return tag_start, overlap_content
- def find_split_by_period(self, chunk: str, full_text: str, chunk_start: int):
- """
- 按句号"。"查找分割点
-
- 返回:
- (split_position, overlap_content): 分割位置和重叠内容
- """
- # 从后向前查找句号
- last_period = chunk.rfind('。')
-
- if last_period == -1:
- return None, None
-
- # 分割点是句号之后
- split_pos = last_period + 1
-
- # 查找这个句子的开始位置作为重叠内容
- # 向前查找上一个句号
- prev_period = chunk.rfind('。', 0, last_period)
-
- if prev_period != -1:
- overlap_start = prev_period + 1
- overlap_content = chunk[overlap_start:split_pos]
- else:
- # 如果前面没有句号,则从块开始到当前句号
- overlap_content = chunk[:split_pos]
-
- # 检查重叠内容长度
- if len(overlap_content) > 20000:
- overlap_content = ""
-
- return split_pos, overlap_content
- def split_by_title(self, file_content_list, set_table, doc_id):
- # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据
- text_lists = []
- text = ""
- image_num = 1
- flag_img_info = {}
- level_1_text = ""
- level_2_text = ""
- for i, content_dict in enumerate(file_content_list):
- text_type = content_dict.get("type")
- content_text = content_dict.get("text")
- if text_type == "text":
- text_level = content_dict.get("text_level", "")
- if text_level == 1:
- if not level_1_text:
- level_1_text = f"# {content_text}\n"
- text += f"# {content_text}\n"
- else:
- if len(text) >20000:
- text_chunks = self.chunk_text_for_rag(text)
- text_lists.extend(text_chunks)
- else:
- text_lists.append(text)
- text = f"# {content_text}\n"
- level_1_text = f"# {content_text}\n"
- level_2_text = ""
- elif text_level == 2:
- if not level_2_text:
- text += f"## {content_text}\n"
- level_2_text = f"## {content_text}\n"
- else:
- if len(text) >20000:
- text_chunks = self.chunk_text_for_rag(text)
- text_lists.extend(text_chunks)
- else:
- text_lists.append(text)
- text = level_1_text + f"## {content_text}\n"
- else:
- if text_level:
- text += text_level*"#" + " " + content_text + "\n"
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- text += " ".join(content_dict.get("table_caption")) + '\n'
- text += content_dict.get("table_body") + '\n'
- elif text_type in ("image", "table"):
- image_path = content_dict.get("img_path")
- if not image_path:
- continue
- image_name = image_path.split("/")[1]
- # save_image_path = "./tmp_file/images/" + f"/{image_name}"
- save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- if text_type == "table":
- text += " ".join(content_dict.get("table_caption")) + '\n'
- text += replace_text
- image_num += 1
- if i+1 == len(file_content_list):
- text_lists.append(text)
-
- elif text_type == "list":
- list_items = content_dict.get("list_items")
- if list_items:
- text += "\n".join(list_items) + "\n"
- return text_lists, flag_img_info
-
- def split_by_page(self, file_content_list, set_table, doc_id):
- # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据
- text_lists = []
- current_page = ""
- text = ""
- image_num = 1
- flag_img_info = {}
- for i,content_dict in enumerate(file_content_list):
- page_index = content_dict.get("page_idx")
- if i == 0:
- current_page = page_index
- elif page_index != current_page:
- text_lists.append(text)
- text = ""
- current_page = page_index
- text_type = content_dict.get("type")
- if text_type == "text":
- content_text = content_dict.get("text")
- text_level = content_dict.get("text_level")
- if text_level:
- text += "#" * text_level + " " + content_text
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- text += content_dict.get("table_body")
- elif text_type in ("image", "table"):
- image_path = content_dict.get("img_path")
- image_name = image_path.split("/")[1]
- # save_image_path = "./tmp_file/images/" + f"/{image_name}"
- save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- text += replace_text
- image_num += 1
- if i+1 == len(file_content_list):
- text_lists.append(text)
- return text_lists, flag_img_info
- def split_by_self(self, file_content_list, set_table, slice_value, doc_id):
- # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断
- logger.info(f"自定义的分隔符:{slice_value}")
- text = ""
- image_num = 1
- flag_img_info = {}
- for i, content_dict in enumerate(file_content_list):
- text_type = content_dict.get("type")
- if text_type == "text":
- content_text = content_dict.get("text")
- text_level = content_dict.get("text_level")
- if text_level:
- text += "#" * text_level + " " + content_text
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- text += content_dict.get("table_body")
- elif text_type in ("image", "table"):
- image_path = content_dict.get("img_path")
- image_name = image_path.split("/")[1]
- # save_image_path = "./tmp_file/images/" + f"/{image_name}"
- save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- text += replace_text
- image_num += 1
- split_lists = text.split(slice_value)
- text_lists = []
- for split_text in split_lists:
- r = len(split_text)//500
- if r >= 1:
- for i in range(r+1):
- t = split_text[i*500:(i+1)*500]
- if t:
- text_lists.append(t)
- else:
- text_lists.append(split_text)
-
- return text_lists, flag_img_info
- def file_split(self, file_content_list, doc_id):
- # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接
- separator_num = self.file_json.get("set_slice")
- set_table = self.file_json.get("set_table")
- # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
- # logger.info(f"文本切分字符:{separator}")
- if isinstance(file_content_list, str):
- file_text = file_content_list
- text_lists = self.split_text(file_text)
- return text_lists, {}
- elif separator_num == "0":
- # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个#
- text_lists, flag_img_info = self.split_by_title(file_content_list, set_table, doc_id)
- return text_lists, flag_img_info
- elif separator_num == "1":
- # 按照页面方式切分
- text_lists, flag_img_info = self.split_by_page(file_content_list, set_table, doc_id)
- return text_lists, flag_img_info
- elif separator_num == "2":
- # 按照问答对切分 针对exce文档,暂不实现
- return [], {}
- else:
- # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500
- slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
- text_lists, flag_img_info = self.split_by_self(file_content_list, set_table, slice_value, doc_id)
- return text_lists, flag_img_info
-
- def process_data_to_milvus_schema(self, text_lists, doc_id, name):
- """组织数据格式:
- {
- "content": text,
- "doc_id": doc_id,
- "chunk_id": chunk_id,
- "metadata": {"source": file_name},
- }
- """
- docs = []
- total_len = 0
- for i, text in enumerate(text_lists):
- chunk_id = str(uuid1())
- chunk_len = len(text)
- total_len += chunk_len
- d = {
- "content": text,
- "doc_id": doc_id,
- "chunk_id": chunk_id,
- "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len}
- }
- docs.append(d)
- return docs, total_len
-
- async def process_documents(self, file_json):
- user_id = file_json.get("userId", "")
-
- task_id = file_json.get("docs", "")[0].get("document_id")
- # ===== 注册任务到全局任务表 =====
- task_ctx = None
- if task_id:
- task_ctx = task_registry.register(task_id, user_id, self.knowledge_id)
-
- # 启动进度(独立线程)
- reporter = None
- if task_id:
- from rag.progress_reporter import ProgressReporter
- from config import progress_callback_config
- estimate_seconds = progress_callback_config.get("estimate_seconds", 120)
- callback_url = progress_callback_config.get("default_url")
- reporter = ProgressReporter(task_id, callback_url, estimate_seconds, user_id)
- reporter.start()
-
- # 关联 reporter 到任务上下文(用于取消时停止进度上报)
- if task_ctx:
- task_ctx.reporter = reporter
-
- # 初始化成功文档列表(在 try 外面定义,确保异常处理时可访问)
- success_doc = []
-
- try:
- # 文档下载
- separator_num = file_json.get("set_slice")
- if separator_num == "2":
- if reporter:
- reporter.complete(success=False)
- return {"code": 500, "message": "暂不支持解析"}
- docs = file_json.get("docs")
- flag = file_json.get("flag")
- for doc in docs:
- # ===== 检查点1:每个文档处理前检查取消标志 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在文档处理前被取消")
- raise TaskCancelledException("任务已被用户取消")
-
- url = doc.get("url")
- name = doc.get("name")
- doc_id = doc.get("document_id")
- async with aiohttp.ClientSession() as session:
- down_file_name, file_size = await self.save_file_temp(session, url, name)
- # 只接收txt和pdf格式,其它格式统一转换成pdf,不支持格式在convert_to_pdf中判断
- if not os.path.exists(down_file_name):
- down_file_name, file_size = await self.save_file_temp(session, url, name)
- logger.info(f'{doc_id}文件下载失败,重试中~')
- logger.info(f'下载文件路径:{down_file_name}')
- if not name.endswith(".txt") and not name.endswith('.pdf'):
- down_file_name = convert_to_pdf(down_file_name)
- name = os.path.basename(down_file_name)
-
- file_parse = self._get_file_type(name)
- file_content_list = await file_parse.extract_text(down_file_name)
- logger.info(f"mineru解析的pdf数据:{file_content_list}")
- text_lists, flag_img_info = self.file_split(file_content_list, doc_id)
-
- docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name)
- logger.info(f"存储到milvus的文本数据:{docs}")
-
- # ===== 检查点2:解析完成、入库前检查取消标志 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库")
- raise TaskCancelledException("任务已被用户取消")
-
- if flag == "upload":
- # 插入到milvus库中
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id)
-
- if insert_slice_flag:
- # 插入到mysql的slice info数据库中
- insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
- else:
- insert_img_flag = False
- parse_file_status = False
- if insert_img_flag:
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
- # 插入mysql中的bm_media_replacement表中
- else:
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- insert_milvus_flag = False
- # return resp
- parse_file_status = False
- if insert_milvus_flag:
- parse_file_status = True
-
- # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- raise TaskCancelledException("任务已被用户取消")
-
- else:
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- # resp = {"code": 500, "message": insert_mysql_info}
- parse_file_status = False
- # return resp
-
- elif flag == "update": # 更新切片方式
- # 先把库中的数据删除
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- insert_milvus_start_time = time.time()
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id)
- # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists)
- insert_milvus_end_time = time.time()
- logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}")
- if insert_slice_flag:
- # 插入到mysql的slice info数据库中
- insert_mysql_start_time = time.time()
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
- insert_mysql_end_time = time.time()
- logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}")
- else:
- # resp = {"code": 500, "message": insert_milvus_str}
- # return resp
- insert_milvus_flag = False
- parse_file_status = False
-
- if insert_milvus_flag:
- # resp = {"code": 200, "message": "切片修改成功"}
- parse_file_status = True
-
- # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- raise TaskCancelledException("任务已被用户取消")
-
- else:
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- # resp = {"code":500, "message": insert_mysql_info}
- parse_file_status = False
- # return resp
- if parse_file_status:
- success_doc.append(doc_id)
- else:
- if flag == "upload":
- for del_id in success_doc:
- self.milvus_client._delete_by_doc_id(doc_id=del_id)
- self.mysql_client.delete_image_url(doc_id=del_id)
- self.mysql_client.delete_to_slice(doc_id=del_id)
-
- if reporter:
- reporter.complete(success=False)
- self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
-
- self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"})
- # 任务完成:发送100%进度
- if reporter:
- reporter.complete(success=True)
- return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}}
-
- except TaskCancelledException as e:
- # ===== 处理任务取消异常 =====
- logger.info(f"任务被取消 [task_id={task_id}]: {e}")
- self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- # if task_id:
- # del_success, del_info = self.mysql_client.delete_document(task_id)
- # if del_success:
- # logger.info(f"已删除 bm_document 记录: task_id={task_id}")
- # else:
- # logger.warning(f"删除 bm_document 记录失败: {del_info}")
- # 清理之前已成功处理的文档数据(slice、image、milvus)
- for del_id in success_doc:
- logger.info(f"清理已入库的文档数据: doc_id={del_id}")
- self.milvus_client._delete_by_doc_id(doc_id=del_id)
- self.mysql_client.delete_image_url(doc_id=del_id)
- self.mysql_client.delete_to_slice(doc_id=del_id)
-
- if reporter:
- reporter.complete(success=False)
-
- return {"code": 499, "message": "任务已取消", "knowledge_id": self.knowledge_id, "doc_info": {}}
-
- except Exception as e:
- # 捕获所有异常,统一处理
- logger.error(f"文件处理异常 [task_id={task_id}]: {e}", exc_info=True)
-
- self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- if reporter:
- reporter.complete(success=False)
-
- return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}}
-
- finally:
- # ===== 注销任务 =====
- if task_id:
- task_registry.unregister(task_id)
|