| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535 |
- from rag.vector_db.milvus_vector import HybridRetriever
- from response_info import generate_message, generate_response
- from utils.get_logger import setup_logger
- from datetime import datetime
- from uuid import uuid1
- import mysql.connector
- from mysql.connector import pooling, Error, errors
- import threading
- from concurrent.futures import ThreadPoolExecutor, TimeoutError
- from config import milvus_uri, mysql_config
- import time
- logger = setup_logger(__name__)
- # uri = "http://localhost:19530"
- # if 'POOL' not in globals():
- # try:
- # POOL = pooling.MySQLConnectionPool(
- # pool_name="mysql_pool",
- # pool_size=10,
- # **mysql_config
- # )
- # logger.info("MySQL 连接池初始化成功")
- # except Error as e:
- # logger.info(f"初始化 MySQL 连接池失败: {e}")
- # POOL = None
- class MilvusOperate:
- def __init__(self, collection_name: str = "default", embedding_name:str = "e5"):
- self.collection = collection_name
- self.hybrid_retriever = HybridRetriever(uri=milvus_uri, embedding_name=embedding_name, collection_name=collection_name)
- self.mysql_client = MysqlOperate()
- def _has_collection(self):
- is_collection = self.hybrid_retriever.has_collection()
- return is_collection
-
- def _create_collection(self):
- if self._has_collection():
- resp = {"code": 400, "message": "数据库已存在"}
- else:
- create_result = self.hybrid_retriever.build_collection()
- resp = generate_message(create_result)
- return resp
-
- def _delete_collection(self):
- delete_result = self.hybrid_retriever.delete_collection(self.collection)
- resp = generate_message(delete_result)
- return resp
-
- def _put_by_id(self, slice_json):
- slice_id = slice_json.get("slice_id", None)
- slice_text = slice_json.get("slice_text", None)
- update_result, chunk_len = self.hybrid_retriever.update_data(chunk_id=slice_id, chunk=slice_text)
- if update_result.endswith("success"):
- # 如果成功,更新mysql中知识库总长度和文档长度
- update_json = {}
- update_json["knowledge_id"] = slice_json.get("knowledge_id")
- update_json["doc_id"] = slice_json.get("document_id")
- update_json["chunk_len"] = chunk_len
- update_json["operate"] = "update"
- update_json["chunk_id"] = slice_id
- update_json["chunk_text"] = slice_text
- update_flag, update_str = self.mysql_client.update_total_doc_len(update_json)
- else:
- update_flag = False
-
- if not update_flag:
- update_result = "update_error"
-
- resp = generate_message(update_result)
- return resp
-
- def _insert_slice(self, slice_json):
- slice_id = str(uuid1())
- knowledge_id = slice_json.get("knowledge_id")
- doc_id = slice_json.get("document_id")
- slice_text = slice_json.get("slice_text", None)
- doc_name = slice_json.get("doc_name")
- chunk_len = len(slice_text)
- metadata = {
- "content": slice_text,
- "doc_id": doc_id,
- "chunk_id": slice_id,
- "metadata": {"source": doc_name, "chunk_len": chunk_len},
- "Chapter": slice_json.get("Chapter",""),
- "Father_Chapter": slice_json.get("Father_Chapter",""),
- }
- insert_flag, insert_str = self.hybrid_retriever.insert_data(slice_text, metadata)
- if insert_flag:
- # 如果成功,更新mysql中知识库总长度和文档长度
- update_json = {}
- update_json["knowledge_id"] = slice_json.get("knowledge_id")
- update_json["doc_id"] = slice_json.get("document_id")
- update_json["chunk_len"] = chunk_len
- update_json["operate"] = "insert"
- update_json["chunk_id"] = slice_id
- update_json["chunk_text"] = slice_text
- update_json["slice_index"] = slice_json.get("slice_index")
- update_flag, update_str = self.mysql_client.update_total_doc_len(update_json)
- else:
- logger.error(f"插入向量库出错:{insert_str}")
- update_flag = False
- update_str = "向量库写入出错"
- # pass
-
- if not update_flag:
- logger.error(f"新增切片中mysql数据库出错:{update_str}")
- # insert_result = "insert_error"
- success = "insert_error"
- else:
- # insert_result = "insert_success"
- success = "insert_success"
-
- # resp = generate_message(insert_result)
- resp = {"status": success, "slice_id":slice_id}
- return resp
-
- def _delete_by_chunk_id(self, chunk_id, knowledge_id, document_id):
- logger.info(f"删除的切片id:{chunk_id}")
- delete_result, delete_chunk_len = self.hybrid_retriever.delete_by_chunk_id(chunk_id=chunk_id)
- if delete_result.endswith("success"):
- chunk_len = delete_chunk_len[0]
- update_json = {
- "knowledge_id": knowledge_id,
- "doc_id": document_id,
- "chunk_len": -chunk_len,
- "operate": "delete",
- "chunk_id": chunk_id
- }
- update_flag, update_str = self.mysql_client.update_total_doc_len(update_json)
- else:
- logger.error("根据chunk id删除向量库失败")
- update_flag = False
- update_str = "根据chunk id删除失败"
-
- if not update_flag:
- logger.error(update_str)
- delete_result = "delete_error"
-
- resp = generate_message(delete_result)
- return resp
-
- def _batch_delete_by_chunk_ids(self, chunk_ids: list, knowledge_id: str, document_id: str):
- """
- 批量删除切片
-
- 参数:
- chunk_ids: 切片ID列表
- knowledge_id: 知识库ID
- document_id: 文档ID
- 返回:
- 响应字典
- """
- logger.info(f"批量删除 {len(chunk_ids)} 个切片")
-
- # 1. 批量删除向量库数据
- delete_result, chunk_lens_dict = self.hybrid_retriever.batch_delete_by_chunk_ids(chunk_ids)
-
- if not delete_result.endswith("success"):
- logger.error("批量删除向量库失败")
- return generate_message("delete_error")
-
- # 2. 批量更新 MySQL (标记为删除)
- update_flag, update_str = self.mysql_client.batch_update_slice_delete(chunk_ids)
-
- if not update_flag:
- logger.error(f"批量更新 MySQL 失败: {update_str}")
- return generate_message("delete_error")
-
- logger.info(f"批量删除成功: {len(chunk_ids)} 个切片")
- return generate_message("delete_success")
-
- def _delete_by_doc_id(self, doc_id: str = None):
- logger.info(f"删除数据的id:{doc_id}")
- delete_result = self.hybrid_retriever.delete_by_doc_id(doc_id=doc_id)
- resp = generate_message(delete_result)
- return resp
-
- def _search_by_chunk_id(self, chunk_id):
- if self._has_collection():
- query_result = self.hybrid_retriever.query_chunk_id(chunk_id=chunk_id)
- else:
- query_result = []
- logger.info(f"根据切片查询到的信息:{query_result}")
- resp = generate_response(query_result)
- return resp
-
- def _search_by_chunk_id_list(self, chunk_id_list):
- if self._has_collection():
- query_result = self.hybrid_retriever.query_chunk_id_list(chunk_id_list)
- else:
- query_result = []
- logger.info(f"召回的切片列表查询切片信息:{query_result}")
-
- chunk_content_list = []
- for chunk_dict in query_result:
- chunk_content = chunk_dict.get("content")
- chunk_content_list.append(chunk_content)
- return chunk_content_list
-
-
- def _search_by_key_word(self, search_json):
- if self._has_collection():
- doc_id = search_json.get("document_id", None)
- text = search_json.get("text", None)
- page_num = search_json.get("pageNum", 1)
- page_size = search_json.get("pageSize", 10)
- page_num = search_json.get("pageNum") # 根据传过来的id处理对应知识库
- query_result = self.hybrid_retriever.query_filter(doc_id=doc_id, filter_field=text)
- else:
- query_result = []
- resp = generate_response(query_result,page_num,page_size)
- return resp
-
- def _insert_data(self, docs):
- insert_flag = ""
- insert_info = ""
- for doc in docs:
- chunk = doc.get("content")
- insert_flag, insert_info = self.hybrid_retriever.insert_data(chunk, doc)
- if not insert_flag:
- break
- resp = insert_flag if insert_flag else "insert_error"
- return resp, insert_info
-
- def _batch_insert_data(self, docs, text_lists):
- insert_flag, insert_info = self.hybrid_retriever.batch_insert_data(text_lists, docs)
- resp = insert_flag
- return resp, insert_info
- def _search(self, query, k, mode):
- search_result = self.hybrid_retriever.search(query, k, mode)
- return search_result
-
- def _query_by_scalar_field(self, doc_id: str, field_name: str, field_value: str):
- """
- 根据标量字段查询数据
-
- 参数:
- doc_id: 文档ID
- field_name: 字段名(如 Father_Chapter)
- field_value: 字段值
- 返回:
- 查询结果列表
- """
- return self.hybrid_retriever.query_by_scalar_field(doc_id, field_name, field_value)
-
- def _copy_docs_to_new_collection(self, new_collection_name, doc_ids, embedding_name="e5"):
- """
- 将指定的文档数据复制到新集合或现有集合
- 使用雪花算法生成新的 doc_id 和 chunk_id
-
- 参数:
- new_collection_name: 目标集合名称(也是新的 knowledge_id)
- doc_ids: 要复制的文档ID列表
- embedding_name: 向量模型名称
-
- 返回:
- 响应字典,包含 doc_id_mapping 映射关系
- """
- try:
- # 1. 从源集合查询数据
- logger.info(f"从集合 {self.collection} 查询文档: {doc_ids}")
- query_results = self.hybrid_retriever.query_by_doc_ids(doc_ids)
-
- if not query_results:
- return {"code": 404, "message": "未找到匹配的文档数据", "doc_id_mapping": {}, "chunk_id_mapping": {}}
-
- logger.info(f"查询到 {len(query_results)} 条数据")
-
- # 2. 检查目标集合是否存在,不存在则创建
- target_milvus_client = MilvusOperate(collection_name=new_collection_name, embedding_name=embedding_name)
-
- collection_exists = target_milvus_client._has_collection()
- if not collection_exists:
- logger.info(f"创建新集合: {new_collection_name}")
- create_result = target_milvus_client._create_collection()
- if create_result.get("code") != 200:
- create_result["doc_id_mapping"] = {}
- return create_result
- else:
- logger.info(f"集合 {new_collection_name} 已存在,直接插入数据")
-
- # 3. 为每个源 doc_id 生成新的 doc_id(使用雪花算法)
- doc_id_mapping = {} # {old_doc_id: new_doc_id}
- for old_doc_id in doc_ids:
- doc_id_mapping[old_doc_id] = generate_snowflake_id()
-
- # 4. 准备插入数据(移除pk字段,使用新的 doc_id 和 chunk_id)
- insert_data = []
- chunk_id_mapping = {} # {old_chunk_id: new_chunk_id}
- for item in query_results:
- old_doc_id = item.get("doc_id")
- old_chunk_id = item.get("chunk_id")
- new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
- new_chunk_id = generate_snowflake_id() # 每个切片生成新的 chunk_id
- chunk_id_mapping[old_chunk_id] = new_chunk_id # 记录映射
-
- new_item = {
- "content": item.get("content"),
- "dense_vector": item.get("dense_vector"),
- "doc_id": new_doc_id,
- "chunk_id": new_chunk_id,
- "Father_Chapter": item.get("Father_Chapter"),
- "Chapter": item.get("Chapter"),
- "metadata": item.get("metadata")
- }
- insert_data.append(new_item)
-
- # 5. 批量插入数据到目标集合
- logger.info(f"开始向集合 {new_collection_name} 插入 {len(insert_data)} 条数据")
- try:
- target_milvus_client.hybrid_retriever.client.insert(
- collection_name=new_collection_name,
- data=insert_data
- )
- logger.info(f"成功向集合插入数据")
-
- return {
- "code": 200,
- "message": "复制成功",
- "data": {
- "source_collection": self.collection,
- "target_collection": new_collection_name,
- "doc_ids": doc_ids,
- "total_records": len(insert_data),
- "collection_existed": collection_exists
- },
- "doc_id_mapping": doc_id_mapping,
- "chunk_id_mapping": chunk_id_mapping
- }
- except Exception as e:
- logger.error(f"插入数据到集合失败: {e}")
- # 插入失败且是新创建的集合时删除
- if not collection_exists:
- target_milvus_client._delete_collection()
- return {"code": 500, "message": f"插入数据失败: {str(e)}", "doc_id_mapping": {}, "chunk_id_mapping": {}}
-
- except Exception as e:
- logger.error(f"复制文档到新集合失败: {e}")
- return {"code": 500, "message": f"复制失败: {str(e)}", "doc_id_mapping": {}, "chunk_id_mapping": {}}
- def _copy_single_doc_to_collection(self, new_collection_name, old_doc_id, new_doc_id, embedding_name="e5"):
- """
- 将单个文档数据复制到新集合,使用指定的新 doc_id
-
- 参数:
- new_collection_name: 目标集合名称
- old_doc_id: 源文档ID
- new_doc_id: 新文档ID(由前端指定)
- embedding_name: 向量模型名称
-
- 返回:
- 响应字典,包含 chunk_id_mapping 映射关系
- """
- try:
- # 1. 从源集合查询数据
- logger.info(f"[单文档复制] 从集合 {self.collection} 查询文档: {old_doc_id}")
- query_results = self.hybrid_retriever.query_by_doc_ids([old_doc_id])
-
- if not query_results:
- return {"code": 404, "message": "未找到匹配的文档数据", "chunk_id_mapping": {}}
-
- logger.info(f"[单文档复制] 查询到 {len(query_results)} 条数据")
-
- # 2. 检查目标集合是否存在,不存在则创建
- target_milvus_client = MilvusOperate(collection_name=new_collection_name, embedding_name=embedding_name)
-
- collection_exists = target_milvus_client._has_collection()
- if not collection_exists:
- logger.info(f"[单文档复制] 创建新集合: {new_collection_name}")
- create_result = target_milvus_client._create_collection()
- if create_result.get("code") != 200:
- create_result["chunk_id_mapping"] = {}
- return create_result
- else:
- logger.info(f"[单文档复制] 集合 {new_collection_name} 已存在,直接插入数据")
-
- # 3. 准备插入数据(使用指定的 new_doc_id,生成新的 chunk_id)
- insert_data = []
- chunk_id_mapping = {} # {old_chunk_id: new_chunk_id}
- for item in query_results:
- old_chunk_id = item.get("chunk_id")
- new_chunk_id = generate_snowflake_id()
- chunk_id_mapping[old_chunk_id] = new_chunk_id
-
- new_item = {
- "content": item.get("content"),
- "dense_vector": item.get("dense_vector"),
- "doc_id": new_doc_id, # 使用指定的新 doc_id
- "chunk_id": new_chunk_id,
- "Father_Chapter": item.get("Father_Chapter"),
- "Chapter": item.get("Chapter"),
- "metadata": item.get("metadata")
- }
- insert_data.append(new_item)
-
- # 4. 批量插入数据到目标集合
- logger.info(f"[单文档复制] 开始向集合 {new_collection_name} 插入 {len(insert_data)} 条数据")
- try:
- target_milvus_client.hybrid_retriever.client.insert(
- collection_name=new_collection_name,
- data=insert_data
- )
- logger.info(f"[单文档复制] 成功向集合插入数据")
-
- return {
- "code": 200,
- "message": "复制成功",
- "data": {
- "source_collection": self.collection,
- "target_collection": new_collection_name,
- "old_doc_id": old_doc_id,
- "new_doc_id": new_doc_id,
- "total_records": len(insert_data),
- "collection_existed": collection_exists
- },
- "chunk_id_mapping": chunk_id_mapping
- }
- except Exception as e:
- logger.error(f"[单文档复制] 插入数据到集合失败: {e}")
- if not collection_exists:
- target_milvus_client._delete_collection()
- return {"code": 500, "message": f"插入数据失败: {str(e)}", "chunk_id_mapping": {}}
-
- except Exception as e:
- logger.error(f"[单文档复制] 复制文档到新集合失败: {e}")
- return {"code": 500, "message": f"复制失败: {str(e)}", "chunk_id_mapping": {}}
- # class MysqlOperate:
- # def get_connection(self):
- # """
- # 从连接池中获取一个连接
- # :return: 数据库连接对象
- # """
- # # try:
- # # with ThreadPoolExecutor() as executor:
- # # future = executor.submit(POOL.get_connection)
- # # connection = future.result(timeout=5.0) # 设置超时时间为5秒
- # # logger.info("成功从连接池获取连接")
- # # return connection, "success"
- # # except TimeoutError:
- # # logger.error("获取mysql数据库连接池超时")
- # # return None, "mysql获取连接池超时"
- # # except errors.InterfaceError as e:
- # # logger.error(f"MySQL 接口异常:{e}")
- # # return None, "mysql接口异常"
- # # except errors.OperationalError as e:
- # # logger.error(f"MySQL 操作错误:{e}")
- # # return None, "mysql 操作错误"
- # # except Error as e:
- # # logger.error(f"无法从连接池获取连接: {e}")
- # # return None, str(e)
- # connection = None
- # event = threading.Event()
- # def target():
- # nonlocal connection
- # try:
- # connection = POOL.get_connection()
- # finally:
- # event.set()
- # thread = threading.Thread(target=target)
- # thread.start()
- # event.wait(timeout=5)
- # if thread.is_alive():
- # # 超时处理
- # logger.error("获取连接超时")
- # return None, "获取连接超时"
- # else:
- # if connection:
- # return connection, "success"
- # else:
- # logger.error("获取连接失败")
- # return None, "获取连接失败"
- # def insert_to_slice(self, docs, knowledge_id, doc_id):
- # """
- # 插入数据到切片信息表中 slice_info
- # """
- # connection = None
- # cursor = None
- # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # values = []
- # connection, cennction_info = self.get_connection()
- # if not connection:
- # return False, cennction_info
-
- # for chunk in docs:
- # slice_id = chunk.get("chunk_id")
- # slice_text = chunk.get("content")
- # chunk_index = chunk.get("metadata").get("chunk_index")
- # values.append((slice_id, knowledge_id, doc_id, slice_text, date_now, chunk_index))
- # try:
- # cursor = connection.cursor()
- # # insert_sql = """
- # # INSERT INTO slice_info (
- # # slice_id,
- # # knowledge_id,
- # # document_id,
- # # slice_text,
- # # create_time,
- # # slice_index
- # # ) VALUES (%s, %s, %s, %s, %s,%s)
- # # """
-
- # # 容错“for key 'UK_ID_TYPE_KEY'”
- # insert_sql = """
- # INSERT INTO slice_info (
- # slice_id,
- # knowledge_id,
- # document_id,
- # slice_text,
- # create_time,
- # slice_index
- # ) VALUES (%s, %s, %s, %s, %s, %s)
- # ON DUPLICATE KEY UPDATE
- # slice_text = VALUES(slice_text),
- # create_time = VALUES(create_time),
- # slice_index = VALUES(slice_index)
- # """
-
- # cursor.executemany(insert_sql, values)
- # connection.commit()
- # logger.info(f"批量插入切片数据成功。")
- # return True, "success"
- # except Error as e:
- # logger.error(f"数据库操作出错:{e}")
- # connection.rollback()
- # return False, str(e)
- # finally:
- # # if cursor:
- # cursor.close()
- # # if connection and connection.is_connected():
- # connection.close()
- # def delete_to_slice(self, doc_id):
- # """
- # 删除 slice_info库中切片信息
- # """
- # connection = None
- # cursor = None
- # connection, connection_info = self.get_connection()
- # if not connection:
- # return False, connection_info
- # try:
- # cursor = connection.cursor()
- # delete_sql = f"DELETE FROM slice_info WHERE document_id = %s"
- # cursor.execute(delete_sql, (doc_id,))
- # connection.commit()
- # logger.info(f"删除数据成功")
- # return True, "success"
- # except Error as e:
- # logger.error(f"根据{doc_id}删除数据失败:{e}")
- # connection.rollback()
- # return False, str(e)
- # finally:
- # # if cursor:
- # cursor.close()
- # # if connection and connection.is_connected():
- # connection.close()
- # def insert_to_image_url(self, image_dict, knowledge_id, doc_id):
- # """
- # 批量插入数据到指定表
- # """
- # connection = None
- # cursor = None
- # connection, connection_info = self.get_connection()
- # if not connection:
- # return False, connection_info
-
- # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # values = []
- # for img_key, img_value in image_dict.items():
- # origin_text = img_key
- # media_url = img_value
- # values.append((knowledge_id, doc_id, origin_text, "image", media_url, date_now))
- # try:
- # cursor = connection.cursor()
- # # insert_sql = """
- # # INSERT INTO bm_media_replacement (
- # # knowledge_id,
- # # document_id,
- # # origin_text,
- # # media_type,
- # # media_url,
- # # create_time
- # # ) VALUES (%s, %s, %s, %s, %s, %s)
- # # """
-
- # # 容错“for key 'UK_ID_TYPE_KEY'”
- # insert_sql = """
- # INSERT INTO bm_media_replacement (
- # knowledge_id,
- # document_id,
- # origin_text,
- # media_type,
- # media_url,
- # create_time
- # ) VALUES (%s, %s, %s, %s, %s, %s)
- # ON DUPLICATE KEY UPDATE
- # origin_text = VALUES(origin_text),
- # media_type = VALUES(media_type),
- # media_url = VALUES(media_url),
- # create_time = VALUES(create_time)
- # """
- # cursor.executemany(insert_sql, values)
- # connection.commit()
- # logger.info(f"插入到bm_media_replacement表成功")
- # return True, "success"
- # except Error as e:
- # logger.error(f"数据库操作出错:{e}")
- # connection.rollback()
- # return False, str(e)
- # finally:
- # # if cursor:
- # cursor.close()
- # # if connection and connection.is_connected():
- # connection.close()
- # def delete_image_url(self, doc_id):
- # """
- # 根据doc id删除bm_media_replacement中的数据
- # """
- # connection = None
- # cursor = None
- # connection, connection_info = self.get_connection()
- # if not connection:
- # return False, connection_info
-
- # try:
- # cursor = connection.cursor()
- # delete_sql = f"DELETE FROM bm_media_replacement WHERE document_id = %s"
- # cursor.execute(delete_sql, (doc_id,))
- # connection.commit()
- # logger.info(f"根据{doc_id} 删除bm_media_replacement表中数据成功")
- # return True, "success"
- # except Error as e:
- # logger.error(f"根据{doc_id}删除 bm_media_replacement 数据库操作出错:{e}")
- # connection.rollback()
- # return False, str(e)
- # finally:
- # # if cursor:
- # cursor.close()
- # # if connection and connection.is_connected():
- # connection.close()
- # def update_total_doc_len(self, update_json):
- # """
- # 更新长度表和文档长度表,删除slice info表, 插入slice info 切片信息
- # """
- # knowledge_id = update_json.get("knowledge_id")
- # doc_id = update_json.get("doc_id")
- # chunk_len = update_json.get("chunk_len")
- # operate = update_json.get("operate")
- # chunk_id = update_json.get("chunk_id")
- # chunk_text = update_json.get("chunk_text")
- # connection = None
- # cursor = None
- # connection, connection_info = self.get_connection()
- # if not connection:
- # return False, connection_info
- # try:
- # cursor = connection.cursor()
- # query_doc_word_num_sql = f"select word_num,slice_total from bm_document where document_id = %s"
- # query_knowledge_word_num_sql = f"select word_num from bm_knowledge where knowledge_id = %s"
- # cursor.execute(query_doc_word_num_sql, (doc_id,))
- # doc_result = cursor.fetchone()
- # logger.info(f"查询到的文档长度信息:{doc_result}")
- # cursor.execute(query_knowledge_word_num_sql, (knowledge_id, ))
- # knowledge_result = cursor.fetchone()
- # logger.info(f"查询到的知识库总长度信息:{knowledge_result}")
- # if not doc_result:
- # new_word_num = 0
- # slice_total = 0
- # else:
- # old_word_num = doc_result[0]
- # slice_total = doc_result[1]
- # new_word_num = old_word_num + chunk_len
- # slice_total -= 1 if slice_total else 0
- # if not knowledge_result:
- # new_knowledge_word_num = 0
- # else:
- # old_knowledge_word_num = knowledge_result[0]
- # new_knowledge_word_num = old_knowledge_word_num + chunk_len
- # if operate == "update":
- # update_sql = f"UPDATE bm_document SET word_num = %s WHERE document_id = %s"
- # cursor.execute(update_sql, (new_word_num, doc_id))
- # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # update_slice_sql = f"UPDATE slice_info SET slice_text = %s, update_time = %s WHERE slice_id = %s"
- # cursor.execute(update_slice_sql, (chunk_text, date_now, chunk_id))
- # elif operate == "insert":
- # query_slice_info_index_sql = f"select MAX(slice_index) from slice_info where document_id = %s"
- # cursor.execute(query_slice_info_index_sql, (doc_id,))
- # chunk_index_result = cursor.fetchone()[0]
- # # logger.info(chunk_index_result)
- # if chunk_index_result:
- # chunk_max_index = int(chunk_index_result)
- # else:
- # chunk_max_index = 0
- # update_sql = f"UPDATE bm_document SET word_num = %s WHERE document_id = %s"
- # cursor.execute(update_sql, (new_word_num, doc_id))
- # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # insert_slice_sql = "INSERT INTO slice_info (slice_id,knowledge_id,document_id,slice_text,create_time, slice_index) VALUES (%s, %s, %s, %s, %s, %s)"
- # cursor.execute(insert_slice_sql, (chunk_id, knowledge_id, doc_id, chunk_text, date_now, chunk_max_index+1))
- # else:
- # update_sql = f"UPDATE bm_document SET word_num = %s, slice_total = %s WHERE document_id = %s"
- # cursor.execute(update_sql, (new_word_num, slice_total, doc_id))
- # # 删除切片id对应的切片
- # delete_slice_sql = f"DELETE FROM slice_info where slice_id = %s"
- # cursor.execute(delete_slice_sql, (chunk_id, ))
- # update_knowledge_sql = f"UPDATE bm_knowledge SET word_num = %s WHERE knowledge_id = %s"
- # cursor.execute(update_knowledge_sql, (new_knowledge_word_num, knowledge_id))
- # connection.commit()
- # logger.info("bm_document和bm_knowledge数据更新成功")
- # return True, "success"
- # except Error as e:
- # logger.error(f"数据库操作出错:{e}")
- # connection.rollback()
- # return False, str(e)
- # finally:
- # # if cursor:
- # cursor.close()
- # # if connection and connection.is_connected():
- # connection.close()
- TABLE_NAME = "bm_document"
- STATUS_FIELD = "update_by"
- TASK_ID_FIELD = "document_id"
- USER_ID_FIELD = "remark"
- import time
- # ========== 全局初始化连接池(自动检测 + 超时保护) ==========
- class SafeMySQLPool:
- def __init__(self, pool_size=50, conn_timeout=10, idle_timeout=60, **mysql_config):
- mysql_config.setdefault("connect_timeout", conn_timeout)
- mysql_config.setdefault("pool_reset_session", True)
- self._pool = pooling.MySQLConnectionPool(
- pool_name="safe_mysql_pool",
- pool_size=pool_size,
- **mysql_config
- )
- # 使用 RLock 可重入锁,避免 _auto_reclaimer 调用 close 时死锁
- self._lock = threading.RLock()
- self._active_conns = {} # {id(conn): (conn, last_used_time)}
- self._idle_timeout = idle_timeout
- self._stop_event = threading.Event()
- threading.Thread(target=self._auto_reclaimer, daemon=True).start()
- def get_connection(self, timeout=10):
- """安全获取连接(带超时检测与追踪)"""
- start = time.time()
- while True:
- try:
- conn = self._pool.get_connection()
- conn.ping(reconnect=True, attempts=3, delay=2)
- with self._lock:
- self._active_conns[id(conn)] = (conn, time.time())
- return self._wrap_connection(conn)
- except errors.PoolError:
- if time.time() - start > timeout:
- raise TimeoutError(f"获取 MySQL 连接超时(超过 {timeout}s)")
- time.sleep(0.3)
- def _wrap_connection(self, conn):
- """包装连接对象以监控关闭事件"""
- pool = self
- orig_close = conn.close
- def safe_close():
- try:
- orig_close()
- finally:
- with pool._lock:
- pool._active_conns.pop(id(conn), None)
- conn.close = safe_close
- return conn
- def _auto_reclaimer(self):
- """后台线程自动回收超时未关闭连接"""
- while not self._stop_event.is_set():
- time.sleep(5)
- now = time.time()
- with self._lock:
- to_remove = []
- for cid, (conn, last_used) in list(self._active_conns.items()):
- if now - last_used > self._idle_timeout:
- try:
- conn.close()
- logger.warning(f"[回收] 已回收超时未关闭连接 (idle={int(now - last_used)}s)")
- except Exception as e:
- logger.error(f"[回收] 回收连接失败: {e}")
- to_remove.append(cid)
- for cid in to_remove:
- self._active_conns.pop(cid, None)
- def close_all(self):
- """停止守护线程并关闭所有连接"""
- self._stop_event.set()
- with self._lock:
- for conn, _ in self._active_conns.values():
- try:
- conn.close()
- except:
- pass
- self._active_conns.clear()
- # ========== 初始化连接池 ==========
- if "POOL" not in globals():
- try:
- POOL = SafeMySQLPool(pool_size=10, idle_timeout=60, **mysql_config)
- logger.info("MySQL 连接池初始化成功")
- except Error as e:
- logger.error(f"MySQL 连接池初始化失败: {e}")
- POOL = None
- """
- 雪花算法获取唯一id
- """
- import time
- import threading
- class Snowflake:
- def __init__(self, datacenter_id=1, worker_id=1):
- self.worker_id_bits = 5
- self.datacenter_id_bits = 5
- self.sequence_bits = 12
- self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
- self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
- self.worker_id = worker_id
- self.datacenter_id = datacenter_id
- self.sequence = 0
- self.worker_id_shift = self.sequence_bits
- self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
- self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
- self.twepoch = 1288834974657
- self.last_timestamp = -1
- self.lock = threading.Lock()
- def _timestamp(self):
- return int(time.time() * 1000)
- def _wait_next_ms(self, last):
- ts = self._timestamp()
- while ts <= last:
- ts = self._timestamp()
- return ts
- def generate_id(self):
- with self.lock:
- timestamp = self._timestamp()
- if timestamp < self.last_timestamp:
- raise Exception("Clock moved backwards, refusing to generate id")
- if timestamp == self.last_timestamp:
- self.sequence = (self.sequence + 1) & ((1 << self.sequence_bits) - 1)
- if self.sequence == 0:
- timestamp = self._wait_next_ms(timestamp)
- else:
- self.sequence = 0
- self.last_timestamp = timestamp
- snowflake_id = (
- ((timestamp - self.twepoch) << self.timestamp_left_shift) |
- (self.datacenter_id << self.datacenter_id_shift) |
- (self.worker_id << self.worker_id_shift) |
- self.sequence
- )
- # 转成固定 20 位数字
- return str(snowflake_id).zfill(20)
- # ========== 全局雪花算法 ID 生成器 ==========
- snowflake_id_generator = Snowflake(datacenter_id=1, worker_id=1)
- def generate_snowflake_id():
- """
- 全局方法:使用雪花算法生成唯一ID
- 返回20位数字字符串
- """
- return snowflake_id_generator.generate_id()
- # ========== MysqlOperate 类 ==========
- class MysqlOperate:
- def get_connection(self):
- """安全获取连接"""
- if not POOL:
- return None, "连接池未初始化"
- try:
- connection = POOL.get_connection(timeout=5)
- return connection, "success"
- except TimeoutError as e:
- logger.error(str(e))
- return None, "获取连接超时"
- except Error as e:
- logger.error(f"MySQL 获取连接失败: {e}")
- return None, str(e)
- def _execute_many(self, sql, values, success_msg, err_msg):
- """通用批量执行模板"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- cursor.executemany(sql, values)
- connection.commit()
- logger.info(f"*******************************\n\n{success_msg}\n\n*******************************")
- return True, "success"
- except Error as e:
- connection.rollback()
- logger.error(f"{err_msg}: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def insert_to_slice(self, docs, knowledge_id, doc_id, tenant_id, user_id):
- """批量插入切片信息(同时存储 slice_text 和 old_slice_text)"""
- date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- values = [
- (
- chunk.get("chunk_id"),
- knowledge_id,
- doc_id,
- chunk.get("content"),
- chunk.get("content"), # old_slice_text 存储原始文本
- date_now,
- chunk.get("metadata", {}).get("chunk_index"),
- chunk.get("Chapter"),
- chunk.get("Father_Chapter", ""),
- chunk.get("bbox"),
- chunk.get("page"),
- tenant_id,
- user_id
- )
- for chunk in docs
- ]
-
- sql = """
- INSERT INTO slice_info (
- slice_id, knowledge_id, document_id, slice_text, old_slice_text, create_time, slice_index, section, parent_section, bbox, page, tenant_id, create_by
- ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- slice_text = VALUES(slice_text),
- create_time = VALUES(create_time),
- slice_index = VALUES(slice_index),
- section = VALUES(section),
- parent_section = VALUES(parent_section),
- bbox = VALUES(bbox),
- page = VALUES(page),
- tenant_id = VALUES(tenant_id),
- create_by = VALUES(create_by)
- """
- return self._execute_many(sql, values, "批量插入切片数据成功", "插入 slice_info 出错")
- def delete_to_slice(self, doc_id):
- """删除切片"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- cursor.execute("DELETE FROM slice_info WHERE document_id = %s", (doc_id,))
- connection.commit()
- logger.info(f"删除 slice_info 数据成功")
- return True, "success"
- except Error as e:
- connection.rollback()
- logger.error(f"删除 slice_info 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def insert_to_image_url(self, image_dict, knowledge_id, doc_id):
- """插入图片映射表"""
- date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- import uuid
- values = [
- (knowledge_id, doc_id, k, "image", v, date_now, generate_snowflake_id())
- for k, v in image_dict.items()
- ]
- sql = """
- INSERT INTO bm_media_replacement (
- knowledge_id, document_id, origin_text, media_type, media_url, create_time, oss_id
- ) VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- origin_text = VALUES(origin_text),
- media_type = VALUES(media_type),
- media_url = VALUES(media_url),
- create_time = VALUES(create_time),
- oss_id = VALUES(oss_id)
- """
- return self._execute_many(sql, values, "插入 bm_media_replacement 成功", "插入 bm_media_replacement 出错")
- def delete_image_url(self, doc_id):
- """删除图片映射"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- cursor.execute("DELETE FROM bm_media_replacement WHERE document_id = %s", (doc_id,))
- connection.commit()
- logger.info(f"删除 bm_media_replacement 成功")
- return True, "success"
- except Error as e:
- connection.rollback()
- logger.error(f"删除 bm_media_replacement 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def update_task_status_start(self, task_id):
- """更新任务状态为开始(1)"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- sql = f"UPDATE {TABLE_NAME} SET {STATUS_FIELD} = %s WHERE {TASK_ID_FIELD} = %s"
- cursor.execute(sql, (1, task_id))
- connection.commit()
- logger.info(f"任务 {task_id} 状态更新为开始(1)")
- return True, "success"
- except Error as e:
- connection.rollback()
- logger.error(f"更新任务状态为开始失败: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def update_task_status_complete(self, task_id, user_id):
- """更新任务状态为完成(2)并更新用户ID"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- sql = f"UPDATE {TABLE_NAME} SET {STATUS_FIELD} = %s, {USER_ID_FIELD} = %s WHERE {TASK_ID_FIELD} = %s"
- cursor.execute(sql, (2, user_id, task_id))
- connection.commit()
- logger.info(f"任务 {task_id} 状态更新为完成(2),用户ID: {user_id}")
- return True, "success"
- except Error as e:
- connection.rollback()
- logger.error(f"更新任务状态为完成失败: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def update_task_status_error(self, task_id):
- """更新任务状态为错误(0)"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- sql = f"UPDATE {TABLE_NAME} SET {STATUS_FIELD} = %s WHERE {TASK_ID_FIELD} = %s"
- cursor.execute(sql, (0, task_id))
- connection.commit()
- logger.info(f"任务 {task_id} 状态更新为错误(0)")
- return True, "success"
- except Error as e:
- connection.rollback()
- logger.error(f"更新任务状态为错误失败: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def delete_document(self, task_id):
- """删除 bm_document 表中的记录(取消任务前清理)"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor()
- sql = f"DELETE FROM {TABLE_NAME} WHERE {TASK_ID_FIELD} = %s"
- cursor.execute(sql, (task_id,))
- affected_rows = cursor.rowcount
- connection.commit()
- logger.info(f"删除 bm_document 记录成功: task_id={task_id}, 影响行数={affected_rows}")
- return True, affected_rows
- except Error as e:
- connection.rollback()
- logger.error(f"删除 bm_document 记录失败: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def query_parent_generation_enabled(self, doc_ids: list):
- """
- 查询文档的 parent_generation_enabled 字段
- 返回 parent_generation_enabled=1 的 doc_id 集合
- """
- if not doc_ids:
- return set()
- connection, info = self.get_connection()
- if not connection:
- return set()
- cursor = None
- try:
- cursor = connection.cursor()
- placeholders = ", ".join(["%s"] * len(doc_ids))
- sql = f"SELECT document_id FROM bm_document WHERE document_id IN ({placeholders}) AND parent_generation_enabled = 1"
- cursor.execute(sql, tuple(doc_ids))
- results = cursor.fetchall()
- return {row[0] for row in results}
- except Error as e:
- logger.error(f"查询 parent_generation_enabled 失败: {e}")
- return set()
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def query_knowledge_by_ids(self, knowledge_ids: list):
- """
- 根据 knowledge_ids 列表查询 bm_knowledge 表中匹配的行内容
-
- 参数:
- knowledge_ids: 知识库ID列表
- 返回:
- list: 查询结果列表,每个元素为包含所有字段的字典
- """
- if not knowledge_ids:
- return []
- connection, info = self.get_connection()
- if not connection:
- logger.error(f"获取数据库连接失败: {info}")
- return []
- cursor = None
- try:
- # 返回字典
- cursor = connection.cursor(dictionary=True)
- placeholders = ", ".join(["%s"] * len(knowledge_ids))
- sql = f"SELECT * FROM bm_knowledge WHERE knowledge_id IN ({placeholders})"
- cursor.execute(sql, tuple(knowledge_ids))
- results = cursor.fetchall()
- logger.info(f"查询 bm_knowledge 成功,共 {len(results)} 条记录")
- return results
- except Error as e:
- logger.error(f"查询 bm_knowledge 失败: {e}")
- return []
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def insert_oss_record(self, file_name, url, tenant_id="000000", file_extension="", file_size = 0):
- """插入 OSS 记录到 sys_oss 表"""
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- # 生成类似 "a2922173479520702464" 的 oss_id 20位
- # import random
- # oss_id = f"a{int(time.time() * 1000)}{random.randint(1000, 9999)}"
- oss_id = Snowflake().generate_id()
- create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- cursor = connection.cursor()
- sql = """
- INSERT INTO sys_oss (oss_id, tenant_id, file_name, original_name, url, create_time, file_suffix, size)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- """
- cursor.execute(sql, (oss_id, tenant_id, file_name, file_name, url, create_time, file_extension, file_size))
- connection.commit()
- logger.info(f"OSS 记录插入成功: {oss_id}")
- return True, oss_id
- except Error as e:
- connection.rollback()
- logger.error(f"插入 OSS 记录失败: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- # def update_total_doc_len(self, update_json, max_retries=3):
- # """
- # 更新长度表和文档长度表,删除/更新/插入 slice_info 表
- # 支持锁冲突时自动重试
- # """
- # knowledge_id = update_json.get("knowledge_id")
- # doc_id = update_json.get("doc_id")
- # chunk_len = update_json.get("chunk_len")
- # operate = update_json.get("operate")
- # chunk_id = update_json.get("chunk_id")
- # chunk_text = update_json.get("chunk_text", "")
- # slice_index = update_json.get("slice_index")
-
- # last_error = None
- # for attempt in range(max_retries):
- # connection, info = self.get_connection()
- # if not connection:
- # return False, info
-
- # cursor = None
- # try:
- # cursor = connection.cursor()
-
- # # 设置较短的锁等待超时,避免长时间阻塞(10秒)
- # cursor.execute("SET SESSION innodb_lock_wait_timeout = 10")
-
- # # 查询文档当前信息
- # cursor.execute(
- # "SELECT word_num, slice_total FROM bm_document WHERE document_id = %s",
- # (doc_id,)
- # )
- # doc_result = cursor.fetchone()
- # logger.info(f"查询到的文档长度信息:{doc_result}")
-
- # # 查询知识库当前信息
- # cursor.execute(
- # "SELECT word_num FROM bm_knowledge WHERE knowledge_id = %s",
- # (knowledge_id,)
- # )
- # knowledge_result = cursor.fetchone()
- # logger.info(f"查询到的知识库总长度信息:{knowledge_result}")
-
- # # 计算新的文档长度
- # if not doc_result:
- # new_word_num = chunk_len if chunk_len > 0 else 0
- # slice_total = 0
- # else:
- # old_word_num = doc_result[0] or 0
- # slice_total = doc_result[1] or 0
- # new_word_num = old_word_num + chunk_len
- # if operate == "delete":
- # slice_total = max(0, slice_total - 1)
-
- # # 计算新的知识库长度
- # if not knowledge_result:
- # new_knowledge_word_num = chunk_len if chunk_len > 0 else 0
- # else:
- # old_knowledge_word_num = knowledge_result[0] or 0
- # new_knowledge_word_num = old_knowledge_word_num + chunk_len
-
- # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- # # 根据操作类型执行不同的 SQL
- # if operate == "update":
- # # # 更新文档长度
- # # cursor.execute(
- # # "UPDATE bm_document SET word_num = %s WHERE document_id = %s",
- # # (new_word_num, doc_id)
- # # )
- # # 更新切片内容
- # cursor.execute(
- # "UPDATE slice_info SET slice_text = %s, update_time = %s WHERE slice_id = %s",
- # (chunk_text, date_now, chunk_id)
- # )
-
- # elif operate == "insert":
- # # 获取最大切片索引
- # # cursor.execute(
- # # "SELECT MAX(slice_index) FROM slice_info WHERE document_id = %s",
- # # (doc_id,)
- # # )
- # # chunk_index_result = cursor.fetchone()[0]
- # # chunk_max_index = int(chunk_index_result) if chunk_index_result else 0
-
- # # # 更新文档长度
- # # cursor.execute(
- # # "UPDATE bm_document SET word_num = %s WHERE document_id = %s",
- # # (new_word_num, doc_id)
- # # )
- # # 插入新切片
- # cursor.execute(
- # """INSERT INTO slice_info
- # (slice_id, knowledge_id, document_id, slice_text, create_time, slice_index, old_slice_text)
- # VALUES (%s, %s, %s, %s, %s, %s, %s)""",
- # (chunk_id, knowledge_id, doc_id, chunk_text, date_now, int(slice_index), chunk_text)
- # )
-
- # elif operate == "delete":
- # # # 更新文档长度和切片总数
- # # cursor.execute(
- # # "UPDATE bm_document SET word_num = %s, slice_total = %s WHERE document_id = %s",
- # # (new_word_num, slice_total, doc_id)
- # # )
- # # 删除切片
- # # cursor.execute(
- # # "DELETE FROM slice_info WHERE slice_id = %s",
- # # (chunk_id,)
- # # )
- # cursor.execute(
- # "UPDATE slice_info SET del_flag = 1 WHERE slice_id = %s",
- # (chunk_id,)
- # )
-
- # # # 更新知识库总长度
- # # cursor.execute(
- # # "UPDATE bm_knowledge SET word_num = %s WHERE knowledge_id = %s",
- # # (new_knowledge_word_num, knowledge_id)
- # # )
-
- # connection.commit()
- # logger.info("bm_document 和 bm_knowledge 数据更新成功")
- # return True, "success"
-
- # except Error as e:
- # connection.rollback()
- # last_error = str(e)
- # error_code = e.errno if hasattr(e, 'errno') else None
-
- # # 1205: Lock wait timeout exceeded
- # # 1213: Deadlock found
- # if error_code in (1205, 1213) and attempt < max_retries - 1:
- # wait_time = (attempt + 1) * 2 # 逐渐增加等待时间:2s, 4s, 6s
- # logger.warning(f"update_total_doc_len 锁冲突,第 {attempt + 1} 次重试,等待 {wait_time} 秒")
- # time.sleep(wait_time)
- # continue
- # else:
- # logger.error(f"update_total_doc_len 数据库操作出错:{e}")
- # return False, last_error
- # finally:
- # if cursor:
- # cursor.close()
- # if connection:
- # connection.close()
-
- # logger.error(f"update_total_doc_len 重试 {max_retries} 次后仍然失败")
- # return False, last_error
- ##== update_total_doc_len ==##
- # def _execute_with_retry(self, sql_func, max_retries=3):
- # last_error = None
- # for attempt in range(max_retries):
- # conn, info = self.get_connection()
- # if not conn:
- # return False, info
- # cursor = None
- # try:
- # cursor = conn.cursor()
- # cursor.execute("SET SESSION innodb_lock_wait_timeout = 10")
- # sql_func(cursor)
- # conn.commit()
- # return True, "success"
- # except Error as e:
- # conn.rollback()
- # last_error = str(e)
- # errno = getattr(e, "errno", None)
- # if errno in (1205, 1213) and attempt < max_retries - 1:
- # wait = (attempt + 1) * 2
- # logger.warning(f"锁冲突重试 {attempt + 1} 次,等待 {wait}s")
- # time.sleep(wait)
- # continue
- # else:
- # logger.error(f"SQL 执行失败: {e}")
- # return False, last_error
- # finally:
- # if cursor:
- # cursor.close()
- # conn.close()
- # return False, last_error
- # def update_slice_insert(self, data):
- # def _sql(cursor):
- # cursor.execute(
- # """
- # INSERT INTO slice_info
- # (slice_id, knowledge_id, document_id, slice_text,
- # create_time, slice_index, old_slice_text)
- # VALUES (%s,%s,%s,%s,%s,%s,%s)
- # """,
- # (
- # data["chunk_id"],
- # data["knowledge_id"],
- # data["doc_id"],
- # data["chunk_text"],
- # data["date_now"],
- # int(data["slice_index"]),
- # data["chunk_text"],
- # ),
- # )
- # return self._execute_with_retry(_sql)
- # def update_slice_update(self, data):
- # def _sql(cursor):
- # cursor.execute(
- # """
- # UPDATE slice_info
- # SET slice_text = %s, update_time = %s
- # WHERE slice_id = %s
- # """,
- # (data["chunk_text"], data["date_now"], data["chunk_id"]),
- # )
- # return self._execute_with_retry(_sql)
- # def update_slice_delete(self, data):
- # def _sql(cursor):
- # cursor.execute(
- # "UPDATE slice_info SET del_flag = 1 WHERE slice_id = %s",
- # (data["chunk_id"],),
- # )
- # return self._execute_with_retry(_sql)
- # def update_document_word_num(self, doc_id, delta_len, slice_delta=0):
- # def _sql(cursor):
- # cursor.execute(
- # """
- # UPDATE bm_document
- # SET word_num = word_num + %s,
- # slice_total = slice_total + %s
- # WHERE document_id = %s
- # """,
- # (delta_len, slice_delta, doc_id),
- # )
- # return self._execute_with_retry(_sql)
- # def update_knowledge_word_num(self, knowledge_id, delta_len):
- # def _sql(cursor):
- # cursor.execute(
- # """
- # UPDATE bm_knowledge
- # SET word_num = word_num + %s
- # WHERE knowledge_id = %s
- # """,
- # (delta_len, knowledge_id),
- # )
- # return self._execute_with_retry(_sql)
- # def update_total_doc_len(self, update_json):
- # operate = update_json["operate"]
- # delta_len = update_json["chunk_len"]
- # update_json["date_now"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- # if operate == "insert":
- # self.update_slice_insert(update_json)
- # # self.update_document_word_num(update_json["doc_id"], delta_len, 1)
- # # self.update_knowledge_word_num(update_json["knowledge_id"], delta_len)
- # elif operate == "update":
- # self.update_slice_update(update_json)
- # elif operate == "delete":
- # self.update_slice_delete(update_json)
- # # self.update_document_word_num(update_json["doc_id"], -delta_len, -1)
- # # self.update_knowledge_word_num(update_json["knowledge_id"], -delta_len)
- # return True, "success"
- # logger = logging.getLogger(__name__)
- # class DBOperator:
- def _execute_with_retry(self, sql_func, max_retries=3):
- last_error = None
- for attempt in range(max_retries):
- conn, info = self.get_connection()
- if not conn:
- return False, info
- cursor = None
- try:
- # 明确开启 autocommit,避免隐式事务
- conn.autocommit = True
- cursor = conn.cursor()
- # 会话参数在事务外设置
- cursor.execute("SET SESSION innodb_lock_wait_timeout = 10")
- # 显式开启事务(最小范围)
- conn.start_transaction()
- sql_func(cursor)
- conn.commit()
- return True, "success"
- except Error as e:
- last_error = str(e)
- errno = getattr(e, "errno", None)
- # 出错第一时间 rollback
- try:
- conn.rollback()
- except Exception:
- pass
- # retry 前 close 连接
- try:
- cursor and cursor.close()
- finally:
- conn.close()
- if errno in (1205, 1213) and attempt < max_retries - 1:
- wait = (attempt + 1) * 2
- logger.warning(
- f"锁冲突重试 {attempt + 1}/{max_retries},{wait}s 后重试"
- )
- time.sleep(wait)
- continue
- else:
- logger.error(f"SQL 执行失败: {e}")
- return False, last_error
- finally:
- # 正常路径兜底关闭(异常已提前 close)
- try:
- cursor and cursor.close()
- except Exception:
- pass
- try:
- conn.close()
- except Exception:
- pass
- return False, last_error
- # ------------------ 业务 SQL ------------------
- def update_slice_insert(self, data):
- def _sql(cursor):
- cursor.execute(
- """
- INSERT INTO slice_info
- (slice_id, knowledge_id, document_id, slice_text,
- create_time, slice_index, old_slice_text)
- VALUES (%s,%s,%s,%s,%s,%s,%s)
- """,
- (
- data["chunk_id"],
- data["knowledge_id"],
- data["doc_id"],
- data["chunk_text"],
- data["date_now"],
- int(data["slice_index"]),
- data["chunk_text"],
- ),
- )
- return self._execute_with_retry(_sql)
- def update_slice_update(self, data):
- def _sql(cursor):
- cursor.execute(
- """
- UPDATE slice_info
- SET slice_text = %s,
- update_time = %s
- WHERE slice_id = %s
- """,
- (
- data["chunk_text"],
- data["date_now"],
- data["chunk_id"],
- ),
- )
- return self._execute_with_retry(_sql)
- def update_slice_delete(self, data):
- def _sql(cursor):
- cursor.execute(
- """
- UPDATE slice_info
- SET del_flag = 1
- WHERE slice_id = %s
- """,
- (data["chunk_id"],),
- )
- return self._execute_with_retry(_sql)
-
- def batch_update_slice_delete(self, chunk_ids: list):
- """
- 批量标记切片为删除状态
-
- 参数:
- chunk_ids: 切片ID列表
- 返回:
- (success, message)
- """
- if not chunk_ids:
- return True, "success"
-
- def _sql(cursor):
- # 构造 IN 表达式
- placeholders = ",".join(["%s"] * len(chunk_ids))
- cursor.execute(
- f"""
- UPDATE slice_info
- SET del_flag = 1
- WHERE slice_id IN ({placeholders})
- """,
- tuple(chunk_ids)
- )
-
- return self._execute_with_retry(_sql)
- def update_document_word_num(self, doc_id, delta_len, slice_delta=0):
- def _sql(cursor):
- cursor.execute(
- """
- UPDATE bm_document
- SET word_num = word_num + %s,
- slice_total = slice_total + %s
- WHERE document_id = %s
- """,
- (delta_len, slice_delta, doc_id),
- )
- return self._execute_with_retry(_sql)
- def update_knowledge_word_num(self, knowledge_id, delta_len):
- def _sql(cursor):
- cursor.execute(
- """
- UPDATE bm_knowledge
- SET word_num = word_num + %s
- WHERE knowledge_id = %s
- """,
- (delta_len, knowledge_id),
- )
- return self._execute_with_retry(_sql)
- def update_total_doc_len(self, update_json):
- operate = update_json["operate"]
- delta_len = update_json["chunk_len"]
- update_json["date_now"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- if operate == "insert":
- return self.update_slice_insert(update_json)
- elif operate == "update":
- return self.update_slice_update(update_json)
- elif operate == "delete":
- return self.update_slice_delete(update_json)
- return True, "success"
- def query_slice_info_by_doc_ids(self, knowledge_id: str, doc_ids: list):
- """
- 根据 knowledge_id 和 doc_ids 列表查询 slice_info 表中的数据
-
- 参数:
- knowledge_id: 知识库ID
- doc_ids: 文档ID列表
- 返回:
- (success, data_or_error): 成功时返回数据列表,失败时返回错误信息
- """
- if not doc_ids:
- return True, []
-
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ','.join(['%s'] * len(doc_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text, old_slice_text,
- create_time, update_time, slice_index
- FROM slice_info
- WHERE knowledge_id = %s AND document_id IN ({placeholders})
- """
- cursor.execute(sql, (knowledge_id, *doc_ids))
- results = cursor.fetchall()
- logger.info(f"查询 slice_info 成功,共 {len(results)} 条记录")
- return True, results
- except Error as e:
- logger.error(f"查询 slice_info 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
-
- def query_media_replacement_by_doc_ids(self, knowledge_id: str, doc_ids: list):
- """
- 根据 knowledge_id 和 doc_ids 列表查询 bm_media_replacement 表中的数据
-
- 参数:
- knowledge_id: 知识库ID
- doc_ids: 文档ID列表
- 返回:
- (success, data_or_error): 成功时返回数据列表,失败时返回错误信息
- """
- if not doc_ids:
- return True, []
-
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ','.join(['%s'] * len(doc_ids))
- sql = f"""
- SELECT knowledge_id, document_id, origin_text, media_type, media_url, create_time
- FROM bm_media_replacement
- WHERE knowledge_id = %s AND document_id IN ({placeholders})
- """
- cursor.execute(sql, (knowledge_id, *doc_ids))
- results = cursor.fetchall()
- logger.info(f"查询 bm_media_replacement 成功,共 {len(results)} 条记录")
- return True, results
- except Error as e:
- logger.error(f"查询 bm_media_replacement 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_bm_document_by_doc_ids(self, knowledge_id: str, doc_ids: list):
- """
- 根据 knowledge_id 和 doc_ids 列表查询 bm_document 表中的数据
-
- 参数:
- knowledge_id: 知识库ID
- doc_ids: 文档ID列表
- 返回:
- (success, data_or_error): 成功时返回数据列表,失败时返回错误信息
- """
- if not doc_ids:
- return True, []
-
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ','.join(['%s'] * len(doc_ids))
- sql = f"""
- SELECT * FROM bm_document
- WHERE knowledge_id = %s AND document_id IN ({placeholders})
- """
- cursor.execute(sql, (knowledge_id, *doc_ids))
- results = cursor.fetchall()
- logger.info(f"查询 bm_document 成功,共 {len(results)} 条记录")
- return True, results
- except Error as e:
- logger.error(f"查询 bm_document 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def copy_docs_metadata_to_new_knowledge(self, source_knowledge_id: str, source_doc_ids: list, new_knowledge_id: str, doc_id_mapping: dict = None, chunk_id_mapping: dict = None, tenant_id: str = "000000"):
- """
- 复制文档元数据到新知识库(bm_document、slice_info 和 bm_media_replacement)
-
- 参数:
- source_knowledge_id: 源知识库ID
- source_doc_ids: 源文档ID列表
- new_knowledge_id: 新的知识库ID
- doc_id_mapping: 旧文档ID到新文档ID的映射 {old_doc_id: new_doc_id}
- 如果不提供,将为每个文档生成新的雪花算法ID
- chunk_id_mapping: 旧切片ID到新切片ID的映射 {old_chunk_id: new_chunk_id}
- 用于 slice_id,保持与向量库一致
- tenant_id: 租户ID(由前端传入)
-
- 返回:
- (success, result_dict): result_dict 包含复制结果详情
- """
- if not source_doc_ids:
- return True, {"message": "无需复制的文档", "doc_id_mapping": {}}
-
- # 如果没有提供映射,为每个源文档生成新的 doc_id
- if doc_id_mapping is None:
- doc_id_mapping = {}
- for old_doc_id in source_doc_ids:
- doc_id_mapping[old_doc_id] = generate_snowflake_id()
-
- # 1. 查询源 slice_info 数据
- success, slice_data = self.query_slice_info_by_doc_ids(source_knowledge_id, source_doc_ids)
- if not success:
- return False, {"error": f"查询切片数据失败: {slice_data}"}
-
- # 2. 查询源 bm_media_replacement 数据
- success, media_data = self.query_media_replacement_by_doc_ids(source_knowledge_id, source_doc_ids)
- if not success:
- return False, {"error": f"查询媒体映射失败: {media_data}"}
-
- # 3. 查询源 bm_document 数据
- success, doc_data = self.query_bm_document_by_doc_ids(source_knowledge_id, source_doc_ids)
- if not success:
- return False, {"error": f"查询文档数据失败: {doc_data}"}
-
- connection, info = self.get_connection()
- if not connection:
- return False, {"error": info}
-
- cursor = None
- try:
- cursor = connection.cursor()
- date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- slice_count = 0
- media_count = 0
- doc_count = 0
-
- # 4. 插入 bm_document
- if doc_data:
- # 定义所有字段及特殊处理
- all_fields = [
- 'document_id', 'knowledge_id', 'custom_separator', 'sentence_size', 'length',
- 'word_num', 'slice_total', 'name', 'url', 'parse_image', 'tenant_id',
- 'create_dept', 'create_by', 'create_time', 'update_by', 'update_time',
- 'remark', 'parsing_type', 'oss_id', 'status', 'mark_oss_id', 'mark_url',
- 'ref_document_id', 'qa_checked', 'related_questions_enabled',
- 'summary_generation_enabled', 'parent_generation_enabled', 'suffix', 'pdf_url'
- ]
- doc_values = []
- for row in doc_data:
- old_doc_id = row['document_id']
- new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
- # 特殊处理字段
- special = {
- 'document_id': new_doc_id,
- 'knowledge_id': new_knowledge_id,
- 'tenant_id': tenant_id,
- 'create_time': date_now,
- 'ref_document_id': old_doc_id
- }
- doc_values.append(tuple(special.get(f, row.get(f)) for f in all_fields))
-
- placeholders = ','.join(['%s'] * len(all_fields))
- doc_sql = f"INSERT INTO bm_document ({','.join(all_fields)}) VALUES ({placeholders})"
- cursor.executemany(doc_sql, doc_values)
- doc_count = len(doc_values)
- logger.info(f"插入文档数据成功: {doc_count} 条")
-
- # 5. 插入 slice_info(使用 old_slice_text 存储原始文本)
- if slice_data:
- slice_values = []
-
- for row in slice_data:
- old_doc_id = row['document_id']
- old_slice_id = row['slice_id']
- new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
- # 使用向量库中的 chunk_id 作为 slice_id(保持一致)
- new_slice_id = chunk_id_mapping.get(old_slice_id, generate_snowflake_id()) if chunk_id_mapping else generate_snowflake_id()
-
- # old_slice_text 存储原始文本
- old_slice_text = row.get('old_slice_text') or row['slice_text']
-
- slice_values.append((
- new_slice_id,
- new_knowledge_id,
- new_doc_id,
- row['slice_text'],
- old_slice_text,
- date_now,
- row.get('slice_index')
- ))
-
- slice_sql = """
- INSERT INTO slice_info (
- slice_id, knowledge_id, document_id, slice_text, old_slice_text, create_time, slice_index
- ) VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
-
- cursor.executemany(slice_sql, slice_values)
- slice_count = len(slice_values)
- logger.info(f"插入切片数据成功: {slice_count} 条")
-
- # 6. 插入 bm_media_replacement
- if media_data:
- media_values = []
- for row in media_data:
- old_doc_id = row['document_id']
- new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
-
- media_values.append((
- new_knowledge_id,
- new_doc_id,
- row['origin_text'],
- row['media_type'],
- row['media_url'],
- date_now
- ))
-
- media_sql = """
- INSERT INTO bm_media_replacement (
- knowledge_id, document_id, origin_text, media_type, media_url, create_time
- ) VALUES (%s, %s, %s, %s, %s, %s)
- """
- cursor.executemany(media_sql, media_values)
- media_count = len(media_values)
- logger.info(f"插入图片映射数据成功: {media_count} 条")
-
- connection.commit()
-
- return True, {
- "message": "复制元数据成功",
- "doc_id_mapping": doc_id_mapping,
- "doc_count": doc_count,
- "slice_count": slice_count,
- "media_count": media_count
- }
-
- except Error as e:
- connection.rollback()
- logger.error(f"复制文档元数据失败: {e}")
- return False, {"error": str(e)}
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def copy_single_doc_metadata_for_document_copy(self, source_knowledge_id: str, source_doc_id: str,
- new_knowledge_id: str, new_doc_id: str,
- chunk_id_mapping: dict = None, tenant_id: str = "000000"):
- """
- 单文档复制模式:复制文档元数据到新知识库(bm_document、slice_info 和 bm_media_replacement)
-
- 与 copy_docs_metadata_to_new_knowledge 的区别:
- - 针对单个文档
- - slice_info 的 slice_text 和 old_slice_text 都使用旧文档的 old_slice_text
-
- 参数:
- source_knowledge_id: 源知识库ID
- source_doc_id: 源文档ID
- new_knowledge_id: 新的知识库ID
- new_doc_id: 新的文档ID(由前端传入)
- chunk_id_mapping: 旧切片ID到新切片ID的映射 {old_chunk_id: new_chunk_id}
- tenant_id: 租户ID
-
- 返回:
- (success, result_dict): result_dict 包含复制结果详情
- """
- source_doc_ids = [source_doc_id]
-
- # 1. 查询源 slice_info 数据
- success, slice_data = self.query_slice_info_by_doc_ids(source_knowledge_id, source_doc_ids)
- if not success:
- return False, {"error": f"查询切片数据失败: {slice_data}"}
-
- # 2. 查询源 bm_media_replacement 数据
- success, media_data = self.query_media_replacement_by_doc_ids(source_knowledge_id, source_doc_ids)
- if not success:
- return False, {"error": f"查询媒体映射失败: {media_data}"}
-
- # 3. 查询源 bm_document 数据
- success, doc_data = self.query_bm_document_by_doc_ids(source_knowledge_id, source_doc_ids)
- if not success:
- return False, {"error": f"查询文档数据失败: {doc_data}"}
-
- connection, info = self.get_connection()
- if not connection:
- return False, {"error": info}
-
- cursor = None
- try:
- cursor = connection.cursor()
- date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-
- slice_count = 0
- media_count = 0
- doc_count = 0
-
- # 4. 更新 bm_document(根据 document_id 更新)
- if doc_data:
- # 需要更新的字段(排除 document_id)
- update_fields = [
- 'custom_separator', 'sentence_size', 'length',
- 'word_num', 'slice_total', 'url', 'parse_image',
- 'remark', 'parsing_type', 'oss_id', 'mark_oss_id', 'mark_url',
- 'ref_document_id', 'qa_checked', 'related_questions_enabled',
- 'summary_generation_enabled', 'parent_generation_enabled', 'suffix', 'pdf_url',
- 'update_time'
- ]
-
- for row in doc_data:
- old_doc_id = row['document_id']
- # 构建更新值
- update_values = []
- set_clauses = []
- for field in update_fields:
- if field == 'ref_document_id':
- update_values.append(old_doc_id)
- elif field == 'update_time':
- update_values.append(date_now)
- else:
- update_values.append(row.get(field))
- set_clauses.append(f"{field} = %s")
-
- # 添加 WHERE 条件的值
- update_values.append(new_doc_id)
-
- update_sql = f"UPDATE bm_document SET {', '.join(set_clauses)} WHERE document_id = %s"
- cursor.execute(update_sql, tuple(update_values))
- doc_count = cursor.rowcount
-
- logger.info(f"[文档复制模式] 更新文档数据成功: {doc_count} 条")
-
- # 5. 插入 slice_info(特殊处理:slice_text 和 old_slice_text 都使用旧文档的 old_slice_text)
- lightrag_data = []
- if slice_data:
- slice_values = []
-
- for row in slice_data:
- old_slice_id = row['slice_id']
- new_slice_id = chunk_id_mapping.get(old_slice_id, generate_snowflake_id()) if chunk_id_mapping else generate_snowflake_id()
-
- # 关键区别:slice_text 和 old_slice_text 都使用旧文档的 old_slice_text
- original_text = row.get('old_slice_text') or row['slice_text']
-
- lightrag_data.append(original_text)
- slice_values.append((
- new_slice_id,
- new_knowledge_id,
- new_doc_id,
- original_text, # slice_text 使用 old_slice_text
- original_text, # old_slice_text 使用 old_slice_text
- date_now,
- row.get('slice_index')
- ))
-
- slice_sql = """
- INSERT INTO slice_info (
- slice_id, knowledge_id, document_id, slice_text, old_slice_text, create_time, slice_index
- ) VALUES (%s, %s, %s, %s, %s, %s, %s)
- """
-
- cursor.executemany(slice_sql, slice_values)
- slice_count = len(slice_values)
- logger.info(f"[文档复制模式] 插入切片数据成功: {slice_count} 条")
-
- # 6. 插入 bm_media_replacement
- if media_data:
- media_values = []
- for row in media_data:
- media_values.append((
- new_knowledge_id,
- new_doc_id,
- row['origin_text'],
- row['media_type'],
- row['media_url'],
- date_now
- ))
-
- media_sql = """
- INSERT INTO bm_media_replacement (
- knowledge_id, document_id, origin_text, media_type, media_url, create_time
- ) VALUES (%s, %s, %s, %s, %s, %s)
- """
- cursor.executemany(media_sql, media_values)
- media_count = len(media_values)
- logger.info(f"[文档复制模式] 插入图片映射数据成功: {media_count} 条")
-
- connection.commit()
- result = {
- "new_knowledge_id": new_knowledge_id,
- "new_doc_id": new_doc_id,
- "lightrag_data": lightrag_data
- }
-
- return True, {
- "message": "文档复制模式:复制元数据成功",
- "doc_count": doc_count,
- "slice_count": slice_count,
- "media_count": media_count,
- "result": result
- }
-
- except Error as e:
- connection.rollback()
- logger.error(f"[文档复制模式] 复制文档元数据失败: {e}")
- return False, {"error": str(e)}
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_slice_by_id(self, knowledge_id: str, slice_id: str):
- """
- 根据 knowledge_id 和 slice_id 查询单个切片数据
- """
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- sql = """
- SELECT slice_id, knowledge_id, document_id, slice_text, old_slice_text, slice_index
- FROM slice_info WHERE knowledge_id = %s AND slice_id = %s
- """
- cursor.execute(sql, (knowledge_id, slice_id))
- results = cursor.fetchall()
- return True, results
- except Error as e:
- logger.error(f"查询 slice_info 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_slice_revision_info_by_slice_ids(self, knowledge_id: str, slice_ids: list):
- """批量查询切片的修订/废弃信息及展示所需字段"""
- if not slice_ids:
- return True, []
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(slice_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text, section, parent_section,
- revision_status, ref_slice_id
- FROM slice_info
- WHERE knowledge_id = %s AND slice_id IN ({placeholders})
- """
- cursor.execute(sql, (knowledge_id, *slice_ids))
- return True, cursor.fetchall()
- except Error as e:
- logger.error(f"批量查询 slice_info 修订字段出错: {e}")
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(slice_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text,
- revision_status, ref_slice_id
- FROM slice_info
- WHERE knowledge_id = %s AND slice_id IN ({placeholders})
- """
- cursor.execute(sql, (knowledge_id, *slice_ids))
- rows = cursor.fetchall()
- for r in rows:
- r["section"] = None
- r["parent_section"] = None
- return True, rows
- except Error as e1:
- logger.error(f"批量查询 slice_info 修订字段(无section)也失败: {e1}")
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(slice_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text
- FROM slice_info
- WHERE knowledge_id = %s AND slice_id IN ({placeholders})
- """
- cursor.execute(sql, (knowledge_id, *slice_ids))
- rows = cursor.fetchall()
- for r in rows:
- r["section"] = None
- r["parent_section"] = None
- r["revision_status"] = None
- r["ref_slice_id"] = None
- return True, rows
- except Error as e2:
- logger.error(f"批量查询 slice_info 兜底也失败: {e2}")
- return False, str(e2)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_slice_revision_info_by_slice_ids_any(self, slice_ids: list):
- if not slice_ids:
- return True, []
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(slice_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text, section, parent_section,
- revision_status, ref_slice_id, revision_slice_text
- FROM slice_info
- WHERE slice_id IN ({placeholders})
- """
- cursor.execute(sql, tuple(slice_ids))
- return True, cursor.fetchall()
- except Error as e:
- logger.error(f"批量查询 slice_info 修订字段(不含knowledge)出错: {e}")
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(slice_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text,
- revision_status, ref_slice_id, revision_slice_text
- FROM slice_info
- WHERE slice_id IN ({placeholders})
- """
- cursor.execute(sql, tuple(slice_ids))
- rows = cursor.fetchall()
- for r in rows:
- r["section"] = None
- r["parent_section"] = None
- r["revision_slice_text"] = None
- return True, rows
- except Error as e1:
- logger.error(f"批量查询 slice_info 修订字段(不含knowledge,无section)也失败: {e1}")
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(slice_ids))
- sql = f"""
- SELECT slice_id, knowledge_id, document_id, slice_text
- FROM slice_info
- WHERE slice_id IN ({placeholders})
- """
- cursor.execute(sql, tuple(slice_ids))
- rows = cursor.fetchall()
- for r in rows:
- r["section"] = None
- r["parent_section"] = None
- r["revision_status"] = None
- r["ref_slice_id"] = None
- r["revision_slice_text"] = None
- return True, rows
- except Error as e2:
- logger.error(f"批量查询 slice_info(不含knowledge)兜底也失败: {e2}")
- return False, str(e2)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_document_names_by_document_ids(self, document_ids: list):
- if not document_ids:
- return True, {}
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- placeholders = ",".join(["%s"] * len(document_ids))
- sql = f"""
- SELECT document_id, name
- FROM bm_document
- WHERE document_id IN ({placeholders})
- """
- cursor.execute(sql, tuple(document_ids))
- rows = cursor.fetchall()
- result = {}
- for r in rows:
- if r.get("document_id"):
- result[r["document_id"]] = r.get("name")
- return True, result
- except Error as e:
- logger.error(f"批量查询 bm_document.name 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_slice_by_knowledge_and_doc(self, knowledge_id: str, document_id: str):
- """
- 根据 knowledge_id 和 document_id 查询 slice_info 表中的切片数据
- """
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- sql = """
- SELECT slice_id, knowledge_id, document_id, slice_text, old_slice_text, slice_index
- FROM slice_info
- WHERE knowledge_id = %s AND document_id = %s
- """
- cursor.execute(sql, (knowledge_id, document_id))
- results = cursor.fetchall()
- logger.info(f"查询 slice_info 成功,共 {len(results)} 条记录")
- return True, results
- except Error as e:
- logger.error(f"查询 slice_info 出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- # def update_slice_llm_fields(self, knowledge_id: str, slice_id: str, qa: str = None, question: str = None, summary: str = None):
- # """
- # 更新 slice_info 表中的 qa、question、summary 字段,值为 None 的字段不更新
- # 基于 knowledge_id 和 slice_id 的包含关系进行更新
- # """
- # connection, info = self.get_connection()
- # if not connection:
- # return False, info
- # cursor = None
- # try:
- # # 动态构建 SET 子句,只更新非 None 字段
- # set_parts = []
- # params = []
- # if qa is not None:
- # set_parts.append("qa = %s")
- # params.append(qa)
- # if question is not None:
- # set_parts.append("question = %s")
- # params.append(question)
- # if summary is not None:
- # set_parts.append("summary = %s")
- # params.append(summary)
-
- # if not set_parts:
- # return True, "no fields to update"
-
- # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # set_parts.append("update_time = %s")
- # params.append(date_now)
- # params.append(knowledge_id)
- # params.append(slice_id)
-
- # cursor = connection.cursor()
- # sql = f"UPDATE slice_info SET {', '.join(set_parts)} WHERE knowledge_id = %s AND slice_id = %s"
- # cursor.execute(sql, tuple(params))
- # connection.commit()
- # logger.info(f"更新切片 {slice_id} 的LLM字段成功")
- # return True, "success"
- # except Error as e:
- # connection.rollback()
- # logger.error(f"更新 slice_info LLM字段出错: {e}")
- # return False, str(e)
- # finally:
- # if cursor:
- # cursor.close()
- # if connection:
- # connection.close()
- def update_slice_llm_fields(
- self,
- knowledge_id: str,
- slice_id: str,
- qa: str = None,
- question: str = None,
- summary: str = None
- ):
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- set_parts = []
- params = []
- if qa is not None:
- set_parts.append("qa = %s")
- params.append(qa)
- if question is not None:
- set_parts.append("question = %s")
- params.append(question)
- if summary is not None:
- set_parts.append("summary = %s")
- params.append(summary)
- if not set_parts:
- connection.rollback()
- return True, "no fields to update"
- set_parts.append("update_time = %s")
- params.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
- # params.append(knowledge_id)
- params.append(slice_id)
- cursor = connection.cursor()
- sql = f"""
- UPDATE slice_info
- SET {', '.join(set_parts)}
- WHERE slice_id = %s
- """
- cursor.execute(sql, tuple(params))
- connection.commit()
- return True, "success"
- except Exception as e:
- try:
- connection.rollback()
- except Exception:
- pass
- logger.error(f"更新 slice_info LLM字段出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
- def query_doc_metadata_flags(self, document_id: str):
- """
- 根据 document_id 查询 bm_document 表的 qa_checked、related_questions_enabled、summary_generation_enabled 字段
- 返回: (success, dict) 其中 dict 包含 qa, question, summary 布尔值
- """
- connection, info = self.get_connection()
- if not connection:
- return False, info
- cursor = None
- try:
- cursor = connection.cursor(dictionary=True)
- sql = """
- SELECT qa_checked, related_questions_enabled, summary_generation_enabled
- FROM bm_document WHERE document_id = %s
- """
- cursor.execute(sql, (document_id,))
- result = cursor.fetchone()
- if not result:
- return True, {"qa": False, "question": False, "summary": False}
- return True, {
- "qa": result.get("qa_checked") == 1,
- "question": result.get("related_questions_enabled") == 1,
- "summary": result.get("summary_generation_enabled") == 1
- }
- except Error as e:
- logger.error(f"查询 bm_document 元数据字段出错: {e}")
- return False, str(e)
- finally:
- if cursor:
- cursor.close()
- if connection:
- connection.close()
|