documents_process.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787
  1. import aiohttp
  2. import aiofiles
  3. import requests
  4. from rag.db import MilvusOperate, MysqlOperate
  5. from rag.task_registry import task_registry, TaskCancelledException
  6. from rag.document_load.pdf_load import MinerUParsePdf
  7. from rag.document_load.office_load import MinerUParseOffice
  8. from rag.document_load.txt_load import TextLoad
  9. from rag.document_load.image_load import MinerUParseImage
  10. from utils.upload_file_to_oss import UploadMinio
  11. from utils.get_logger import setup_logger
  12. from config import minio_config
  13. import os
  14. import time
  15. from uuid import uuid1
  16. from langchain_text_splitters import RecursiveCharacterTextSplitter
  17. from rag.document_load.document_format_conversion import *
  18. import re
  19. from typing import List
  20. pdf_parse = MinerUParsePdf()
  21. office_parse = MinerUParseOffice()
  22. text_parse = TextLoad()
  23. image_parse = MinerUParseImage()
  24. logger = setup_logger(__name__)
  25. class ProcessDocuments():
  26. def __init__(self, file_json):
  27. self.file_json = file_json
  28. self.knowledge_id = self.file_json.get("knowledge_id")
  29. self.mysql_client = MysqlOperate()
  30. self.minio_client = UploadMinio()
  31. self.milvus_client = MilvusOperate(collection_name=self.knowledge_id)
  32. # def _get_file_type(self, name):
  33. # if name.endswith(".txt"):
  34. # return text_parse
  35. # elif name.endswith('.pdf'):
  36. # return pdf_parse
  37. # elif name.endswith((".doc", ".docx", "ppt", "pptx")):
  38. # return office_parse
  39. # elif name.endswith((".jpg", "png", "jpeg")):
  40. # return image_parse
  41. # else:
  42. # raise "不支持的文件格式"
  43. # 只接收txt和pdf格式
  44. def _get_file_type(self, name):
  45. if name.endswith(".txt"):
  46. return text_parse
  47. elif name.endswith('.pdf'):
  48. return pdf_parse
  49. # async def save_file_temp(self, session, url, name):
  50. # down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
  51. # # down_file_path = "./tmp_file"
  52. # os.makedirs(down_file_path, exist_ok=True)
  53. # down_file_name = down_file_path + f"/{name}"
  54. # # if os.path.exists(down_file_name):
  55. # # pass
  56. # # else:
  57. # async with session.get(url, ssl=False) as resp:
  58. # resp.raise_for_status()
  59. # content_length = resp.headers.get('Content-Length')
  60. # if content_length:
  61. # file_size = int(content_length)
  62. # else:
  63. # file_size = 0
  64. # async with aiofiles.open(down_file_name, 'wb') as f:
  65. # async for chunk in resp.content.iter_chunked(1024):
  66. # await f.write(chunk)
  67. async def save_file_temp(self, session, url, name, max_retries=3):
  68. down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
  69. os.makedirs(down_file_path, exist_ok=True)
  70. down_file_name = down_file_path + f"/{name}"
  71. attempt = 0
  72. while attempt < max_retries:
  73. try:
  74. async with session.get(url, ssl=False) as resp:
  75. resp.raise_for_status()
  76. content_length = resp.headers.get('Content-Length')
  77. file_size = int(content_length) if content_length else 0
  78. async with aiofiles.open(down_file_name, 'wb') as f:
  79. async for chunk in resp.content.iter_chunked(1024):
  80. await f.write(chunk)
  81. # 成功就 return
  82. return down_file_name, file_size
  83. except Exception as e:
  84. attempt += 1
  85. # 最后一次失败直接抛出
  86. if attempt >= max_retries:
  87. raise
  88. # 指数退避:0.5s ~ 3s 内,随机涨
  89. wait = min(3, 0.5 * (2 ** attempt) + random.random())
  90. await asyncio.sleep(wait)
  91. return down_file_name, file_size
  92. def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30):
  93. """
  94. 同步发送POST请求
  95. 参数:
  96. url: 请求的URL地址
  97. json_data: JSON格式的请求体数据(字典类型)
  98. headers: 可选的请求头字典
  99. timeout: 请求超时时间(秒),默认30秒
  100. 返回:
  101. dict: 包含状态码、响应数据等信息
  102. """
  103. try:
  104. # 设置默认的Content-Type为application/json
  105. if headers is None:
  106. headers = {}
  107. if 'Content-Type' not in headers:
  108. headers['Content-Type'] = 'application/json'
  109. resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False)
  110. status_code = resp.status_code
  111. response_data = resp.text
  112. logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}")
  113. return {
  114. "code": 200,
  115. "status_code": status_code,
  116. "data": response_data,
  117. "message": "POST请求成功"
  118. }
  119. except requests.exceptions.Timeout as e:
  120. logger.error(f"同步POST请求超时 [url={url}]: {e}")
  121. return {
  122. "code": 500,
  123. "message": f"POST请求超时: {str(e)}"
  124. }
  125. except requests.exceptions.RequestException as e:
  126. logger.error(f"同步POST请求失败 [url={url}]: {e}")
  127. return {
  128. "code": 500,
  129. "message": f"POST请求失败: {str(e)}"
  130. }
  131. def file_split_by_len(self, file_text):
  132. split_map = {
  133. "0": ["#"], # 按标题段落切片
  134. "1": ["<page>"], # 按页切片
  135. "2": ["\n"] # 按问答对
  136. }
  137. separator_num = self.file_json.get("set_slice")
  138. slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
  139. separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
  140. logger.info(f"文本切分字符:{separator}")
  141. text_split = RecursiveCharacterTextSplitter(
  142. separators=separator,
  143. chunk_size=500,
  144. chunk_overlap=40,
  145. length_function=len
  146. )
  147. texts = text_split.split_text(file_text)
  148. return texts
  149. def split_text(self, file_text):
  150. text_split = RecursiveCharacterTextSplitter(
  151. separators=["\n\n", "\n"],
  152. chunk_size=500,
  153. chunk_overlap=40,
  154. length_function=len
  155. )
  156. texts = text_split.split_text(file_text)
  157. return texts
  158. def chunk_text_for_rag(self, text: str, max_chars: int = 20000) -> List[str]:
  159. """
  160. 分割长文本
  161. 参数:
  162. text: 输入的长文本字符串
  163. max_chars: 每块的最大字符数,默认20000
  164. 返回:
  165. List[str]: 分割后的文本块列表
  166. 分割规则:
  167. 1. 在标签x.x(如1.1, 2.3等)处分割
  168. 2. 下一块包含上一块最后一个x.x标签的完整内容(重叠)
  169. 3. 如果没有x.x标签或加上重叠内容超过max_chars,则按句号"。"分割
  170. """
  171. if len(text) <= max_chars:
  172. return [text]
  173. chunks = []
  174. start_idx = 0
  175. while start_idx < len(text):
  176. # 确定当前块的结束位置
  177. end_idx = min(start_idx + max_chars, len(text))
  178. if end_idx >= len(text):
  179. # 最后一块,直接添加
  180. chunks.append(text[start_idx:])
  181. break
  182. # 在当前块范围内查找x.x标签
  183. current_chunk = text[start_idx:end_idx]
  184. split_pos, overlap_content = self.find_split_point(current_chunk, text, start_idx)
  185. if split_pos is not None:
  186. # 找到了合适的x.x标签分割点
  187. actual_end = start_idx + split_pos
  188. # 检查加上重叠内容是否超过限制
  189. if overlap_content and len(current_chunk[:split_pos]) + len(overlap_content) <= max_chars:
  190. chunks.append(text[start_idx:actual_end])
  191. # 下一块从重叠内容的起始位置开始
  192. overlap_start = text.rfind(overlap_content, start_idx, actual_end)
  193. if overlap_start != -1:
  194. start_idx = overlap_start
  195. else:
  196. start_idx = actual_end
  197. else:
  198. # 重叠内容过长,使用句号分割
  199. split_pos, overlap_content = self.find_split_by_period(current_chunk, text, start_idx)
  200. if split_pos is not None:
  201. actual_end = start_idx + split_pos
  202. chunks.append(text[start_idx:actual_end])
  203. if overlap_content:
  204. overlap_start = text.rfind(overlap_content, start_idx, actual_end)
  205. start_idx = overlap_start if overlap_start != -1 else actual_end
  206. else:
  207. start_idx = actual_end
  208. else:
  209. # 连句号都找不到,强制分割
  210. chunks.append(text[start_idx:end_idx])
  211. start_idx = end_idx
  212. else:
  213. # 没有找到x.x标签,使用句号分割
  214. split_pos, overlap_content = self.find_split_by_period(current_chunk, text, start_idx)
  215. if split_pos is not None:
  216. actual_end = start_idx + split_pos
  217. chunks.append(text[start_idx:actual_end])
  218. if overlap_content:
  219. overlap_start = text.rfind(overlap_content, start_idx, actual_end)
  220. start_idx = overlap_start if overlap_start != -1 else actual_end
  221. else:
  222. start_idx = actual_end
  223. else:
  224. # 连句号都找不到,强制分割
  225. chunks.append(text[start_idx:end_idx])
  226. start_idx = end_idx
  227. return chunks
  228. def find_split_point(self, chunk: str, full_text: str, chunk_start: int):
  229. """
  230. 在块中查找最近的x.x标签作为分割点
  231. 返回:
  232. (split_position, overlap_content): 分割位置和重叠内容
  233. """
  234. # 匹配x.x格式的标签(如1.1, 2.3, 10.5等)
  235. pattern = r'\d+\.\d+'
  236. # pattern = r'\b\d+\.\d+\b'
  237. matches = list(re.finditer(pattern, chunk))
  238. if not matches:
  239. return None, None
  240. # 找最后一个匹配的标签
  241. last_match = matches[-1]
  242. tag_start = last_match.start()
  243. # 提取该标签的完整内容(从标签到下一个标签或块尾)
  244. tag_content_start = chunk_start + tag_start
  245. # 在全文中查找下一个x.x标签
  246. remaining_text = full_text[tag_content_start:]
  247. next_tag_match = re.search(pattern, remaining_text[len(last_match.group()):])
  248. if next_tag_match:
  249. tag_content_end = tag_content_start + len(last_match.group()) + next_tag_match.start()
  250. else:
  251. tag_content_end = len(full_text)
  252. overlap_content = full_text[tag_content_start:tag_content_end]
  253. # 检查重叠内容长度
  254. if len(overlap_content) > 20000:
  255. # 重叠内容本身超过限制,返回None使用句号分割
  256. return None, None
  257. return tag_start, overlap_content
  258. def find_split_by_period(self, chunk: str, full_text: str, chunk_start: int):
  259. """
  260. 按句号"。"查找分割点
  261. 返回:
  262. (split_position, overlap_content): 分割位置和重叠内容
  263. """
  264. # 从后向前查找句号
  265. last_period = chunk.rfind('。')
  266. if last_period == -1:
  267. return None, None
  268. # 分割点是句号之后
  269. split_pos = last_period + 1
  270. # 查找这个句子的开始位置作为重叠内容
  271. # 向前查找上一个句号
  272. prev_period = chunk.rfind('。', 0, last_period)
  273. if prev_period != -1:
  274. overlap_start = prev_period + 1
  275. overlap_content = chunk[overlap_start:split_pos]
  276. else:
  277. # 如果前面没有句号,则从块开始到当前句号
  278. overlap_content = chunk[:split_pos]
  279. # 检查重叠内容长度
  280. if len(overlap_content) > 20000:
  281. overlap_content = ""
  282. return split_pos, overlap_content
  283. def split_by_title(self, file_content_list, set_table, doc_id):
  284. # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据
  285. text_lists = []
  286. text = ""
  287. image_num = 1
  288. flag_img_info = {}
  289. level_1_text = ""
  290. level_2_text = ""
  291. for i, content_dict in enumerate(file_content_list):
  292. text_type = content_dict.get("type")
  293. content_text = content_dict.get("text")
  294. if text_type == "text":
  295. text_level = content_dict.get("text_level", "")
  296. if text_level == 1:
  297. if not level_1_text:
  298. level_1_text = f"# {content_text}\n"
  299. text += f"# {content_text}\n"
  300. else:
  301. if len(text) >20000:
  302. text_chunks = self.chunk_text_for_rag(text)
  303. text_lists.extend(text_chunks)
  304. else:
  305. text_lists.append(text)
  306. text = f"# {content_text}\n"
  307. level_1_text = f"# {content_text}\n"
  308. level_2_text = ""
  309. elif text_level == 2:
  310. if not level_2_text:
  311. text += f"## {content_text}\n"
  312. level_2_text = f"## {content_text}\n"
  313. else:
  314. if len(text) >20000:
  315. text_chunks = self.chunk_text_for_rag(text)
  316. text_lists.extend(text_chunks)
  317. else:
  318. text_lists.append(text)
  319. text = level_1_text + f"## {content_text}\n"
  320. else:
  321. if text_level:
  322. text += text_level*"#" + " " + content_text + "\n"
  323. else:
  324. text += content_text
  325. elif text_type == "table" and set_table == "1":
  326. text += " ".join(content_dict.get("table_caption")) + '\n'
  327. text += content_dict.get("table_body") + '\n'
  328. elif text_type in ("image", "table"):
  329. image_path = content_dict.get("img_path")
  330. if not image_path:
  331. continue
  332. image_name = image_path.split("/")[1]
  333. # save_image_path = "./tmp_file/images/" + f"/{image_name}"
  334. save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}"
  335. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  336. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  337. self.minio_client.upload_file(save_image_path, minio_file_path)
  338. minio_url = minio_config.get("minio_url")
  339. minio_bucket = minio_config.get("minio_bucket")
  340. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  341. if text_type == "table":
  342. text += " ".join(content_dict.get("table_caption")) + '\n'
  343. text += replace_text
  344. image_num += 1
  345. if i+1 == len(file_content_list):
  346. text_lists.append(text)
  347. elif text_type == "list":
  348. list_items = content_dict.get("list_items")
  349. if list_items:
  350. text += "\n".join(list_items) + "\n"
  351. return text_lists, flag_img_info
  352. def split_by_page(self, file_content_list, set_table, doc_id):
  353. # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据
  354. text_lists = []
  355. current_page = ""
  356. text = ""
  357. image_num = 1
  358. flag_img_info = {}
  359. for i,content_dict in enumerate(file_content_list):
  360. page_index = content_dict.get("page_idx")
  361. if i == 0:
  362. current_page = page_index
  363. elif page_index != current_page:
  364. text_lists.append(text)
  365. text = ""
  366. current_page = page_index
  367. text_type = content_dict.get("type")
  368. if text_type == "text":
  369. content_text = content_dict.get("text")
  370. text_level = content_dict.get("text_level")
  371. if text_level:
  372. text += "#" * text_level + " " + content_text
  373. else:
  374. text += content_text
  375. elif text_type == "table" and set_table == "1":
  376. text += content_dict.get("table_body")
  377. elif text_type in ("image", "table"):
  378. image_path = content_dict.get("img_path")
  379. image_name = image_path.split("/")[1]
  380. # save_image_path = "./tmp_file/images/" + f"/{image_name}"
  381. save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}"
  382. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  383. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  384. self.minio_client.upload_file(save_image_path, minio_file_path)
  385. minio_url = minio_config.get("minio_url")
  386. minio_bucket = minio_config.get("minio_bucket")
  387. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  388. text += replace_text
  389. image_num += 1
  390. if i+1 == len(file_content_list):
  391. text_lists.append(text)
  392. return text_lists, flag_img_info
  393. def split_by_self(self, file_content_list, set_table, slice_value, doc_id):
  394. # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断
  395. logger.info(f"自定义的分隔符:{slice_value}")
  396. text = ""
  397. image_num = 1
  398. flag_img_info = {}
  399. for i, content_dict in enumerate(file_content_list):
  400. text_type = content_dict.get("type")
  401. if text_type == "text":
  402. content_text = content_dict.get("text")
  403. text_level = content_dict.get("text_level")
  404. if text_level:
  405. text += "#" * text_level + " " + content_text
  406. else:
  407. text += content_text
  408. elif text_type == "table" and set_table == "1":
  409. text += content_dict.get("table_body")
  410. elif text_type in ("image", "table"):
  411. image_path = content_dict.get("img_path")
  412. image_name = image_path.split("/")[1]
  413. # save_image_path = "./tmp_file/images/" + f"/{image_name}"
  414. save_image_path = "./tmp_file/document/vlm/images" + f"/{image_name}"
  415. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  416. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  417. self.minio_client.upload_file(save_image_path, minio_file_path)
  418. minio_url = minio_config.get("minio_url")
  419. minio_bucket = minio_config.get("minio_bucket")
  420. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  421. text += replace_text
  422. image_num += 1
  423. split_lists = text.split(slice_value)
  424. text_lists = []
  425. for split_text in split_lists:
  426. r = len(split_text)//500
  427. if r >= 1:
  428. for i in range(r+1):
  429. t = split_text[i*500:(i+1)*500]
  430. if t:
  431. text_lists.append(t)
  432. else:
  433. text_lists.append(split_text)
  434. return text_lists, flag_img_info
  435. def file_split(self, file_content_list, doc_id):
  436. # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接
  437. separator_num = self.file_json.get("set_slice")
  438. set_table = self.file_json.get("set_table")
  439. # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
  440. # logger.info(f"文本切分字符:{separator}")
  441. if isinstance(file_content_list, str):
  442. file_text = file_content_list
  443. text_lists = self.split_text(file_text)
  444. return text_lists, {}
  445. elif separator_num == "0":
  446. # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个#
  447. text_lists, flag_img_info = self.split_by_title(file_content_list, set_table, doc_id)
  448. return text_lists, flag_img_info
  449. elif separator_num == "1":
  450. # 按照页面方式切分
  451. text_lists, flag_img_info = self.split_by_page(file_content_list, set_table, doc_id)
  452. return text_lists, flag_img_info
  453. elif separator_num == "2":
  454. # 按照问答对切分 针对exce文档,暂不实现
  455. return [], {}
  456. else:
  457. # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500
  458. slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
  459. text_lists, flag_img_info = self.split_by_self(file_content_list, set_table, slice_value, doc_id)
  460. return text_lists, flag_img_info
  461. def process_data_to_milvus_schema(self, text_lists, doc_id, name):
  462. """组织数据格式:
  463. {
  464. "content": text,
  465. "doc_id": doc_id,
  466. "chunk_id": chunk_id,
  467. "metadata": {"source": file_name},
  468. }
  469. """
  470. docs = []
  471. total_len = 0
  472. for i, text in enumerate(text_lists):
  473. chunk_id = str(uuid1())
  474. chunk_len = len(text)
  475. total_len += chunk_len
  476. d = {
  477. "content": text,
  478. "doc_id": doc_id,
  479. "chunk_id": chunk_id,
  480. "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len}
  481. }
  482. docs.append(d)
  483. return docs, total_len
  484. async def process_documents(self, file_json):
  485. user_id = file_json.get("userId", "")
  486. task_id = file_json.get("docs", "")[0].get("document_id")
  487. # ===== 注册任务到全局任务表 =====
  488. task_ctx = None
  489. if task_id:
  490. task_ctx = task_registry.register(task_id, user_id, self.knowledge_id)
  491. # 启动进度(独立线程)
  492. reporter = None
  493. if task_id:
  494. from rag.progress_reporter import ProgressReporter
  495. from config import progress_callback_config
  496. estimate_seconds = progress_callback_config.get("estimate_seconds", 120)
  497. callback_url = progress_callback_config.get("default_url")
  498. reporter = ProgressReporter(task_id, callback_url, estimate_seconds, user_id)
  499. reporter.start()
  500. # 关联 reporter 到任务上下文(用于取消时停止进度上报)
  501. if task_ctx:
  502. task_ctx.reporter = reporter
  503. # 初始化成功文档列表(在 try 外面定义,确保异常处理时可访问)
  504. success_doc = []
  505. try:
  506. # 文档下载
  507. separator_num = file_json.get("set_slice")
  508. if separator_num == "2":
  509. if reporter:
  510. reporter.complete(success=False)
  511. return {"code": 500, "message": "暂不支持解析"}
  512. docs = file_json.get("docs")
  513. flag = file_json.get("flag")
  514. for doc in docs:
  515. # ===== 检查点1:每个文档处理前检查取消标志 =====
  516. if task_ctx and task_ctx.is_cancelled:
  517. logger.info(f"任务 {task_id} 在文档处理前被取消")
  518. raise TaskCancelledException("任务已被用户取消")
  519. url = doc.get("url")
  520. name = doc.get("name")
  521. doc_id = doc.get("document_id")
  522. async with aiohttp.ClientSession() as session:
  523. down_file_name, file_size = await self.save_file_temp(session, url, name)
  524. # 只接收txt和pdf格式,其它格式统一转换成pdf,不支持格式在convert_to_pdf中判断
  525. if not os.path.exists(down_file_name):
  526. down_file_name, file_size = await self.save_file_temp(session, url, name)
  527. logger.info(f'{doc_id}文件下载失败,重试中~')
  528. logger.info(f'下载文件路径:{down_file_name}')
  529. if not name.endswith(".txt") and not name.endswith('.pdf'):
  530. down_file_name = convert_to_pdf(down_file_name)
  531. name = os.path.basename(down_file_name)
  532. file_parse = self._get_file_type(name)
  533. file_content_list = await file_parse.extract_text(down_file_name)
  534. logger.info(f"mineru解析的pdf数据:{file_content_list}")
  535. text_lists, flag_img_info = self.file_split(file_content_list, doc_id)
  536. docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name)
  537. logger.info(f"存储到milvus的文本数据:{docs}")
  538. # ===== 检查点2:解析完成、入库前检查取消标志 =====
  539. if task_ctx and task_ctx.is_cancelled:
  540. logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库")
  541. raise TaskCancelledException("任务已被用户取消")
  542. if flag == "upload":
  543. # 插入到milvus库中
  544. insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id)
  545. if insert_slice_flag:
  546. # 插入到mysql的slice info数据库中
  547. insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
  548. else:
  549. insert_img_flag = False
  550. parse_file_status = False
  551. if insert_img_flag:
  552. insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
  553. # 插入mysql中的bm_media_replacement表中
  554. else:
  555. # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  556. insert_milvus_flag = False
  557. # return resp
  558. parse_file_status = False
  559. if insert_milvus_flag:
  560. parse_file_status = True
  561. # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
  562. if task_ctx and task_ctx.is_cancelled:
  563. logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
  564. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  565. self.mysql_client.delete_to_slice(doc_id=doc_id)
  566. self.mysql_client.delete_image_url(doc_id=doc_id)
  567. raise TaskCancelledException("任务已被用户取消")
  568. else:
  569. self.mysql_client.delete_to_slice(doc_id=doc_id)
  570. # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  571. self.mysql_client.delete_image_url(doc_id=doc_id)
  572. # resp = {"code": 500, "message": insert_mysql_info}
  573. parse_file_status = False
  574. # return resp
  575. elif flag == "update": # 更新切片方式
  576. # 先把库中的数据删除
  577. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  578. self.mysql_client.delete_to_slice(doc_id=doc_id)
  579. insert_milvus_start_time = time.time()
  580. insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id)
  581. # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists)
  582. insert_milvus_end_time = time.time()
  583. logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}")
  584. if insert_slice_flag:
  585. # 插入到mysql的slice info数据库中
  586. insert_mysql_start_time = time.time()
  587. insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
  588. insert_mysql_end_time = time.time()
  589. logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}")
  590. else:
  591. # resp = {"code": 500, "message": insert_milvus_str}
  592. # return resp
  593. insert_milvus_flag = False
  594. parse_file_status = False
  595. if insert_milvus_flag:
  596. # resp = {"code": 200, "message": "切片修改成功"}
  597. parse_file_status = True
  598. # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
  599. if task_ctx and task_ctx.is_cancelled:
  600. logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
  601. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  602. self.mysql_client.delete_to_slice(doc_id=doc_id)
  603. raise TaskCancelledException("任务已被用户取消")
  604. else:
  605. self.mysql_client.delete_to_slice(doc_id=doc_id)
  606. # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  607. # resp = {"code":500, "message": insert_mysql_info}
  608. parse_file_status = False
  609. # return resp
  610. if parse_file_status:
  611. success_doc.append(doc_id)
  612. else:
  613. if flag == "upload":
  614. for del_id in success_doc:
  615. self.milvus_client._delete_by_doc_id(doc_id=del_id)
  616. self.mysql_client.delete_image_url(doc_id=del_id)
  617. self.mysql_client.delete_to_slice(doc_id=del_id)
  618. if reporter:
  619. reporter.complete(success=False)
  620. self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  621. return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
  622. self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"})
  623. # 任务完成:发送100%进度
  624. if reporter:
  625. reporter.complete(success=True)
  626. return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}}
  627. except TaskCancelledException as e:
  628. # ===== 处理任务取消异常 =====
  629. logger.info(f"任务被取消 [task_id={task_id}]: {e}")
  630. self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  631. # if task_id:
  632. # del_success, del_info = self.mysql_client.delete_document(task_id)
  633. # if del_success:
  634. # logger.info(f"已删除 bm_document 记录: task_id={task_id}")
  635. # else:
  636. # logger.warning(f"删除 bm_document 记录失败: {del_info}")
  637. # 清理之前已成功处理的文档数据(slice、image、milvus)
  638. for del_id in success_doc:
  639. logger.info(f"清理已入库的文档数据: doc_id={del_id}")
  640. self.milvus_client._delete_by_doc_id(doc_id=del_id)
  641. self.mysql_client.delete_image_url(doc_id=del_id)
  642. self.mysql_client.delete_to_slice(doc_id=del_id)
  643. if reporter:
  644. reporter.complete(success=False)
  645. return {"code": 499, "message": "任务已取消", "knowledge_id": self.knowledge_id, "doc_info": {}}
  646. except Exception as e:
  647. # 捕获所有异常,统一处理
  648. logger.error(f"文件处理异常 [task_id={task_id}]: {e}", exc_info=True)
  649. self.send_post_request_sync("http://10.10.10.2:8091/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  650. if reporter:
  651. reporter.complete(success=False)
  652. return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}}
  653. finally:
  654. # ===== 注销任务 =====
  655. if task_id:
  656. task_registry.unregister(task_id)