documents_process.py 112 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274
  1. import asyncio
  2. import aiohttp
  3. import aiofiles
  4. import requests
  5. from rag.db import MilvusOperate, MysqlOperate
  6. from rag.task_registry import task_registry, TaskCancelledException
  7. from rag.document_load.pdf_load import MinerUParsePdf, MinerUParsePdfClient
  8. # from rag.document_load.dots_pdf_load import DotsPDFLoader
  9. from rag.document_load.paddleocr_load import PaddleOCRLoader
  10. # from rag.document_load.office_load import MinerUParseOffice
  11. from rag.document_load.txt_load import TextLoad
  12. # from rag.document_load.image_load import MinerUParseImage
  13. from rag.document_load.excel_load import parse_excel
  14. from utils.upload_file_to_oss import UploadMinio
  15. from utils.get_logger import setup_logger
  16. from config import minio_config
  17. import os
  18. import time
  19. from uuid import uuid1
  20. from langchain_text_splitters import RecursiveCharacterTextSplitter
  21. from rag.document_load.document_format_conversion import *
  22. import re
  23. from typing import List, Dict
  24. from config import progress_callback_config
  25. from rag.document_load.md_splitter import MarkdownSplitter
  26. import sys
  27. sys.path.append("/opt/lightRAG_dir")
  28. from lightRAG import AsyncLightRAGManager
  29. class VLMRetryExhaustedError(RuntimeError):
  30. """VLM 多次重试失败"""
  31. pass
  32. # 使用 API 客户端模式
  33. pdf_parse = MinerUParsePdfClient()
  34. # 备用:直接调用模式
  35. # pdf_parse = MinerUParsePdf()
  36. # office_parse = MinerUParseOffice()
  37. text_parse = TextLoad()
  38. # image_parse = MinerUParseImage()
  39. logger = setup_logger(__name__)
  40. embedding_model_mapping_sequence_len = {
  41. "bge-m3": 5000,
  42. "Qwen3-Embedding": 30000,
  43. }
  44. class ProcessDocuments():
  45. def __init__(self, file_json):
  46. self.file_json = file_json
  47. self.knowledge_id = self.file_json.get("knowledge_id")
  48. self.parser_type = "mineru" # 默认解析器类型,将在 process_documents 中更新
  49. self.mysql_client = MysqlOperate()
  50. self.minio_client = UploadMinio()
  51. self.embedding_name = file_json.get("embedding_id", "e5")
  52. self.milvus_client = MilvusOperate(collection_name=self.knowledge_id, embedding_name=self.embedding_name)
  53. self.max_cut_len = embedding_model_mapping_sequence_len.get(self.embedding_name, 5000)
  54. # def _get_file_type(self, name):
  55. # if name.endswith(".txt"):
  56. # return text_parse
  57. # elif name.endswith('.pdf'):
  58. # return pdf_parse
  59. # elif name.endswith((".doc", ".docx", "ppt", "pptx")):
  60. # return office_parse
  61. # elif name.endswith((".jpg", "png", "jpeg")):
  62. # return image_parse
  63. # else:
  64. # raise "不支持的文件格式"
  65. def _get_image_base_path(self, pdf_file_name):
  66. """
  67. 根据解析器类型返回图片基础路径
  68. 返回:
  69. str: 图片基础路径
  70. """
  71. if self.parser_type == "dots":
  72. return "./tmp_file/dots_parsed/" + pdf_file_name + "/images"
  73. elif self.parser_type == "2":
  74. return "./tmp_file/paddleocr_parsed/" + pdf_file_name + "/imgs"
  75. else:
  76. # mineru 默认路径
  77. return "./tmp_file/" + pdf_file_name + "/vlm/images"
  78. # 只接收txt和pdf格式
  79. def _get_file_type(self, name, parser_type="mineru"):
  80. """
  81. 根据文件类型和解析器类型返回对应的解析器
  82. 参数:
  83. name: 文件名
  84. parser_type: 解析器类型,"mineru" 或 "dots"
  85. """
  86. if name.endswith(".txt"):
  87. return text_parse
  88. elif name.endswith('.pdf'):
  89. if parser_type == "dots":
  90. # Dots 解析器需要 file_json,动态创建
  91. # return DotsPDFLoader(self.file_json)
  92. return
  93. elif parser_type == "2":
  94. # PaddleOCR-VL 解析器
  95. return PaddleOCRLoader(self.file_json)
  96. else:
  97. # 默认使用 MinerU 解析器
  98. return pdf_parse
  99. async def send_get_request(self, url, headers=None):
  100. """
  101. 发送GET请求
  102. 参数:
  103. url: 请求的URL地址
  104. headers: 可选的请求头字典
  105. 返回:
  106. dict: 包含状态码、响应数据等信息
  107. """
  108. try:
  109. async with aiohttp.ClientSession() as session:
  110. async with session.get(url, headers=headers, ssl=False) as resp:
  111. status_code = resp.status
  112. response_data = await resp.text()
  113. return {
  114. "code": 200,
  115. "status_code": status_code,
  116. "data": response_data,
  117. "message": "GET请求成功"
  118. }
  119. except Exception as e:
  120. logger.error(f"GET请求失败 [url={url}]: {e}")
  121. return {
  122. "code": 500,
  123. "message": f"GET请求失败: {str(e)}"
  124. }
  125. async def send_post_request(self, url, json_data=None, headers=None):
  126. """
  127. 发送POST请求
  128. 参数:
  129. url: 请求的URL地址
  130. json_data: JSON格式的请求体数据(字典类型)
  131. headers: 可选的请求头字典
  132. 返回:
  133. dict: 包含状态码、响应数据等信息
  134. """
  135. try:
  136. # 设置默认的Content-Type为application/json
  137. if headers is None:
  138. headers = {}
  139. if 'Content-Type' not in headers:
  140. headers['Content-Type'] = 'application/json'
  141. async with aiohttp.ClientSession() as session:
  142. async with session.post(url, json=json_data, headers=headers, ssl=False) as resp:
  143. status_code = resp.status
  144. response_data = await resp.text()
  145. logger.info(f"POST请求成功 [url={url}]: {response_data} json_data={json_data}")
  146. return {
  147. "code": 200,
  148. "status_code": status_code,
  149. "data": response_data,
  150. "message": "POST请求成功"
  151. }
  152. except Exception as e:
  153. logger.error(f"POST请求失败 [url={url}]: {e}")
  154. return {
  155. "code": 500,
  156. "message": f"POST请求失败: {str(e)}"
  157. }
  158. def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30):
  159. """
  160. 同步发送POST请求
  161. 参数:
  162. url: 请求的URL地址
  163. json_data: JSON格式的请求体数据(字典类型)
  164. headers: 可选的请求头字典
  165. timeout: 请求超时时间(秒),默认30秒
  166. 返回:
  167. dict: 包含状态码、响应数据等信息
  168. """
  169. try:
  170. # 设置默认的Content-Type为application/json
  171. if headers is None:
  172. headers = {}
  173. if 'Content-Type' not in headers:
  174. headers['Content-Type'] = 'application/json'
  175. resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False)
  176. status_code = resp.status_code
  177. response_data = resp.text
  178. logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}")
  179. return {
  180. "code": 200,
  181. "status_code": status_code,
  182. "data": response_data,
  183. "message": "POST请求成功"
  184. }
  185. except requests.exceptions.Timeout as e:
  186. logger.error(f"同步POST请求超时 [url={url}]: {e}")
  187. return {
  188. "code": 500,
  189. "message": f"POST请求超时: {str(e)}"
  190. }
  191. except requests.exceptions.RequestException as e:
  192. logger.error(f"同步POST请求失败 [url={url}]: {e}")
  193. return {
  194. "code": 500,
  195. "message": f"POST请求失败: {str(e)}"
  196. }
  197. async def save_file_temp(self, session, url, name, max_retries=5):
  198. down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
  199. # down_file_path = "./tmp_file"
  200. os.makedirs(down_file_path, exist_ok=True)
  201. down_file_name = down_file_path + f"/{name}"
  202. # if os.path.exists(down_file_name):
  203. # pass
  204. # else:
  205. headers = {
  206. "User-Agent": "Mozilla/5.0",
  207. "Accept": "*/*"
  208. }
  209. for i in range(max_retries):
  210. try:
  211. async with session.get(url, ssl=False, headers=headers) as resp:
  212. resp.raise_for_status()
  213. content_length = resp.headers.get('Content-Length')
  214. if content_length:
  215. file_size = int(content_length)
  216. else:
  217. file_size = 0
  218. async with aiofiles.open(down_file_name, 'wb') as f:
  219. async for chunk in resp.content.iter_chunked(1024):
  220. await f.write(chunk)
  221. return down_file_name, file_size
  222. except Exception as e:
  223. logger.info(f"文件下载失败:{e}")
  224. logger.info(f"准备重试:{i + 1}/{max_retries}")
  225. def file_split_by_len(self, file_text):
  226. split_map = {
  227. "0": ["#"], # 按标题段落切片
  228. "1": ["<page>"], # 按页切片
  229. "2": ["\n"] # 按问答对
  230. }
  231. separator_num = self.file_json.get("customSeparator")
  232. slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
  233. separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
  234. logger.info(f"文本切分字符:{separator}")
  235. text_split = RecursiveCharacterTextSplitter(
  236. separators=separator,
  237. chunk_size=500,
  238. chunk_overlap=40,
  239. length_function=len
  240. )
  241. texts = text_split.split_text(file_text)
  242. return texts
  243. def split_text(self, file_text):
  244. text_split = RecursiveCharacterTextSplitter(
  245. separators=["\n\n", "\n"],
  246. chunk_size=500,
  247. chunk_overlap=40,
  248. length_function=len
  249. )
  250. texts = text_split.split_text(file_text)
  251. return texts
  252. def split_into_atoms(self, text: str) -> List[Dict]:
  253. """
  254. 将文本拆为不可拆分的结构原子
  255. """
  256. TABLE_PATTERN = re.compile(r"<table[\s\S]*?</table>", re.IGNORECASE)
  257. PLACEHOLDER_PATTERN = re.compile(r"【[^】]+】")
  258. atoms = []
  259. idx = 0
  260. matches = []
  261. for m in TABLE_PATTERN.finditer(text):
  262. matches.append((m.start(), m.end(), "table"))
  263. for m in PLACEHOLDER_PATTERN.finditer(text):
  264. matches.append((m.start(), m.end(), "placeholder"))
  265. matches.sort(key=lambda x: x[0])
  266. for start, end, typ in matches:
  267. if idx < start:
  268. atoms.append({
  269. "type": "text",
  270. "content": text[idx:start]
  271. })
  272. atoms.append({
  273. "type": typ,
  274. "content": text[start:end]
  275. })
  276. idx = end
  277. if idx < len(text):
  278. atoms.append({
  279. "type": "text",
  280. "content": text[idx:]
  281. })
  282. return atoms
  283. # def calc_overlap_atoms(self, prev_atoms, overlap_ratio=0.1):
  284. # total_len = sum(len(a["content"]) for a in prev_atoms)
  285. # target_len = int(total_len * overlap_ratio)
  286. # overlap_atoms = []
  287. # cur_len = 0
  288. # # 从后往前收集原子
  289. # for atom in reversed(prev_atoms):
  290. # overlap_atoms.insert(0, atom)
  291. # cur_len += len(atom["content"])
  292. # if cur_len >= target_len:
  293. # break
  294. # return overlap_atoms
  295. def calc_overlap_atoms(self, prev_atoms, overlap_ratio=0.1):
  296. total_len = sum(len(a["content"]) if a["type"] == "text" else 0 for a in prev_atoms)
  297. target_len = int(total_len * overlap_ratio)
  298. overlap_atoms = []
  299. cur_len = 0
  300. # 从后往前收集原子
  301. for atom in reversed(prev_atoms):
  302. if atom["type"] == "text":
  303. cur_len += len(atom["content"])
  304. if cur_len > target_len:
  305. atom["content"] = atom["content"][-(target_len - (cur_len - len(atom["content"]))):]
  306. overlap_atoms.insert(0, atom)
  307. else:
  308. overlap_atoms.insert(0, atom)
  309. if cur_len >= target_len:
  310. break
  311. return overlap_atoms
  312. def overlap_chunks(self, chunks, overlap_ratio=0.1):
  313. """
  314. chunks: 原始 chunk 列表
  315. """
  316. new_chunks = []
  317. prev_atoms = None
  318. for chunk in chunks:
  319. atoms = self.split_into_atoms(chunk)
  320. if prev_atoms:
  321. overlap_atoms = self.calc_overlap_atoms(prev_atoms, overlap_ratio)
  322. merged_atoms = overlap_atoms + atoms
  323. else:
  324. merged_atoms = atoms
  325. # new_chunk = chunk.copy()
  326. new_chunk = "".join(a["content"] for a in merged_atoms)
  327. new_chunks.append(new_chunk)
  328. prev_atoms = atoms
  329. return new_chunks
  330. def split_large_atom(self, atom: Dict, max_chunk_size: int) -> List[Dict]:
  331. """
  332. 切分超长的单个原子(表格、占位符、文本)
  333. 参数:
  334. atom: 原子字典 {"type": "table/placeholder/text", "content": "..."}
  335. max_chunk_size: 最大字符数限制
  336. 返回:
  337. List[Dict]: 切分后的原子列表
  338. """
  339. content = atom["content"]
  340. atom_type = atom["type"]
  341. # 如果不超长,直接返回
  342. if len(content) <= max_chunk_size:
  343. return [atom]
  344. logger.warning(f"检测到超长原子 [type={atom_type}, length={len(content)}],进行切分")
  345. result_atoms = []
  346. if atom_type == "table":
  347. # 表格按行切分
  348. # 提取表格标签
  349. table_match = re.match(r'(<table[^>]*>)(.*)(</table>)', content, re.DOTALL | re.IGNORECASE)
  350. if not table_match:
  351. # 无法解析表格结构,按字符强制切分
  352. logger.warning("无法解析表格结构,按字符强制切分")
  353. for i in range(0, len(content), max_chunk_size):
  354. chunk = content[i:i + max_chunk_size]
  355. result_atoms.append({"type": atom_type, "content": chunk})
  356. return result_atoms
  357. table_open = table_match.group(1)
  358. table_body = table_match.group(2)
  359. table_close = table_match.group(3)
  360. # 按 <tr> 行切分
  361. rows = re.findall(r'<tr[^>]*>.*?</tr>', table_body, re.DOTALL | re.IGNORECASE)
  362. if not rows:
  363. # 没有找到行,按字符强制切分
  364. for i in range(0, len(content), max_chunk_size):
  365. chunk = content[i:i + max_chunk_size]
  366. result_atoms.append({"type": atom_type, "content": chunk})
  367. return result_atoms
  368. # 组装表格块
  369. current_chunk = table_open
  370. for row in rows:
  371. # 检查加入这一行后是否超长
  372. if len(current_chunk) + len(row) + len(table_close) > max_chunk_size:
  373. # 当前块已满,保存并开始新块
  374. if current_chunk != table_open: # 确保不是空表格
  375. result_atoms.append({"type": atom_type, "content": current_chunk + table_close})
  376. current_chunk = table_open
  377. else:
  378. # 单行就超长,强制加入
  379. result_atoms.append({"type": atom_type, "content": current_chunk + row + table_close})
  380. current_chunk = table_open
  381. continue
  382. current_chunk += row
  383. # 添加最后一个块
  384. if current_chunk != table_open:
  385. result_atoms.append({"type": atom_type, "content": current_chunk + table_close})
  386. elif atom_type == "placeholder":
  387. # 占位符按字符切分(通常占位符不应该超长,但以防万一)
  388. logger.warning(f"占位符超长 [length={len(content)}],按字符切分")
  389. for i in range(0, len(content), max_chunk_size):
  390. chunk = content[i:i + max_chunk_size]
  391. result_atoms.append({"type": atom_type, "content": chunk})
  392. else: # text
  393. # 普通文本按字符切分
  394. for i in range(0, len(content), max_chunk_size):
  395. chunk = content[i:i + max_chunk_size]
  396. result_atoms.append({"type": atom_type, "content": chunk})
  397. logger.info(f"超长原子切分完成 [原长度={len(content)}, 切分为{len(result_atoms)}个块]")
  398. return result_atoms
  399. def smart_chunk_with_overlap(self, text: str, max_chunk_size: int = 5000, overlap_ratio: float = 0.1) -> List[str]:
  400. """
  401. 智能分割长文本并添加重叠
  402. 参数:
  403. text: 输入的长文本字符串
  404. max_chunk_size: 每块的最大字符数(所有内容),默认5000
  405. overlap_ratio: 重叠率,默认0.1(10%)
  406. 返回:
  407. List[str]: 分割后带重叠的文本块列表
  408. 处理流程:
  409. 1. 如果文本不超长,直接返回
  410. 2. 将文本拆分为原子(表格、占位符、普通文本)
  411. 3. 对超长原子进行切分
  412. 4. 按照max_chunk_size将原子组装成块(计算所有内容的总长度)
  413. 5. 对所有块添加重叠率处理(重叠时只计算纯文本长度)
  414. """
  415. if len(text) <= max_chunk_size:
  416. return [text]
  417. # 步骤1:将文本拆分为原子
  418. atoms = self.split_into_atoms(text)
  419. # 步骤2:对超长原子进行切分
  420. processed_atoms = []
  421. for atom in atoms:
  422. if len(atom["content"]) > max_chunk_size:
  423. # 超长原子需要切分
  424. split_atoms = self.split_large_atom(atom, max_chunk_size)
  425. processed_atoms.extend(split_atoms)
  426. else:
  427. processed_atoms.append(atom)
  428. # 步骤3:按照max_chunk_size组装原子成块(计算所有内容的总长度)
  429. chunks = []
  430. current_chunk_atoms = []
  431. current_total_len = 0 # 计算所有内容的总长度
  432. for atom in processed_atoms:
  433. atom_len = len(atom["content"]) # 所有类型都计算长度
  434. # 检查是否需要开始新块
  435. if current_total_len > 0 and current_total_len + atom_len > max_chunk_size:
  436. # 当前块已满,保存并开始新块
  437. chunk_text = "".join(a["content"] for a in current_chunk_atoms)
  438. chunks.append(chunk_text)
  439. current_chunk_atoms = []
  440. current_total_len = 0
  441. # 添加原子到当前块
  442. current_chunk_atoms.append(atom)
  443. current_total_len += atom_len
  444. # 添加最后一个块
  445. if current_chunk_atoms:
  446. chunk_text = "".join(a["content"] for a in current_chunk_atoms)
  447. chunks.append(chunk_text)
  448. # 步骤4:对所有块添加重叠率处理(重叠时只计算纯文本长度)
  449. if len(chunks) <= 1:
  450. return chunks
  451. return self.overlap_chunks(chunks, overlap_ratio)
  452. def chunk_text_for_rag(self, text: str, max_chars: int = 5000) -> List[str]:
  453. """
  454. 分割长文本(兼容旧接口,内部调用新方法)
  455. 参数:
  456. text: 输入的长文本字符串
  457. max_chars: 每块的最大字符数,默认5000
  458. 返回:
  459. List[str]: 分割后的文本块列表
  460. """
  461. return self.smart_chunk_with_overlap(text, max_chunk_size=max_chars, overlap_ratio=0.1)
  462. def find_split_by_table(self, chunk: str):
  463. """
  464. 在块中查找 </table> 标签作为分割点
  465. 优先在完整表格边界处分割,避免截断表格
  466. 返回:
  467. split_position: 分割位置(</table>标签之后),如果没找到返回 None
  468. """
  469. # 从后向前查找最后一个完整的 </table> 标签
  470. close_tag = '</table>'
  471. last_close_pos = chunk.rfind(close_tag)
  472. if last_close_pos == -1:
  473. return None
  474. # 分割点在 </table> 之后
  475. split_pos = last_close_pos + len(close_tag)
  476. # 确保分割点不是在块的最开始或最末尾(避免空块)
  477. if split_pos < 100 or split_pos >= len(chunk) - 100:
  478. return None
  479. return split_pos
  480. def find_split_point(self, chunk: str, full_text: str, chunk_start: int):
  481. """
  482. 在块中查找最近的x.x标签作为分割点
  483. 返回:
  484. (split_position, overlap_content): 分割位置和重叠内容
  485. """
  486. # 匹配x.x格式的标签(如1.1, 2.3, 10.5等)
  487. pattern = r'\d+\.\d+'
  488. # pattern = r'\b\d+\.\d+\b'
  489. matches = list(re.finditer(pattern, chunk))
  490. if not matches:
  491. return None, None
  492. # 找最后一个匹配的标签
  493. last_match = matches[-1]
  494. tag_start = last_match.start()
  495. # 提取该标签的完整内容(从标签到下一个标签或块尾)
  496. tag_content_start = chunk_start + tag_start
  497. # 在全文中查找下一个x.x标签
  498. remaining_text = full_text[tag_content_start:]
  499. next_tag_match = re.search(pattern, remaining_text[len(last_match.group()):])
  500. if next_tag_match:
  501. tag_content_end = tag_content_start + len(last_match.group()) + next_tag_match.start()
  502. else:
  503. tag_content_end = len(full_text)
  504. overlap_content = full_text[tag_content_start:tag_content_end]
  505. # 检查重叠内容长度
  506. if len(overlap_content) > self.max_cut_len:
  507. # 重叠内容本身超过限制,返回None使用句号分割
  508. return None, None
  509. return tag_start, overlap_content
  510. def find_split_by_period(self, chunk: str, full_text: str, chunk_start: int):
  511. """
  512. 按句号"。"查找分割点
  513. 返回:
  514. (split_position, overlap_content): 分割位置和重叠内容
  515. """
  516. # 从后向前查找句号
  517. last_period = chunk.rfind('。')
  518. if last_period == -1:
  519. return None, None
  520. # 分割点是句号之后
  521. split_pos = last_period + 1
  522. # 查找这个句子的开始位置作为重叠内容
  523. # 向前查找上一个句号
  524. prev_period = chunk.rfind('。', 0, last_period)
  525. if prev_period != -1:
  526. overlap_start = prev_period + 1
  527. overlap_content = chunk[overlap_start:split_pos]
  528. else:
  529. # 如果前面没有句号,则从块开始到当前句号
  530. overlap_content = chunk[:split_pos]
  531. # 检查重叠内容长度
  532. if len(overlap_content) > self.max_cut_len:
  533. overlap_content = ""
  534. return split_pos, overlap_content
  535. def process_table_images(self, table_body, doc_id, pdf_file_name, image_num, flag_img_info):
  536. """
  537. 处理表格中的<img>标签,提取图片并替换为占位符
  538. 参数:
  539. table_body: 表格HTML内容
  540. doc_id: 文档ID
  541. pdf_file_name: PDF文件名
  542. image_num: 当前图片序号
  543. flag_img_info: 图片信息字典
  544. 返回:
  545. tuple: (处理后的表格HTML, 更新后的图片序号)
  546. """
  547. if not table_body:
  548. return table_body, image_num
  549. # 匹配表格中的<img>标签,提取src属性
  550. img_pattern = re.compile(r'<img\s+[^>]*src=["\']([^"\']+)["\'][^>]*>', re.IGNORECASE)
  551. def replace_img(match):
  552. nonlocal image_num
  553. img_src = match.group(1)
  554. try:
  555. # 提取图片文件名
  556. image_name = img_src.split("/")[-1]
  557. # 使用_get_image_base_path拼接完整路径
  558. image_base_path = self._get_image_base_path(pdf_file_name)
  559. save_image_path = f"{image_base_path}/{image_name}"
  560. # 检查图片文件是否存在
  561. if not os.path.exists(save_image_path):
  562. logger.warning(f"表格中的图片文件不存在: {save_image_path}")
  563. return match.group(0) # 保持原样
  564. # 生成占位符
  565. # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  566. minio_url = minio_config.get("minio_url")
  567. minio_bucket = minio_config.get("minio_bucket")
  568. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{image_name}.jpg"
  569. full_img_url = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  570. original_img_tag = match.group(0)
  571. replace_text = re.sub(
  572. r'src=["\']([^"\']+)["\']',
  573. f'src="{full_img_url}"',
  574. original_img_tag,
  575. flags=re.IGNORECASE
  576. )
  577. # 上传到MinIO
  578. self.minio_client.upload_file(save_image_path, minio_file_path)
  579. # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  580. image_num += 1
  581. return replace_text
  582. except Exception as e:
  583. logger.error(f"处理表格中的图片失败 [src={img_src}]: {e}")
  584. return match.group(0) # 保持原样
  585. # 替换所有<img>标签
  586. processed_table = img_pattern.sub(replace_img, table_body)
  587. return processed_table, image_num
  588. def process_table_content(self, table_caption, table_body, max_table_chars=8000):
  589. """
  590. 处理表格内容,防止HTML表格过大导致数据库字段溢出
  591. 参数:
  592. table_caption: 表格标题列表
  593. table_body: 表格HTML内容
  594. max_table_chars: 表格内容最大字符数限制
  595. 返回:
  596. str: 处理后的表格文本
  597. """
  598. if not table_body:
  599. return ""
  600. # 组合标题和内容
  601. caption_text = " ".join(table_caption) if table_caption else ""
  602. full_table_text = f"{caption_text}\n{table_body}\n" if caption_text else f"{table_body}\n"
  603. # 如果表格内容不超过限制,直接返回
  604. if len(full_table_text) <= max_table_chars:
  605. return full_table_text
  606. logger.warning(f"表格内容过长({len(full_table_text)}字符),进行截断处理")
  607. # 尝试智能截断:优先保留表格结构
  608. if "</table>" in table_body:
  609. # 查找完整的表格标签
  610. table_start = table_body.find("<table")
  611. if table_start != -1:
  612. # 从表格开始位置截断,确保保留表格结构
  613. available_chars = max_table_chars - len(caption_text) - 2 # 减去标题和换行符
  614. if available_chars > 100: # 确保有足够空间
  615. truncated_body = table_body[:available_chars]
  616. # 尝试在最后一个完整的行结束处截断
  617. last_tr_end = truncated_body.rfind("</tr>")
  618. if last_tr_end != -1 and last_tr_end > available_chars // 2:
  619. truncated_body = truncated_body[:last_tr_end + 5] # 包含</tr>
  620. # 确保表格标签闭合
  621. if "<table" in truncated_body and "</table>" not in truncated_body:
  622. truncated_body += "</table>"
  623. result = f"{caption_text}\n{truncated_body}\n[表格内容已截断]\n" if caption_text else f"{truncated_body}\n[表格内容已截断]\n"
  624. return result
  625. # 如果无法智能截断,进行简单截断
  626. max_content_chars = max_table_chars - len("[内容已截断]\n")
  627. if caption_text:
  628. max_content_chars -= len(caption_text) + 1 # 减去标题和换行符
  629. truncated_content = table_body[:max_content_chars] if max_content_chars > 0 else ""
  630. return f"{caption_text}\n{truncated_content}[内容已截断]\n"
  631. else:
  632. truncated_content = table_body[:max_content_chars] if max_content_chars > 0 else ""
  633. return f"{truncated_content}[内容已截断]\n"
  634. # 0
  635. def split_by_title(self, file_content_list, set_table, doc_id, pdf_file_name):
  636. # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据
  637. stop_title_list = ['前言', '目录', '目次', 'context', 'CONTEXT', 'content', 'CONTENT', 'contents', 'CONTENTS']
  638. text_lists = []
  639. bbox_list = []
  640. page_list = []
  641. text = ""
  642. first_bbox = None
  643. first_page = None
  644. image_num = 1
  645. flag_img_info = {}
  646. first_level_1 = True
  647. level_1_text = ""
  648. level_2_text = ""
  649. for i, content_dict in enumerate(file_content_list):
  650. # 记录当前块的首bbox和首page
  651. if first_bbox is None and content_dict.get("bbox"):
  652. first_bbox = content_dict.get("bbox")
  653. first_page = content_dict.get("page_idx", 0) + 1
  654. text_type = content_dict.get("type")
  655. content_text = content_dict.get("text")
  656. if text_type == "text":
  657. text_level = content_dict.get("text_level", "")
  658. if text_level == 1:
  659. if not level_1_text and first_level_1:
  660. if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text):
  661. if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text).group(1).strip() not in stop_title_list:
  662. level_1_text = f"# {content_text}\n"
  663. # first_level_1 = True
  664. if first_level_1:
  665. text += f"# {content_text}\n"
  666. first_level_1 = False
  667. else:
  668. if len(text) > self.max_cut_len:
  669. logger.info(f"块长度过大:{len(text)}")
  670. text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  671. logger.info(f"切分后的块:{text_chunks}")
  672. text_lists.extend(text_chunks)
  673. bbox_list.extend([first_bbox] * len(text_chunks))
  674. page_list.extend([first_page] * len(text_chunks))
  675. else:
  676. text_lists.append(text)
  677. bbox_list.append(first_bbox)
  678. page_list.append(first_page)
  679. text = f"# {content_text}\n"
  680. # 屏蔽常见干扰标题
  681. if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text):
  682. if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text).group(1).strip() not in stop_title_list:
  683. level_1_text = f"# {content_text}\n"
  684. else:
  685. level_1_text = ""
  686. level_2_text = ""
  687. first_bbox = content_dict.get("bbox")
  688. first_page = content_dict.get("page_idx", 0) + 1
  689. elif text_level == 2:
  690. if not level_2_text:
  691. text += f"## {content_text}\n"
  692. level_2_text = f"## {content_text}\n"
  693. else:
  694. if len(text) > self.max_cut_len:
  695. logger.info(f"块长度过大:{len(text)}")
  696. text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  697. logger.info(f"切分后的块:{text_chunks}")
  698. text_lists.extend(text_chunks)
  699. bbox_list.extend([first_bbox] * len(text_chunks))
  700. page_list.extend([first_page] * len(text_chunks))
  701. else:
  702. text_lists.append(text)
  703. bbox_list.append(first_bbox)
  704. page_list.append(first_page)
  705. text = level_1_text + f"## {content_text}\n"
  706. first_bbox = content_dict.get("bbox")
  707. first_page = content_dict.get("page_idx", 0) + 1
  708. else:
  709. if text_level:
  710. text += text_level*"#" + " " + content_text + "\n"
  711. else:
  712. text += content_text
  713. elif text_type == "table" and set_table == "1":
  714. table_body = content_dict.get("table_body", "")
  715. # 先处理表格中的图片
  716. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  717. # 再处理表格内容
  718. table_text = self.process_table_content(
  719. content_dict.get("table_caption", []),
  720. table_body
  721. )
  722. text += table_text
  723. elif text_type in ("image", "table"):
  724. # image_path = content_dict.get("img_path")
  725. # if not image_path:
  726. # continue
  727. # image_name = image_path.split("/")[1]
  728. # # 根据解析器类型选择图片路径
  729. # image_base_path = self._get_image_base_path(pdf_file_name)
  730. # save_image_path = f"{image_base_path}/{image_name}"
  731. # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  732. # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  733. # self.minio_client.upload_file(save_image_path, minio_file_path)
  734. # minio_url = minio_config.get("minio_url")
  735. # minio_bucket = minio_config.get("minio_bucket")
  736. # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  737. # if text_type == "table":
  738. # text += " ".join(content_dict.get("table_caption")) + '\n' + content_dict.get("table_body", "") + '\n'
  739. # text += replace_text
  740. # image_num += 1
  741. try:
  742. image_path = content_dict.get("img_path")
  743. replace_text = ""
  744. if not image_path and text_type == "image":
  745. continue
  746. elif image_path:
  747. image_name = image_path.split("/")[-1]
  748. image_base_path = self._get_image_base_path(pdf_file_name)
  749. save_image_path = f"{image_base_path}/{image_name}"
  750. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  751. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  752. self.minio_client.upload_file(save_image_path, minio_file_path)
  753. minio_url = minio_config.get("minio_url")
  754. minio_bucket = minio_config.get("minio_bucket")
  755. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  756. if text_type == "image":
  757. image_head = content_dict.get("image_caption", "")
  758. image_tail = content_dict.get("image_footnote", "")
  759. replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
  760. if text_type == "table":
  761. image_num += 1
  762. table_body = content_dict.get("table_body", "")
  763. # 先处理表格中的图片
  764. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  765. # 再处理表格内容
  766. table_text = self.process_table_content(
  767. content_dict.get("table_caption", []),
  768. table_body
  769. )
  770. # table_text = self.process_table_content(
  771. # content_dict.get("table_caption", []),
  772. # content_dict.get("table_body", "")
  773. # )
  774. text += table_text
  775. if replace_text:
  776. text += replace_text
  777. image_num += 1
  778. except Exception as e:
  779. logger.error(f"处理图片或表格失败: {e}")
  780. if i+1 == len(file_content_list):
  781. if len(text) > self.max_cut_len:
  782. logger.info(f"块长度过大(最后一个):{len(text)}")
  783. text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  784. logger.info(f"切分后的块(最后一个):{text_chunks}")
  785. text_lists.extend(text_chunks)
  786. bbox_list.extend([first_bbox] * len(text_chunks))
  787. page_list.extend([first_page] * len(text_chunks))
  788. else:
  789. text_lists.append(text)
  790. bbox_list.append(first_bbox)
  791. page_list.append(first_page)
  792. elif text_type == "list":
  793. list_items = content_dict.get("list_items")
  794. if list_items:
  795. text += "\n".join(list_items) + "\n"
  796. return text_lists, flag_img_info, bbox_list, page_list
  797. # 1
  798. def split_by_page(self, file_content_list, set_table, doc_id, pdf_file_name):
  799. # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据
  800. text_lists = []
  801. bbox_list = []
  802. page_list = []
  803. current_page = ""
  804. text = ""
  805. first_bbox = None
  806. first_page = None
  807. image_num = 1
  808. flag_img_info = {}
  809. for i,content_dict in enumerate(file_content_list):
  810. page_index = content_dict.get("page_idx")
  811. if i == 0:
  812. current_page = page_index
  813. first_bbox = content_dict.get("bbox")
  814. first_page = page_index + 1 if page_index is not None else None
  815. elif page_index != current_page:
  816. if len(text) > self.max_cut_len:
  817. text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  818. text_lists.extend(text_chunks)
  819. bbox_list.extend([first_bbox] * len(text_chunks))
  820. page_list.extend([first_page] * len(text_chunks))
  821. else:
  822. text_lists.append(text)
  823. bbox_list.append(first_bbox)
  824. page_list.append(first_page)
  825. text = ""
  826. current_page = page_index
  827. first_bbox = content_dict.get("bbox")
  828. first_page = page_index + 1 if page_index is not None else None
  829. text_type = content_dict.get("type")
  830. if text_type == "text":
  831. content_text = content_dict.get("text")
  832. text_level = content_dict.get("text_level")
  833. if text_level:
  834. text += "#" * text_level + " " + content_text
  835. else:
  836. text += content_text
  837. elif text_type == "table" and set_table == "1":
  838. table_body = content_dict.get("table_body", "")
  839. # 先处理表格中的图片
  840. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  841. # 再处理表格内容
  842. table_text = self.process_table_content(
  843. content_dict.get("table_caption", []),
  844. table_body
  845. )
  846. text += table_text
  847. elif text_type in ("image", "table"):
  848. # image_path = content_dict.get("img_path")
  849. # if not image_path:
  850. # continue
  851. # image_name = image_path.split("/")[1]
  852. # # 根据解析器类型选择图片路径
  853. # image_base_path = self._get_image_base_path(pdf_file_name)
  854. # save_image_path = f"{image_base_path}/{image_name}"
  855. # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  856. # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  857. # self.minio_client.upload_file(save_image_path, minio_file_path)
  858. # minio_url = minio_config.get("minio_url")
  859. # minio_bucket = minio_config.get("minio_bucket")
  860. # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  861. # if text_type == "table":
  862. # text += " ".join(content_dict.get("table_caption", [])) + '\n' + content_dict.get("table_body", "") + '\n'
  863. # text += replace_text
  864. # image_num += 1
  865. try:
  866. image_path = content_dict.get("img_path")
  867. replace_text = ""
  868. if not image_path and text_type == "image":
  869. continue
  870. elif image_path:
  871. image_name = image_path.split("/")[-1]
  872. image_base_path = self._get_image_base_path(pdf_file_name)
  873. save_image_path = f"{image_base_path}/{image_name}"
  874. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  875. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  876. self.minio_client.upload_file(save_image_path, minio_file_path)
  877. minio_url = minio_config.get("minio_url")
  878. minio_bucket = minio_config.get("minio_bucket")
  879. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  880. if text_type == "image":
  881. image_head = content_dict.get("image_caption", "")
  882. image_tail = content_dict.get("image_footnote", "")
  883. replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
  884. if text_type == "table":
  885. image_num += 1
  886. table_body = content_dict.get("table_body", "")
  887. # 先处理表格中的图片
  888. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  889. # 再处理表格内容
  890. table_text = self.process_table_content(
  891. content_dict.get("table_caption", []),
  892. table_body
  893. )
  894. text += table_text
  895. if replace_text:
  896. text += replace_text
  897. image_num += 1
  898. except Exception as e:
  899. logger.error(f"处理图片或表格失败: {e}")
  900. if i+1 == len(file_content_list):
  901. if len(text) > self.max_cut_len:
  902. text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  903. text_lists.extend(text_chunks)
  904. bbox_list.extend([first_bbox] * len(text_chunks))
  905. page_list.extend([first_page] * len(text_chunks))
  906. else:
  907. text_lists.append(text)
  908. bbox_list.append(first_bbox)
  909. page_list.append(first_page)
  910. return self.overlap_chunks(text_lists), flag_img_info, bbox_list, page_list
  911. # 其它
  912. def split_by_self(self, file_content_list, set_table, slice_value, doc_id, pdf_file_name):
  913. # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断
  914. logger.info(f"自定义的分隔符:{slice_value}")
  915. text = ""
  916. first_bbox = None
  917. first_page = None
  918. image_num = 1
  919. flag_img_info = {}
  920. for i, content_dict in enumerate(file_content_list):
  921. # 记录首个bbox和page
  922. if first_bbox is None and content_dict.get("bbox"):
  923. first_bbox = content_dict.get("bbox")
  924. first_page = content_dict.get("page_idx", 0) + 1
  925. text_type = content_dict.get("type")
  926. if text_type == "text":
  927. content_text = content_dict.get("text")
  928. text_level = content_dict.get("text_level")
  929. if text_level:
  930. text += "#" * text_level + " " + content_text
  931. else:
  932. text += content_text
  933. elif text_type == "table" and set_table == "1":
  934. table_body = content_dict.get("table_body", "")
  935. # 先处理表格中的图片
  936. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  937. # 再处理表格内容
  938. table_text = self.process_table_content(
  939. content_dict.get("table_caption", []),
  940. table_body
  941. )
  942. text += table_text
  943. elif text_type in ("image", "table"):
  944. # image_path = content_dict.get("img_path")
  945. # if not image_path:
  946. # continue
  947. # image_name = image_path.split("/")[1]
  948. # # 根据解析器类型选择图片路径
  949. # image_base_path = self._get_image_base_path(pdf_file_name)
  950. # save_image_path = f"{image_base_path}/{image_name}"
  951. # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  952. # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  953. # self.minio_client.upload_file(save_image_path, minio_file_path)
  954. # minio_url = minio_config.get("minio_url")
  955. # minio_bucket = minio_config.get("minio_bucket")
  956. # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  957. # if text_type == "table":
  958. # text += " ".join(content_dict.get("table_caption", [])) + '\n' + content_dict.get("table_body", "") + '\n'
  959. # text += replace_text
  960. # image_num += 1
  961. try:
  962. image_path = content_dict.get("img_path")
  963. replace_text = ""
  964. if not image_path and text_type == "image":
  965. continue
  966. elif image_path:
  967. image_name = image_path.split("/")[-1]
  968. image_base_path = self._get_image_base_path(pdf_file_name)
  969. save_image_path = f"{image_base_path}/{image_name}"
  970. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  971. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  972. self.minio_client.upload_file(save_image_path, minio_file_path)
  973. minio_url = minio_config.get("minio_url")
  974. minio_bucket = minio_config.get("minio_bucket")
  975. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  976. if text_type == "image":
  977. image_head = content_dict.get("image_caption", "")
  978. image_tail = content_dict.get("image_footnote", "")
  979. replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
  980. if text_type == "table":
  981. image_num += 1
  982. table_body = content_dict.get("table_body", "")
  983. # 先处理表格中的图片
  984. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  985. # 再处理表格内容
  986. table_text = self.process_table_content(
  987. content_dict.get("table_caption", []),
  988. table_body
  989. )
  990. text += table_text
  991. if replace_text:
  992. text += replace_text
  993. image_num += 1
  994. except Exception as e:
  995. logger.error(f"处理图片或表格失败: {e}")
  996. split_lists = text.split(slice_value)
  997. text_lists = []
  998. for split_text in split_lists:
  999. # r = len(split_text)//500
  1000. # if r >= 1:
  1001. # for i in range(r+1):
  1002. # t = split_text[i*500:(i+1)*500]
  1003. # if t:
  1004. # text_lists.append(t)
  1005. if not split_text:
  1006. continue
  1007. if len(split_text) > self.max_cut_len:
  1008. text_chunks = self.smart_chunk_with_overlap(split_text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  1009. text_lists.extend(text_chunks)
  1010. else:
  1011. text_lists.append(split_text)
  1012. # 所有块使用首个bbox和page
  1013. bbox_list = [first_bbox] * len(text_lists)
  1014. page_list = [first_page] * len(text_lists)
  1015. return text_lists, flag_img_info, bbox_list, page_list
  1016. # 3
  1017. def split_by_min_paragraph(self, file_content_list, set_table, doc_id, pdf_file_name):
  1018. """
  1019. 按最小段落切分:每个标题及其下面的非标题内容切为一个块
  1020. 特殊处理:1级标题(text_level=1)合并到后面非1级标题的块中,连续1级标题也合并
  1021. 构建 title_dict,每个块包含完整的父标题链内容
  1022. 数字结构标题(如5.1.1)的父级链查找
  1023. """
  1024. text_lists = []
  1025. bbox_list = []
  1026. page_list = []
  1027. text = ""
  1028. first_bbox = None
  1029. first_page = None
  1030. # pending_level1 = "" # 累积的1级标题文本
  1031. pending_bbox = None # 累积1级标题时的首个bbox
  1032. pending_page = None # 累积1级标题时的首个page
  1033. image_num = 1
  1034. flag_img_info = {}
  1035. path_title_list = []
  1036. father_path_title_list = []
  1037. title_dict = {} # 存储 title_path -> 标题内容的映射
  1038. number_title_dict = {} # 存储 数字编号 -> title_path 的映射
  1039. coln_log = False
  1040. coln_log_level = 0
  1041. for i, content_dict in enumerate(file_content_list):
  1042. text_type = content_dict.get("type")
  1043. content_text = content_dict.get("text", "")
  1044. if text_type == "text":
  1045. text_level = content_dict.get("text_level", 0)
  1046. # if text_level == 1:
  1047. # # 1级标题:记录到 title_dict 并累积到pending_level1
  1048. # title_path = content_dict.get("title_path", "")
  1049. # if title_path:
  1050. # title_content = f"# {content_text}\n"
  1051. # title_dict[title_path] = title_content
  1052. # # 提取数字编号并记录映射
  1053. # number_match = re.match(r'^(\d+(?:\.\d+)*)', content_text)
  1054. # if number_match:
  1055. # number_title_dict[number_match.group(1)] = title_path
  1056. # pending_level1 += f"# {content_text}\n"
  1057. # if pending_bbox is None and content_dict.get("bbox"):
  1058. # pending_bbox = content_dict.get("bbox")
  1059. # pending_page = content_dict.get("page_idx", 0) + 1
  1060. if text_level:
  1061. # 记录标题到 title_dict
  1062. title_path = content_dict.get("title_path", "")
  1063. if title_path:
  1064. title_content = "#" * text_level + f" {content_text}\n"
  1065. title_dict[title_path] = title_content
  1066. # 提取数字编号并记录映射
  1067. number_match = re.match(r'^(\d+(?:\.\d+)*)', content_text)
  1068. if number_match:
  1069. # if number_match.group(1) not in number_title_dict:
  1070. number_title_dict[number_match.group(1)] = title_content
  1071. if coln_log:
  1072. title_number = re.match(r'^(\d+(?:\.\d+)*)', content_text).group(1) if re.match(r'^(\d+(?:\.\d+)*)', content_text) else ""
  1073. # logger.info(title_number)
  1074. if text_level < coln_log_level or title_number not in ["1","2","3","4","5","6","7","8","9"]:
  1075. coln_log = False
  1076. coln_log_level = 0
  1077. else:
  1078. text += "#" * text_level + f" {content_text}\n"
  1079. continue
  1080. # 保存当前块(如果有内容)
  1081. if text:
  1082. if len(text) > self.max_cut_len:
  1083. text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  1084. text_lists.extend(text_chunks)
  1085. bbox_list.extend([first_bbox] * len(text_chunks))
  1086. page_list.extend([first_page] * len(text_chunks))
  1087. # 使用当前块的标题路径
  1088. last_path = path_title_list[-1] if path_title_list else ""
  1089. last_father = father_path_title_list[-1] if father_path_title_list else ""
  1090. path_title_list.extend([last_path] * len(text_chunks))
  1091. father_path_title_list.extend([last_father] * len(text_chunks))
  1092. else:
  1093. text_lists.append(text)
  1094. bbox_list.append(first_bbox)
  1095. page_list.append(first_page)
  1096. # 使用当前块的标题路径
  1097. path_title_list.append(path_title_list[-1] if path_title_list else "")
  1098. father_path_title_list.append(father_path_title_list[-1] if father_path_title_list else "")
  1099. # 新块:构建完整的父标题链
  1100. title_chain_text = self._build_title_chain(title_path, title_dict, number_title_dict)
  1101. if not coln_log:
  1102. if content_text[-1] in [":", ":"]:
  1103. coln_log = True
  1104. coln_log_level = text_level
  1105. # 更新"当前块"的标题路径(为下一次保存块做准备)
  1106. if path_title_list:
  1107. # 如果列表不为空,更新最后一个元素
  1108. path_title_list[-1] = title_path
  1109. parts = title_path.split("->") if title_path else []
  1110. father_path_title_list[-1] = "->".join(parts[:-1]) if len(parts) > 1 else ""
  1111. else:
  1112. # 第一次遇到标题,添加到列表
  1113. path_title_list.append(title_path)
  1114. parts = title_path.split("->") if title_path else []
  1115. father_path_title_list.append("->".join(parts[:-1]) if len(parts) > 1 else "")
  1116. text = title_chain_text
  1117. # 新块的首bbox/page:优先用pending的,否则用当前的
  1118. first_bbox = pending_bbox if pending_bbox else content_dict.get("bbox")
  1119. first_page = pending_page if pending_page else (content_dict.get("page_idx", 0) + 1 if content_dict.get("page_idx") is not None else None)
  1120. pending_bbox = None
  1121. pending_page = None
  1122. # if not coln_log:
  1123. # if content_text[-1] in [":", ":"]:
  1124. # coln_log = True
  1125. # coln_log_level = text_level
  1126. # # text += "#" * text_level + f" {content_text}\n"
  1127. # continue
  1128. else:
  1129. # 非标题内容:追加到当前块
  1130. if first_bbox is None and content_dict.get("bbox"):
  1131. first_bbox = content_dict.get("bbox")
  1132. first_page = content_dict.get("page_idx", 0) + 1
  1133. text += content_text
  1134. elif text_type == "table" and set_table == "1":
  1135. table_body = content_dict.get("table_body", "")
  1136. # 先处理表格中的图片
  1137. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  1138. # 再处理表格内容
  1139. table_text = self.process_table_content(
  1140. content_dict.get("table_caption", []),
  1141. table_body
  1142. )
  1143. text += table_text
  1144. elif text_type in ("image", "table"):
  1145. try:
  1146. image_path = content_dict.get("img_path")
  1147. replace_text = ""
  1148. if not image_path and text_type == "image":
  1149. continue
  1150. elif image_path:
  1151. image_name = image_path.split("/")[-1]
  1152. image_base_path = self._get_image_base_path(pdf_file_name)
  1153. save_image_path = f"{image_base_path}/{image_name}"
  1154. replace_text = f"【示意图序号_{doc_id}_{image_num}】"
  1155. minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
  1156. self.minio_client.upload_file(save_image_path, minio_file_path)
  1157. minio_url = minio_config.get("minio_url")
  1158. minio_bucket = minio_config.get("minio_bucket")
  1159. flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
  1160. if text_type == "image":
  1161. image_head = content_dict.get("image_caption", "")
  1162. image_tail = content_dict.get("image_footnote", "")
  1163. replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
  1164. if text_type == "table":
  1165. image_num += 1
  1166. table_body = content_dict.get("table_body", "")
  1167. # 先处理表格中的图片
  1168. table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
  1169. # 再处理表格内容
  1170. table_text = self.process_table_content(
  1171. content_dict.get("table_caption", []),
  1172. table_body
  1173. )
  1174. text += table_text
  1175. if replace_text:
  1176. text += replace_text
  1177. image_num += 1
  1178. except Exception as e:
  1179. logger.error(f"处理图片或表格失败: {e}")
  1180. elif text_type == "list":
  1181. list_items = content_dict.get("list_items")
  1182. if list_items:
  1183. text += "\n".join(list_items) + "\n"
  1184. # # 处理最后一个块
  1185. # if pending_level1 or text:
  1186. # final_text = pending_level1 + text if pending_level1 else text
  1187. # if final_text:
  1188. # if len(final_text) > 5000:
  1189. # text_chunks = self.smart_chunk_with_overlap(final_text, max_chunk_size=5000, overlap_ratio=0.1)
  1190. # text_lists.extend(text_chunks)
  1191. # final_bbox = pending_bbox if pending_bbox else first_bbox
  1192. # final_page = pending_page if pending_page else first_page
  1193. # bbox_list.extend([final_bbox] * len(text_chunks))
  1194. # page_list.extend([final_page] * len(text_chunks))
  1195. # path_title_list.extend([""] * len(text_chunks))
  1196. # father_path_title_list.extend([""] * len(text_chunks))
  1197. # else:
  1198. # text_lists.append(final_text)
  1199. # bbox_list.append(pending_bbox if pending_bbox else first_bbox)
  1200. # page_list.append(pending_page if pending_page else first_page)
  1201. # path_title_list.append("")
  1202. # father_path_title_list.append("")
  1203. if text:
  1204. final_text = text
  1205. if final_text:
  1206. if len(final_text) > self.max_cut_len:
  1207. text_chunks = self.smart_chunk_with_overlap(final_text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
  1208. text_lists.extend(text_chunks)
  1209. final_bbox = pending_bbox if pending_bbox else first_bbox
  1210. final_page = pending_page if pending_page else first_page
  1211. bbox_list.extend([final_bbox] * len(text_chunks))
  1212. page_list.extend([final_page] * len(text_chunks))
  1213. path_title_list.extend([""] * len(text_chunks))
  1214. father_path_title_list.extend([""] * len(text_chunks))
  1215. else:
  1216. text_lists.append(final_text)
  1217. bbox_list.append(pending_bbox if pending_bbox else first_bbox)
  1218. page_list.append(pending_page if pending_page else first_page)
  1219. path_title_list.append("")
  1220. father_path_title_list.append("")
  1221. return text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list
  1222. def _build_title_chain(self, title_path: str, title_dict: Dict[str, str], number_title_dict: Dict[str, str] = None) -> str:
  1223. """
  1224. 根据 title_path 构建完整的父标题链内容
  1225. 参数:
  1226. title_path: 标题路径,格式如 "1->1.1->1.1.1"
  1227. title_dict: 标题字典,格式如 {"1": "1、xx\n", "1->1.1": "1.1、xx\n", ...}
  1228. number_title_dict: 数字标题映射字典,格式如 {"5.1.1": "path", "5.1": "path", "5": "path"}
  1229. 返回:
  1230. 完整的标题链文本,如 "1、xx\n1.1、xx\n1.1.1、xx\n"
  1231. """
  1232. if not title_path:
  1233. return ""
  1234. # 分解 title_path,构建所有父路径
  1235. parts = title_path.split("->")
  1236. title_chain = []
  1237. # 使用数字结构查找父级链
  1238. if number_title_dict:
  1239. # 获取当前标题路径对应的标题内容
  1240. current_title = title_dict.get(title_path, "")
  1241. # 使用正则提取数字编号
  1242. number_match = re.match(r'^(#+\s+)?(\d+(?:\.\d+)*)', current_title)
  1243. if number_match:
  1244. number = number_match.group(2) # 提取数字部分,如 "5.1.1"
  1245. # 按"."切分,逐层查找父标题
  1246. number_parts = number.split(".")
  1247. for i in range(len(number_parts)):
  1248. parent_number = ".".join(number_parts[:i+1])
  1249. if parent_number in number_title_dict:
  1250. parent_path = number_title_dict[parent_number]
  1251. # if parent_path in title_dict:
  1252. # current_len = len(re.match(r'^(#+)', parent_path).group(1) if re.match(r'^(#+)', parent_path) else '')
  1253. # if title_chain:
  1254. # last_len = len(re.match(r'^(#+)', current_title).group(1) if re.match(r'^(#+)', title_chain[-1]) else '')
  1255. # else:
  1256. # last_len = 100
  1257. # if current_len >= last_len:
  1258. # continue
  1259. title_chain.append(parent_path)
  1260. return "".join(title_chain)
  1261. # 如果不是数字结构或未找到,按标题路径
  1262. for i in range(len(parts)):
  1263. # 构建从根到当前层级的路径
  1264. current_path = "->".join(parts[:i+1])
  1265. if current_path in title_dict:
  1266. title_chain.append(title_dict[current_path])
  1267. return "".join(title_chain)
  1268. def file_split(self, file_content_list, doc_id, pdf_file_name):
  1269. # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接
  1270. separator_num = self.file_json.get("customSeparator")
  1271. set_table = self.file_json.get("set_table")
  1272. # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
  1273. # logger.info(f"文本切分字符:{separator}")
  1274. if isinstance(file_content_list, str):
  1275. file_text = file_content_list
  1276. text_lists = self.split_text(file_text)
  1277. # 字符串切分无bbox/page信息
  1278. bbox_list = [None] * len(text_lists)
  1279. page_list = [None] * len(text_lists)
  1280. return text_lists, {}, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
  1281. # 0自定义,1按照页,2是最大,3是最小
  1282. elif separator_num == "2":
  1283. # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个#
  1284. text_lists, flag_img_info, bbox_list, page_list = self.split_by_title(file_content_list, set_table, doc_id, pdf_file_name)
  1285. return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
  1286. elif separator_num == "1":
  1287. # 按照页面方式切分
  1288. text_lists, flag_img_info, bbox_list, page_list = self.split_by_page(file_content_list, set_table, doc_id, pdf_file_name)
  1289. return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
  1290. elif separator_num == "-1":
  1291. # 按照问答对切分 针对exce文档,暂不实现
  1292. return [], {}, [], [], [], []
  1293. elif separator_num == "3":
  1294. # 按最小段落切分:每个标题+其下内容为一块,1级标题合并到后续非1级块
  1295. text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list = self.split_by_min_paragraph(file_content_list, set_table, doc_id, pdf_file_name)
  1296. return text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list
  1297. elif separator_num == "0":
  1298. # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500
  1299. slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
  1300. text_lists, flag_img_info, bbox_list, page_list = self.split_by_self(file_content_list, set_table, slice_value, doc_id, pdf_file_name)
  1301. return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
  1302. def parse_excel_to_text_lists(self, excel_file_path):
  1303. """
  1304. 解析 Excel 文件,返回 Markdown 表格格式的文本列表
  1305. 每个工作表(sheet)的第一行作为整个表的表头,后续所有块的数据行都使用该表头
  1306. """
  1307. try:
  1308. from tabulate import tabulate
  1309. except ImportError:
  1310. logger.error("缺少 tabulate 库,请安装: pip install tabulate")
  1311. logger.info("降级使用纯文本格式")
  1312. # 降级为原始纯文本格式
  1313. result = parse_excel(excel_file_path)
  1314. text_lists = []
  1315. for sheet_name, blocks in result.items():
  1316. for block in blocks:
  1317. content = block.get("content", [])
  1318. lines = [" ".join(str(cell) if cell is not None else "" for cell in row) for row in content]
  1319. text = "\n".join(lines).strip()
  1320. if text:
  1321. text_lists.append(text)
  1322. return text_lists
  1323. result = parse_excel(excel_file_path)
  1324. text_lists = []
  1325. for sheet_name, blocks in result.items():
  1326. if not blocks:
  1327. continue
  1328. # 获取该工作表的第一个块的第一行作为整个表的表头
  1329. sheet_headers = None
  1330. first_block_content = blocks[0].get("content", [])
  1331. if first_block_content:
  1332. sheet_headers = [str(cell).strip() if cell is not None else "" for cell in first_block_content[0]]
  1333. if not sheet_headers:
  1334. logger.warning(f"工作表 {sheet_name} 没有有效的表头,跳过")
  1335. # continue
  1336. logger.info(f"工作表 {sheet_name} 的表头: {sheet_headers}")
  1337. for block_idx, block in enumerate(blocks):
  1338. content = block.get("content", [])
  1339. if not content:
  1340. continue
  1341. try:
  1342. # 确定数据行:第一个块跳过第一行(表头),后续块使用全部行
  1343. if block_idx == 0:
  1344. # 第一个块:跳过第一行(表头),其余行作为数据
  1345. data_rows = content[1:]
  1346. else:
  1347. # 后续块:所有行都是数据行
  1348. data_rows = content
  1349. # 格式化数据行
  1350. rows = []
  1351. for row in data_rows:
  1352. formatted_row = [str(cell).strip() if cell is not None else "" for cell in row]
  1353. rows.append(formatted_row)
  1354. # 生成 Markdown 表格
  1355. if rows: # 有数据行
  1356. md_table = tabulate(rows, headers=sheet_headers, tablefmt="github")
  1357. text_lists.append(md_table)
  1358. logger.debug(f"成功生成 Markdown 表格 [sheet={sheet_name}, block={block_idx}],表头: {sheet_headers}, 数据行数: {len(rows)}")
  1359. else:
  1360. logger.debug(f"跳过空数据块 [sheet={sheet_name}, block={block_idx}]")
  1361. except Exception as e:
  1362. logger.error(f"生成 Markdown 表格失败 [sheet={sheet_name}, block={block_idx}]: {e}")
  1363. # 降级为纯文本格式
  1364. if block_idx == 0:
  1365. data_rows = content[1:]
  1366. else:
  1367. data_rows = content
  1368. lines = [" ".join(str(cell) if cell is not None else "" for cell in row) for row in data_rows]
  1369. text = "\n".join(lines).strip()
  1370. if text:
  1371. text_lists.append(text)
  1372. logger.info(f"降级使用纯文本格式 [sheet={sheet_name}, block={block_idx}]")
  1373. logger.info(f"Excel 解析完成,共生成 {len(text_lists)} 个文本块")
  1374. return text_lists
  1375. def process_data_to_milvus_schema(self, text_lists, doc_id, name, path_title_list, father_path_title_list=None, bbox_list=None, page_list=None):
  1376. """组织数据格式:
  1377. {
  1378. "content": text,
  1379. "doc_id": doc_id,
  1380. "chunk_id": chunk_id,
  1381. "metadata": {"source": file_name},
  1382. "path_title": path_title_list[i],
  1383. "Chapter": path_title_list[i],
  1384. "Father_Chapter": father_path_title_list[i],
  1385. "bbox": bbox_list[i],
  1386. "page": page_list[i]
  1387. }
  1388. """
  1389. if father_path_title_list is None:
  1390. father_path_title_list = [""] * len(text_lists)
  1391. if bbox_list is None:
  1392. bbox_list = [None] * len(text_lists)
  1393. if page_list is None:
  1394. page_list = [None] * len(text_lists)
  1395. docs = []
  1396. total_len = 0
  1397. for i, text in enumerate(text_lists):
  1398. chunk_id = str(uuid1())
  1399. chunk_len = len(text)
  1400. total_len += chunk_len
  1401. # bbox转为字符串存储
  1402. bbox_val = bbox_list[i] if i < len(bbox_list) else None
  1403. bbox_str = str(bbox_val) if bbox_val else None
  1404. page_val = page_list[i] if i < len(page_list) else None
  1405. d = {
  1406. "content": text,
  1407. "doc_id": doc_id,
  1408. "chunk_id": chunk_id,
  1409. "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len, "knowledge_id": self.knowledge_id},
  1410. # "path_title": path_title_list[i],
  1411. "Chapter": path_title_list[i],
  1412. "Father_Chapter": father_path_title_list[i],
  1413. "bbox": bbox_str,
  1414. "page": page_val
  1415. }
  1416. docs.append(d)
  1417. return docs, total_len
  1418. async def process_documents(self, file_json, task_id=None):
  1419. # 提取用户ID
  1420. user_id = file_json.get("userId", "")
  1421. embedding_id = file_json.get("embedding_id", "bge-m3")
  1422. # ===== 注册任务到全局任务表 =====
  1423. task_ctx = None
  1424. if task_id:
  1425. task_ctx = task_registry.register(task_id, user_id, self.knowledge_id)
  1426. # 启动进度(独立线程)
  1427. reporter = None
  1428. if task_id:
  1429. from rag.progress_reporter import ProgressReporter
  1430. # from config import progress_callback_config
  1431. estimate_seconds = progress_callback_config.get("estimate_seconds", 120)
  1432. callback_url = progress_callback_config.get("base_url")
  1433. reporter = ProgressReporter(task_id, callback_url, estimate_seconds, user_id)
  1434. reporter.start()
  1435. # 关联 reporter 到任务上下文(用于取消时停止进度上报)
  1436. if task_ctx:
  1437. task_ctx.reporter = reporter
  1438. # 更新任务状态为开始(1)
  1439. # self.mysql_client.update_task_status_start(task_id)
  1440. # 初始化成功文档列表(在 try 外面定义,确保异常处理时可访问)
  1441. success_doc = []
  1442. try:
  1443. # 文档下载
  1444. separator_num = file_json.get("customSeparator", "")
  1445. if separator_num == "-1":
  1446. if reporter:
  1447. reporter.complete(success=False)
  1448. # if task_id:
  1449. # self.mysql_client.update_task_status_error(task_id)
  1450. return {"code": 500, "message": "暂不支持解析"}
  1451. docs = file_json.get("docs")
  1452. flag = file_json.get("flag")
  1453. for doc in docs:
  1454. # ===== 检查点1:每个文档处理前检查取消标志 =====
  1455. if task_ctx and task_ctx.is_cancelled:
  1456. logger.info(f"任务 {task_id} 在文档处理前被取消")
  1457. raise TaskCancelledException("任务已被用户取消")
  1458. url = doc.get("url")
  1459. name = doc.get("name", "").replace(" ", "")
  1460. doc_id = doc.get("document_id")
  1461. # ===== 文档复制模式处理 =====
  1462. old_document_id = doc.get("oldDocumentId")
  1463. old_knowledge_id = doc.get("oldKnowledgeId")
  1464. tenant_id = file_json.get("tenantId", "000000")
  1465. # 如果提供old 文档id 和 old 知识库id 则表示文档存在直接复制
  1466. if old_document_id and old_knowledge_id:
  1467. logger.info(f"进入文档复制模式: oldDocumentId={old_document_id}, oldKnowledgeId={old_knowledge_id}, newDocId={doc_id}, newKnowledgeId={self.knowledge_id}")
  1468. try:
  1469. # 1. 复制 Milvus 向量数据(使用前端传入的 doc_id)
  1470. old_milvus_client = MilvusOperate(collection_name=old_knowledge_id, embedding_name=self.embedding_name)
  1471. milvus_resp = old_milvus_client._copy_single_doc_to_collection(
  1472. new_collection_name=self.knowledge_id,
  1473. old_doc_id=old_document_id,
  1474. new_doc_id=doc_id,
  1475. embedding_name=self.embedding_name
  1476. )
  1477. if milvus_resp.get("code") != 200:
  1478. logger.error(f"[文档复制模式] Milvus数据复制失败: {milvus_resp.get('message')}")
  1479. self.send_post_request_sync(
  1480. f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
  1481. {"status": "2", "documentId": doc_id}
  1482. )
  1483. if reporter:
  1484. reporter.complete(success=False)
  1485. return {"code": 500, "message": f"文档复制模式:Milvus复制失败 - {milvus_resp.get('message')}", "knowledge_id": self.knowledge_id, "doc_info": {}}
  1486. # 获取 chunk_id 映射
  1487. chunk_id_mapping = milvus_resp.get("chunk_id_mapping", {})
  1488. logger.info(f"[文档复制模式] Milvus数据复制成功,共 {milvus_resp.get('data', {}).get('total_records', 0)} 条记录")
  1489. # 2. 复制 MySQL 元数据(slice_text 和 old_slice_text 都用旧的 old_slice_text)
  1490. mysql_success, mysql_result = self.mysql_client.copy_single_doc_metadata_for_document_copy(
  1491. source_knowledge_id=old_knowledge_id,
  1492. source_doc_id=old_document_id,
  1493. new_knowledge_id=self.knowledge_id,
  1494. new_doc_id=doc_id,
  1495. chunk_id_mapping=chunk_id_mapping,
  1496. tenant_id=tenant_id
  1497. )
  1498. knowledge_id = file_json.get("knowledge_id", "")
  1499. enabled_kn_rs = self.mysql_client.query_knowledge_by_ids([knowledge_id])
  1500. enabled_kn_gp_ids = set()
  1501. for r in enabled_kn_rs:
  1502. if r["knowledge_graph"]:
  1503. enabled_kn_gp_ids.add(r["knowledge_id"])
  1504. if enabled_kn_gp_ids:
  1505. # if file_json.get("lightrag"):
  1506. result = mysql_result.get("result", {})
  1507. knowledge_id = result.get("new_knowledge_id", "")
  1508. document_id = result.get("new_doc_id", "")
  1509. datas = result.get("lightrag_data", [])
  1510. # logger.info(f"11111111111111111111111{result}")
  1511. rag_mgr = AsyncLightRAGManager()
  1512. await rag_mgr.init_workspace(knowledge_id)
  1513. await rag_mgr.insert(knowledge_id, {"content": datas, "ids": [document_id]})
  1514. await rag_mgr.close()
  1515. if not mysql_success:
  1516. logger.error(f"[文档复制模式] MySQL元数据复制失败: {mysql_result.get('error')}")
  1517. # 回滚 Milvus 数据
  1518. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1519. self.send_post_request_sync(
  1520. f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
  1521. {"status": "2", "documentId": doc_id}
  1522. )
  1523. if reporter:
  1524. reporter.complete(success=False)
  1525. return {"code": 500, "message": f"文档复制模式:MySQL复制失败 - {mysql_result.get('error')}", "knowledge_id": self.knowledge_id, "doc_info": {}}
  1526. logger.info(f"[文档复制模式] MySQL元数据复制成功: {mysql_result}")
  1527. # 3. 发送成功回调
  1528. # slice_count = mysql_result.get("slice_count", 0)
  1529. self.send_post_request_sync(
  1530. f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
  1531. {
  1532. "knowledgeId": self.knowledge_id,
  1533. "documentId": doc_id,
  1534. # "sliceTotal": slice_count,
  1535. "status": "1"
  1536. }
  1537. )
  1538. if reporter:
  1539. reporter.complete(success=True)
  1540. logger.info(f"[文档复制模式] 文档复制完成: oldDocId={old_document_id} -> newDocId={doc_id}")
  1541. return {
  1542. "code": 200,
  1543. "message": "文档复制模式:复制成功",
  1544. "knowledge_id": self.knowledge_id,
  1545. "doc_info": {
  1546. # "slice_num": slice_count,
  1547. "old_document_id": old_document_id,
  1548. "new_document_id": doc_id
  1549. }
  1550. }
  1551. except Exception as e:
  1552. logger.error(f"[文档复制模式] 处理异常: {e}", exc_info=True)
  1553. self.send_post_request_sync(
  1554. f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
  1555. {"status": "2", "documentId": doc_id}
  1556. )
  1557. if reporter:
  1558. reporter.complete(success=False)
  1559. return {"code": 500, "message": f"文档复制模式异常: {str(e)}", "knowledge_id": self.knowledge_id, "doc_info": {}}
  1560. # ===== 文档复制模式处理结束 =====
  1561. # 文件单独处理(standardClassification=3)(临时)
  1562. remark = doc.get("remark", "")
  1563. standard_classification = doc.get("standardClassification", "")
  1564. # standardClassification 为 '3' 时,不解析不分块,直接拼接数据入库
  1565. if standard_classification == "3":
  1566. logger.info(f"standardClassification=3,跳过解析,直接使用文件名+URL+remark入库: {name}")
  1567. # 拼接文本:文件名 + URL + remark
  1568. combined_text = f"{name}\n {url}\n {remark}"
  1569. text_lists = [combined_text]
  1570. # 空变量赋值
  1571. flag_img_info = {}
  1572. full_md_url = ""
  1573. full_pdf_url = ""
  1574. oss_id = ""
  1575. file_size = None
  1576. path_title_list = [""]
  1577. father_path_title_list = [""]
  1578. bbox_list = [None]
  1579. page_list = [None]
  1580. milvus_docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name, path_title_list, father_path_title_list, bbox_list, page_list)
  1581. logger.info(f"standardClassification=3,存储到milvus的文本数据:{milvus_docs}")
  1582. # ===== 检查点2:入库前检查取消标志 =====
  1583. if task_ctx and task_ctx.is_cancelled:
  1584. logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库")
  1585. raise TaskCancelledException("任务已被用户取消")
  1586. if flag == "upload":
  1587. insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(milvus_docs, self.knowledge_id, doc_id, tenant_id, user_id)
  1588. if insert_slice_flag:
  1589. insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
  1590. else:
  1591. insert_img_flag = False
  1592. parse_file_status = False
  1593. if insert_img_flag:
  1594. insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(milvus_docs)
  1595. else:
  1596. insert_milvus_flag = False
  1597. parse_file_status = False
  1598. if insert_milvus_flag:
  1599. parse_file_status = True
  1600. if task_ctx and task_ctx.is_cancelled:
  1601. logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
  1602. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1603. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1604. self.mysql_client.delete_image_url(doc_id=doc_id)
  1605. raise TaskCancelledException("任务已被用户取消")
  1606. else:
  1607. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1608. self.mysql_client.delete_image_url(doc_id=doc_id)
  1609. parse_file_status = False
  1610. elif flag == "update":
  1611. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1612. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1613. insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(milvus_docs, self.knowledge_id, doc_id, tenant_id, user_id)
  1614. if insert_slice_flag:
  1615. insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(milvus_docs)
  1616. else:
  1617. insert_milvus_flag = False
  1618. parse_file_status = False
  1619. if insert_milvus_flag:
  1620. parse_file_status = True
  1621. if task_ctx and task_ctx.is_cancelled:
  1622. logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
  1623. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1624. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1625. raise TaskCancelledException("任务已被用户取消")
  1626. else:
  1627. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1628. parse_file_status = False
  1629. if parse_file_status:
  1630. success_doc.append(doc_id)
  1631. else:
  1632. if flag == "upload":
  1633. for del_id in success_doc:
  1634. self.milvus_client._delete_by_doc_id(doc_id=del_id)
  1635. self.mysql_client.delete_image_url(doc_id=del_id)
  1636. self.mysql_client.delete_to_slice(doc_id=del_id)
  1637. if reporter:
  1638. reporter.complete(success=False)
  1639. # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  1640. self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"), "tenantId": tenant_id})
  1641. return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
  1642. # 继续下一个文档
  1643. continue
  1644. ## 文件模式(standardClassification=3) ↑
  1645. async with aiohttp.ClientSession() as session:
  1646. # 下载并保存文件
  1647. down_file_name, file_size = await self.save_file_temp(session, url, name)
  1648. excel_file_path = down_file_name # 保留原始 Excel 文件路径
  1649. # 只接收txt和pdf格式,其它格式统一转换成pdf,不支持格式在convert_to_pdf中判断
  1650. if not name.endswith(".txt") and not name.endswith('.pdf') and not file_json.get("markDownFlg"):
  1651. # 转化为pdf文件
  1652. down_file_name = convert_to_pdf(down_file_name)
  1653. name = os.path.basename(down_file_name)
  1654. if down_file_name:
  1655. # 上传PDF文件到MinIO
  1656. pdf_file_name = os.path.basename(down_file_name)
  1657. minio_pdf_path = f"/pdf/{self.knowledge_id}/{doc_id}/{pdf_file_name}"
  1658. upload_success = self.minio_client.upload_file(down_file_name, minio_pdf_path)
  1659. pdf_file_size = os.path.getsize(down_file_name)
  1660. # 记录OSS信息到数据库
  1661. if upload_success:
  1662. minio_url = minio_config.get("minio_url")
  1663. minio_bucket = minio_config.get("minio_bucket")
  1664. full_pdf_url = f"{minio_url}/{minio_bucket}{minio_pdf_path}"
  1665. _, oss_id = self.mysql_client.insert_oss_record(pdf_file_name, full_pdf_url, user_id, ".pdf", pdf_file_size)
  1666. logger.info(f"PDF文件已上传至MinIO并记录OSS: {full_pdf_url}")
  1667. # Excel 特殊解析模式
  1668. excel_except = file_json.get("excel_parsing", False)
  1669. if excel_except and excel_file_path.endswith((".xls", ".xlsx")):
  1670. logger.info(f"使用 Excel 特殊解析模式: {excel_file_path}")
  1671. text_lists = self.parse_excel_to_text_lists(excel_file_path)
  1672. flag_img_info = {}
  1673. path_title_list = [""] * len(text_lists)
  1674. father_path_title_list = [""] * len(text_lists)
  1675. bbox_list = [None] * len(text_lists)
  1676. page_list = [None] * len(text_lists)
  1677. # 生成并上传 MD 文件
  1678. full_md_url = ""
  1679. try:
  1680. # 将所有表格合并为一个 MD 文件
  1681. md_content = "\n\n".join(text_lists)
  1682. # 保存 MD 文件到临时目录
  1683. excel_base_name = os.path.splitext(os.path.basename(excel_file_path))[0]
  1684. md_file_name = f"{excel_base_name}_tables.md"
  1685. temp_md_dir = f"./tmp_file/{self.knowledge_id}"
  1686. os.makedirs(temp_md_dir, exist_ok=True)
  1687. temp_md_path = os.path.join(temp_md_dir, md_file_name)
  1688. # 写入 MD 文件
  1689. with open(temp_md_path, 'w', encoding='utf-8') as f:
  1690. f.write(md_content)
  1691. logger.info(f"Excel MD 文件已保存: {temp_md_path}")
  1692. # 上传到 MinIO
  1693. minio_md_path = f"/md/{self.knowledge_id}/{doc_id}/{md_file_name}"
  1694. upload_success = self.minio_client.upload_file(temp_md_path, minio_md_path)
  1695. if upload_success:
  1696. md_file_size = os.path.getsize(temp_md_path)
  1697. minio_url = minio_config.get("minio_url")
  1698. minio_bucket = minio_config.get("minio_bucket")
  1699. full_md_url = f"{minio_url}/{minio_bucket}{minio_md_path}"
  1700. # 记录 OSS 信息到数据库
  1701. _, oss_id = self.mysql_client.insert_oss_record(
  1702. md_file_name,
  1703. full_md_url,
  1704. user_id,
  1705. ".md",
  1706. md_file_size
  1707. )
  1708. logger.info(f"Excel MD 文件已上传至 MinIO 并记录 OSS: {full_md_url}")
  1709. else:
  1710. logger.warning(f"Excel MD 文件上传失败: {temp_md_path}")
  1711. except Exception as e:
  1712. logger.error(f"Excel MD 文件生成或上传失败: {e}", exc_info=True)
  1713. full_md_url = ""
  1714. # MD 解析模式
  1715. elif file_json.get("markDownFlg"):
  1716. split_mode = file_json.get("customSeparator", "2")
  1717. if split_mode == "3":
  1718. split_mode = "min"
  1719. elif split_mode == "2":
  1720. split_mode = "max"
  1721. logger.info(f"使用MD解析规则,切分方式:{split_mode}")
  1722. text_lists = await MarkdownSplitter(max_chunk_size=self.max_cut_len).split_markdown(down_file_name, split_mode)
  1723. logger.info(f"MD解析,存入mlivus的数据{text_lists}")
  1724. flag_img_info = {}
  1725. path_title_list = [""] * len(text_lists)
  1726. father_path_title_list = [""] * len(text_lists)
  1727. bbox_list = [None] * len(text_lists)
  1728. page_list = [None] * len(text_lists)
  1729. full_md_url = url
  1730. full_pdf_url = ""
  1731. oss_id = ""
  1732. # from rag.document_load.md_to_html_to_pdf import AsyncMdToPdf
  1733. # md_to_pdf_file_name = await AsyncMdToPdf().convert_md_to_pdf(down_file_name)
  1734. # if md_to_pdf_file_name:
  1735. # down_file_name = md_to_pdf_file_name
  1736. # name = os.path.basename(down_file_name)
  1737. else:
  1738. # 获取解析器类型(默认 mineru)
  1739. parser_type = file_json.get("parsingType", "0")
  1740. self.parser_type = parser_type # 保存解析器类型供后续使用
  1741. logger.info(f"使用解析器: {parser_type}")
  1742. file_parse = self._get_file_type(name, parser_type)
  1743. max_retries = 3
  1744. retry_delay = 5
  1745. last_error = None
  1746. for attempt in range(max_retries):
  1747. try:
  1748. if parser_type == "dots":
  1749. file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name, doc_id)
  1750. elif parser_type == "2":
  1751. file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name, doc_id)
  1752. else:
  1753. # MinerU 解析
  1754. file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name)
  1755. break # 成功则跳出重试循环
  1756. except Exception as e:
  1757. last_error = e
  1758. logger.error(f"解析失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
  1759. if attempt < max_retries - 1:
  1760. logger.info(f"等待 {retry_delay} 秒后重试...")
  1761. await asyncio.sleep(retry_delay)
  1762. else:
  1763. logger.error(f"解析失败,已重试 {max_retries} 次")
  1764. raise RuntimeError(f"文档解析失败: {str(last_error)}")
  1765. # 上传 MD 文件到 minio
  1766. full_md_url = ""
  1767. if path_md:
  1768. md_file_name = os.path.basename(path_md)
  1769. file_extension = os.path.splitext(md_file_name)[1]
  1770. minio_md_path = f"/md/{self.knowledge_id}/{doc_id}/{md_file_name}"
  1771. upload_success = self.minio_client.upload_file(path_md, minio_md_path)
  1772. md_file_size = os.path.getsize(path_md)
  1773. # 记录 OSS 信息到数据库
  1774. if upload_success:
  1775. minio_url = minio_config.get("minio_url")
  1776. minio_bucket = minio_config.get("minio_bucket")
  1777. full_md_url = f"{minio_url}/{minio_bucket}{minio_md_path}"
  1778. _, oss_id = self.mysql_client.insert_oss_record(md_file_name, full_md_url, user_id, file_extension, md_file_size)
  1779. logger.info(f"MD 文件已上传至 minio 并记录 OSS: {full_md_url}")
  1780. logger.info(f"mineru解析的pdf数据:{file_content_list}")
  1781. split_result = self.file_split(file_content_list, doc_id, pdf_file_name)
  1782. # 返回值:text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list
  1783. text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list = split_result
  1784. docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name, path_title_list, father_path_title_list, bbox_list, page_list)
  1785. logger.info(f"存储到milvus的文本数据:{docs}")
  1786. # lightrag_sign = file_json.get("lightrag", False)
  1787. knowledge_id = file_json.get("knowledge_id", "")
  1788. enabled_kn_rs = self.mysql_client.query_knowledge_by_ids([knowledge_id])
  1789. enabled_kn_gp_ids = set()
  1790. for r in enabled_kn_rs:
  1791. if r["knowledge_graph"]:
  1792. enabled_kn_gp_ids.add(r["knowledge_id"])
  1793. if enabled_kn_gp_ids:
  1794. # if lightrag_sign:
  1795. lightrag_data = {}
  1796. contents = []
  1797. for doc in docs:
  1798. contents.append(doc.get("content", ""))
  1799. # lightrag_data[knowledge_id] = {"content": contents, "ids": [docs[0].get("doc_id", "")]}
  1800. # logger.info(f"contents:{lightrag_data}")
  1801. logger.info(f"LightRAG_Ids:{enabled_kn_gp_ids}")
  1802. rag_mgr = AsyncLightRAGManager()
  1803. await rag_mgr.init_workspace(knowledge_id)
  1804. await rag_mgr.insert(knowledge_id, {"content": contents, "ids": [docs[0].get("doc_id", "")]})
  1805. await rag_mgr.close()
  1806. # ===== 检查点2:解析完成、入库前检查取消标志 =====
  1807. if task_ctx and task_ctx.is_cancelled:
  1808. logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库")
  1809. raise TaskCancelledException("任务已被用户取消")
  1810. if flag == "upload":
  1811. # 插入到milvus库中
  1812. insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id, tenant_id, user_id)
  1813. if insert_slice_flag:
  1814. # 插入到mysql的slice info数据库中
  1815. insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
  1816. else:
  1817. insert_img_flag = False
  1818. parse_file_status = False
  1819. if insert_img_flag:
  1820. # insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
  1821. # 批量插入
  1822. text_lists = [doc.get("content") for doc in docs]
  1823. insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs, text_lists)
  1824. # 插入mysql中的bm_media_replacement表中
  1825. else:
  1826. # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1827. insert_milvus_flag = False
  1828. # return resp
  1829. parse_file_status = False
  1830. if insert_milvus_flag:
  1831. parse_file_status = True
  1832. # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
  1833. if task_ctx and task_ctx.is_cancelled:
  1834. logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
  1835. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1836. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1837. self.mysql_client.delete_image_url(doc_id=doc_id)
  1838. raise TaskCancelledException("任务已被用户取消")
  1839. else:
  1840. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1841. # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1842. self.mysql_client.delete_image_url(doc_id=doc_id)
  1843. # resp = {"code": 500, "message": insert_mysql_info}
  1844. parse_file_status = False
  1845. # return resp
  1846. elif flag == "update": # 更新切片方式
  1847. # 先把库中的数据删除
  1848. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1849. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1850. insert_milvus_start_time = time.time()
  1851. insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id, tenant_id, user_id)
  1852. # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists)
  1853. insert_milvus_end_time = time.time()
  1854. logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}")
  1855. if insert_slice_flag:
  1856. # 插入到mysql的slice info数据库中
  1857. insert_mysql_start_time = time.time()
  1858. insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
  1859. insert_mysql_end_time = time.time()
  1860. logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}")
  1861. else:
  1862. # resp = {"code": 500, "message": insert_milvus_str}
  1863. # return resp
  1864. insert_milvus_flag = False
  1865. parse_file_status = False
  1866. if insert_milvus_flag:
  1867. # resp = {"code": 200, "message": "切片修改成功"}
  1868. parse_file_status = True
  1869. # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
  1870. if task_ctx and task_ctx.is_cancelled:
  1871. logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
  1872. self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1873. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1874. raise TaskCancelledException("任务已被用户取消")
  1875. else:
  1876. self.mysql_client.delete_to_slice(doc_id=doc_id)
  1877. # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
  1878. # resp = {"code":500, "message": insert_mysql_info}
  1879. parse_file_status = False
  1880. # return resp
  1881. if parse_file_status:
  1882. success_doc.append(doc_id)
  1883. else:
  1884. if flag == "upload":
  1885. for del_id in success_doc:
  1886. self.milvus_client._delete_by_doc_id(doc_id=del_id)
  1887. self.mysql_client.delete_image_url(doc_id=del_id)
  1888. self.mysql_client.delete_to_slice(doc_id=del_id)
  1889. if reporter:
  1890. reporter.complete(success=False)
  1891. # if task_id:
  1892. # self.mysql_client.update_task_status_error(task_id)
  1893. self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"), "tenantId": tenant_id})
  1894. # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  1895. return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
  1896. # 任务完成:先更新数据库状态为完成(2),再发送100%进度
  1897. # if task_id and user_id:
  1898. # self.mysql_client.update_task_status_complete(task_id, user_id)
  1899. self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1", "tenantId": tenant_id})
  1900. # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"})
  1901. if reporter:
  1902. reporter.complete(success=True)
  1903. # await self.send_post_request("http://10.10.10.3:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists)})
  1904. # self.send_post_request_sync("http://10.10.10.2:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"})
  1905. return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}}
  1906. except TaskCancelledException as e:
  1907. # ===== 处理任务取消异常 =====
  1908. logger.info(f"任务被取消 [task_id={task_id}]: {e}")
  1909. # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  1910. self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id})
  1911. # 1. 首先删除 bm_document 表中的记录
  1912. if task_id:
  1913. del_success, del_info = self.mysql_client.delete_document(task_id)
  1914. if del_success:
  1915. logger.info(f"已删除 bm_document 记录: task_id={task_id}")
  1916. else:
  1917. logger.warning(f"删除 bm_document 记录失败: {del_info}")
  1918. # 2. 清理之前已成功处理的文档数据(slice、image、milvus)
  1919. for del_id in success_doc:
  1920. logger.info(f"清理已入库的文档数据: doc_id={del_id}")
  1921. self.milvus_client._delete_by_doc_id(doc_id=del_id)
  1922. self.mysql_client.delete_image_url(doc_id=del_id)
  1923. self.mysql_client.delete_to_slice(doc_id=del_id)
  1924. if reporter:
  1925. reporter.complete(success=False)
  1926. return {"code": 499, "message": "任务已取消", "knowledge_id": self.knowledge_id, "doc_info": {}}
  1927. except VLMRetryExhaustedError as e:
  1928. self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id})
  1929. if reporter:
  1930. reporter.complete(success=False)
  1931. # if task_id:
  1932. # self.mysql_client.update_task_status_error(task_id)
  1933. return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}}
  1934. except Exception as e:
  1935. # 捕获所有异常,统一处理
  1936. logger.error(f"文件处理异常 [task_id={task_id}]: {e}", exc_info=True)
  1937. # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
  1938. self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id})
  1939. if reporter:
  1940. reporter.complete(success=False)
  1941. # if task_id:
  1942. # self.mysql_client.update_task_status_error(task_id)
  1943. return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}}
  1944. finally:
  1945. # ===== 注销任务 =====
  1946. if task_id:
  1947. task_registry.unregister(task_id)