| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274 |
- import asyncio
- import aiohttp
- import aiofiles
- import requests
- from rag.db import MilvusOperate, MysqlOperate
- from rag.task_registry import task_registry, TaskCancelledException
- from rag.document_load.pdf_load import MinerUParsePdf, MinerUParsePdfClient
- # from rag.document_load.dots_pdf_load import DotsPDFLoader
- from rag.document_load.paddleocr_load import PaddleOCRLoader
- # from rag.document_load.office_load import MinerUParseOffice
- from rag.document_load.txt_load import TextLoad
- # from rag.document_load.image_load import MinerUParseImage
- from rag.document_load.excel_load import parse_excel
- from utils.upload_file_to_oss import UploadMinio
- from utils.get_logger import setup_logger
- from config import minio_config
- import os
- import time
- from uuid import uuid1
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- from rag.document_load.document_format_conversion import *
- import re
- from typing import List, Dict
- from config import progress_callback_config
- from rag.document_load.md_splitter import MarkdownSplitter
- import sys
- sys.path.append("/opt/lightRAG_dir")
- from lightRAG import AsyncLightRAGManager
- class VLMRetryExhaustedError(RuntimeError):
- """VLM 多次重试失败"""
- pass
- # 使用 API 客户端模式
- pdf_parse = MinerUParsePdfClient()
- # 备用:直接调用模式
- # pdf_parse = MinerUParsePdf()
- # office_parse = MinerUParseOffice()
- text_parse = TextLoad()
- # image_parse = MinerUParseImage()
- logger = setup_logger(__name__)
- embedding_model_mapping_sequence_len = {
- "bge-m3": 5000,
- "Qwen3-Embedding": 30000,
- }
- class ProcessDocuments():
- def __init__(self, file_json):
- self.file_json = file_json
- self.knowledge_id = self.file_json.get("knowledge_id")
- self.parser_type = "mineru" # 默认解析器类型,将在 process_documents 中更新
- self.mysql_client = MysqlOperate()
- self.minio_client = UploadMinio()
- self.embedding_name = file_json.get("embedding_id", "e5")
- self.milvus_client = MilvusOperate(collection_name=self.knowledge_id, embedding_name=self.embedding_name)
- self.max_cut_len = embedding_model_mapping_sequence_len.get(self.embedding_name, 5000)
- # def _get_file_type(self, name):
- # if name.endswith(".txt"):
- # return text_parse
- # elif name.endswith('.pdf'):
- # return pdf_parse
- # elif name.endswith((".doc", ".docx", "ppt", "pptx")):
- # return office_parse
- # elif name.endswith((".jpg", "png", "jpeg")):
- # return image_parse
- # else:
- # raise "不支持的文件格式"
- def _get_image_base_path(self, pdf_file_name):
- """
- 根据解析器类型返回图片基础路径
-
- 返回:
- str: 图片基础路径
- """
- if self.parser_type == "dots":
- return "./tmp_file/dots_parsed/" + pdf_file_name + "/images"
- elif self.parser_type == "2":
- return "./tmp_file/paddleocr_parsed/" + pdf_file_name + "/imgs"
- else:
- # mineru 默认路径
- return "./tmp_file/" + pdf_file_name + "/vlm/images"
-
- # 只接收txt和pdf格式
- def _get_file_type(self, name, parser_type="mineru"):
- """
- 根据文件类型和解析器类型返回对应的解析器
-
- 参数:
- name: 文件名
- parser_type: 解析器类型,"mineru" 或 "dots"
- """
- if name.endswith(".txt"):
- return text_parse
- elif name.endswith('.pdf'):
- if parser_type == "dots":
- # Dots 解析器需要 file_json,动态创建
- # return DotsPDFLoader(self.file_json)
- return
- elif parser_type == "2":
- # PaddleOCR-VL 解析器
- return PaddleOCRLoader(self.file_json)
- else:
- # 默认使用 MinerU 解析器
- return pdf_parse
-
- async def send_get_request(self, url, headers=None):
- """
- 发送GET请求
-
- 参数:
- url: 请求的URL地址
- headers: 可选的请求头字典
-
- 返回:
- dict: 包含状态码、响应数据等信息
- """
- try:
- async with aiohttp.ClientSession() as session:
- async with session.get(url, headers=headers, ssl=False) as resp:
- status_code = resp.status
- response_data = await resp.text()
- return {
- "code": 200,
- "status_code": status_code,
- "data": response_data,
- "message": "GET请求成功"
- }
- except Exception as e:
- logger.error(f"GET请求失败 [url={url}]: {e}")
- return {
- "code": 500,
- "message": f"GET请求失败: {str(e)}"
- }
-
- async def send_post_request(self, url, json_data=None, headers=None):
- """
- 发送POST请求
-
- 参数:
- url: 请求的URL地址
- json_data: JSON格式的请求体数据(字典类型)
- headers: 可选的请求头字典
-
- 返回:
- dict: 包含状态码、响应数据等信息
- """
- try:
- # 设置默认的Content-Type为application/json
- if headers is None:
- headers = {}
- if 'Content-Type' not in headers:
- headers['Content-Type'] = 'application/json'
-
- async with aiohttp.ClientSession() as session:
- async with session.post(url, json=json_data, headers=headers, ssl=False) as resp:
- status_code = resp.status
- response_data = await resp.text()
- logger.info(f"POST请求成功 [url={url}]: {response_data} json_data={json_data}")
- return {
- "code": 200,
- "status_code": status_code,
- "data": response_data,
- "message": "POST请求成功"
- }
- except Exception as e:
- logger.error(f"POST请求失败 [url={url}]: {e}")
- return {
- "code": 500,
- "message": f"POST请求失败: {str(e)}"
- }
- def send_post_request_sync(self, url, json_data=None, headers=None, timeout=30):
- """
- 同步发送POST请求
-
- 参数:
- url: 请求的URL地址
- json_data: JSON格式的请求体数据(字典类型)
- headers: 可选的请求头字典
- timeout: 请求超时时间(秒),默认30秒
-
- 返回:
- dict: 包含状态码、响应数据等信息
- """
- try:
- # 设置默认的Content-Type为application/json
- if headers is None:
- headers = {}
- if 'Content-Type' not in headers:
- headers['Content-Type'] = 'application/json'
-
- resp = requests.post(url, json=json_data, headers=headers, timeout=timeout, verify=False)
- status_code = resp.status_code
- response_data = resp.text
- logger.info(f"同步POST请求成功 [url={url}]: {response_data} json_data={json_data}")
- return {
- "code": 200,
- "status_code": status_code,
- "data": response_data,
- "message": "POST请求成功"
- }
- except requests.exceptions.Timeout as e:
- logger.error(f"同步POST请求超时 [url={url}]: {e}")
- return {
- "code": 500,
- "message": f"POST请求超时: {str(e)}"
- }
- except requests.exceptions.RequestException as e:
- logger.error(f"同步POST请求失败 [url={url}]: {e}")
- return {
- "code": 500,
- "message": f"POST请求失败: {str(e)}"
- }
- async def save_file_temp(self, session, url, name, max_retries=5):
- down_file_path = "./tmp_file" + f"/{self.knowledge_id}"
- # down_file_path = "./tmp_file"
- os.makedirs(down_file_path, exist_ok=True)
- down_file_name = down_file_path + f"/{name}"
- # if os.path.exists(down_file_name):
- # pass
- # else:
- headers = {
- "User-Agent": "Mozilla/5.0",
- "Accept": "*/*"
- }
- for i in range(max_retries):
- try:
- async with session.get(url, ssl=False, headers=headers) as resp:
- resp.raise_for_status()
- content_length = resp.headers.get('Content-Length')
- if content_length:
- file_size = int(content_length)
- else:
- file_size = 0
- async with aiofiles.open(down_file_name, 'wb') as f:
- async for chunk in resp.content.iter_chunked(1024):
- await f.write(chunk)
-
- return down_file_name, file_size
- except Exception as e:
- logger.info(f"文件下载失败:{e}")
- logger.info(f"准备重试:{i + 1}/{max_retries}")
- def file_split_by_len(self, file_text):
- split_map = {
- "0": ["#"], # 按标题段落切片
- "1": ["<page>"], # 按页切片
- "2": ["\n"] # 按问答对
- }
- separator_num = self.file_json.get("customSeparator")
- slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
- separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
- logger.info(f"文本切分字符:{separator}")
- text_split = RecursiveCharacterTextSplitter(
- separators=separator,
- chunk_size=500,
- chunk_overlap=40,
- length_function=len
- )
- texts = text_split.split_text(file_text)
- return texts
-
- def split_text(self, file_text):
- text_split = RecursiveCharacterTextSplitter(
- separators=["\n\n", "\n"],
- chunk_size=500,
- chunk_overlap=40,
- length_function=len
- )
- texts = text_split.split_text(file_text)
- return texts
- def split_into_atoms(self, text: str) -> List[Dict]:
- """
- 将文本拆为不可拆分的结构原子
- """
- TABLE_PATTERN = re.compile(r"<table[\s\S]*?</table>", re.IGNORECASE)
- PLACEHOLDER_PATTERN = re.compile(r"【[^】]+】")
- atoms = []
- idx = 0
- matches = []
- for m in TABLE_PATTERN.finditer(text):
- matches.append((m.start(), m.end(), "table"))
- for m in PLACEHOLDER_PATTERN.finditer(text):
- matches.append((m.start(), m.end(), "placeholder"))
- matches.sort(key=lambda x: x[0])
- for start, end, typ in matches:
- if idx < start:
- atoms.append({
- "type": "text",
- "content": text[idx:start]
- })
- atoms.append({
- "type": typ,
- "content": text[start:end]
- })
- idx = end
- if idx < len(text):
- atoms.append({
- "type": "text",
- "content": text[idx:]
- })
- return atoms
- # def calc_overlap_atoms(self, prev_atoms, overlap_ratio=0.1):
- # total_len = sum(len(a["content"]) for a in prev_atoms)
- # target_len = int(total_len * overlap_ratio)
- # overlap_atoms = []
- # cur_len = 0
- # # 从后往前收集原子
- # for atom in reversed(prev_atoms):
- # overlap_atoms.insert(0, atom)
- # cur_len += len(atom["content"])
- # if cur_len >= target_len:
- # break
- # return overlap_atoms
- def calc_overlap_atoms(self, prev_atoms, overlap_ratio=0.1):
- total_len = sum(len(a["content"]) if a["type"] == "text" else 0 for a in prev_atoms)
- target_len = int(total_len * overlap_ratio)
- overlap_atoms = []
- cur_len = 0
- # 从后往前收集原子
- for atom in reversed(prev_atoms):
- if atom["type"] == "text":
- cur_len += len(atom["content"])
- if cur_len > target_len:
- atom["content"] = atom["content"][-(target_len - (cur_len - len(atom["content"]))):]
- overlap_atoms.insert(0, atom)
- else:
- overlap_atoms.insert(0, atom)
- if cur_len >= target_len:
- break
- return overlap_atoms
-
- def overlap_chunks(self, chunks, overlap_ratio=0.1):
- """
- chunks: 原始 chunk 列表
- """
- new_chunks = []
- prev_atoms = None
- for chunk in chunks:
- atoms = self.split_into_atoms(chunk)
- if prev_atoms:
- overlap_atoms = self.calc_overlap_atoms(prev_atoms, overlap_ratio)
- merged_atoms = overlap_atoms + atoms
- else:
- merged_atoms = atoms
- # new_chunk = chunk.copy()
- new_chunk = "".join(a["content"] for a in merged_atoms)
- new_chunks.append(new_chunk)
- prev_atoms = atoms
- return new_chunks
- def split_large_atom(self, atom: Dict, max_chunk_size: int) -> List[Dict]:
- """
- 切分超长的单个原子(表格、占位符、文本)
-
- 参数:
- atom: 原子字典 {"type": "table/placeholder/text", "content": "..."}
- max_chunk_size: 最大字符数限制
-
- 返回:
- List[Dict]: 切分后的原子列表
- """
- content = atom["content"]
- atom_type = atom["type"]
-
- # 如果不超长,直接返回
- if len(content) <= max_chunk_size:
- return [atom]
-
- logger.warning(f"检测到超长原子 [type={atom_type}, length={len(content)}],进行切分")
-
- result_atoms = []
-
- if atom_type == "table":
- # 表格按行切分
- # 提取表格标签
- table_match = re.match(r'(<table[^>]*>)(.*)(</table>)', content, re.DOTALL | re.IGNORECASE)
- if not table_match:
- # 无法解析表格结构,按字符强制切分
- logger.warning("无法解析表格结构,按字符强制切分")
- for i in range(0, len(content), max_chunk_size):
- chunk = content[i:i + max_chunk_size]
- result_atoms.append({"type": atom_type, "content": chunk})
- return result_atoms
-
- table_open = table_match.group(1)
- table_body = table_match.group(2)
- table_close = table_match.group(3)
-
- # 按 <tr> 行切分
- rows = re.findall(r'<tr[^>]*>.*?</tr>', table_body, re.DOTALL | re.IGNORECASE)
- if not rows:
- # 没有找到行,按字符强制切分
- for i in range(0, len(content), max_chunk_size):
- chunk = content[i:i + max_chunk_size]
- result_atoms.append({"type": atom_type, "content": chunk})
- return result_atoms
-
- # 组装表格块
- current_chunk = table_open
- for row in rows:
- # 检查加入这一行后是否超长
- if len(current_chunk) + len(row) + len(table_close) > max_chunk_size:
- # 当前块已满,保存并开始新块
- if current_chunk != table_open: # 确保不是空表格
- result_atoms.append({"type": atom_type, "content": current_chunk + table_close})
- current_chunk = table_open
- else:
- # 单行就超长,强制加入
- result_atoms.append({"type": atom_type, "content": current_chunk + row + table_close})
- current_chunk = table_open
- continue
-
- current_chunk += row
-
- # 添加最后一个块
- if current_chunk != table_open:
- result_atoms.append({"type": atom_type, "content": current_chunk + table_close})
-
- elif atom_type == "placeholder":
- # 占位符按字符切分(通常占位符不应该超长,但以防万一)
- logger.warning(f"占位符超长 [length={len(content)}],按字符切分")
- for i in range(0, len(content), max_chunk_size):
- chunk = content[i:i + max_chunk_size]
- result_atoms.append({"type": atom_type, "content": chunk})
-
- else: # text
- # 普通文本按字符切分
- for i in range(0, len(content), max_chunk_size):
- chunk = content[i:i + max_chunk_size]
- result_atoms.append({"type": atom_type, "content": chunk})
-
- logger.info(f"超长原子切分完成 [原长度={len(content)}, 切分为{len(result_atoms)}个块]")
- return result_atoms
- def smart_chunk_with_overlap(self, text: str, max_chunk_size: int = 5000, overlap_ratio: float = 0.1) -> List[str]:
- """
- 智能分割长文本并添加重叠
-
- 参数:
- text: 输入的长文本字符串
- max_chunk_size: 每块的最大字符数(所有内容),默认5000
- overlap_ratio: 重叠率,默认0.1(10%)
-
- 返回:
- List[str]: 分割后带重叠的文本块列表
-
- 处理流程:
- 1. 如果文本不超长,直接返回
- 2. 将文本拆分为原子(表格、占位符、普通文本)
- 3. 对超长原子进行切分
- 4. 按照max_chunk_size将原子组装成块(计算所有内容的总长度)
- 5. 对所有块添加重叠率处理(重叠时只计算纯文本长度)
- """
-
- if len(text) <= max_chunk_size:
- return [text]
-
- # 步骤1:将文本拆分为原子
- atoms = self.split_into_atoms(text)
-
- # 步骤2:对超长原子进行切分
- processed_atoms = []
- for atom in atoms:
- if len(atom["content"]) > max_chunk_size:
- # 超长原子需要切分
- split_atoms = self.split_large_atom(atom, max_chunk_size)
- processed_atoms.extend(split_atoms)
- else:
- processed_atoms.append(atom)
-
- # 步骤3:按照max_chunk_size组装原子成块(计算所有内容的总长度)
- chunks = []
- current_chunk_atoms = []
- current_total_len = 0 # 计算所有内容的总长度
-
- for atom in processed_atoms:
- atom_len = len(atom["content"]) # 所有类型都计算长度
-
- # 检查是否需要开始新块
- if current_total_len > 0 and current_total_len + atom_len > max_chunk_size:
- # 当前块已满,保存并开始新块
- chunk_text = "".join(a["content"] for a in current_chunk_atoms)
- chunks.append(chunk_text)
- current_chunk_atoms = []
- current_total_len = 0
-
- # 添加原子到当前块
- current_chunk_atoms.append(atom)
- current_total_len += atom_len
-
- # 添加最后一个块
- if current_chunk_atoms:
- chunk_text = "".join(a["content"] for a in current_chunk_atoms)
- chunks.append(chunk_text)
-
- # 步骤4:对所有块添加重叠率处理(重叠时只计算纯文本长度)
- if len(chunks) <= 1:
- return chunks
-
- return self.overlap_chunks(chunks, overlap_ratio)
-
- def chunk_text_for_rag(self, text: str, max_chars: int = 5000) -> List[str]:
- """
- 分割长文本(兼容旧接口,内部调用新方法)
-
- 参数:
- text: 输入的长文本字符串
- max_chars: 每块的最大字符数,默认5000
-
- 返回:
- List[str]: 分割后的文本块列表
- """
- return self.smart_chunk_with_overlap(text, max_chunk_size=max_chars, overlap_ratio=0.1)
- def find_split_by_table(self, chunk: str):
- """
- 在块中查找 </table> 标签作为分割点
- 优先在完整表格边界处分割,避免截断表格
-
- 返回:
- split_position: 分割位置(</table>标签之后),如果没找到返回 None
- """
- # 从后向前查找最后一个完整的 </table> 标签
- close_tag = '</table>'
- last_close_pos = chunk.rfind(close_tag)
-
- if last_close_pos == -1:
- return None
-
- # 分割点在 </table> 之后
- split_pos = last_close_pos + len(close_tag)
-
- # 确保分割点不是在块的最开始或最末尾(避免空块)
- if split_pos < 100 or split_pos >= len(chunk) - 100:
- return None
-
- return split_pos
- def find_split_point(self, chunk: str, full_text: str, chunk_start: int):
- """
- 在块中查找最近的x.x标签作为分割点
-
- 返回:
- (split_position, overlap_content): 分割位置和重叠内容
- """
- # 匹配x.x格式的标签(如1.1, 2.3, 10.5等)
- pattern = r'\d+\.\d+'
- # pattern = r'\b\d+\.\d+\b'
- matches = list(re.finditer(pattern, chunk))
-
- if not matches:
- return None, None
-
- # 找最后一个匹配的标签
- last_match = matches[-1]
- tag_start = last_match.start()
-
- # 提取该标签的完整内容(从标签到下一个标签或块尾)
- tag_content_start = chunk_start + tag_start
-
- # 在全文中查找下一个x.x标签
- remaining_text = full_text[tag_content_start:]
- next_tag_match = re.search(pattern, remaining_text[len(last_match.group()):])
-
- if next_tag_match:
- tag_content_end = tag_content_start + len(last_match.group()) + next_tag_match.start()
- else:
- tag_content_end = len(full_text)
-
- overlap_content = full_text[tag_content_start:tag_content_end]
-
- # 检查重叠内容长度
- if len(overlap_content) > self.max_cut_len:
- # 重叠内容本身超过限制,返回None使用句号分割
- return None, None
-
- return tag_start, overlap_content
- def find_split_by_period(self, chunk: str, full_text: str, chunk_start: int):
- """
- 按句号"。"查找分割点
-
- 返回:
- (split_position, overlap_content): 分割位置和重叠内容
- """
- # 从后向前查找句号
- last_period = chunk.rfind('。')
-
- if last_period == -1:
- return None, None
-
- # 分割点是句号之后
- split_pos = last_period + 1
-
- # 查找这个句子的开始位置作为重叠内容
- # 向前查找上一个句号
- prev_period = chunk.rfind('。', 0, last_period)
-
- if prev_period != -1:
- overlap_start = prev_period + 1
- overlap_content = chunk[overlap_start:split_pos]
- else:
- # 如果前面没有句号,则从块开始到当前句号
- overlap_content = chunk[:split_pos]
-
- # 检查重叠内容长度
- if len(overlap_content) > self.max_cut_len:
- overlap_content = ""
-
- return split_pos, overlap_content
- def process_table_images(self, table_body, doc_id, pdf_file_name, image_num, flag_img_info):
- """
- 处理表格中的<img>标签,提取图片并替换为占位符
-
- 参数:
- table_body: 表格HTML内容
- doc_id: 文档ID
- pdf_file_name: PDF文件名
- image_num: 当前图片序号
- flag_img_info: 图片信息字典
-
- 返回:
- tuple: (处理后的表格HTML, 更新后的图片序号)
- """
- if not table_body:
- return table_body, image_num
-
- # 匹配表格中的<img>标签,提取src属性
- img_pattern = re.compile(r'<img\s+[^>]*src=["\']([^"\']+)["\'][^>]*>', re.IGNORECASE)
-
- def replace_img(match):
- nonlocal image_num
- img_src = match.group(1)
-
- try:
- # 提取图片文件名
- image_name = img_src.split("/")[-1]
- # 使用_get_image_base_path拼接完整路径
- image_base_path = self._get_image_base_path(pdf_file_name)
- save_image_path = f"{image_base_path}/{image_name}"
-
- # 检查图片文件是否存在
- if not os.path.exists(save_image_path):
- logger.warning(f"表格中的图片文件不存在: {save_image_path}")
- return match.group(0) # 保持原样
-
- # 生成占位符
- # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{image_name}.jpg"
- full_img_url = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- original_img_tag = match.group(0)
- replace_text = re.sub(
- r'src=["\']([^"\']+)["\']',
- f'src="{full_img_url}"',
- original_img_tag,
- flags=re.IGNORECASE
- )
-
- # 上传到MinIO
- self.minio_client.upload_file(save_image_path, minio_file_path)
- # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
-
- image_num += 1
- return replace_text
-
- except Exception as e:
- logger.error(f"处理表格中的图片失败 [src={img_src}]: {e}")
- return match.group(0) # 保持原样
-
- # 替换所有<img>标签
- processed_table = img_pattern.sub(replace_img, table_body)
- return processed_table, image_num
-
- def process_table_content(self, table_caption, table_body, max_table_chars=8000):
- """
- 处理表格内容,防止HTML表格过大导致数据库字段溢出
-
- 参数:
- table_caption: 表格标题列表
- table_body: 表格HTML内容
- max_table_chars: 表格内容最大字符数限制
-
- 返回:
- str: 处理后的表格文本
- """
- if not table_body:
- return ""
-
- # 组合标题和内容
- caption_text = " ".join(table_caption) if table_caption else ""
- full_table_text = f"{caption_text}\n{table_body}\n" if caption_text else f"{table_body}\n"
-
- # 如果表格内容不超过限制,直接返回
- if len(full_table_text) <= max_table_chars:
- return full_table_text
-
- logger.warning(f"表格内容过长({len(full_table_text)}字符),进行截断处理")
-
- # 尝试智能截断:优先保留表格结构
- if "</table>" in table_body:
- # 查找完整的表格标签
- table_start = table_body.find("<table")
- if table_start != -1:
- # 从表格开始位置截断,确保保留表格结构
- available_chars = max_table_chars - len(caption_text) - 2 # 减去标题和换行符
- if available_chars > 100: # 确保有足够空间
- truncated_body = table_body[:available_chars]
- # 尝试在最后一个完整的行结束处截断
- last_tr_end = truncated_body.rfind("</tr>")
- if last_tr_end != -1 and last_tr_end > available_chars // 2:
- truncated_body = truncated_body[:last_tr_end + 5] # 包含</tr>
-
- # 确保表格标签闭合
- if "<table" in truncated_body and "</table>" not in truncated_body:
- truncated_body += "</table>"
-
- result = f"{caption_text}\n{truncated_body}\n[表格内容已截断]\n" if caption_text else f"{truncated_body}\n[表格内容已截断]\n"
- return result
-
- # 如果无法智能截断,进行简单截断
- max_content_chars = max_table_chars - len("[内容已截断]\n")
- if caption_text:
- max_content_chars -= len(caption_text) + 1 # 减去标题和换行符
- truncated_content = table_body[:max_content_chars] if max_content_chars > 0 else ""
- return f"{caption_text}\n{truncated_content}[内容已截断]\n"
- else:
- truncated_content = table_body[:max_content_chars] if max_content_chars > 0 else ""
- return f"{truncated_content}[内容已截断]\n"
-
- # 0
- def split_by_title(self, file_content_list, set_table, doc_id, pdf_file_name):
- # TODO 处理根据标题切分逻辑 图片替换标识符,表格按照set table 0图片,1html数据
- stop_title_list = ['前言', '目录', '目次', 'context', 'CONTEXT', 'content', 'CONTENT', 'contents', 'CONTENTS']
- text_lists = []
- bbox_list = []
- page_list = []
- text = ""
- first_bbox = None
- first_page = None
- image_num = 1
- flag_img_info = {}
- first_level_1 = True
- level_1_text = ""
- level_2_text = ""
- for i, content_dict in enumerate(file_content_list):
- # 记录当前块的首bbox和首page
- if first_bbox is None and content_dict.get("bbox"):
- first_bbox = content_dict.get("bbox")
- first_page = content_dict.get("page_idx", 0) + 1
- text_type = content_dict.get("type")
- content_text = content_dict.get("text")
- if text_type == "text":
- text_level = content_dict.get("text_level", "")
- if text_level == 1:
- if not level_1_text and first_level_1:
- if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text):
- if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text).group(1).strip() not in stop_title_list:
- level_1_text = f"# {content_text}\n"
- # first_level_1 = True
- if first_level_1:
- text += f"# {content_text}\n"
- first_level_1 = False
- else:
- if len(text) > self.max_cut_len:
- logger.info(f"块长度过大:{len(text)}")
- text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- logger.info(f"切分后的块:{text_chunks}")
- text_lists.extend(text_chunks)
- bbox_list.extend([first_bbox] * len(text_chunks))
- page_list.extend([first_page] * len(text_chunks))
- else:
- text_lists.append(text)
- bbox_list.append(first_bbox)
- page_list.append(first_page)
- text = f"# {content_text}\n"
- # 屏蔽常见干扰标题
- if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text):
- if re.match(r'^\d+(?:\.\d+)*\s*(.+)$', content_text).group(1).strip() not in stop_title_list:
- level_1_text = f"# {content_text}\n"
- else:
- level_1_text = ""
- level_2_text = ""
- first_bbox = content_dict.get("bbox")
- first_page = content_dict.get("page_idx", 0) + 1
- elif text_level == 2:
- if not level_2_text:
- text += f"## {content_text}\n"
- level_2_text = f"## {content_text}\n"
- else:
- if len(text) > self.max_cut_len:
- logger.info(f"块长度过大:{len(text)}")
- text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- logger.info(f"切分后的块:{text_chunks}")
- text_lists.extend(text_chunks)
- bbox_list.extend([first_bbox] * len(text_chunks))
- page_list.extend([first_page] * len(text_chunks))
- else:
- text_lists.append(text)
- bbox_list.append(first_bbox)
- page_list.append(first_page)
- text = level_1_text + f"## {content_text}\n"
- first_bbox = content_dict.get("bbox")
- first_page = content_dict.get("page_idx", 0) + 1
- else:
- if text_level:
- text += text_level*"#" + " " + content_text + "\n"
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
- elif text_type in ("image", "table"):
- # image_path = content_dict.get("img_path")
- # if not image_path:
- # continue
- # image_name = image_path.split("/")[1]
- # # 根据解析器类型选择图片路径
- # image_base_path = self._get_image_base_path(pdf_file_name)
- # save_image_path = f"{image_base_path}/{image_name}"
- # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- # self.minio_client.upload_file(save_image_path, minio_file_path)
- # minio_url = minio_config.get("minio_url")
- # minio_bucket = minio_config.get("minio_bucket")
- # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- # if text_type == "table":
- # text += " ".join(content_dict.get("table_caption")) + '\n' + content_dict.get("table_body", "") + '\n'
- # text += replace_text
- # image_num += 1
- try:
- image_path = content_dict.get("img_path")
- replace_text = ""
- if not image_path and text_type == "image":
- continue
- elif image_path:
- image_name = image_path.split("/")[-1]
- image_base_path = self._get_image_base_path(pdf_file_name)
- save_image_path = f"{image_base_path}/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
-
- if text_type == "image":
- image_head = content_dict.get("image_caption", "")
- image_tail = content_dict.get("image_footnote", "")
- replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
- if text_type == "table":
- image_num += 1
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- # table_text = self.process_table_content(
- # content_dict.get("table_caption", []),
- # content_dict.get("table_body", "")
- # )
- text += table_text
- if replace_text:
- text += replace_text
- image_num += 1
- except Exception as e:
- logger.error(f"处理图片或表格失败: {e}")
- if i+1 == len(file_content_list):
- if len(text) > self.max_cut_len:
- logger.info(f"块长度过大(最后一个):{len(text)}")
- text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- logger.info(f"切分后的块(最后一个):{text_chunks}")
- text_lists.extend(text_chunks)
- bbox_list.extend([first_bbox] * len(text_chunks))
- page_list.extend([first_page] * len(text_chunks))
- else:
- text_lists.append(text)
- bbox_list.append(first_bbox)
- page_list.append(first_page)
-
- elif text_type == "list":
- list_items = content_dict.get("list_items")
- if list_items:
- text += "\n".join(list_items) + "\n"
- return text_lists, flag_img_info, bbox_list, page_list
-
- # 1
- def split_by_page(self, file_content_list, set_table, doc_id, pdf_file_name):
- # TODO 处理按照页面切分,图片处理成标识符,表格按照set table 0图片,1html数据
- text_lists = []
- bbox_list = []
- page_list = []
- current_page = ""
- text = ""
- first_bbox = None
- first_page = None
- image_num = 1
- flag_img_info = {}
- for i,content_dict in enumerate(file_content_list):
- page_index = content_dict.get("page_idx")
- if i == 0:
- current_page = page_index
- first_bbox = content_dict.get("bbox")
- first_page = page_index + 1 if page_index is not None else None
- elif page_index != current_page:
- if len(text) > self.max_cut_len:
- text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- text_lists.extend(text_chunks)
- bbox_list.extend([first_bbox] * len(text_chunks))
- page_list.extend([first_page] * len(text_chunks))
- else:
- text_lists.append(text)
- bbox_list.append(first_bbox)
- page_list.append(first_page)
- text = ""
- current_page = page_index
- first_bbox = content_dict.get("bbox")
- first_page = page_index + 1 if page_index is not None else None
- text_type = content_dict.get("type")
- if text_type == "text":
- content_text = content_dict.get("text")
- text_level = content_dict.get("text_level")
- if text_level:
- text += "#" * text_level + " " + content_text
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
- elif text_type in ("image", "table"):
- # image_path = content_dict.get("img_path")
- # if not image_path:
- # continue
- # image_name = image_path.split("/")[1]
- # # 根据解析器类型选择图片路径
- # image_base_path = self._get_image_base_path(pdf_file_name)
- # save_image_path = f"{image_base_path}/{image_name}"
- # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- # self.minio_client.upload_file(save_image_path, minio_file_path)
- # minio_url = minio_config.get("minio_url")
- # minio_bucket = minio_config.get("minio_bucket")
- # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- # if text_type == "table":
- # text += " ".join(content_dict.get("table_caption", [])) + '\n' + content_dict.get("table_body", "") + '\n'
- # text += replace_text
- # image_num += 1
- try:
- image_path = content_dict.get("img_path")
- replace_text = ""
- if not image_path and text_type == "image":
- continue
- elif image_path:
- image_name = image_path.split("/")[-1]
- image_base_path = self._get_image_base_path(pdf_file_name)
- save_image_path = f"{image_base_path}/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
-
- if text_type == "image":
- image_head = content_dict.get("image_caption", "")
- image_tail = content_dict.get("image_footnote", "")
- replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
- if text_type == "table":
- image_num += 1
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
- if replace_text:
- text += replace_text
- image_num += 1
- except Exception as e:
- logger.error(f"处理图片或表格失败: {e}")
- if i+1 == len(file_content_list):
- if len(text) > self.max_cut_len:
- text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- text_lists.extend(text_chunks)
- bbox_list.extend([first_bbox] * len(text_chunks))
- page_list.extend([first_page] * len(text_chunks))
- else:
- text_lists.append(text)
- bbox_list.append(first_bbox)
- page_list.append(first_page)
- return self.overlap_chunks(text_lists), flag_img_info, bbox_list, page_list
- # 其它
- def split_by_self(self, file_content_list, set_table, slice_value, doc_id, pdf_file_name):
- # TODO 按照自定义的符号切分,图片处理成标识符,表格按照set table 0图片,1html数据,长度控制500以内,超过500切断
- logger.info(f"自定义的分隔符:{slice_value}")
- text = ""
- first_bbox = None
- first_page = None
- image_num = 1
- flag_img_info = {}
- for i, content_dict in enumerate(file_content_list):
- # 记录首个bbox和page
- if first_bbox is None and content_dict.get("bbox"):
- first_bbox = content_dict.get("bbox")
- first_page = content_dict.get("page_idx", 0) + 1
- text_type = content_dict.get("type")
- if text_type == "text":
- content_text = content_dict.get("text")
- text_level = content_dict.get("text_level")
- if text_level:
- text += "#" * text_level + " " + content_text
- else:
- text += content_text
- elif text_type == "table" and set_table == "1":
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
- elif text_type in ("image", "table"):
- # image_path = content_dict.get("img_path")
- # if not image_path:
- # continue
- # image_name = image_path.split("/")[1]
- # # 根据解析器类型选择图片路径
- # image_base_path = self._get_image_base_path(pdf_file_name)
- # save_image_path = f"{image_base_path}/{image_name}"
- # replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- # minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- # self.minio_client.upload_file(save_image_path, minio_file_path)
- # minio_url = minio_config.get("minio_url")
- # minio_bucket = minio_config.get("minio_bucket")
- # flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- # if text_type == "table":
- # text += " ".join(content_dict.get("table_caption", [])) + '\n' + content_dict.get("table_body", "") + '\n'
- # text += replace_text
- # image_num += 1
- try:
- image_path = content_dict.get("img_path")
- replace_text = ""
- if not image_path and text_type == "image":
- continue
- elif image_path:
- image_name = image_path.split("/")[-1]
- image_base_path = self._get_image_base_path(pdf_file_name)
- save_image_path = f"{image_base_path}/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
-
- if text_type == "image":
- image_head = content_dict.get("image_caption", "")
- image_tail = content_dict.get("image_footnote", "")
- replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
-
- if text_type == "table":
- image_num += 1
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
- if replace_text:
- text += replace_text
- image_num += 1
- except Exception as e:
- logger.error(f"处理图片或表格失败: {e}")
- split_lists = text.split(slice_value)
- text_lists = []
- for split_text in split_lists:
- # r = len(split_text)//500
- # if r >= 1:
- # for i in range(r+1):
- # t = split_text[i*500:(i+1)*500]
- # if t:
- # text_lists.append(t)
- if not split_text:
- continue
- if len(split_text) > self.max_cut_len:
- text_chunks = self.smart_chunk_with_overlap(split_text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- text_lists.extend(text_chunks)
- else:
- text_lists.append(split_text)
-
- # 所有块使用首个bbox和page
- bbox_list = [first_bbox] * len(text_lists)
- page_list = [first_page] * len(text_lists)
- return text_lists, flag_img_info, bbox_list, page_list
- # 3
- def split_by_min_paragraph(self, file_content_list, set_table, doc_id, pdf_file_name):
- """
- 按最小段落切分:每个标题及其下面的非标题内容切为一个块
- 特殊处理:1级标题(text_level=1)合并到后面非1级标题的块中,连续1级标题也合并
- 构建 title_dict,每个块包含完整的父标题链内容
- 数字结构标题(如5.1.1)的父级链查找
- """
- text_lists = []
- bbox_list = []
- page_list = []
- text = ""
- first_bbox = None
- first_page = None
- # pending_level1 = "" # 累积的1级标题文本
- pending_bbox = None # 累积1级标题时的首个bbox
- pending_page = None # 累积1级标题时的首个page
- image_num = 1
- flag_img_info = {}
- path_title_list = []
- father_path_title_list = []
- title_dict = {} # 存储 title_path -> 标题内容的映射
- number_title_dict = {} # 存储 数字编号 -> title_path 的映射
- coln_log = False
- coln_log_level = 0
-
- for i, content_dict in enumerate(file_content_list):
- text_type = content_dict.get("type")
- content_text = content_dict.get("text", "")
-
- if text_type == "text":
- text_level = content_dict.get("text_level", 0)
-
- # if text_level == 1:
- # # 1级标题:记录到 title_dict 并累积到pending_level1
- # title_path = content_dict.get("title_path", "")
- # if title_path:
- # title_content = f"# {content_text}\n"
- # title_dict[title_path] = title_content
- # # 提取数字编号并记录映射
- # number_match = re.match(r'^(\d+(?:\.\d+)*)', content_text)
- # if number_match:
- # number_title_dict[number_match.group(1)] = title_path
- # pending_level1 += f"# {content_text}\n"
- # if pending_bbox is None and content_dict.get("bbox"):
- # pending_bbox = content_dict.get("bbox")
- # pending_page = content_dict.get("page_idx", 0) + 1
- if text_level:
- # 记录标题到 title_dict
- title_path = content_dict.get("title_path", "")
- if title_path:
- title_content = "#" * text_level + f" {content_text}\n"
- title_dict[title_path] = title_content
- # 提取数字编号并记录映射
- number_match = re.match(r'^(\d+(?:\.\d+)*)', content_text)
- if number_match:
- # if number_match.group(1) not in number_title_dict:
- number_title_dict[number_match.group(1)] = title_content
-
- if coln_log:
- title_number = re.match(r'^(\d+(?:\.\d+)*)', content_text).group(1) if re.match(r'^(\d+(?:\.\d+)*)', content_text) else ""
- # logger.info(title_number)
- if text_level < coln_log_level or title_number not in ["1","2","3","4","5","6","7","8","9"]:
- coln_log = False
- coln_log_level = 0
- else:
- text += "#" * text_level + f" {content_text}\n"
- continue
- # 保存当前块(如果有内容)
- if text:
- if len(text) > self.max_cut_len:
- text_chunks = self.smart_chunk_with_overlap(text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- text_lists.extend(text_chunks)
- bbox_list.extend([first_bbox] * len(text_chunks))
- page_list.extend([first_page] * len(text_chunks))
- # 使用当前块的标题路径
- last_path = path_title_list[-1] if path_title_list else ""
- last_father = father_path_title_list[-1] if father_path_title_list else ""
- path_title_list.extend([last_path] * len(text_chunks))
- father_path_title_list.extend([last_father] * len(text_chunks))
- else:
- text_lists.append(text)
- bbox_list.append(first_bbox)
- page_list.append(first_page)
- # 使用当前块的标题路径
- path_title_list.append(path_title_list[-1] if path_title_list else "")
- father_path_title_list.append(father_path_title_list[-1] if father_path_title_list else "")
-
- # 新块:构建完整的父标题链
- title_chain_text = self._build_title_chain(title_path, title_dict, number_title_dict)
- if not coln_log:
- if content_text[-1] in [":", ":"]:
- coln_log = True
- coln_log_level = text_level
-
- # 更新"当前块"的标题路径(为下一次保存块做准备)
- if path_title_list:
- # 如果列表不为空,更新最后一个元素
- path_title_list[-1] = title_path
- parts = title_path.split("->") if title_path else []
- father_path_title_list[-1] = "->".join(parts[:-1]) if len(parts) > 1 else ""
- else:
- # 第一次遇到标题,添加到列表
- path_title_list.append(title_path)
- parts = title_path.split("->") if title_path else []
- father_path_title_list.append("->".join(parts[:-1]) if len(parts) > 1 else "")
-
- text = title_chain_text
- # 新块的首bbox/page:优先用pending的,否则用当前的
- first_bbox = pending_bbox if pending_bbox else content_dict.get("bbox")
- first_page = pending_page if pending_page else (content_dict.get("page_idx", 0) + 1 if content_dict.get("page_idx") is not None else None)
- pending_bbox = None
- pending_page = None
- # if not coln_log:
- # if content_text[-1] in [":", ":"]:
- # coln_log = True
- # coln_log_level = text_level
- # # text += "#" * text_level + f" {content_text}\n"
- # continue
-
- else:
- # 非标题内容:追加到当前块
- if first_bbox is None and content_dict.get("bbox"):
- first_bbox = content_dict.get("bbox")
- first_page = content_dict.get("page_idx", 0) + 1
- text += content_text
-
- elif text_type == "table" and set_table == "1":
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
-
- elif text_type in ("image", "table"):
- try:
- image_path = content_dict.get("img_path")
- replace_text = ""
- if not image_path and text_type == "image":
- continue
- elif image_path:
- image_name = image_path.split("/")[-1]
- image_base_path = self._get_image_base_path(pdf_file_name)
- save_image_path = f"{image_base_path}/{image_name}"
- replace_text = f"【示意图序号_{doc_id}_{image_num}】"
- minio_file_path = f"/pdf/{self.knowledge_id}/{doc_id}/{replace_text}.jpg"
- self.minio_client.upload_file(save_image_path, minio_file_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- flag_img_info[replace_text] = f"{minio_url}/{minio_bucket}/{minio_file_path}"
- if text_type == "image":
- image_head = content_dict.get("image_caption", "")
- image_tail = content_dict.get("image_footnote", "")
- replace_text = "\n".join(image_head) + replace_text + "\n".join(image_tail)
-
- if text_type == "table":
- image_num += 1
- table_body = content_dict.get("table_body", "")
- # 先处理表格中的图片
- table_body, image_num = self.process_table_images(table_body, doc_id, pdf_file_name, image_num, flag_img_info)
- # 再处理表格内容
- table_text = self.process_table_content(
- content_dict.get("table_caption", []),
- table_body
- )
- text += table_text
- if replace_text:
- text += replace_text
- image_num += 1
- except Exception as e:
- logger.error(f"处理图片或表格失败: {e}")
-
- elif text_type == "list":
- list_items = content_dict.get("list_items")
- if list_items:
- text += "\n".join(list_items) + "\n"
-
- # # 处理最后一个块
- # if pending_level1 or text:
- # final_text = pending_level1 + text if pending_level1 else text
- # if final_text:
- # if len(final_text) > 5000:
- # text_chunks = self.smart_chunk_with_overlap(final_text, max_chunk_size=5000, overlap_ratio=0.1)
- # text_lists.extend(text_chunks)
- # final_bbox = pending_bbox if pending_bbox else first_bbox
- # final_page = pending_page if pending_page else first_page
- # bbox_list.extend([final_bbox] * len(text_chunks))
- # page_list.extend([final_page] * len(text_chunks))
- # path_title_list.extend([""] * len(text_chunks))
- # father_path_title_list.extend([""] * len(text_chunks))
- # else:
- # text_lists.append(final_text)
- # bbox_list.append(pending_bbox if pending_bbox else first_bbox)
- # page_list.append(pending_page if pending_page else first_page)
- # path_title_list.append("")
- # father_path_title_list.append("")
-
- if text:
- final_text = text
- if final_text:
- if len(final_text) > self.max_cut_len:
- text_chunks = self.smart_chunk_with_overlap(final_text, max_chunk_size=self.max_cut_len, overlap_ratio=0.1)
- text_lists.extend(text_chunks)
- final_bbox = pending_bbox if pending_bbox else first_bbox
- final_page = pending_page if pending_page else first_page
- bbox_list.extend([final_bbox] * len(text_chunks))
- page_list.extend([final_page] * len(text_chunks))
- path_title_list.extend([""] * len(text_chunks))
- father_path_title_list.extend([""] * len(text_chunks))
- else:
- text_lists.append(final_text)
- bbox_list.append(pending_bbox if pending_bbox else first_bbox)
- page_list.append(pending_page if pending_page else first_page)
- path_title_list.append("")
- father_path_title_list.append("")
-
- return text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list
-
- def _build_title_chain(self, title_path: str, title_dict: Dict[str, str], number_title_dict: Dict[str, str] = None) -> str:
- """
- 根据 title_path 构建完整的父标题链内容
-
- 参数:
- title_path: 标题路径,格式如 "1->1.1->1.1.1"
- title_dict: 标题字典,格式如 {"1": "1、xx\n", "1->1.1": "1.1、xx\n", ...}
- number_title_dict: 数字标题映射字典,格式如 {"5.1.1": "path", "5.1": "path", "5": "path"}
-
- 返回:
- 完整的标题链文本,如 "1、xx\n1.1、xx\n1.1.1、xx\n"
- """
- if not title_path:
- return ""
-
- # 分解 title_path,构建所有父路径
- parts = title_path.split("->")
- title_chain = []
-
- # 使用数字结构查找父级链
- if number_title_dict:
- # 获取当前标题路径对应的标题内容
- current_title = title_dict.get(title_path, "")
- # 使用正则提取数字编号
- number_match = re.match(r'^(#+\s+)?(\d+(?:\.\d+)*)', current_title)
- if number_match:
- number = number_match.group(2) # 提取数字部分,如 "5.1.1"
- # 按"."切分,逐层查找父标题
- number_parts = number.split(".")
- for i in range(len(number_parts)):
- parent_number = ".".join(number_parts[:i+1])
- if parent_number in number_title_dict:
- parent_path = number_title_dict[parent_number]
- # if parent_path in title_dict:
- # current_len = len(re.match(r'^(#+)', parent_path).group(1) if re.match(r'^(#+)', parent_path) else '')
- # if title_chain:
- # last_len = len(re.match(r'^(#+)', current_title).group(1) if re.match(r'^(#+)', title_chain[-1]) else '')
- # else:
- # last_len = 100
- # if current_len >= last_len:
- # continue
- title_chain.append(parent_path)
- return "".join(title_chain)
-
- # 如果不是数字结构或未找到,按标题路径
- for i in range(len(parts)):
- # 构建从根到当前层级的路径
- current_path = "->".join(parts[:i+1])
- if current_path in title_dict:
- title_chain.append(title_dict[current_path])
-
- return "".join(title_chain)
- def file_split(self, file_content_list, doc_id, pdf_file_name):
- # TODO 根据文本列表进行切分 返回切分列表和存储图片的链接
- separator_num = self.file_json.get("customSeparator")
- set_table = self.file_json.get("set_table")
- # separator = split_map.get(separator_num) if split_map.get(separator_num) else [slice_value]
- # logger.info(f"文本切分字符:{separator}")
- if isinstance(file_content_list, str):
- file_text = file_content_list
- text_lists = self.split_text(file_text)
- # 字符串切分无bbox/page信息
- bbox_list = [None] * len(text_lists)
- page_list = [None] * len(text_lists)
- return text_lists, {}, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
- # 0自定义,1按照页,2是最大,3是最小
- elif separator_num == "2":
- # 使用标题段落切分,使用text_level=1,2 切分即一个# 还是两个#
- text_lists, flag_img_info, bbox_list, page_list = self.split_by_title(file_content_list, set_table, doc_id, pdf_file_name)
- return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
- elif separator_num == "1":
- # 按照页面方式切分
- text_lists, flag_img_info, bbox_list, page_list = self.split_by_page(file_content_list, set_table, doc_id, pdf_file_name)
- return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
- elif separator_num == "-1":
- # 按照问答对切分 针对exce文档,暂不实现
- return [], {}, [], [], [], []
- elif separator_num == "3":
- # 按最小段落切分:每个标题+其下内容为一块,1级标题合并到后续非1级块
- text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list = self.split_by_min_paragraph(file_content_list, set_table, doc_id, pdf_file_name)
- return text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list
- elif separator_num == "0":
- # 自定义切分的方式,按照自定义字符以及文本长度切分,超过500
- slice_value = self.file_json.get("slice_value", "").replace("\\n", "\n")
- text_lists, flag_img_info, bbox_list, page_list = self.split_by_self(file_content_list, set_table, slice_value, doc_id, pdf_file_name)
- return text_lists, flag_img_info, [""] * len(text_lists), [""] * len(text_lists), bbox_list, page_list
-
- def parse_excel_to_text_lists(self, excel_file_path):
- """
- 解析 Excel 文件,返回 Markdown 表格格式的文本列表
- 每个工作表(sheet)的第一行作为整个表的表头,后续所有块的数据行都使用该表头
- """
- try:
- from tabulate import tabulate
- except ImportError:
- logger.error("缺少 tabulate 库,请安装: pip install tabulate")
- logger.info("降级使用纯文本格式")
- # 降级为原始纯文本格式
- result = parse_excel(excel_file_path)
- text_lists = []
- for sheet_name, blocks in result.items():
- for block in blocks:
- content = block.get("content", [])
- lines = [" ".join(str(cell) if cell is not None else "" for cell in row) for row in content]
- text = "\n".join(lines).strip()
- if text:
- text_lists.append(text)
- return text_lists
-
- result = parse_excel(excel_file_path)
- text_lists = []
-
- for sheet_name, blocks in result.items():
- if not blocks:
- continue
-
- # 获取该工作表的第一个块的第一行作为整个表的表头
- sheet_headers = None
- first_block_content = blocks[0].get("content", [])
- if first_block_content:
- sheet_headers = [str(cell).strip() if cell is not None else "" for cell in first_block_content[0]]
-
- if not sheet_headers:
- logger.warning(f"工作表 {sheet_name} 没有有效的表头,跳过")
- # continue
-
- logger.info(f"工作表 {sheet_name} 的表头: {sheet_headers}")
-
- for block_idx, block in enumerate(blocks):
- content = block.get("content", [])
- if not content:
- continue
-
- try:
- # 确定数据行:第一个块跳过第一行(表头),后续块使用全部行
- if block_idx == 0:
- # 第一个块:跳过第一行(表头),其余行作为数据
- data_rows = content[1:]
- else:
- # 后续块:所有行都是数据行
- data_rows = content
-
- # 格式化数据行
- rows = []
- for row in data_rows:
- formatted_row = [str(cell).strip() if cell is not None else "" for cell in row]
- rows.append(formatted_row)
-
- # 生成 Markdown 表格
- if rows: # 有数据行
- md_table = tabulate(rows, headers=sheet_headers, tablefmt="github")
- text_lists.append(md_table)
- logger.debug(f"成功生成 Markdown 表格 [sheet={sheet_name}, block={block_idx}],表头: {sheet_headers}, 数据行数: {len(rows)}")
- else:
- logger.debug(f"跳过空数据块 [sheet={sheet_name}, block={block_idx}]")
-
- except Exception as e:
- logger.error(f"生成 Markdown 表格失败 [sheet={sheet_name}, block={block_idx}]: {e}")
- # 降级为纯文本格式
- if block_idx == 0:
- data_rows = content[1:]
- else:
- data_rows = content
- lines = [" ".join(str(cell) if cell is not None else "" for cell in row) for row in data_rows]
- text = "\n".join(lines).strip()
- if text:
- text_lists.append(text)
- logger.info(f"降级使用纯文本格式 [sheet={sheet_name}, block={block_idx}]")
-
- logger.info(f"Excel 解析完成,共生成 {len(text_lists)} 个文本块")
- return text_lists
- def process_data_to_milvus_schema(self, text_lists, doc_id, name, path_title_list, father_path_title_list=None, bbox_list=None, page_list=None):
- """组织数据格式:
- {
- "content": text,
- "doc_id": doc_id,
- "chunk_id": chunk_id,
- "metadata": {"source": file_name},
- "path_title": path_title_list[i],
- "Chapter": path_title_list[i],
- "Father_Chapter": father_path_title_list[i],
- "bbox": bbox_list[i],
- "page": page_list[i]
- }
- """
- if father_path_title_list is None:
- father_path_title_list = [""] * len(text_lists)
- if bbox_list is None:
- bbox_list = [None] * len(text_lists)
- if page_list is None:
- page_list = [None] * len(text_lists)
- docs = []
- total_len = 0
- for i, text in enumerate(text_lists):
- chunk_id = str(uuid1())
- chunk_len = len(text)
- total_len += chunk_len
- # bbox转为字符串存储
- bbox_val = bbox_list[i] if i < len(bbox_list) else None
- bbox_str = str(bbox_val) if bbox_val else None
- page_val = page_list[i] if i < len(page_list) else None
- d = {
- "content": text,
- "doc_id": doc_id,
- "chunk_id": chunk_id,
- "metadata": {"source": name, "chunk_index": i+1, "chunk_len": chunk_len, "knowledge_id": self.knowledge_id},
- # "path_title": path_title_list[i],
- "Chapter": path_title_list[i],
- "Father_Chapter": father_path_title_list[i],
- "bbox": bbox_str,
- "page": page_val
- }
- docs.append(d)
- return docs, total_len
-
- async def process_documents(self, file_json, task_id=None):
- # 提取用户ID
- user_id = file_json.get("userId", "")
- embedding_id = file_json.get("embedding_id", "bge-m3")
-
- # ===== 注册任务到全局任务表 =====
- task_ctx = None
- if task_id:
- task_ctx = task_registry.register(task_id, user_id, self.knowledge_id)
-
- # 启动进度(独立线程)
- reporter = None
- if task_id:
- from rag.progress_reporter import ProgressReporter
- # from config import progress_callback_config
- estimate_seconds = progress_callback_config.get("estimate_seconds", 120)
- callback_url = progress_callback_config.get("base_url")
- reporter = ProgressReporter(task_id, callback_url, estimate_seconds, user_id)
- reporter.start()
-
- # 关联 reporter 到任务上下文(用于取消时停止进度上报)
- if task_ctx:
- task_ctx.reporter = reporter
-
- # 更新任务状态为开始(1)
- # self.mysql_client.update_task_status_start(task_id)
-
- # 初始化成功文档列表(在 try 外面定义,确保异常处理时可访问)
- success_doc = []
-
- try:
- # 文档下载
- separator_num = file_json.get("customSeparator", "")
- if separator_num == "-1":
- if reporter:
- reporter.complete(success=False)
- # if task_id:
- # self.mysql_client.update_task_status_error(task_id)
- return {"code": 500, "message": "暂不支持解析"}
- docs = file_json.get("docs")
- flag = file_json.get("flag")
- for doc in docs:
- # ===== 检查点1:每个文档处理前检查取消标志 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在文档处理前被取消")
- raise TaskCancelledException("任务已被用户取消")
-
- url = doc.get("url")
- name = doc.get("name", "").replace(" ", "")
- doc_id = doc.get("document_id")
- # ===== 文档复制模式处理 =====
- old_document_id = doc.get("oldDocumentId")
- old_knowledge_id = doc.get("oldKnowledgeId")
- tenant_id = file_json.get("tenantId", "000000")
-
- # 如果提供old 文档id 和 old 知识库id 则表示文档存在直接复制
- if old_document_id and old_knowledge_id:
- logger.info(f"进入文档复制模式: oldDocumentId={old_document_id}, oldKnowledgeId={old_knowledge_id}, newDocId={doc_id}, newKnowledgeId={self.knowledge_id}")
-
- try:
- # 1. 复制 Milvus 向量数据(使用前端传入的 doc_id)
- old_milvus_client = MilvusOperate(collection_name=old_knowledge_id, embedding_name=self.embedding_name)
- milvus_resp = old_milvus_client._copy_single_doc_to_collection(
- new_collection_name=self.knowledge_id,
- old_doc_id=old_document_id,
- new_doc_id=doc_id,
- embedding_name=self.embedding_name
- )
-
- if milvus_resp.get("code") != 200:
- logger.error(f"[文档复制模式] Milvus数据复制失败: {milvus_resp.get('message')}")
- self.send_post_request_sync(
- f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
- {"status": "2", "documentId": doc_id}
- )
- if reporter:
- reporter.complete(success=False)
- return {"code": 500, "message": f"文档复制模式:Milvus复制失败 - {milvus_resp.get('message')}", "knowledge_id": self.knowledge_id, "doc_info": {}}
-
- # 获取 chunk_id 映射
- chunk_id_mapping = milvus_resp.get("chunk_id_mapping", {})
-
- logger.info(f"[文档复制模式] Milvus数据复制成功,共 {milvus_resp.get('data', {}).get('total_records', 0)} 条记录")
-
- # 2. 复制 MySQL 元数据(slice_text 和 old_slice_text 都用旧的 old_slice_text)
- mysql_success, mysql_result = self.mysql_client.copy_single_doc_metadata_for_document_copy(
- source_knowledge_id=old_knowledge_id,
- source_doc_id=old_document_id,
- new_knowledge_id=self.knowledge_id,
- new_doc_id=doc_id,
- chunk_id_mapping=chunk_id_mapping,
- tenant_id=tenant_id
- )
- knowledge_id = file_json.get("knowledge_id", "")
- enabled_kn_rs = self.mysql_client.query_knowledge_by_ids([knowledge_id])
- enabled_kn_gp_ids = set()
- for r in enabled_kn_rs:
- if r["knowledge_graph"]:
- enabled_kn_gp_ids.add(r["knowledge_id"])
- if enabled_kn_gp_ids:
- # if file_json.get("lightrag"):
- result = mysql_result.get("result", {})
- knowledge_id = result.get("new_knowledge_id", "")
- document_id = result.get("new_doc_id", "")
- datas = result.get("lightrag_data", [])
- # logger.info(f"11111111111111111111111{result}")
- rag_mgr = AsyncLightRAGManager()
- await rag_mgr.init_workspace(knowledge_id)
- await rag_mgr.insert(knowledge_id, {"content": datas, "ids": [document_id]})
- await rag_mgr.close()
-
- if not mysql_success:
- logger.error(f"[文档复制模式] MySQL元数据复制失败: {mysql_result.get('error')}")
- # 回滚 Milvus 数据
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.send_post_request_sync(
- f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
- {"status": "2", "documentId": doc_id}
- )
- if reporter:
- reporter.complete(success=False)
- return {"code": 500, "message": f"文档复制模式:MySQL复制失败 - {mysql_result.get('error')}", "knowledge_id": self.knowledge_id, "doc_info": {}}
-
- logger.info(f"[文档复制模式] MySQL元数据复制成功: {mysql_result}")
-
- # 3. 发送成功回调
- # slice_count = mysql_result.get("slice_count", 0)
- self.send_post_request_sync(
- f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
- {
- "knowledgeId": self.knowledge_id,
- "documentId": doc_id,
- # "sliceTotal": slice_count,
- "status": "1"
- }
- )
-
- if reporter:
- reporter.complete(success=True)
-
- logger.info(f"[文档复制模式] 文档复制完成: oldDocId={old_document_id} -> newDocId={doc_id}")
- return {
- "code": 200,
- "message": "文档复制模式:复制成功",
- "knowledge_id": self.knowledge_id,
- "doc_info": {
- # "slice_num": slice_count,
- "old_document_id": old_document_id,
- "new_document_id": doc_id
- }
- }
-
- except Exception as e:
- logger.error(f"[文档复制模式] 处理异常: {e}", exc_info=True)
- self.send_post_request_sync(
- f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",
- {"status": "2", "documentId": doc_id}
- )
- if reporter:
- reporter.complete(success=False)
- return {"code": 500, "message": f"文档复制模式异常: {str(e)}", "knowledge_id": self.knowledge_id, "doc_info": {}}
- # ===== 文档复制模式处理结束 =====
- # 文件单独处理(standardClassification=3)(临时)
- remark = doc.get("remark", "")
- standard_classification = doc.get("standardClassification", "")
-
- # standardClassification 为 '3' 时,不解析不分块,直接拼接数据入库
- if standard_classification == "3":
- logger.info(f"standardClassification=3,跳过解析,直接使用文件名+URL+remark入库: {name}")
- # 拼接文本:文件名 + URL + remark
- combined_text = f"{name}\n {url}\n {remark}"
- text_lists = [combined_text]
- # 空变量赋值
- flag_img_info = {}
- full_md_url = ""
- full_pdf_url = ""
- oss_id = ""
- file_size = None
- path_title_list = [""]
- father_path_title_list = [""]
- bbox_list = [None]
- page_list = [None]
-
- milvus_docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name, path_title_list, father_path_title_list, bbox_list, page_list)
- logger.info(f"standardClassification=3,存储到milvus的文本数据:{milvus_docs}")
-
- # ===== 检查点2:入库前检查取消标志 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库")
- raise TaskCancelledException("任务已被用户取消")
-
- if flag == "upload":
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(milvus_docs, self.knowledge_id, doc_id, tenant_id, user_id)
- if insert_slice_flag:
- insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
- else:
- insert_img_flag = False
- parse_file_status = False
- if insert_img_flag:
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(milvus_docs)
- else:
- insert_milvus_flag = False
- parse_file_status = False
- if insert_milvus_flag:
- parse_file_status = True
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- raise TaskCancelledException("任务已被用户取消")
- else:
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- parse_file_status = False
- elif flag == "update":
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(milvus_docs, self.knowledge_id, doc_id, tenant_id, user_id)
- if insert_slice_flag:
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(milvus_docs)
- else:
- insert_milvus_flag = False
- parse_file_status = False
- if insert_milvus_flag:
- parse_file_status = True
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- raise TaskCancelledException("任务已被用户取消")
- else:
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- parse_file_status = False
-
- if parse_file_status:
- success_doc.append(doc_id)
- else:
- if flag == "upload":
- for del_id in success_doc:
- self.milvus_client._delete_by_doc_id(doc_id=del_id)
- self.mysql_client.delete_image_url(doc_id=del_id)
- self.mysql_client.delete_to_slice(doc_id=del_id)
- if reporter:
- reporter.complete(success=False)
- # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"), "tenantId": tenant_id})
- return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
-
- # 继续下一个文档
- continue
- ## 文件模式(standardClassification=3) ↑
-
- async with aiohttp.ClientSession() as session:
- # 下载并保存文件
- down_file_name, file_size = await self.save_file_temp(session, url, name)
- excel_file_path = down_file_name # 保留原始 Excel 文件路径
- # 只接收txt和pdf格式,其它格式统一转换成pdf,不支持格式在convert_to_pdf中判断
- if not name.endswith(".txt") and not name.endswith('.pdf') and not file_json.get("markDownFlg"):
- # 转化为pdf文件
- down_file_name = convert_to_pdf(down_file_name)
- name = os.path.basename(down_file_name)
-
- if down_file_name:
- # 上传PDF文件到MinIO
- pdf_file_name = os.path.basename(down_file_name)
- minio_pdf_path = f"/pdf/{self.knowledge_id}/{doc_id}/{pdf_file_name}"
- upload_success = self.minio_client.upload_file(down_file_name, minio_pdf_path)
- pdf_file_size = os.path.getsize(down_file_name)
-
- # 记录OSS信息到数据库
- if upload_success:
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- full_pdf_url = f"{minio_url}/{minio_bucket}{minio_pdf_path}"
- _, oss_id = self.mysql_client.insert_oss_record(pdf_file_name, full_pdf_url, user_id, ".pdf", pdf_file_size)
- logger.info(f"PDF文件已上传至MinIO并记录OSS: {full_pdf_url}")
-
- # Excel 特殊解析模式
- excel_except = file_json.get("excel_parsing", False)
- if excel_except and excel_file_path.endswith((".xls", ".xlsx")):
- logger.info(f"使用 Excel 特殊解析模式: {excel_file_path}")
- text_lists = self.parse_excel_to_text_lists(excel_file_path)
- flag_img_info = {}
- path_title_list = [""] * len(text_lists)
- father_path_title_list = [""] * len(text_lists)
- bbox_list = [None] * len(text_lists)
- page_list = [None] * len(text_lists)
-
- # 生成并上传 MD 文件
- full_md_url = ""
- try:
- # 将所有表格合并为一个 MD 文件
- md_content = "\n\n".join(text_lists)
-
- # 保存 MD 文件到临时目录
- excel_base_name = os.path.splitext(os.path.basename(excel_file_path))[0]
- md_file_name = f"{excel_base_name}_tables.md"
- temp_md_dir = f"./tmp_file/{self.knowledge_id}"
- os.makedirs(temp_md_dir, exist_ok=True)
- temp_md_path = os.path.join(temp_md_dir, md_file_name)
-
- # 写入 MD 文件
- with open(temp_md_path, 'w', encoding='utf-8') as f:
- f.write(md_content)
-
- logger.info(f"Excel MD 文件已保存: {temp_md_path}")
-
- # 上传到 MinIO
- minio_md_path = f"/md/{self.knowledge_id}/{doc_id}/{md_file_name}"
- upload_success = self.minio_client.upload_file(temp_md_path, minio_md_path)
-
- if upload_success:
- md_file_size = os.path.getsize(temp_md_path)
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- full_md_url = f"{minio_url}/{minio_bucket}{minio_md_path}"
-
- # 记录 OSS 信息到数据库
- _, oss_id = self.mysql_client.insert_oss_record(
- md_file_name,
- full_md_url,
- user_id,
- ".md",
- md_file_size
- )
- logger.info(f"Excel MD 文件已上传至 MinIO 并记录 OSS: {full_md_url}")
- else:
- logger.warning(f"Excel MD 文件上传失败: {temp_md_path}")
-
- except Exception as e:
- logger.error(f"Excel MD 文件生成或上传失败: {e}", exc_info=True)
- full_md_url = ""
- # MD 解析模式
- elif file_json.get("markDownFlg"):
- split_mode = file_json.get("customSeparator", "2")
- if split_mode == "3":
- split_mode = "min"
- elif split_mode == "2":
- split_mode = "max"
- logger.info(f"使用MD解析规则,切分方式:{split_mode}")
- text_lists = await MarkdownSplitter(max_chunk_size=self.max_cut_len).split_markdown(down_file_name, split_mode)
- logger.info(f"MD解析,存入mlivus的数据{text_lists}")
- flag_img_info = {}
- path_title_list = [""] * len(text_lists)
- father_path_title_list = [""] * len(text_lists)
- bbox_list = [None] * len(text_lists)
- page_list = [None] * len(text_lists)
- full_md_url = url
- full_pdf_url = ""
- oss_id = ""
- # from rag.document_load.md_to_html_to_pdf import AsyncMdToPdf
- # md_to_pdf_file_name = await AsyncMdToPdf().convert_md_to_pdf(down_file_name)
- # if md_to_pdf_file_name:
- # down_file_name = md_to_pdf_file_name
- # name = os.path.basename(down_file_name)
- else:
- # 获取解析器类型(默认 mineru)
- parser_type = file_json.get("parsingType", "0")
- self.parser_type = parser_type # 保存解析器类型供后续使用
- logger.info(f"使用解析器: {parser_type}")
-
- file_parse = self._get_file_type(name, parser_type)
- max_retries = 3
- retry_delay = 5
- last_error = None
- for attempt in range(max_retries):
- try:
- if parser_type == "dots":
- file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name, doc_id)
- elif parser_type == "2":
- file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name, doc_id)
- else:
- # MinerU 解析
- file_content_list, path_md, pdf_file_name = await file_parse.extract_text(down_file_name)
- break # 成功则跳出重试循环
- except Exception as e:
- last_error = e
- logger.error(f"解析失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
- if attempt < max_retries - 1:
- logger.info(f"等待 {retry_delay} 秒后重试...")
- await asyncio.sleep(retry_delay)
- else:
- logger.error(f"解析失败,已重试 {max_retries} 次")
- raise RuntimeError(f"文档解析失败: {str(last_error)}")
- # 上传 MD 文件到 minio
- full_md_url = ""
- if path_md:
- md_file_name = os.path.basename(path_md)
- file_extension = os.path.splitext(md_file_name)[1]
- minio_md_path = f"/md/{self.knowledge_id}/{doc_id}/{md_file_name}"
- upload_success = self.minio_client.upload_file(path_md, minio_md_path)
- md_file_size = os.path.getsize(path_md)
-
- # 记录 OSS 信息到数据库
- if upload_success:
- minio_url = minio_config.get("minio_url")
- minio_bucket = minio_config.get("minio_bucket")
- full_md_url = f"{minio_url}/{minio_bucket}{minio_md_path}"
- _, oss_id = self.mysql_client.insert_oss_record(md_file_name, full_md_url, user_id, file_extension, md_file_size)
- logger.info(f"MD 文件已上传至 minio 并记录 OSS: {full_md_url}")
-
- logger.info(f"mineru解析的pdf数据:{file_content_list}")
- split_result = self.file_split(file_content_list, doc_id, pdf_file_name)
- # 返回值:text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list
- text_lists, flag_img_info, path_title_list, father_path_title_list, bbox_list, page_list = split_result
-
- docs, total_char_len = self.process_data_to_milvus_schema(text_lists, doc_id, name, path_title_list, father_path_title_list, bbox_list, page_list)
- logger.info(f"存储到milvus的文本数据:{docs}")
- # lightrag_sign = file_json.get("lightrag", False)
- knowledge_id = file_json.get("knowledge_id", "")
- enabled_kn_rs = self.mysql_client.query_knowledge_by_ids([knowledge_id])
- enabled_kn_gp_ids = set()
- for r in enabled_kn_rs:
- if r["knowledge_graph"]:
- enabled_kn_gp_ids.add(r["knowledge_id"])
- if enabled_kn_gp_ids:
- # if lightrag_sign:
- lightrag_data = {}
- contents = []
- for doc in docs:
- contents.append(doc.get("content", ""))
- # lightrag_data[knowledge_id] = {"content": contents, "ids": [docs[0].get("doc_id", "")]}
- # logger.info(f"contents:{lightrag_data}")
- logger.info(f"LightRAG_Ids:{enabled_kn_gp_ids}")
- rag_mgr = AsyncLightRAGManager()
- await rag_mgr.init_workspace(knowledge_id)
- await rag_mgr.insert(knowledge_id, {"content": contents, "ids": [docs[0].get("doc_id", "")]})
- await rag_mgr.close()
- # ===== 检查点2:解析完成、入库前检查取消标志 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库前被取消,无需清理数据库")
- raise TaskCancelledException("任务已被用户取消")
-
- if flag == "upload":
- # 插入到milvus库中
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id, tenant_id, user_id)
-
- if insert_slice_flag:
- # 插入到mysql的slice info数据库中
- insert_img_flag, insert_mysql_info = self.mysql_client.insert_to_image_url(flag_img_info, self.knowledge_id, doc_id)
- else:
- insert_img_flag = False
- parse_file_status = False
- if insert_img_flag:
- # insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
- # 批量插入
- text_lists = [doc.get("content") for doc in docs]
- insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs, text_lists)
- # 插入mysql中的bm_media_replacement表中
- else:
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- insert_milvus_flag = False
- # return resp
- parse_file_status = False
- if insert_milvus_flag:
- parse_file_status = True
-
- # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- raise TaskCancelledException("任务已被用户取消")
-
- else:
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_image_url(doc_id=doc_id)
- # resp = {"code": 500, "message": insert_mysql_info}
- parse_file_status = False
- # return resp
-
- elif flag == "update": # 更新切片方式
- # 先把库中的数据删除
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- insert_milvus_start_time = time.time()
- insert_slice_flag, insert_mysql_info = self.mysql_client.insert_to_slice(docs, self.knowledge_id, doc_id, tenant_id, user_id)
- # insert_milvus_flag, insert_milvus_str = self.milvus_client._batch_insert_data(docs,text_lists)
- insert_milvus_end_time = time.time()
- logger.info(f"插入milvus数据库耗时:{insert_milvus_end_time - insert_milvus_start_time}")
- if insert_slice_flag:
- # 插入到mysql的slice info数据库中
- insert_mysql_start_time = time.time()
- insert_milvus_flag, insert_milvus_str = self.milvus_client._insert_data(docs)
- insert_mysql_end_time = time.time()
- logger.info(f"插入mysql数据库耗时:{insert_mysql_end_time - insert_mysql_start_time}")
- else:
- # resp = {"code": 500, "message": insert_milvus_str}
- # return resp
- insert_milvus_flag = False
- parse_file_status = False
-
- if insert_milvus_flag:
- # resp = {"code": 200, "message": "切片修改成功"}
- parse_file_status = True
-
- # ===== 检查点3:入库后检查取消标志,若取消则清理已插入数据 =====
- if task_ctx and task_ctx.is_cancelled:
- logger.info(f"任务 {task_id} 在入库后被取消,清理已插入数据: doc_id={doc_id}")
- self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- raise TaskCancelledException("任务已被用户取消")
-
- else:
- self.mysql_client.delete_to_slice(doc_id=doc_id)
- # self.milvus_client._delete_by_doc_id(doc_id=doc_id)
- # resp = {"code":500, "message": insert_mysql_info}
- parse_file_status = False
- # return resp
- if parse_file_status:
- success_doc.append(doc_id)
- else:
- if flag == "upload":
- for del_id in success_doc:
- self.milvus_client._delete_by_doc_id(doc_id=del_id)
- self.mysql_client.delete_image_url(doc_id=del_id)
- self.mysql_client.delete_to_slice(doc_id=del_id)
-
- if reporter:
- reporter.complete(success=False)
- # if task_id:
- # self.mysql_client.update_task_status_error(task_id)
- self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"), "tenantId": tenant_id})
- # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- return {"code": 500, "message": "解析失败", "knowledge_id" : self.knowledge_id, "doc_info": {}}
-
- # 任务完成:先更新数据库状态为完成(2),再发送100%进度
- # if task_id and user_id:
- # self.mysql_client.update_task_status_complete(task_id, user_id)
-
- self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1", "tenantId": tenant_id})
- # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"})
- if reporter:
- reporter.complete(success=True)
-
- # await self.send_post_request("http://10.10.10.3:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists)})
- # self.send_post_request_sync("http://10.10.10.2:9080/deepseek/api/updateDocumentByPython", {"knowledgeId": self.knowledge_id,"documentId": file_json.get("docs")[0].get("document_id"), "mdMinIOUrl": full_md_url, "pdfUrl":full_pdf_url, "ossId": oss_id, "length": file_size, "wordNum": total_char_len, "sliceTotal": len(text_lists), "status":"1"})
- return {"code": 200, "message": "解析成功", "knowledge_id" : self.knowledge_id, "doc_info": {"file_size": file_size, "total_char_len": total_char_len, "slice_num": len(text_lists)}}
-
- except TaskCancelledException as e:
- # ===== 处理任务取消异常 =====
- logger.info(f"任务被取消 [task_id={task_id}]: {e}")
- # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id})
- # 1. 首先删除 bm_document 表中的记录
- if task_id:
- del_success, del_info = self.mysql_client.delete_document(task_id)
- if del_success:
- logger.info(f"已删除 bm_document 记录: task_id={task_id}")
- else:
- logger.warning(f"删除 bm_document 记录失败: {del_info}")
-
- # 2. 清理之前已成功处理的文档数据(slice、image、milvus)
- for del_id in success_doc:
- logger.info(f"清理已入库的文档数据: doc_id={del_id}")
- self.milvus_client._delete_by_doc_id(doc_id=del_id)
- self.mysql_client.delete_image_url(doc_id=del_id)
- self.mysql_client.delete_to_slice(doc_id=del_id)
-
- if reporter:
- reporter.complete(success=False)
-
- return {"code": 499, "message": "任务已取消", "knowledge_id": self.knowledge_id, "doc_info": {}}
- except VLMRetryExhaustedError as e:
- self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id})
- if reporter:
- reporter.complete(success=False)
- # if task_id:
- # self.mysql_client.update_task_status_error(task_id)
-
- return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}}
- except Exception as e:
- # 捕获所有异常,统一处理
- logger.error(f"文件处理异常 [task_id={task_id}]: {e}", exc_info=True)
-
- # self.send_post_request_sync("http://10.168.100.4:9080/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id")})
- self.send_post_request_sync(f"{progress_callback_config.get('base_url')}/deepseek/api/updateDocumentByPython",{"status":"2","documentId": file_json.get("docs")[0].get("document_id"),"errorMessage": str(e), "tenantId": tenant_id})
- if reporter:
- reporter.complete(success=False)
- # if task_id:
- # self.mysql_client.update_task_status_error(task_id)
-
- return {"code": 500, "message": "error", "knowledge_id": self.knowledge_id, "doc_info": {}}
-
- finally:
- # ===== 注销任务 =====
- if task_id:
- task_registry.unregister(task_id)
|