db.py 98 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535
  1. from rag.vector_db.milvus_vector import HybridRetriever
  2. from response_info import generate_message, generate_response
  3. from utils.get_logger import setup_logger
  4. from datetime import datetime
  5. from uuid import uuid1
  6. import mysql.connector
  7. from mysql.connector import pooling, Error, errors
  8. import threading
  9. from concurrent.futures import ThreadPoolExecutor, TimeoutError
  10. from config import milvus_uri, mysql_config
  11. import time
  12. logger = setup_logger(__name__)
  13. # uri = "http://localhost:19530"
  14. # if 'POOL' not in globals():
  15. # try:
  16. # POOL = pooling.MySQLConnectionPool(
  17. # pool_name="mysql_pool",
  18. # pool_size=10,
  19. # **mysql_config
  20. # )
  21. # logger.info("MySQL 连接池初始化成功")
  22. # except Error as e:
  23. # logger.info(f"初始化 MySQL 连接池失败: {e}")
  24. # POOL = None
  25. class MilvusOperate:
  26. def __init__(self, collection_name: str = "default", embedding_name:str = "e5"):
  27. self.collection = collection_name
  28. self.hybrid_retriever = HybridRetriever(uri=milvus_uri, embedding_name=embedding_name, collection_name=collection_name)
  29. self.mysql_client = MysqlOperate()
  30. def _has_collection(self):
  31. is_collection = self.hybrid_retriever.has_collection()
  32. return is_collection
  33. def _create_collection(self):
  34. if self._has_collection():
  35. resp = {"code": 400, "message": "数据库已存在"}
  36. else:
  37. create_result = self.hybrid_retriever.build_collection()
  38. resp = generate_message(create_result)
  39. return resp
  40. def _delete_collection(self):
  41. delete_result = self.hybrid_retriever.delete_collection(self.collection)
  42. resp = generate_message(delete_result)
  43. return resp
  44. def _put_by_id(self, slice_json):
  45. slice_id = slice_json.get("slice_id", None)
  46. slice_text = slice_json.get("slice_text", None)
  47. update_result, chunk_len = self.hybrid_retriever.update_data(chunk_id=slice_id, chunk=slice_text)
  48. if update_result.endswith("success"):
  49. # 如果成功,更新mysql中知识库总长度和文档长度
  50. update_json = {}
  51. update_json["knowledge_id"] = slice_json.get("knowledge_id")
  52. update_json["doc_id"] = slice_json.get("document_id")
  53. update_json["chunk_len"] = chunk_len
  54. update_json["operate"] = "update"
  55. update_json["chunk_id"] = slice_id
  56. update_json["chunk_text"] = slice_text
  57. update_flag, update_str = self.mysql_client.update_total_doc_len(update_json)
  58. else:
  59. update_flag = False
  60. if not update_flag:
  61. update_result = "update_error"
  62. resp = generate_message(update_result)
  63. return resp
  64. def _insert_slice(self, slice_json):
  65. slice_id = str(uuid1())
  66. knowledge_id = slice_json.get("knowledge_id")
  67. doc_id = slice_json.get("document_id")
  68. slice_text = slice_json.get("slice_text", None)
  69. doc_name = slice_json.get("doc_name")
  70. chunk_len = len(slice_text)
  71. metadata = {
  72. "content": slice_text,
  73. "doc_id": doc_id,
  74. "chunk_id": slice_id,
  75. "metadata": {"source": doc_name, "chunk_len": chunk_len},
  76. "Chapter": slice_json.get("Chapter",""),
  77. "Father_Chapter": slice_json.get("Father_Chapter",""),
  78. }
  79. insert_flag, insert_str = self.hybrid_retriever.insert_data(slice_text, metadata)
  80. if insert_flag:
  81. # 如果成功,更新mysql中知识库总长度和文档长度
  82. update_json = {}
  83. update_json["knowledge_id"] = slice_json.get("knowledge_id")
  84. update_json["doc_id"] = slice_json.get("document_id")
  85. update_json["chunk_len"] = chunk_len
  86. update_json["operate"] = "insert"
  87. update_json["chunk_id"] = slice_id
  88. update_json["chunk_text"] = slice_text
  89. update_json["slice_index"] = slice_json.get("slice_index")
  90. update_flag, update_str = self.mysql_client.update_total_doc_len(update_json)
  91. else:
  92. logger.error(f"插入向量库出错:{insert_str}")
  93. update_flag = False
  94. update_str = "向量库写入出错"
  95. # pass
  96. if not update_flag:
  97. logger.error(f"新增切片中mysql数据库出错:{update_str}")
  98. # insert_result = "insert_error"
  99. success = "insert_error"
  100. else:
  101. # insert_result = "insert_success"
  102. success = "insert_success"
  103. # resp = generate_message(insert_result)
  104. resp = {"status": success, "slice_id":slice_id}
  105. return resp
  106. def _delete_by_chunk_id(self, chunk_id, knowledge_id, document_id):
  107. logger.info(f"删除的切片id:{chunk_id}")
  108. delete_result, delete_chunk_len = self.hybrid_retriever.delete_by_chunk_id(chunk_id=chunk_id)
  109. if delete_result.endswith("success"):
  110. chunk_len = delete_chunk_len[0]
  111. update_json = {
  112. "knowledge_id": knowledge_id,
  113. "doc_id": document_id,
  114. "chunk_len": -chunk_len,
  115. "operate": "delete",
  116. "chunk_id": chunk_id
  117. }
  118. update_flag, update_str = self.mysql_client.update_total_doc_len(update_json)
  119. else:
  120. logger.error("根据chunk id删除向量库失败")
  121. update_flag = False
  122. update_str = "根据chunk id删除失败"
  123. if not update_flag:
  124. logger.error(update_str)
  125. delete_result = "delete_error"
  126. resp = generate_message(delete_result)
  127. return resp
  128. def _batch_delete_by_chunk_ids(self, chunk_ids: list, knowledge_id: str, document_id: str):
  129. """
  130. 批量删除切片
  131. 参数:
  132. chunk_ids: 切片ID列表
  133. knowledge_id: 知识库ID
  134. document_id: 文档ID
  135. 返回:
  136. 响应字典
  137. """
  138. logger.info(f"批量删除 {len(chunk_ids)} 个切片")
  139. # 1. 批量删除向量库数据
  140. delete_result, chunk_lens_dict = self.hybrid_retriever.batch_delete_by_chunk_ids(chunk_ids)
  141. if not delete_result.endswith("success"):
  142. logger.error("批量删除向量库失败")
  143. return generate_message("delete_error")
  144. # 2. 批量更新 MySQL (标记为删除)
  145. update_flag, update_str = self.mysql_client.batch_update_slice_delete(chunk_ids)
  146. if not update_flag:
  147. logger.error(f"批量更新 MySQL 失败: {update_str}")
  148. return generate_message("delete_error")
  149. logger.info(f"批量删除成功: {len(chunk_ids)} 个切片")
  150. return generate_message("delete_success")
  151. def _delete_by_doc_id(self, doc_id: str = None):
  152. logger.info(f"删除数据的id:{doc_id}")
  153. delete_result = self.hybrid_retriever.delete_by_doc_id(doc_id=doc_id)
  154. resp = generate_message(delete_result)
  155. return resp
  156. def _search_by_chunk_id(self, chunk_id):
  157. if self._has_collection():
  158. query_result = self.hybrid_retriever.query_chunk_id(chunk_id=chunk_id)
  159. else:
  160. query_result = []
  161. logger.info(f"根据切片查询到的信息:{query_result}")
  162. resp = generate_response(query_result)
  163. return resp
  164. def _search_by_chunk_id_list(self, chunk_id_list):
  165. if self._has_collection():
  166. query_result = self.hybrid_retriever.query_chunk_id_list(chunk_id_list)
  167. else:
  168. query_result = []
  169. logger.info(f"召回的切片列表查询切片信息:{query_result}")
  170. chunk_content_list = []
  171. for chunk_dict in query_result:
  172. chunk_content = chunk_dict.get("content")
  173. chunk_content_list.append(chunk_content)
  174. return chunk_content_list
  175. def _search_by_key_word(self, search_json):
  176. if self._has_collection():
  177. doc_id = search_json.get("document_id", None)
  178. text = search_json.get("text", None)
  179. page_num = search_json.get("pageNum", 1)
  180. page_size = search_json.get("pageSize", 10)
  181. page_num = search_json.get("pageNum") # 根据传过来的id处理对应知识库
  182. query_result = self.hybrid_retriever.query_filter(doc_id=doc_id, filter_field=text)
  183. else:
  184. query_result = []
  185. resp = generate_response(query_result,page_num,page_size)
  186. return resp
  187. def _insert_data(self, docs):
  188. insert_flag = ""
  189. insert_info = ""
  190. for doc in docs:
  191. chunk = doc.get("content")
  192. insert_flag, insert_info = self.hybrid_retriever.insert_data(chunk, doc)
  193. if not insert_flag:
  194. break
  195. resp = insert_flag if insert_flag else "insert_error"
  196. return resp, insert_info
  197. def _batch_insert_data(self, docs, text_lists):
  198. insert_flag, insert_info = self.hybrid_retriever.batch_insert_data(text_lists, docs)
  199. resp = insert_flag
  200. return resp, insert_info
  201. def _search(self, query, k, mode):
  202. search_result = self.hybrid_retriever.search(query, k, mode)
  203. return search_result
  204. def _query_by_scalar_field(self, doc_id: str, field_name: str, field_value: str):
  205. """
  206. 根据标量字段查询数据
  207. 参数:
  208. doc_id: 文档ID
  209. field_name: 字段名(如 Father_Chapter)
  210. field_value: 字段值
  211. 返回:
  212. 查询结果列表
  213. """
  214. return self.hybrid_retriever.query_by_scalar_field(doc_id, field_name, field_value)
  215. def _copy_docs_to_new_collection(self, new_collection_name, doc_ids, embedding_name="e5"):
  216. """
  217. 将指定的文档数据复制到新集合或现有集合
  218. 使用雪花算法生成新的 doc_id 和 chunk_id
  219. 参数:
  220. new_collection_name: 目标集合名称(也是新的 knowledge_id)
  221. doc_ids: 要复制的文档ID列表
  222. embedding_name: 向量模型名称
  223. 返回:
  224. 响应字典,包含 doc_id_mapping 映射关系
  225. """
  226. try:
  227. # 1. 从源集合查询数据
  228. logger.info(f"从集合 {self.collection} 查询文档: {doc_ids}")
  229. query_results = self.hybrid_retriever.query_by_doc_ids(doc_ids)
  230. if not query_results:
  231. return {"code": 404, "message": "未找到匹配的文档数据", "doc_id_mapping": {}, "chunk_id_mapping": {}}
  232. logger.info(f"查询到 {len(query_results)} 条数据")
  233. # 2. 检查目标集合是否存在,不存在则创建
  234. target_milvus_client = MilvusOperate(collection_name=new_collection_name, embedding_name=embedding_name)
  235. collection_exists = target_milvus_client._has_collection()
  236. if not collection_exists:
  237. logger.info(f"创建新集合: {new_collection_name}")
  238. create_result = target_milvus_client._create_collection()
  239. if create_result.get("code") != 200:
  240. create_result["doc_id_mapping"] = {}
  241. return create_result
  242. else:
  243. logger.info(f"集合 {new_collection_name} 已存在,直接插入数据")
  244. # 3. 为每个源 doc_id 生成新的 doc_id(使用雪花算法)
  245. doc_id_mapping = {} # {old_doc_id: new_doc_id}
  246. for old_doc_id in doc_ids:
  247. doc_id_mapping[old_doc_id] = generate_snowflake_id()
  248. # 4. 准备插入数据(移除pk字段,使用新的 doc_id 和 chunk_id)
  249. insert_data = []
  250. chunk_id_mapping = {} # {old_chunk_id: new_chunk_id}
  251. for item in query_results:
  252. old_doc_id = item.get("doc_id")
  253. old_chunk_id = item.get("chunk_id")
  254. new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
  255. new_chunk_id = generate_snowflake_id() # 每个切片生成新的 chunk_id
  256. chunk_id_mapping[old_chunk_id] = new_chunk_id # 记录映射
  257. new_item = {
  258. "content": item.get("content"),
  259. "dense_vector": item.get("dense_vector"),
  260. "doc_id": new_doc_id,
  261. "chunk_id": new_chunk_id,
  262. "Father_Chapter": item.get("Father_Chapter"),
  263. "Chapter": item.get("Chapter"),
  264. "metadata": item.get("metadata")
  265. }
  266. insert_data.append(new_item)
  267. # 5. 批量插入数据到目标集合
  268. logger.info(f"开始向集合 {new_collection_name} 插入 {len(insert_data)} 条数据")
  269. try:
  270. target_milvus_client.hybrid_retriever.client.insert(
  271. collection_name=new_collection_name,
  272. data=insert_data
  273. )
  274. logger.info(f"成功向集合插入数据")
  275. return {
  276. "code": 200,
  277. "message": "复制成功",
  278. "data": {
  279. "source_collection": self.collection,
  280. "target_collection": new_collection_name,
  281. "doc_ids": doc_ids,
  282. "total_records": len(insert_data),
  283. "collection_existed": collection_exists
  284. },
  285. "doc_id_mapping": doc_id_mapping,
  286. "chunk_id_mapping": chunk_id_mapping
  287. }
  288. except Exception as e:
  289. logger.error(f"插入数据到集合失败: {e}")
  290. # 插入失败且是新创建的集合时删除
  291. if not collection_exists:
  292. target_milvus_client._delete_collection()
  293. return {"code": 500, "message": f"插入数据失败: {str(e)}", "doc_id_mapping": {}, "chunk_id_mapping": {}}
  294. except Exception as e:
  295. logger.error(f"复制文档到新集合失败: {e}")
  296. return {"code": 500, "message": f"复制失败: {str(e)}", "doc_id_mapping": {}, "chunk_id_mapping": {}}
  297. def _copy_single_doc_to_collection(self, new_collection_name, old_doc_id, new_doc_id, embedding_name="e5"):
  298. """
  299. 将单个文档数据复制到新集合,使用指定的新 doc_id
  300. 参数:
  301. new_collection_name: 目标集合名称
  302. old_doc_id: 源文档ID
  303. new_doc_id: 新文档ID(由前端指定)
  304. embedding_name: 向量模型名称
  305. 返回:
  306. 响应字典,包含 chunk_id_mapping 映射关系
  307. """
  308. try:
  309. # 1. 从源集合查询数据
  310. logger.info(f"[单文档复制] 从集合 {self.collection} 查询文档: {old_doc_id}")
  311. query_results = self.hybrid_retriever.query_by_doc_ids([old_doc_id])
  312. if not query_results:
  313. return {"code": 404, "message": "未找到匹配的文档数据", "chunk_id_mapping": {}}
  314. logger.info(f"[单文档复制] 查询到 {len(query_results)} 条数据")
  315. # 2. 检查目标集合是否存在,不存在则创建
  316. target_milvus_client = MilvusOperate(collection_name=new_collection_name, embedding_name=embedding_name)
  317. collection_exists = target_milvus_client._has_collection()
  318. if not collection_exists:
  319. logger.info(f"[单文档复制] 创建新集合: {new_collection_name}")
  320. create_result = target_milvus_client._create_collection()
  321. if create_result.get("code") != 200:
  322. create_result["chunk_id_mapping"] = {}
  323. return create_result
  324. else:
  325. logger.info(f"[单文档复制] 集合 {new_collection_name} 已存在,直接插入数据")
  326. # 3. 准备插入数据(使用指定的 new_doc_id,生成新的 chunk_id)
  327. insert_data = []
  328. chunk_id_mapping = {} # {old_chunk_id: new_chunk_id}
  329. for item in query_results:
  330. old_chunk_id = item.get("chunk_id")
  331. new_chunk_id = generate_snowflake_id()
  332. chunk_id_mapping[old_chunk_id] = new_chunk_id
  333. new_item = {
  334. "content": item.get("content"),
  335. "dense_vector": item.get("dense_vector"),
  336. "doc_id": new_doc_id, # 使用指定的新 doc_id
  337. "chunk_id": new_chunk_id,
  338. "Father_Chapter": item.get("Father_Chapter"),
  339. "Chapter": item.get("Chapter"),
  340. "metadata": item.get("metadata")
  341. }
  342. insert_data.append(new_item)
  343. # 4. 批量插入数据到目标集合
  344. logger.info(f"[单文档复制] 开始向集合 {new_collection_name} 插入 {len(insert_data)} 条数据")
  345. try:
  346. target_milvus_client.hybrid_retriever.client.insert(
  347. collection_name=new_collection_name,
  348. data=insert_data
  349. )
  350. logger.info(f"[单文档复制] 成功向集合插入数据")
  351. return {
  352. "code": 200,
  353. "message": "复制成功",
  354. "data": {
  355. "source_collection": self.collection,
  356. "target_collection": new_collection_name,
  357. "old_doc_id": old_doc_id,
  358. "new_doc_id": new_doc_id,
  359. "total_records": len(insert_data),
  360. "collection_existed": collection_exists
  361. },
  362. "chunk_id_mapping": chunk_id_mapping
  363. }
  364. except Exception as e:
  365. logger.error(f"[单文档复制] 插入数据到集合失败: {e}")
  366. if not collection_exists:
  367. target_milvus_client._delete_collection()
  368. return {"code": 500, "message": f"插入数据失败: {str(e)}", "chunk_id_mapping": {}}
  369. except Exception as e:
  370. logger.error(f"[单文档复制] 复制文档到新集合失败: {e}")
  371. return {"code": 500, "message": f"复制失败: {str(e)}", "chunk_id_mapping": {}}
  372. # class MysqlOperate:
  373. # def get_connection(self):
  374. # """
  375. # 从连接池中获取一个连接
  376. # :return: 数据库连接对象
  377. # """
  378. # # try:
  379. # # with ThreadPoolExecutor() as executor:
  380. # # future = executor.submit(POOL.get_connection)
  381. # # connection = future.result(timeout=5.0) # 设置超时时间为5秒
  382. # # logger.info("成功从连接池获取连接")
  383. # # return connection, "success"
  384. # # except TimeoutError:
  385. # # logger.error("获取mysql数据库连接池超时")
  386. # # return None, "mysql获取连接池超时"
  387. # # except errors.InterfaceError as e:
  388. # # logger.error(f"MySQL 接口异常:{e}")
  389. # # return None, "mysql接口异常"
  390. # # except errors.OperationalError as e:
  391. # # logger.error(f"MySQL 操作错误:{e}")
  392. # # return None, "mysql 操作错误"
  393. # # except Error as e:
  394. # # logger.error(f"无法从连接池获取连接: {e}")
  395. # # return None, str(e)
  396. # connection = None
  397. # event = threading.Event()
  398. # def target():
  399. # nonlocal connection
  400. # try:
  401. # connection = POOL.get_connection()
  402. # finally:
  403. # event.set()
  404. # thread = threading.Thread(target=target)
  405. # thread.start()
  406. # event.wait(timeout=5)
  407. # if thread.is_alive():
  408. # # 超时处理
  409. # logger.error("获取连接超时")
  410. # return None, "获取连接超时"
  411. # else:
  412. # if connection:
  413. # return connection, "success"
  414. # else:
  415. # logger.error("获取连接失败")
  416. # return None, "获取连接失败"
  417. # def insert_to_slice(self, docs, knowledge_id, doc_id):
  418. # """
  419. # 插入数据到切片信息表中 slice_info
  420. # """
  421. # connection = None
  422. # cursor = None
  423. # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  424. # values = []
  425. # connection, cennction_info = self.get_connection()
  426. # if not connection:
  427. # return False, cennction_info
  428. # for chunk in docs:
  429. # slice_id = chunk.get("chunk_id")
  430. # slice_text = chunk.get("content")
  431. # chunk_index = chunk.get("metadata").get("chunk_index")
  432. # values.append((slice_id, knowledge_id, doc_id, slice_text, date_now, chunk_index))
  433. # try:
  434. # cursor = connection.cursor()
  435. # # insert_sql = """
  436. # # INSERT INTO slice_info (
  437. # # slice_id,
  438. # # knowledge_id,
  439. # # document_id,
  440. # # slice_text,
  441. # # create_time,
  442. # # slice_index
  443. # # ) VALUES (%s, %s, %s, %s, %s,%s)
  444. # # """
  445. # # 容错“for key 'UK_ID_TYPE_KEY'”
  446. # insert_sql = """
  447. # INSERT INTO slice_info (
  448. # slice_id,
  449. # knowledge_id,
  450. # document_id,
  451. # slice_text,
  452. # create_time,
  453. # slice_index
  454. # ) VALUES (%s, %s, %s, %s, %s, %s)
  455. # ON DUPLICATE KEY UPDATE
  456. # slice_text = VALUES(slice_text),
  457. # create_time = VALUES(create_time),
  458. # slice_index = VALUES(slice_index)
  459. # """
  460. # cursor.executemany(insert_sql, values)
  461. # connection.commit()
  462. # logger.info(f"批量插入切片数据成功。")
  463. # return True, "success"
  464. # except Error as e:
  465. # logger.error(f"数据库操作出错:{e}")
  466. # connection.rollback()
  467. # return False, str(e)
  468. # finally:
  469. # # if cursor:
  470. # cursor.close()
  471. # # if connection and connection.is_connected():
  472. # connection.close()
  473. # def delete_to_slice(self, doc_id):
  474. # """
  475. # 删除 slice_info库中切片信息
  476. # """
  477. # connection = None
  478. # cursor = None
  479. # connection, connection_info = self.get_connection()
  480. # if not connection:
  481. # return False, connection_info
  482. # try:
  483. # cursor = connection.cursor()
  484. # delete_sql = f"DELETE FROM slice_info WHERE document_id = %s"
  485. # cursor.execute(delete_sql, (doc_id,))
  486. # connection.commit()
  487. # logger.info(f"删除数据成功")
  488. # return True, "success"
  489. # except Error as e:
  490. # logger.error(f"根据{doc_id}删除数据失败:{e}")
  491. # connection.rollback()
  492. # return False, str(e)
  493. # finally:
  494. # # if cursor:
  495. # cursor.close()
  496. # # if connection and connection.is_connected():
  497. # connection.close()
  498. # def insert_to_image_url(self, image_dict, knowledge_id, doc_id):
  499. # """
  500. # 批量插入数据到指定表
  501. # """
  502. # connection = None
  503. # cursor = None
  504. # connection, connection_info = self.get_connection()
  505. # if not connection:
  506. # return False, connection_info
  507. # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  508. # values = []
  509. # for img_key, img_value in image_dict.items():
  510. # origin_text = img_key
  511. # media_url = img_value
  512. # values.append((knowledge_id, doc_id, origin_text, "image", media_url, date_now))
  513. # try:
  514. # cursor = connection.cursor()
  515. # # insert_sql = """
  516. # # INSERT INTO bm_media_replacement (
  517. # # knowledge_id,
  518. # # document_id,
  519. # # origin_text,
  520. # # media_type,
  521. # # media_url,
  522. # # create_time
  523. # # ) VALUES (%s, %s, %s, %s, %s, %s)
  524. # # """
  525. # # 容错“for key 'UK_ID_TYPE_KEY'”
  526. # insert_sql = """
  527. # INSERT INTO bm_media_replacement (
  528. # knowledge_id,
  529. # document_id,
  530. # origin_text,
  531. # media_type,
  532. # media_url,
  533. # create_time
  534. # ) VALUES (%s, %s, %s, %s, %s, %s)
  535. # ON DUPLICATE KEY UPDATE
  536. # origin_text = VALUES(origin_text),
  537. # media_type = VALUES(media_type),
  538. # media_url = VALUES(media_url),
  539. # create_time = VALUES(create_time)
  540. # """
  541. # cursor.executemany(insert_sql, values)
  542. # connection.commit()
  543. # logger.info(f"插入到bm_media_replacement表成功")
  544. # return True, "success"
  545. # except Error as e:
  546. # logger.error(f"数据库操作出错:{e}")
  547. # connection.rollback()
  548. # return False, str(e)
  549. # finally:
  550. # # if cursor:
  551. # cursor.close()
  552. # # if connection and connection.is_connected():
  553. # connection.close()
  554. # def delete_image_url(self, doc_id):
  555. # """
  556. # 根据doc id删除bm_media_replacement中的数据
  557. # """
  558. # connection = None
  559. # cursor = None
  560. # connection, connection_info = self.get_connection()
  561. # if not connection:
  562. # return False, connection_info
  563. # try:
  564. # cursor = connection.cursor()
  565. # delete_sql = f"DELETE FROM bm_media_replacement WHERE document_id = %s"
  566. # cursor.execute(delete_sql, (doc_id,))
  567. # connection.commit()
  568. # logger.info(f"根据{doc_id} 删除bm_media_replacement表中数据成功")
  569. # return True, "success"
  570. # except Error as e:
  571. # logger.error(f"根据{doc_id}删除 bm_media_replacement 数据库操作出错:{e}")
  572. # connection.rollback()
  573. # return False, str(e)
  574. # finally:
  575. # # if cursor:
  576. # cursor.close()
  577. # # if connection and connection.is_connected():
  578. # connection.close()
  579. # def update_total_doc_len(self, update_json):
  580. # """
  581. # 更新长度表和文档长度表,删除slice info表, 插入slice info 切片信息
  582. # """
  583. # knowledge_id = update_json.get("knowledge_id")
  584. # doc_id = update_json.get("doc_id")
  585. # chunk_len = update_json.get("chunk_len")
  586. # operate = update_json.get("operate")
  587. # chunk_id = update_json.get("chunk_id")
  588. # chunk_text = update_json.get("chunk_text")
  589. # connection = None
  590. # cursor = None
  591. # connection, connection_info = self.get_connection()
  592. # if not connection:
  593. # return False, connection_info
  594. # try:
  595. # cursor = connection.cursor()
  596. # query_doc_word_num_sql = f"select word_num,slice_total from bm_document where document_id = %s"
  597. # query_knowledge_word_num_sql = f"select word_num from bm_knowledge where knowledge_id = %s"
  598. # cursor.execute(query_doc_word_num_sql, (doc_id,))
  599. # doc_result = cursor.fetchone()
  600. # logger.info(f"查询到的文档长度信息:{doc_result}")
  601. # cursor.execute(query_knowledge_word_num_sql, (knowledge_id, ))
  602. # knowledge_result = cursor.fetchone()
  603. # logger.info(f"查询到的知识库总长度信息:{knowledge_result}")
  604. # if not doc_result:
  605. # new_word_num = 0
  606. # slice_total = 0
  607. # else:
  608. # old_word_num = doc_result[0]
  609. # slice_total = doc_result[1]
  610. # new_word_num = old_word_num + chunk_len
  611. # slice_total -= 1 if slice_total else 0
  612. # if not knowledge_result:
  613. # new_knowledge_word_num = 0
  614. # else:
  615. # old_knowledge_word_num = knowledge_result[0]
  616. # new_knowledge_word_num = old_knowledge_word_num + chunk_len
  617. # if operate == "update":
  618. # update_sql = f"UPDATE bm_document SET word_num = %s WHERE document_id = %s"
  619. # cursor.execute(update_sql, (new_word_num, doc_id))
  620. # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  621. # update_slice_sql = f"UPDATE slice_info SET slice_text = %s, update_time = %s WHERE slice_id = %s"
  622. # cursor.execute(update_slice_sql, (chunk_text, date_now, chunk_id))
  623. # elif operate == "insert":
  624. # query_slice_info_index_sql = f"select MAX(slice_index) from slice_info where document_id = %s"
  625. # cursor.execute(query_slice_info_index_sql, (doc_id,))
  626. # chunk_index_result = cursor.fetchone()[0]
  627. # # logger.info(chunk_index_result)
  628. # if chunk_index_result:
  629. # chunk_max_index = int(chunk_index_result)
  630. # else:
  631. # chunk_max_index = 0
  632. # update_sql = f"UPDATE bm_document SET word_num = %s WHERE document_id = %s"
  633. # cursor.execute(update_sql, (new_word_num, doc_id))
  634. # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  635. # insert_slice_sql = "INSERT INTO slice_info (slice_id,knowledge_id,document_id,slice_text,create_time, slice_index) VALUES (%s, %s, %s, %s, %s, %s)"
  636. # cursor.execute(insert_slice_sql, (chunk_id, knowledge_id, doc_id, chunk_text, date_now, chunk_max_index+1))
  637. # else:
  638. # update_sql = f"UPDATE bm_document SET word_num = %s, slice_total = %s WHERE document_id = %s"
  639. # cursor.execute(update_sql, (new_word_num, slice_total, doc_id))
  640. # # 删除切片id对应的切片
  641. # delete_slice_sql = f"DELETE FROM slice_info where slice_id = %s"
  642. # cursor.execute(delete_slice_sql, (chunk_id, ))
  643. # update_knowledge_sql = f"UPDATE bm_knowledge SET word_num = %s WHERE knowledge_id = %s"
  644. # cursor.execute(update_knowledge_sql, (new_knowledge_word_num, knowledge_id))
  645. # connection.commit()
  646. # logger.info("bm_document和bm_knowledge数据更新成功")
  647. # return True, "success"
  648. # except Error as e:
  649. # logger.error(f"数据库操作出错:{e}")
  650. # connection.rollback()
  651. # return False, str(e)
  652. # finally:
  653. # # if cursor:
  654. # cursor.close()
  655. # # if connection and connection.is_connected():
  656. # connection.close()
  657. TABLE_NAME = "bm_document"
  658. STATUS_FIELD = "update_by"
  659. TASK_ID_FIELD = "document_id"
  660. USER_ID_FIELD = "remark"
  661. import time
  662. # ========== 全局初始化连接池(自动检测 + 超时保护) ==========
  663. class SafeMySQLPool:
  664. def __init__(self, pool_size=50, conn_timeout=10, idle_timeout=60, **mysql_config):
  665. mysql_config.setdefault("connect_timeout", conn_timeout)
  666. mysql_config.setdefault("pool_reset_session", True)
  667. self._pool = pooling.MySQLConnectionPool(
  668. pool_name="safe_mysql_pool",
  669. pool_size=pool_size,
  670. **mysql_config
  671. )
  672. # 使用 RLock 可重入锁,避免 _auto_reclaimer 调用 close 时死锁
  673. self._lock = threading.RLock()
  674. self._active_conns = {} # {id(conn): (conn, last_used_time)}
  675. self._idle_timeout = idle_timeout
  676. self._stop_event = threading.Event()
  677. threading.Thread(target=self._auto_reclaimer, daemon=True).start()
  678. def get_connection(self, timeout=10):
  679. """安全获取连接(带超时检测与追踪)"""
  680. start = time.time()
  681. while True:
  682. try:
  683. conn = self._pool.get_connection()
  684. conn.ping(reconnect=True, attempts=3, delay=2)
  685. with self._lock:
  686. self._active_conns[id(conn)] = (conn, time.time())
  687. return self._wrap_connection(conn)
  688. except errors.PoolError:
  689. if time.time() - start > timeout:
  690. raise TimeoutError(f"获取 MySQL 连接超时(超过 {timeout}s)")
  691. time.sleep(0.3)
  692. def _wrap_connection(self, conn):
  693. """包装连接对象以监控关闭事件"""
  694. pool = self
  695. orig_close = conn.close
  696. def safe_close():
  697. try:
  698. orig_close()
  699. finally:
  700. with pool._lock:
  701. pool._active_conns.pop(id(conn), None)
  702. conn.close = safe_close
  703. return conn
  704. def _auto_reclaimer(self):
  705. """后台线程自动回收超时未关闭连接"""
  706. while not self._stop_event.is_set():
  707. time.sleep(5)
  708. now = time.time()
  709. with self._lock:
  710. to_remove = []
  711. for cid, (conn, last_used) in list(self._active_conns.items()):
  712. if now - last_used > self._idle_timeout:
  713. try:
  714. conn.close()
  715. logger.warning(f"[回收] 已回收超时未关闭连接 (idle={int(now - last_used)}s)")
  716. except Exception as e:
  717. logger.error(f"[回收] 回收连接失败: {e}")
  718. to_remove.append(cid)
  719. for cid in to_remove:
  720. self._active_conns.pop(cid, None)
  721. def close_all(self):
  722. """停止守护线程并关闭所有连接"""
  723. self._stop_event.set()
  724. with self._lock:
  725. for conn, _ in self._active_conns.values():
  726. try:
  727. conn.close()
  728. except:
  729. pass
  730. self._active_conns.clear()
  731. # ========== 初始化连接池 ==========
  732. if "POOL" not in globals():
  733. try:
  734. POOL = SafeMySQLPool(pool_size=10, idle_timeout=60, **mysql_config)
  735. logger.info("MySQL 连接池初始化成功")
  736. except Error as e:
  737. logger.error(f"MySQL 连接池初始化失败: {e}")
  738. POOL = None
  739. """
  740. 雪花算法获取唯一id
  741. """
  742. import time
  743. import threading
  744. class Snowflake:
  745. def __init__(self, datacenter_id=1, worker_id=1):
  746. self.worker_id_bits = 5
  747. self.datacenter_id_bits = 5
  748. self.sequence_bits = 12
  749. self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
  750. self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
  751. self.worker_id = worker_id
  752. self.datacenter_id = datacenter_id
  753. self.sequence = 0
  754. self.worker_id_shift = self.sequence_bits
  755. self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
  756. self.timestamp_left_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
  757. self.twepoch = 1288834974657
  758. self.last_timestamp = -1
  759. self.lock = threading.Lock()
  760. def _timestamp(self):
  761. return int(time.time() * 1000)
  762. def _wait_next_ms(self, last):
  763. ts = self._timestamp()
  764. while ts <= last:
  765. ts = self._timestamp()
  766. return ts
  767. def generate_id(self):
  768. with self.lock:
  769. timestamp = self._timestamp()
  770. if timestamp < self.last_timestamp:
  771. raise Exception("Clock moved backwards, refusing to generate id")
  772. if timestamp == self.last_timestamp:
  773. self.sequence = (self.sequence + 1) & ((1 << self.sequence_bits) - 1)
  774. if self.sequence == 0:
  775. timestamp = self._wait_next_ms(timestamp)
  776. else:
  777. self.sequence = 0
  778. self.last_timestamp = timestamp
  779. snowflake_id = (
  780. ((timestamp - self.twepoch) << self.timestamp_left_shift) |
  781. (self.datacenter_id << self.datacenter_id_shift) |
  782. (self.worker_id << self.worker_id_shift) |
  783. self.sequence
  784. )
  785. # 转成固定 20 位数字
  786. return str(snowflake_id).zfill(20)
  787. # ========== 全局雪花算法 ID 生成器 ==========
  788. snowflake_id_generator = Snowflake(datacenter_id=1, worker_id=1)
  789. def generate_snowflake_id():
  790. """
  791. 全局方法:使用雪花算法生成唯一ID
  792. 返回20位数字字符串
  793. """
  794. return snowflake_id_generator.generate_id()
  795. # ========== MysqlOperate 类 ==========
  796. class MysqlOperate:
  797. def get_connection(self):
  798. """安全获取连接"""
  799. if not POOL:
  800. return None, "连接池未初始化"
  801. try:
  802. connection = POOL.get_connection(timeout=5)
  803. return connection, "success"
  804. except TimeoutError as e:
  805. logger.error(str(e))
  806. return None, "获取连接超时"
  807. except Error as e:
  808. logger.error(f"MySQL 获取连接失败: {e}")
  809. return None, str(e)
  810. def _execute_many(self, sql, values, success_msg, err_msg):
  811. """通用批量执行模板"""
  812. connection, info = self.get_connection()
  813. if not connection:
  814. return False, info
  815. cursor = None
  816. try:
  817. cursor = connection.cursor()
  818. cursor.executemany(sql, values)
  819. connection.commit()
  820. logger.info(f"*******************************\n\n{success_msg}\n\n*******************************")
  821. return True, "success"
  822. except Error as e:
  823. connection.rollback()
  824. logger.error(f"{err_msg}: {e}")
  825. return False, str(e)
  826. finally:
  827. if cursor:
  828. cursor.close()
  829. if connection:
  830. connection.close()
  831. def insert_to_slice(self, docs, knowledge_id, doc_id, tenant_id, user_id):
  832. """批量插入切片信息(同时存储 slice_text 和 old_slice_text)"""
  833. date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  834. values = [
  835. (
  836. chunk.get("chunk_id"),
  837. knowledge_id,
  838. doc_id,
  839. chunk.get("content"),
  840. chunk.get("content"), # old_slice_text 存储原始文本
  841. date_now,
  842. chunk.get("metadata", {}).get("chunk_index"),
  843. chunk.get("Chapter"),
  844. chunk.get("Father_Chapter", ""),
  845. chunk.get("bbox"),
  846. chunk.get("page"),
  847. tenant_id,
  848. user_id
  849. )
  850. for chunk in docs
  851. ]
  852. sql = """
  853. INSERT INTO slice_info (
  854. slice_id, knowledge_id, document_id, slice_text, old_slice_text, create_time, slice_index, section, parent_section, bbox, page, tenant_id, create_by
  855. ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  856. ON DUPLICATE KEY UPDATE
  857. slice_text = VALUES(slice_text),
  858. create_time = VALUES(create_time),
  859. slice_index = VALUES(slice_index),
  860. section = VALUES(section),
  861. parent_section = VALUES(parent_section),
  862. bbox = VALUES(bbox),
  863. page = VALUES(page),
  864. tenant_id = VALUES(tenant_id),
  865. create_by = VALUES(create_by)
  866. """
  867. return self._execute_many(sql, values, "批量插入切片数据成功", "插入 slice_info 出错")
  868. def delete_to_slice(self, doc_id):
  869. """删除切片"""
  870. connection, info = self.get_connection()
  871. if not connection:
  872. return False, info
  873. cursor = None
  874. try:
  875. cursor = connection.cursor()
  876. cursor.execute("DELETE FROM slice_info WHERE document_id = %s", (doc_id,))
  877. connection.commit()
  878. logger.info(f"删除 slice_info 数据成功")
  879. return True, "success"
  880. except Error as e:
  881. connection.rollback()
  882. logger.error(f"删除 slice_info 出错: {e}")
  883. return False, str(e)
  884. finally:
  885. if cursor:
  886. cursor.close()
  887. if connection:
  888. connection.close()
  889. def insert_to_image_url(self, image_dict, knowledge_id, doc_id):
  890. """插入图片映射表"""
  891. date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  892. import uuid
  893. values = [
  894. (knowledge_id, doc_id, k, "image", v, date_now, generate_snowflake_id())
  895. for k, v in image_dict.items()
  896. ]
  897. sql = """
  898. INSERT INTO bm_media_replacement (
  899. knowledge_id, document_id, origin_text, media_type, media_url, create_time, oss_id
  900. ) VALUES (%s, %s, %s, %s, %s, %s, %s)
  901. ON DUPLICATE KEY UPDATE
  902. origin_text = VALUES(origin_text),
  903. media_type = VALUES(media_type),
  904. media_url = VALUES(media_url),
  905. create_time = VALUES(create_time),
  906. oss_id = VALUES(oss_id)
  907. """
  908. return self._execute_many(sql, values, "插入 bm_media_replacement 成功", "插入 bm_media_replacement 出错")
  909. def delete_image_url(self, doc_id):
  910. """删除图片映射"""
  911. connection, info = self.get_connection()
  912. if not connection:
  913. return False, info
  914. cursor = None
  915. try:
  916. cursor = connection.cursor()
  917. cursor.execute("DELETE FROM bm_media_replacement WHERE document_id = %s", (doc_id,))
  918. connection.commit()
  919. logger.info(f"删除 bm_media_replacement 成功")
  920. return True, "success"
  921. except Error as e:
  922. connection.rollback()
  923. logger.error(f"删除 bm_media_replacement 出错: {e}")
  924. return False, str(e)
  925. finally:
  926. if cursor:
  927. cursor.close()
  928. if connection:
  929. connection.close()
  930. def update_task_status_start(self, task_id):
  931. """更新任务状态为开始(1)"""
  932. connection, info = self.get_connection()
  933. if not connection:
  934. return False, info
  935. cursor = None
  936. try:
  937. cursor = connection.cursor()
  938. sql = f"UPDATE {TABLE_NAME} SET {STATUS_FIELD} = %s WHERE {TASK_ID_FIELD} = %s"
  939. cursor.execute(sql, (1, task_id))
  940. connection.commit()
  941. logger.info(f"任务 {task_id} 状态更新为开始(1)")
  942. return True, "success"
  943. except Error as e:
  944. connection.rollback()
  945. logger.error(f"更新任务状态为开始失败: {e}")
  946. return False, str(e)
  947. finally:
  948. if cursor:
  949. cursor.close()
  950. if connection:
  951. connection.close()
  952. def update_task_status_complete(self, task_id, user_id):
  953. """更新任务状态为完成(2)并更新用户ID"""
  954. connection, info = self.get_connection()
  955. if not connection:
  956. return False, info
  957. cursor = None
  958. try:
  959. cursor = connection.cursor()
  960. sql = f"UPDATE {TABLE_NAME} SET {STATUS_FIELD} = %s, {USER_ID_FIELD} = %s WHERE {TASK_ID_FIELD} = %s"
  961. cursor.execute(sql, (2, user_id, task_id))
  962. connection.commit()
  963. logger.info(f"任务 {task_id} 状态更新为完成(2),用户ID: {user_id}")
  964. return True, "success"
  965. except Error as e:
  966. connection.rollback()
  967. logger.error(f"更新任务状态为完成失败: {e}")
  968. return False, str(e)
  969. finally:
  970. if cursor:
  971. cursor.close()
  972. if connection:
  973. connection.close()
  974. def update_task_status_error(self, task_id):
  975. """更新任务状态为错误(0)"""
  976. connection, info = self.get_connection()
  977. if not connection:
  978. return False, info
  979. cursor = None
  980. try:
  981. cursor = connection.cursor()
  982. sql = f"UPDATE {TABLE_NAME} SET {STATUS_FIELD} = %s WHERE {TASK_ID_FIELD} = %s"
  983. cursor.execute(sql, (0, task_id))
  984. connection.commit()
  985. logger.info(f"任务 {task_id} 状态更新为错误(0)")
  986. return True, "success"
  987. except Error as e:
  988. connection.rollback()
  989. logger.error(f"更新任务状态为错误失败: {e}")
  990. return False, str(e)
  991. finally:
  992. if cursor:
  993. cursor.close()
  994. if connection:
  995. connection.close()
  996. def delete_document(self, task_id):
  997. """删除 bm_document 表中的记录(取消任务前清理)"""
  998. connection, info = self.get_connection()
  999. if not connection:
  1000. return False, info
  1001. cursor = None
  1002. try:
  1003. cursor = connection.cursor()
  1004. sql = f"DELETE FROM {TABLE_NAME} WHERE {TASK_ID_FIELD} = %s"
  1005. cursor.execute(sql, (task_id,))
  1006. affected_rows = cursor.rowcount
  1007. connection.commit()
  1008. logger.info(f"删除 bm_document 记录成功: task_id={task_id}, 影响行数={affected_rows}")
  1009. return True, affected_rows
  1010. except Error as e:
  1011. connection.rollback()
  1012. logger.error(f"删除 bm_document 记录失败: {e}")
  1013. return False, str(e)
  1014. finally:
  1015. if cursor:
  1016. cursor.close()
  1017. if connection:
  1018. connection.close()
  1019. def query_parent_generation_enabled(self, doc_ids: list):
  1020. """
  1021. 查询文档的 parent_generation_enabled 字段
  1022. 返回 parent_generation_enabled=1 的 doc_id 集合
  1023. """
  1024. if not doc_ids:
  1025. return set()
  1026. connection, info = self.get_connection()
  1027. if not connection:
  1028. return set()
  1029. cursor = None
  1030. try:
  1031. cursor = connection.cursor()
  1032. placeholders = ", ".join(["%s"] * len(doc_ids))
  1033. sql = f"SELECT document_id FROM bm_document WHERE document_id IN ({placeholders}) AND parent_generation_enabled = 1"
  1034. cursor.execute(sql, tuple(doc_ids))
  1035. results = cursor.fetchall()
  1036. return {row[0] for row in results}
  1037. except Error as e:
  1038. logger.error(f"查询 parent_generation_enabled 失败: {e}")
  1039. return set()
  1040. finally:
  1041. if cursor:
  1042. cursor.close()
  1043. if connection:
  1044. connection.close()
  1045. def query_knowledge_by_ids(self, knowledge_ids: list):
  1046. """
  1047. 根据 knowledge_ids 列表查询 bm_knowledge 表中匹配的行内容
  1048. 参数:
  1049. knowledge_ids: 知识库ID列表
  1050. 返回:
  1051. list: 查询结果列表,每个元素为包含所有字段的字典
  1052. """
  1053. if not knowledge_ids:
  1054. return []
  1055. connection, info = self.get_connection()
  1056. if not connection:
  1057. logger.error(f"获取数据库连接失败: {info}")
  1058. return []
  1059. cursor = None
  1060. try:
  1061. # 返回字典
  1062. cursor = connection.cursor(dictionary=True)
  1063. placeholders = ", ".join(["%s"] * len(knowledge_ids))
  1064. sql = f"SELECT * FROM bm_knowledge WHERE knowledge_id IN ({placeholders})"
  1065. cursor.execute(sql, tuple(knowledge_ids))
  1066. results = cursor.fetchall()
  1067. logger.info(f"查询 bm_knowledge 成功,共 {len(results)} 条记录")
  1068. return results
  1069. except Error as e:
  1070. logger.error(f"查询 bm_knowledge 失败: {e}")
  1071. return []
  1072. finally:
  1073. if cursor:
  1074. cursor.close()
  1075. if connection:
  1076. connection.close()
  1077. def insert_oss_record(self, file_name, url, tenant_id="000000", file_extension="", file_size = 0):
  1078. """插入 OSS 记录到 sys_oss 表"""
  1079. connection, info = self.get_connection()
  1080. if not connection:
  1081. return False, info
  1082. cursor = None
  1083. try:
  1084. # 生成类似 "a2922173479520702464" 的 oss_id 20位
  1085. # import random
  1086. # oss_id = f"a{int(time.time() * 1000)}{random.randint(1000, 9999)}"
  1087. oss_id = Snowflake().generate_id()
  1088. create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  1089. cursor = connection.cursor()
  1090. sql = """
  1091. INSERT INTO sys_oss (oss_id, tenant_id, file_name, original_name, url, create_time, file_suffix, size)
  1092. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
  1093. """
  1094. cursor.execute(sql, (oss_id, tenant_id, file_name, file_name, url, create_time, file_extension, file_size))
  1095. connection.commit()
  1096. logger.info(f"OSS 记录插入成功: {oss_id}")
  1097. return True, oss_id
  1098. except Error as e:
  1099. connection.rollback()
  1100. logger.error(f"插入 OSS 记录失败: {e}")
  1101. return False, str(e)
  1102. finally:
  1103. if cursor:
  1104. cursor.close()
  1105. if connection:
  1106. connection.close()
  1107. # def update_total_doc_len(self, update_json, max_retries=3):
  1108. # """
  1109. # 更新长度表和文档长度表,删除/更新/插入 slice_info 表
  1110. # 支持锁冲突时自动重试
  1111. # """
  1112. # knowledge_id = update_json.get("knowledge_id")
  1113. # doc_id = update_json.get("doc_id")
  1114. # chunk_len = update_json.get("chunk_len")
  1115. # operate = update_json.get("operate")
  1116. # chunk_id = update_json.get("chunk_id")
  1117. # chunk_text = update_json.get("chunk_text", "")
  1118. # slice_index = update_json.get("slice_index")
  1119. # last_error = None
  1120. # for attempt in range(max_retries):
  1121. # connection, info = self.get_connection()
  1122. # if not connection:
  1123. # return False, info
  1124. # cursor = None
  1125. # try:
  1126. # cursor = connection.cursor()
  1127. # # 设置较短的锁等待超时,避免长时间阻塞(10秒)
  1128. # cursor.execute("SET SESSION innodb_lock_wait_timeout = 10")
  1129. # # 查询文档当前信息
  1130. # cursor.execute(
  1131. # "SELECT word_num, slice_total FROM bm_document WHERE document_id = %s",
  1132. # (doc_id,)
  1133. # )
  1134. # doc_result = cursor.fetchone()
  1135. # logger.info(f"查询到的文档长度信息:{doc_result}")
  1136. # # 查询知识库当前信息
  1137. # cursor.execute(
  1138. # "SELECT word_num FROM bm_knowledge WHERE knowledge_id = %s",
  1139. # (knowledge_id,)
  1140. # )
  1141. # knowledge_result = cursor.fetchone()
  1142. # logger.info(f"查询到的知识库总长度信息:{knowledge_result}")
  1143. # # 计算新的文档长度
  1144. # if not doc_result:
  1145. # new_word_num = chunk_len if chunk_len > 0 else 0
  1146. # slice_total = 0
  1147. # else:
  1148. # old_word_num = doc_result[0] or 0
  1149. # slice_total = doc_result[1] or 0
  1150. # new_word_num = old_word_num + chunk_len
  1151. # if operate == "delete":
  1152. # slice_total = max(0, slice_total - 1)
  1153. # # 计算新的知识库长度
  1154. # if not knowledge_result:
  1155. # new_knowledge_word_num = chunk_len if chunk_len > 0 else 0
  1156. # else:
  1157. # old_knowledge_word_num = knowledge_result[0] or 0
  1158. # new_knowledge_word_num = old_knowledge_word_num + chunk_len
  1159. # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  1160. # # 根据操作类型执行不同的 SQL
  1161. # if operate == "update":
  1162. # # # 更新文档长度
  1163. # # cursor.execute(
  1164. # # "UPDATE bm_document SET word_num = %s WHERE document_id = %s",
  1165. # # (new_word_num, doc_id)
  1166. # # )
  1167. # # 更新切片内容
  1168. # cursor.execute(
  1169. # "UPDATE slice_info SET slice_text = %s, update_time = %s WHERE slice_id = %s",
  1170. # (chunk_text, date_now, chunk_id)
  1171. # )
  1172. # elif operate == "insert":
  1173. # # 获取最大切片索引
  1174. # # cursor.execute(
  1175. # # "SELECT MAX(slice_index) FROM slice_info WHERE document_id = %s",
  1176. # # (doc_id,)
  1177. # # )
  1178. # # chunk_index_result = cursor.fetchone()[0]
  1179. # # chunk_max_index = int(chunk_index_result) if chunk_index_result else 0
  1180. # # # 更新文档长度
  1181. # # cursor.execute(
  1182. # # "UPDATE bm_document SET word_num = %s WHERE document_id = %s",
  1183. # # (new_word_num, doc_id)
  1184. # # )
  1185. # # 插入新切片
  1186. # cursor.execute(
  1187. # """INSERT INTO slice_info
  1188. # (slice_id, knowledge_id, document_id, slice_text, create_time, slice_index, old_slice_text)
  1189. # VALUES (%s, %s, %s, %s, %s, %s, %s)""",
  1190. # (chunk_id, knowledge_id, doc_id, chunk_text, date_now, int(slice_index), chunk_text)
  1191. # )
  1192. # elif operate == "delete":
  1193. # # # 更新文档长度和切片总数
  1194. # # cursor.execute(
  1195. # # "UPDATE bm_document SET word_num = %s, slice_total = %s WHERE document_id = %s",
  1196. # # (new_word_num, slice_total, doc_id)
  1197. # # )
  1198. # # 删除切片
  1199. # # cursor.execute(
  1200. # # "DELETE FROM slice_info WHERE slice_id = %s",
  1201. # # (chunk_id,)
  1202. # # )
  1203. # cursor.execute(
  1204. # "UPDATE slice_info SET del_flag = 1 WHERE slice_id = %s",
  1205. # (chunk_id,)
  1206. # )
  1207. # # # 更新知识库总长度
  1208. # # cursor.execute(
  1209. # # "UPDATE bm_knowledge SET word_num = %s WHERE knowledge_id = %s",
  1210. # # (new_knowledge_word_num, knowledge_id)
  1211. # # )
  1212. # connection.commit()
  1213. # logger.info("bm_document 和 bm_knowledge 数据更新成功")
  1214. # return True, "success"
  1215. # except Error as e:
  1216. # connection.rollback()
  1217. # last_error = str(e)
  1218. # error_code = e.errno if hasattr(e, 'errno') else None
  1219. # # 1205: Lock wait timeout exceeded
  1220. # # 1213: Deadlock found
  1221. # if error_code in (1205, 1213) and attempt < max_retries - 1:
  1222. # wait_time = (attempt + 1) * 2 # 逐渐增加等待时间:2s, 4s, 6s
  1223. # logger.warning(f"update_total_doc_len 锁冲突,第 {attempt + 1} 次重试,等待 {wait_time} 秒")
  1224. # time.sleep(wait_time)
  1225. # continue
  1226. # else:
  1227. # logger.error(f"update_total_doc_len 数据库操作出错:{e}")
  1228. # return False, last_error
  1229. # finally:
  1230. # if cursor:
  1231. # cursor.close()
  1232. # if connection:
  1233. # connection.close()
  1234. # logger.error(f"update_total_doc_len 重试 {max_retries} 次后仍然失败")
  1235. # return False, last_error
  1236. ##== update_total_doc_len ==##
  1237. # def _execute_with_retry(self, sql_func, max_retries=3):
  1238. # last_error = None
  1239. # for attempt in range(max_retries):
  1240. # conn, info = self.get_connection()
  1241. # if not conn:
  1242. # return False, info
  1243. # cursor = None
  1244. # try:
  1245. # cursor = conn.cursor()
  1246. # cursor.execute("SET SESSION innodb_lock_wait_timeout = 10")
  1247. # sql_func(cursor)
  1248. # conn.commit()
  1249. # return True, "success"
  1250. # except Error as e:
  1251. # conn.rollback()
  1252. # last_error = str(e)
  1253. # errno = getattr(e, "errno", None)
  1254. # if errno in (1205, 1213) and attempt < max_retries - 1:
  1255. # wait = (attempt + 1) * 2
  1256. # logger.warning(f"锁冲突重试 {attempt + 1} 次,等待 {wait}s")
  1257. # time.sleep(wait)
  1258. # continue
  1259. # else:
  1260. # logger.error(f"SQL 执行失败: {e}")
  1261. # return False, last_error
  1262. # finally:
  1263. # if cursor:
  1264. # cursor.close()
  1265. # conn.close()
  1266. # return False, last_error
  1267. # def update_slice_insert(self, data):
  1268. # def _sql(cursor):
  1269. # cursor.execute(
  1270. # """
  1271. # INSERT INTO slice_info
  1272. # (slice_id, knowledge_id, document_id, slice_text,
  1273. # create_time, slice_index, old_slice_text)
  1274. # VALUES (%s,%s,%s,%s,%s,%s,%s)
  1275. # """,
  1276. # (
  1277. # data["chunk_id"],
  1278. # data["knowledge_id"],
  1279. # data["doc_id"],
  1280. # data["chunk_text"],
  1281. # data["date_now"],
  1282. # int(data["slice_index"]),
  1283. # data["chunk_text"],
  1284. # ),
  1285. # )
  1286. # return self._execute_with_retry(_sql)
  1287. # def update_slice_update(self, data):
  1288. # def _sql(cursor):
  1289. # cursor.execute(
  1290. # """
  1291. # UPDATE slice_info
  1292. # SET slice_text = %s, update_time = %s
  1293. # WHERE slice_id = %s
  1294. # """,
  1295. # (data["chunk_text"], data["date_now"], data["chunk_id"]),
  1296. # )
  1297. # return self._execute_with_retry(_sql)
  1298. # def update_slice_delete(self, data):
  1299. # def _sql(cursor):
  1300. # cursor.execute(
  1301. # "UPDATE slice_info SET del_flag = 1 WHERE slice_id = %s",
  1302. # (data["chunk_id"],),
  1303. # )
  1304. # return self._execute_with_retry(_sql)
  1305. # def update_document_word_num(self, doc_id, delta_len, slice_delta=0):
  1306. # def _sql(cursor):
  1307. # cursor.execute(
  1308. # """
  1309. # UPDATE bm_document
  1310. # SET word_num = word_num + %s,
  1311. # slice_total = slice_total + %s
  1312. # WHERE document_id = %s
  1313. # """,
  1314. # (delta_len, slice_delta, doc_id),
  1315. # )
  1316. # return self._execute_with_retry(_sql)
  1317. # def update_knowledge_word_num(self, knowledge_id, delta_len):
  1318. # def _sql(cursor):
  1319. # cursor.execute(
  1320. # """
  1321. # UPDATE bm_knowledge
  1322. # SET word_num = word_num + %s
  1323. # WHERE knowledge_id = %s
  1324. # """,
  1325. # (delta_len, knowledge_id),
  1326. # )
  1327. # return self._execute_with_retry(_sql)
  1328. # def update_total_doc_len(self, update_json):
  1329. # operate = update_json["operate"]
  1330. # delta_len = update_json["chunk_len"]
  1331. # update_json["date_now"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  1332. # if operate == "insert":
  1333. # self.update_slice_insert(update_json)
  1334. # # self.update_document_word_num(update_json["doc_id"], delta_len, 1)
  1335. # # self.update_knowledge_word_num(update_json["knowledge_id"], delta_len)
  1336. # elif operate == "update":
  1337. # self.update_slice_update(update_json)
  1338. # elif operate == "delete":
  1339. # self.update_slice_delete(update_json)
  1340. # # self.update_document_word_num(update_json["doc_id"], -delta_len, -1)
  1341. # # self.update_knowledge_word_num(update_json["knowledge_id"], -delta_len)
  1342. # return True, "success"
  1343. # logger = logging.getLogger(__name__)
  1344. # class DBOperator:
  1345. def _execute_with_retry(self, sql_func, max_retries=3):
  1346. last_error = None
  1347. for attempt in range(max_retries):
  1348. conn, info = self.get_connection()
  1349. if not conn:
  1350. return False, info
  1351. cursor = None
  1352. try:
  1353. # 明确开启 autocommit,避免隐式事务
  1354. conn.autocommit = True
  1355. cursor = conn.cursor()
  1356. # 会话参数在事务外设置
  1357. cursor.execute("SET SESSION innodb_lock_wait_timeout = 10")
  1358. # 显式开启事务(最小范围)
  1359. conn.start_transaction()
  1360. sql_func(cursor)
  1361. conn.commit()
  1362. return True, "success"
  1363. except Error as e:
  1364. last_error = str(e)
  1365. errno = getattr(e, "errno", None)
  1366. # 出错第一时间 rollback
  1367. try:
  1368. conn.rollback()
  1369. except Exception:
  1370. pass
  1371. # retry 前 close 连接
  1372. try:
  1373. cursor and cursor.close()
  1374. finally:
  1375. conn.close()
  1376. if errno in (1205, 1213) and attempt < max_retries - 1:
  1377. wait = (attempt + 1) * 2
  1378. logger.warning(
  1379. f"锁冲突重试 {attempt + 1}/{max_retries},{wait}s 后重试"
  1380. )
  1381. time.sleep(wait)
  1382. continue
  1383. else:
  1384. logger.error(f"SQL 执行失败: {e}")
  1385. return False, last_error
  1386. finally:
  1387. # 正常路径兜底关闭(异常已提前 close)
  1388. try:
  1389. cursor and cursor.close()
  1390. except Exception:
  1391. pass
  1392. try:
  1393. conn.close()
  1394. except Exception:
  1395. pass
  1396. return False, last_error
  1397. # ------------------ 业务 SQL ------------------
  1398. def update_slice_insert(self, data):
  1399. def _sql(cursor):
  1400. cursor.execute(
  1401. """
  1402. INSERT INTO slice_info
  1403. (slice_id, knowledge_id, document_id, slice_text,
  1404. create_time, slice_index, old_slice_text)
  1405. VALUES (%s,%s,%s,%s,%s,%s,%s)
  1406. """,
  1407. (
  1408. data["chunk_id"],
  1409. data["knowledge_id"],
  1410. data["doc_id"],
  1411. data["chunk_text"],
  1412. data["date_now"],
  1413. int(data["slice_index"]),
  1414. data["chunk_text"],
  1415. ),
  1416. )
  1417. return self._execute_with_retry(_sql)
  1418. def update_slice_update(self, data):
  1419. def _sql(cursor):
  1420. cursor.execute(
  1421. """
  1422. UPDATE slice_info
  1423. SET slice_text = %s,
  1424. update_time = %s
  1425. WHERE slice_id = %s
  1426. """,
  1427. (
  1428. data["chunk_text"],
  1429. data["date_now"],
  1430. data["chunk_id"],
  1431. ),
  1432. )
  1433. return self._execute_with_retry(_sql)
  1434. def update_slice_delete(self, data):
  1435. def _sql(cursor):
  1436. cursor.execute(
  1437. """
  1438. UPDATE slice_info
  1439. SET del_flag = 1
  1440. WHERE slice_id = %s
  1441. """,
  1442. (data["chunk_id"],),
  1443. )
  1444. return self._execute_with_retry(_sql)
  1445. def batch_update_slice_delete(self, chunk_ids: list):
  1446. """
  1447. 批量标记切片为删除状态
  1448. 参数:
  1449. chunk_ids: 切片ID列表
  1450. 返回:
  1451. (success, message)
  1452. """
  1453. if not chunk_ids:
  1454. return True, "success"
  1455. def _sql(cursor):
  1456. # 构造 IN 表达式
  1457. placeholders = ",".join(["%s"] * len(chunk_ids))
  1458. cursor.execute(
  1459. f"""
  1460. UPDATE slice_info
  1461. SET del_flag = 1
  1462. WHERE slice_id IN ({placeholders})
  1463. """,
  1464. tuple(chunk_ids)
  1465. )
  1466. return self._execute_with_retry(_sql)
  1467. def update_document_word_num(self, doc_id, delta_len, slice_delta=0):
  1468. def _sql(cursor):
  1469. cursor.execute(
  1470. """
  1471. UPDATE bm_document
  1472. SET word_num = word_num + %s,
  1473. slice_total = slice_total + %s
  1474. WHERE document_id = %s
  1475. """,
  1476. (delta_len, slice_delta, doc_id),
  1477. )
  1478. return self._execute_with_retry(_sql)
  1479. def update_knowledge_word_num(self, knowledge_id, delta_len):
  1480. def _sql(cursor):
  1481. cursor.execute(
  1482. """
  1483. UPDATE bm_knowledge
  1484. SET word_num = word_num + %s
  1485. WHERE knowledge_id = %s
  1486. """,
  1487. (delta_len, knowledge_id),
  1488. )
  1489. return self._execute_with_retry(_sql)
  1490. def update_total_doc_len(self, update_json):
  1491. operate = update_json["operate"]
  1492. delta_len = update_json["chunk_len"]
  1493. update_json["date_now"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  1494. if operate == "insert":
  1495. return self.update_slice_insert(update_json)
  1496. elif operate == "update":
  1497. return self.update_slice_update(update_json)
  1498. elif operate == "delete":
  1499. return self.update_slice_delete(update_json)
  1500. return True, "success"
  1501. def query_slice_info_by_doc_ids(self, knowledge_id: str, doc_ids: list):
  1502. """
  1503. 根据 knowledge_id 和 doc_ids 列表查询 slice_info 表中的数据
  1504. 参数:
  1505. knowledge_id: 知识库ID
  1506. doc_ids: 文档ID列表
  1507. 返回:
  1508. (success, data_or_error): 成功时返回数据列表,失败时返回错误信息
  1509. """
  1510. if not doc_ids:
  1511. return True, []
  1512. connection, info = self.get_connection()
  1513. if not connection:
  1514. return False, info
  1515. cursor = None
  1516. try:
  1517. cursor = connection.cursor(dictionary=True)
  1518. placeholders = ','.join(['%s'] * len(doc_ids))
  1519. sql = f"""
  1520. SELECT slice_id, knowledge_id, document_id, slice_text, old_slice_text,
  1521. create_time, update_time, slice_index
  1522. FROM slice_info
  1523. WHERE knowledge_id = %s AND document_id IN ({placeholders})
  1524. """
  1525. cursor.execute(sql, (knowledge_id, *doc_ids))
  1526. results = cursor.fetchall()
  1527. logger.info(f"查询 slice_info 成功,共 {len(results)} 条记录")
  1528. return True, results
  1529. except Error as e:
  1530. logger.error(f"查询 slice_info 出错: {e}")
  1531. return False, str(e)
  1532. finally:
  1533. if cursor:
  1534. cursor.close()
  1535. if connection:
  1536. connection.close()
  1537. def query_media_replacement_by_doc_ids(self, knowledge_id: str, doc_ids: list):
  1538. """
  1539. 根据 knowledge_id 和 doc_ids 列表查询 bm_media_replacement 表中的数据
  1540. 参数:
  1541. knowledge_id: 知识库ID
  1542. doc_ids: 文档ID列表
  1543. 返回:
  1544. (success, data_or_error): 成功时返回数据列表,失败时返回错误信息
  1545. """
  1546. if not doc_ids:
  1547. return True, []
  1548. connection, info = self.get_connection()
  1549. if not connection:
  1550. return False, info
  1551. cursor = None
  1552. try:
  1553. cursor = connection.cursor(dictionary=True)
  1554. placeholders = ','.join(['%s'] * len(doc_ids))
  1555. sql = f"""
  1556. SELECT knowledge_id, document_id, origin_text, media_type, media_url, create_time
  1557. FROM bm_media_replacement
  1558. WHERE knowledge_id = %s AND document_id IN ({placeholders})
  1559. """
  1560. cursor.execute(sql, (knowledge_id, *doc_ids))
  1561. results = cursor.fetchall()
  1562. logger.info(f"查询 bm_media_replacement 成功,共 {len(results)} 条记录")
  1563. return True, results
  1564. except Error as e:
  1565. logger.error(f"查询 bm_media_replacement 出错: {e}")
  1566. return False, str(e)
  1567. finally:
  1568. if cursor:
  1569. cursor.close()
  1570. if connection:
  1571. connection.close()
  1572. def query_bm_document_by_doc_ids(self, knowledge_id: str, doc_ids: list):
  1573. """
  1574. 根据 knowledge_id 和 doc_ids 列表查询 bm_document 表中的数据
  1575. 参数:
  1576. knowledge_id: 知识库ID
  1577. doc_ids: 文档ID列表
  1578. 返回:
  1579. (success, data_or_error): 成功时返回数据列表,失败时返回错误信息
  1580. """
  1581. if not doc_ids:
  1582. return True, []
  1583. connection, info = self.get_connection()
  1584. if not connection:
  1585. return False, info
  1586. cursor = None
  1587. try:
  1588. cursor = connection.cursor(dictionary=True)
  1589. placeholders = ','.join(['%s'] * len(doc_ids))
  1590. sql = f"""
  1591. SELECT * FROM bm_document
  1592. WHERE knowledge_id = %s AND document_id IN ({placeholders})
  1593. """
  1594. cursor.execute(sql, (knowledge_id, *doc_ids))
  1595. results = cursor.fetchall()
  1596. logger.info(f"查询 bm_document 成功,共 {len(results)} 条记录")
  1597. return True, results
  1598. except Error as e:
  1599. logger.error(f"查询 bm_document 出错: {e}")
  1600. return False, str(e)
  1601. finally:
  1602. if cursor:
  1603. cursor.close()
  1604. if connection:
  1605. connection.close()
  1606. def copy_docs_metadata_to_new_knowledge(self, source_knowledge_id: str, source_doc_ids: list, new_knowledge_id: str, doc_id_mapping: dict = None, chunk_id_mapping: dict = None, tenant_id: str = "000000"):
  1607. """
  1608. 复制文档元数据到新知识库(bm_document、slice_info 和 bm_media_replacement)
  1609. 参数:
  1610. source_knowledge_id: 源知识库ID
  1611. source_doc_ids: 源文档ID列表
  1612. new_knowledge_id: 新的知识库ID
  1613. doc_id_mapping: 旧文档ID到新文档ID的映射 {old_doc_id: new_doc_id}
  1614. 如果不提供,将为每个文档生成新的雪花算法ID
  1615. chunk_id_mapping: 旧切片ID到新切片ID的映射 {old_chunk_id: new_chunk_id}
  1616. 用于 slice_id,保持与向量库一致
  1617. tenant_id: 租户ID(由前端传入)
  1618. 返回:
  1619. (success, result_dict): result_dict 包含复制结果详情
  1620. """
  1621. if not source_doc_ids:
  1622. return True, {"message": "无需复制的文档", "doc_id_mapping": {}}
  1623. # 如果没有提供映射,为每个源文档生成新的 doc_id
  1624. if doc_id_mapping is None:
  1625. doc_id_mapping = {}
  1626. for old_doc_id in source_doc_ids:
  1627. doc_id_mapping[old_doc_id] = generate_snowflake_id()
  1628. # 1. 查询源 slice_info 数据
  1629. success, slice_data = self.query_slice_info_by_doc_ids(source_knowledge_id, source_doc_ids)
  1630. if not success:
  1631. return False, {"error": f"查询切片数据失败: {slice_data}"}
  1632. # 2. 查询源 bm_media_replacement 数据
  1633. success, media_data = self.query_media_replacement_by_doc_ids(source_knowledge_id, source_doc_ids)
  1634. if not success:
  1635. return False, {"error": f"查询媒体映射失败: {media_data}"}
  1636. # 3. 查询源 bm_document 数据
  1637. success, doc_data = self.query_bm_document_by_doc_ids(source_knowledge_id, source_doc_ids)
  1638. if not success:
  1639. return False, {"error": f"查询文档数据失败: {doc_data}"}
  1640. connection, info = self.get_connection()
  1641. if not connection:
  1642. return False, {"error": info}
  1643. cursor = None
  1644. try:
  1645. cursor = connection.cursor()
  1646. date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  1647. slice_count = 0
  1648. media_count = 0
  1649. doc_count = 0
  1650. # 4. 插入 bm_document
  1651. if doc_data:
  1652. # 定义所有字段及特殊处理
  1653. all_fields = [
  1654. 'document_id', 'knowledge_id', 'custom_separator', 'sentence_size', 'length',
  1655. 'word_num', 'slice_total', 'name', 'url', 'parse_image', 'tenant_id',
  1656. 'create_dept', 'create_by', 'create_time', 'update_by', 'update_time',
  1657. 'remark', 'parsing_type', 'oss_id', 'status', 'mark_oss_id', 'mark_url',
  1658. 'ref_document_id', 'qa_checked', 'related_questions_enabled',
  1659. 'summary_generation_enabled', 'parent_generation_enabled', 'suffix', 'pdf_url'
  1660. ]
  1661. doc_values = []
  1662. for row in doc_data:
  1663. old_doc_id = row['document_id']
  1664. new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
  1665. # 特殊处理字段
  1666. special = {
  1667. 'document_id': new_doc_id,
  1668. 'knowledge_id': new_knowledge_id,
  1669. 'tenant_id': tenant_id,
  1670. 'create_time': date_now,
  1671. 'ref_document_id': old_doc_id
  1672. }
  1673. doc_values.append(tuple(special.get(f, row.get(f)) for f in all_fields))
  1674. placeholders = ','.join(['%s'] * len(all_fields))
  1675. doc_sql = f"INSERT INTO bm_document ({','.join(all_fields)}) VALUES ({placeholders})"
  1676. cursor.executemany(doc_sql, doc_values)
  1677. doc_count = len(doc_values)
  1678. logger.info(f"插入文档数据成功: {doc_count} 条")
  1679. # 5. 插入 slice_info(使用 old_slice_text 存储原始文本)
  1680. if slice_data:
  1681. slice_values = []
  1682. for row in slice_data:
  1683. old_doc_id = row['document_id']
  1684. old_slice_id = row['slice_id']
  1685. new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
  1686. # 使用向量库中的 chunk_id 作为 slice_id(保持一致)
  1687. new_slice_id = chunk_id_mapping.get(old_slice_id, generate_snowflake_id()) if chunk_id_mapping else generate_snowflake_id()
  1688. # old_slice_text 存储原始文本
  1689. old_slice_text = row.get('old_slice_text') or row['slice_text']
  1690. slice_values.append((
  1691. new_slice_id,
  1692. new_knowledge_id,
  1693. new_doc_id,
  1694. row['slice_text'],
  1695. old_slice_text,
  1696. date_now,
  1697. row.get('slice_index')
  1698. ))
  1699. slice_sql = """
  1700. INSERT INTO slice_info (
  1701. slice_id, knowledge_id, document_id, slice_text, old_slice_text, create_time, slice_index
  1702. ) VALUES (%s, %s, %s, %s, %s, %s, %s)
  1703. """
  1704. cursor.executemany(slice_sql, slice_values)
  1705. slice_count = len(slice_values)
  1706. logger.info(f"插入切片数据成功: {slice_count} 条")
  1707. # 6. 插入 bm_media_replacement
  1708. if media_data:
  1709. media_values = []
  1710. for row in media_data:
  1711. old_doc_id = row['document_id']
  1712. new_doc_id = doc_id_mapping.get(old_doc_id, generate_snowflake_id())
  1713. media_values.append((
  1714. new_knowledge_id,
  1715. new_doc_id,
  1716. row['origin_text'],
  1717. row['media_type'],
  1718. row['media_url'],
  1719. date_now
  1720. ))
  1721. media_sql = """
  1722. INSERT INTO bm_media_replacement (
  1723. knowledge_id, document_id, origin_text, media_type, media_url, create_time
  1724. ) VALUES (%s, %s, %s, %s, %s, %s)
  1725. """
  1726. cursor.executemany(media_sql, media_values)
  1727. media_count = len(media_values)
  1728. logger.info(f"插入图片映射数据成功: {media_count} 条")
  1729. connection.commit()
  1730. return True, {
  1731. "message": "复制元数据成功",
  1732. "doc_id_mapping": doc_id_mapping,
  1733. "doc_count": doc_count,
  1734. "slice_count": slice_count,
  1735. "media_count": media_count
  1736. }
  1737. except Error as e:
  1738. connection.rollback()
  1739. logger.error(f"复制文档元数据失败: {e}")
  1740. return False, {"error": str(e)}
  1741. finally:
  1742. if cursor:
  1743. cursor.close()
  1744. if connection:
  1745. connection.close()
  1746. def copy_single_doc_metadata_for_document_copy(self, source_knowledge_id: str, source_doc_id: str,
  1747. new_knowledge_id: str, new_doc_id: str,
  1748. chunk_id_mapping: dict = None, tenant_id: str = "000000"):
  1749. """
  1750. 单文档复制模式:复制文档元数据到新知识库(bm_document、slice_info 和 bm_media_replacement)
  1751. 与 copy_docs_metadata_to_new_knowledge 的区别:
  1752. - 针对单个文档
  1753. - slice_info 的 slice_text 和 old_slice_text 都使用旧文档的 old_slice_text
  1754. 参数:
  1755. source_knowledge_id: 源知识库ID
  1756. source_doc_id: 源文档ID
  1757. new_knowledge_id: 新的知识库ID
  1758. new_doc_id: 新的文档ID(由前端传入)
  1759. chunk_id_mapping: 旧切片ID到新切片ID的映射 {old_chunk_id: new_chunk_id}
  1760. tenant_id: 租户ID
  1761. 返回:
  1762. (success, result_dict): result_dict 包含复制结果详情
  1763. """
  1764. source_doc_ids = [source_doc_id]
  1765. # 1. 查询源 slice_info 数据
  1766. success, slice_data = self.query_slice_info_by_doc_ids(source_knowledge_id, source_doc_ids)
  1767. if not success:
  1768. return False, {"error": f"查询切片数据失败: {slice_data}"}
  1769. # 2. 查询源 bm_media_replacement 数据
  1770. success, media_data = self.query_media_replacement_by_doc_ids(source_knowledge_id, source_doc_ids)
  1771. if not success:
  1772. return False, {"error": f"查询媒体映射失败: {media_data}"}
  1773. # 3. 查询源 bm_document 数据
  1774. success, doc_data = self.query_bm_document_by_doc_ids(source_knowledge_id, source_doc_ids)
  1775. if not success:
  1776. return False, {"error": f"查询文档数据失败: {doc_data}"}
  1777. connection, info = self.get_connection()
  1778. if not connection:
  1779. return False, {"error": info}
  1780. cursor = None
  1781. try:
  1782. cursor = connection.cursor()
  1783. date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  1784. slice_count = 0
  1785. media_count = 0
  1786. doc_count = 0
  1787. # 4. 更新 bm_document(根据 document_id 更新)
  1788. if doc_data:
  1789. # 需要更新的字段(排除 document_id)
  1790. update_fields = [
  1791. 'custom_separator', 'sentence_size', 'length',
  1792. 'word_num', 'slice_total', 'url', 'parse_image',
  1793. 'remark', 'parsing_type', 'oss_id', 'mark_oss_id', 'mark_url',
  1794. 'ref_document_id', 'qa_checked', 'related_questions_enabled',
  1795. 'summary_generation_enabled', 'parent_generation_enabled', 'suffix', 'pdf_url',
  1796. 'update_time'
  1797. ]
  1798. for row in doc_data:
  1799. old_doc_id = row['document_id']
  1800. # 构建更新值
  1801. update_values = []
  1802. set_clauses = []
  1803. for field in update_fields:
  1804. if field == 'ref_document_id':
  1805. update_values.append(old_doc_id)
  1806. elif field == 'update_time':
  1807. update_values.append(date_now)
  1808. else:
  1809. update_values.append(row.get(field))
  1810. set_clauses.append(f"{field} = %s")
  1811. # 添加 WHERE 条件的值
  1812. update_values.append(new_doc_id)
  1813. update_sql = f"UPDATE bm_document SET {', '.join(set_clauses)} WHERE document_id = %s"
  1814. cursor.execute(update_sql, tuple(update_values))
  1815. doc_count = cursor.rowcount
  1816. logger.info(f"[文档复制模式] 更新文档数据成功: {doc_count} 条")
  1817. # 5. 插入 slice_info(特殊处理:slice_text 和 old_slice_text 都使用旧文档的 old_slice_text)
  1818. lightrag_data = []
  1819. if slice_data:
  1820. slice_values = []
  1821. for row in slice_data:
  1822. old_slice_id = row['slice_id']
  1823. new_slice_id = chunk_id_mapping.get(old_slice_id, generate_snowflake_id()) if chunk_id_mapping else generate_snowflake_id()
  1824. # 关键区别:slice_text 和 old_slice_text 都使用旧文档的 old_slice_text
  1825. original_text = row.get('old_slice_text') or row['slice_text']
  1826. lightrag_data.append(original_text)
  1827. slice_values.append((
  1828. new_slice_id,
  1829. new_knowledge_id,
  1830. new_doc_id,
  1831. original_text, # slice_text 使用 old_slice_text
  1832. original_text, # old_slice_text 使用 old_slice_text
  1833. date_now,
  1834. row.get('slice_index')
  1835. ))
  1836. slice_sql = """
  1837. INSERT INTO slice_info (
  1838. slice_id, knowledge_id, document_id, slice_text, old_slice_text, create_time, slice_index
  1839. ) VALUES (%s, %s, %s, %s, %s, %s, %s)
  1840. """
  1841. cursor.executemany(slice_sql, slice_values)
  1842. slice_count = len(slice_values)
  1843. logger.info(f"[文档复制模式] 插入切片数据成功: {slice_count} 条")
  1844. # 6. 插入 bm_media_replacement
  1845. if media_data:
  1846. media_values = []
  1847. for row in media_data:
  1848. media_values.append((
  1849. new_knowledge_id,
  1850. new_doc_id,
  1851. row['origin_text'],
  1852. row['media_type'],
  1853. row['media_url'],
  1854. date_now
  1855. ))
  1856. media_sql = """
  1857. INSERT INTO bm_media_replacement (
  1858. knowledge_id, document_id, origin_text, media_type, media_url, create_time
  1859. ) VALUES (%s, %s, %s, %s, %s, %s)
  1860. """
  1861. cursor.executemany(media_sql, media_values)
  1862. media_count = len(media_values)
  1863. logger.info(f"[文档复制模式] 插入图片映射数据成功: {media_count} 条")
  1864. connection.commit()
  1865. result = {
  1866. "new_knowledge_id": new_knowledge_id,
  1867. "new_doc_id": new_doc_id,
  1868. "lightrag_data": lightrag_data
  1869. }
  1870. return True, {
  1871. "message": "文档复制模式:复制元数据成功",
  1872. "doc_count": doc_count,
  1873. "slice_count": slice_count,
  1874. "media_count": media_count,
  1875. "result": result
  1876. }
  1877. except Error as e:
  1878. connection.rollback()
  1879. logger.error(f"[文档复制模式] 复制文档元数据失败: {e}")
  1880. return False, {"error": str(e)}
  1881. finally:
  1882. if cursor:
  1883. cursor.close()
  1884. if connection:
  1885. connection.close()
  1886. def query_slice_by_id(self, knowledge_id: str, slice_id: str):
  1887. """
  1888. 根据 knowledge_id 和 slice_id 查询单个切片数据
  1889. """
  1890. connection, info = self.get_connection()
  1891. if not connection:
  1892. return False, info
  1893. cursor = None
  1894. try:
  1895. cursor = connection.cursor(dictionary=True)
  1896. sql = """
  1897. SELECT slice_id, knowledge_id, document_id, slice_text, old_slice_text, slice_index
  1898. FROM slice_info WHERE knowledge_id = %s AND slice_id = %s
  1899. """
  1900. cursor.execute(sql, (knowledge_id, slice_id))
  1901. results = cursor.fetchall()
  1902. return True, results
  1903. except Error as e:
  1904. logger.error(f"查询 slice_info 出错: {e}")
  1905. return False, str(e)
  1906. finally:
  1907. if cursor:
  1908. cursor.close()
  1909. if connection:
  1910. connection.close()
  1911. def query_slice_revision_info_by_slice_ids(self, knowledge_id: str, slice_ids: list):
  1912. """批量查询切片的修订/废弃信息及展示所需字段"""
  1913. if not slice_ids:
  1914. return True, []
  1915. connection, info = self.get_connection()
  1916. if not connection:
  1917. return False, info
  1918. cursor = None
  1919. try:
  1920. cursor = connection.cursor(dictionary=True)
  1921. placeholders = ",".join(["%s"] * len(slice_ids))
  1922. sql = f"""
  1923. SELECT slice_id, knowledge_id, document_id, slice_text, section, parent_section,
  1924. revision_status, ref_slice_id
  1925. FROM slice_info
  1926. WHERE knowledge_id = %s AND slice_id IN ({placeholders})
  1927. """
  1928. cursor.execute(sql, (knowledge_id, *slice_ids))
  1929. return True, cursor.fetchall()
  1930. except Error as e:
  1931. logger.error(f"批量查询 slice_info 修订字段出错: {e}")
  1932. try:
  1933. cursor = connection.cursor(dictionary=True)
  1934. placeholders = ",".join(["%s"] * len(slice_ids))
  1935. sql = f"""
  1936. SELECT slice_id, knowledge_id, document_id, slice_text,
  1937. revision_status, ref_slice_id
  1938. FROM slice_info
  1939. WHERE knowledge_id = %s AND slice_id IN ({placeholders})
  1940. """
  1941. cursor.execute(sql, (knowledge_id, *slice_ids))
  1942. rows = cursor.fetchall()
  1943. for r in rows:
  1944. r["section"] = None
  1945. r["parent_section"] = None
  1946. return True, rows
  1947. except Error as e1:
  1948. logger.error(f"批量查询 slice_info 修订字段(无section)也失败: {e1}")
  1949. try:
  1950. cursor = connection.cursor(dictionary=True)
  1951. placeholders = ",".join(["%s"] * len(slice_ids))
  1952. sql = f"""
  1953. SELECT slice_id, knowledge_id, document_id, slice_text
  1954. FROM slice_info
  1955. WHERE knowledge_id = %s AND slice_id IN ({placeholders})
  1956. """
  1957. cursor.execute(sql, (knowledge_id, *slice_ids))
  1958. rows = cursor.fetchall()
  1959. for r in rows:
  1960. r["section"] = None
  1961. r["parent_section"] = None
  1962. r["revision_status"] = None
  1963. r["ref_slice_id"] = None
  1964. return True, rows
  1965. except Error as e2:
  1966. logger.error(f"批量查询 slice_info 兜底也失败: {e2}")
  1967. return False, str(e2)
  1968. finally:
  1969. if cursor:
  1970. cursor.close()
  1971. if connection:
  1972. connection.close()
  1973. def query_slice_revision_info_by_slice_ids_any(self, slice_ids: list):
  1974. if not slice_ids:
  1975. return True, []
  1976. connection, info = self.get_connection()
  1977. if not connection:
  1978. return False, info
  1979. cursor = None
  1980. try:
  1981. cursor = connection.cursor(dictionary=True)
  1982. placeholders = ",".join(["%s"] * len(slice_ids))
  1983. sql = f"""
  1984. SELECT slice_id, knowledge_id, document_id, slice_text, section, parent_section,
  1985. revision_status, ref_slice_id, revision_slice_text
  1986. FROM slice_info
  1987. WHERE slice_id IN ({placeholders})
  1988. """
  1989. cursor.execute(sql, tuple(slice_ids))
  1990. return True, cursor.fetchall()
  1991. except Error as e:
  1992. logger.error(f"批量查询 slice_info 修订字段(不含knowledge)出错: {e}")
  1993. try:
  1994. cursor = connection.cursor(dictionary=True)
  1995. placeholders = ",".join(["%s"] * len(slice_ids))
  1996. sql = f"""
  1997. SELECT slice_id, knowledge_id, document_id, slice_text,
  1998. revision_status, ref_slice_id, revision_slice_text
  1999. FROM slice_info
  2000. WHERE slice_id IN ({placeholders})
  2001. """
  2002. cursor.execute(sql, tuple(slice_ids))
  2003. rows = cursor.fetchall()
  2004. for r in rows:
  2005. r["section"] = None
  2006. r["parent_section"] = None
  2007. r["revision_slice_text"] = None
  2008. return True, rows
  2009. except Error as e1:
  2010. logger.error(f"批量查询 slice_info 修订字段(不含knowledge,无section)也失败: {e1}")
  2011. try:
  2012. cursor = connection.cursor(dictionary=True)
  2013. placeholders = ",".join(["%s"] * len(slice_ids))
  2014. sql = f"""
  2015. SELECT slice_id, knowledge_id, document_id, slice_text
  2016. FROM slice_info
  2017. WHERE slice_id IN ({placeholders})
  2018. """
  2019. cursor.execute(sql, tuple(slice_ids))
  2020. rows = cursor.fetchall()
  2021. for r in rows:
  2022. r["section"] = None
  2023. r["parent_section"] = None
  2024. r["revision_status"] = None
  2025. r["ref_slice_id"] = None
  2026. r["revision_slice_text"] = None
  2027. return True, rows
  2028. except Error as e2:
  2029. logger.error(f"批量查询 slice_info(不含knowledge)兜底也失败: {e2}")
  2030. return False, str(e2)
  2031. finally:
  2032. if cursor:
  2033. cursor.close()
  2034. if connection:
  2035. connection.close()
  2036. def query_document_names_by_document_ids(self, document_ids: list):
  2037. if not document_ids:
  2038. return True, {}
  2039. connection, info = self.get_connection()
  2040. if not connection:
  2041. return False, info
  2042. cursor = None
  2043. try:
  2044. cursor = connection.cursor(dictionary=True)
  2045. placeholders = ",".join(["%s"] * len(document_ids))
  2046. sql = f"""
  2047. SELECT document_id, name
  2048. FROM bm_document
  2049. WHERE document_id IN ({placeholders})
  2050. """
  2051. cursor.execute(sql, tuple(document_ids))
  2052. rows = cursor.fetchall()
  2053. result = {}
  2054. for r in rows:
  2055. if r.get("document_id"):
  2056. result[r["document_id"]] = r.get("name")
  2057. return True, result
  2058. except Error as e:
  2059. logger.error(f"批量查询 bm_document.name 出错: {e}")
  2060. return False, str(e)
  2061. finally:
  2062. if cursor:
  2063. cursor.close()
  2064. if connection:
  2065. connection.close()
  2066. def query_slice_by_knowledge_and_doc(self, knowledge_id: str, document_id: str):
  2067. """
  2068. 根据 knowledge_id 和 document_id 查询 slice_info 表中的切片数据
  2069. """
  2070. connection, info = self.get_connection()
  2071. if not connection:
  2072. return False, info
  2073. cursor = None
  2074. try:
  2075. cursor = connection.cursor(dictionary=True)
  2076. sql = """
  2077. SELECT slice_id, knowledge_id, document_id, slice_text, old_slice_text, slice_index
  2078. FROM slice_info
  2079. WHERE knowledge_id = %s AND document_id = %s
  2080. """
  2081. cursor.execute(sql, (knowledge_id, document_id))
  2082. results = cursor.fetchall()
  2083. logger.info(f"查询 slice_info 成功,共 {len(results)} 条记录")
  2084. return True, results
  2085. except Error as e:
  2086. logger.error(f"查询 slice_info 出错: {e}")
  2087. return False, str(e)
  2088. finally:
  2089. if cursor:
  2090. cursor.close()
  2091. if connection:
  2092. connection.close()
  2093. # def update_slice_llm_fields(self, knowledge_id: str, slice_id: str, qa: str = None, question: str = None, summary: str = None):
  2094. # """
  2095. # 更新 slice_info 表中的 qa、question、summary 字段,值为 None 的字段不更新
  2096. # 基于 knowledge_id 和 slice_id 的包含关系进行更新
  2097. # """
  2098. # connection, info = self.get_connection()
  2099. # if not connection:
  2100. # return False, info
  2101. # cursor = None
  2102. # try:
  2103. # # 动态构建 SET 子句,只更新非 None 字段
  2104. # set_parts = []
  2105. # params = []
  2106. # if qa is not None:
  2107. # set_parts.append("qa = %s")
  2108. # params.append(qa)
  2109. # if question is not None:
  2110. # set_parts.append("question = %s")
  2111. # params.append(question)
  2112. # if summary is not None:
  2113. # set_parts.append("summary = %s")
  2114. # params.append(summary)
  2115. # if not set_parts:
  2116. # return True, "no fields to update"
  2117. # date_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  2118. # set_parts.append("update_time = %s")
  2119. # params.append(date_now)
  2120. # params.append(knowledge_id)
  2121. # params.append(slice_id)
  2122. # cursor = connection.cursor()
  2123. # sql = f"UPDATE slice_info SET {', '.join(set_parts)} WHERE knowledge_id = %s AND slice_id = %s"
  2124. # cursor.execute(sql, tuple(params))
  2125. # connection.commit()
  2126. # logger.info(f"更新切片 {slice_id} 的LLM字段成功")
  2127. # return True, "success"
  2128. # except Error as e:
  2129. # connection.rollback()
  2130. # logger.error(f"更新 slice_info LLM字段出错: {e}")
  2131. # return False, str(e)
  2132. # finally:
  2133. # if cursor:
  2134. # cursor.close()
  2135. # if connection:
  2136. # connection.close()
  2137. def update_slice_llm_fields(
  2138. self,
  2139. knowledge_id: str,
  2140. slice_id: str,
  2141. qa: str = None,
  2142. question: str = None,
  2143. summary: str = None
  2144. ):
  2145. connection, info = self.get_connection()
  2146. if not connection:
  2147. return False, info
  2148. cursor = None
  2149. try:
  2150. set_parts = []
  2151. params = []
  2152. if qa is not None:
  2153. set_parts.append("qa = %s")
  2154. params.append(qa)
  2155. if question is not None:
  2156. set_parts.append("question = %s")
  2157. params.append(question)
  2158. if summary is not None:
  2159. set_parts.append("summary = %s")
  2160. params.append(summary)
  2161. if not set_parts:
  2162. connection.rollback()
  2163. return True, "no fields to update"
  2164. set_parts.append("update_time = %s")
  2165. params.append(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
  2166. # params.append(knowledge_id)
  2167. params.append(slice_id)
  2168. cursor = connection.cursor()
  2169. sql = f"""
  2170. UPDATE slice_info
  2171. SET {', '.join(set_parts)}
  2172. WHERE slice_id = %s
  2173. """
  2174. cursor.execute(sql, tuple(params))
  2175. connection.commit()
  2176. return True, "success"
  2177. except Exception as e:
  2178. try:
  2179. connection.rollback()
  2180. except Exception:
  2181. pass
  2182. logger.error(f"更新 slice_info LLM字段出错: {e}")
  2183. return False, str(e)
  2184. finally:
  2185. if cursor:
  2186. cursor.close()
  2187. if connection:
  2188. connection.close()
  2189. def query_doc_metadata_flags(self, document_id: str):
  2190. """
  2191. 根据 document_id 查询 bm_document 表的 qa_checked、related_questions_enabled、summary_generation_enabled 字段
  2192. 返回: (success, dict) 其中 dict 包含 qa, question, summary 布尔值
  2193. """
  2194. connection, info = self.get_connection()
  2195. if not connection:
  2196. return False, info
  2197. cursor = None
  2198. try:
  2199. cursor = connection.cursor(dictionary=True)
  2200. sql = """
  2201. SELECT qa_checked, related_questions_enabled, summary_generation_enabled
  2202. FROM bm_document WHERE document_id = %s
  2203. """
  2204. cursor.execute(sql, (document_id,))
  2205. result = cursor.fetchone()
  2206. if not result:
  2207. return True, {"qa": False, "question": False, "summary": False}
  2208. return True, {
  2209. "qa": result.get("qa_checked") == 1,
  2210. "question": result.get("related_questions_enabled") == 1,
  2211. "summary": result.get("summary_generation_enabled") == 1
  2212. }
  2213. except Error as e:
  2214. logger.error(f"查询 bm_document 元数据字段出错: {e}")
  2215. return False, str(e)
  2216. finally:
  2217. if cursor:
  2218. cursor.close()
  2219. if connection:
  2220. connection.close()