data_cleanup_script.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. import sys
  2. import os
  3. import logging
  4. from datetime import datetime
  5. from typing import List, Dict, Set, Tuple, Optional
  6. import mysql.connector
  7. from mysql.connector import Error
  8. from pymilvus import MilvusClient
  9. # 添加项目路径到sys.path
  10. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  11. from config import milvus_uri, mysql_config
  12. from utils.get_logger import setup_logger
  13. # 设置日志
  14. logger = setup_logger(__name__)
  15. class VectorDataCleaner:
  16. """向量数据清理工具类"""
  17. def __init__(self):
  18. """初始化数据清理工具"""
  19. self.mysql_config = mysql_config
  20. self.milvus_uri = milvus_uri
  21. self.milvus_client = None
  22. self.mysql_conn = None
  23. self._connect_databases()
  24. def _connect_databases(self):
  25. """连接数据库"""
  26. # 连接MySQL
  27. try:
  28. self.mysql_conn = mysql.connector.connect(**self.mysql_config)
  29. logger.info("MySQL数据库连接成功")
  30. except Error as e:
  31. logger.error(f"MySQL数据库连接失败: {e}")
  32. raise
  33. # 连接Milvus
  34. try:
  35. self.milvus_client = MilvusClient(uri=self.milvus_uri)
  36. logger.info("Milvus向量数据库连接成功")
  37. except Exception as e:
  38. logger.error(f"Milvus向量数据库连接失败: {e}")
  39. raise
  40. def close_connections(self):
  41. """关闭数据库连接"""
  42. if self.mysql_conn:
  43. self.mysql_conn.close()
  44. logger.info("MySQL连接已关闭")
  45. # Milvus客户端会自动管理连接
  46. def get_mysql_slices(self, knowledge_id: str, document_id: str = None) -> Set[str]:
  47. """
  48. 从MySQL获取指定知识库/文件的所有切片ID
  49. Args:
  50. knowledge_id: 知识库ID
  51. document_id: 文档ID,可选。如果不提供则获取整个知识库的切片
  52. Returns:
  53. Set[str]: 切片ID集合
  54. """
  55. if not self.mysql_conn:
  56. logger.error("MySQL连接未建立")
  57. return set()
  58. cursor = None
  59. try:
  60. cursor = self.mysql_conn.cursor()
  61. if document_id:
  62. # 查询指定文档的切片
  63. query = """
  64. SELECT slice_id FROM slice_info
  65. WHERE knowledge_id = %s AND document_id = %s
  66. """
  67. cursor.execute(query, (knowledge_id, document_id))
  68. logger.info(f"查询知识库 {knowledge_id} 中文档 {document_id} 的切片")
  69. else:
  70. # 查询整个知识库的切片
  71. query = """
  72. SELECT slice_id FROM slice_info
  73. WHERE knowledge_id = %s
  74. """
  75. cursor.execute(query, (knowledge_id,))
  76. logger.info(f"查询知识库 {knowledge_id} 的所有切片")
  77. results = cursor.fetchall()
  78. slice_ids = {row[0] for row in results}
  79. logger.info(f"MySQL中找到 {len(slice_ids)} 个切片")
  80. return slice_ids
  81. except Error as e:
  82. logger.error(f"查询MySQL切片数据失败: {e}")
  83. return set()
  84. finally:
  85. if cursor:
  86. cursor.close()
  87. def get_milvus_slices(self, knowledge_id: str, document_id: str = None) -> Set[str]:
  88. """
  89. 从Milvus获取指定知识库/文件的所有切片ID
  90. Args:
  91. knowledge_id: 知识库ID(也是collection名称)
  92. document_id: 文档ID,可选。如果不提供则获取整个知识库的切片
  93. Returns:
  94. Set[str]: 切片ID集合
  95. """
  96. if not self.milvus_client:
  97. logger.error("Milvus连接未建立")
  98. return set()
  99. try:
  100. # 检查collection是否存在
  101. if not self.milvus_client.has_collection(collection_name=knowledge_id):
  102. logger.warning(f"Milvus中不存在知识库 {knowledge_id} 对应的collection")
  103. return set()
  104. # 构建查询条件
  105. if document_id:
  106. filter_expr = f"doc_id == '{document_id}'"
  107. logger.info(f"查询Milvus知识库 {knowledge_id} 中文档 {document_id} 的切片")
  108. else:
  109. filter_expr = None
  110. logger.info(f"查询Milvus知识库 {knowledge_id} 的所有切片")
  111. # 查询所有切片ID(分批查询以避免限制)
  112. all_results = []
  113. offset = 0
  114. limit = 10000 # 每次查询10000条
  115. while True:
  116. results = self.milvus_client.query(
  117. collection_name=knowledge_id,
  118. filter=filter_expr,
  119. output_fields=["chunk_id"],
  120. limit=limit,
  121. offset=offset
  122. )
  123. if not results:
  124. break
  125. all_results.extend(results)
  126. # 如果返回的结果少于limit,说明已经查询完毕
  127. if len(results) < limit:
  128. break
  129. offset += limit
  130. results = all_results
  131. slice_ids = {result["chunk_id"] for result in results if "chunk_id" in result}
  132. logger.info(f"Milvus中找到 {len(slice_ids)} 个切片")
  133. return slice_ids
  134. except Exception as e:
  135. logger.error(f"查询Milvus切片数据失败: {e}")
  136. return set()
  137. def get_milvus_slice_details(self, knowledge_id: str, chunk_ids: List[str]) -> List[Dict]:
  138. """
  139. 获取Milvus中指定切片的详细信息
  140. Args:
  141. knowledge_id: 知识库ID
  142. chunk_ids: 切片ID列表
  143. Returns:
  144. List[Dict]: 切片详细信息列表
  145. """
  146. if not self.milvus_client or not chunk_ids:
  147. return []
  148. try:
  149. # 构建查询条件
  150. chunk_ids_str = "', '".join(chunk_ids)
  151. filter_expr = f"chunk_id in ['{chunk_ids_str}']"
  152. results = self.milvus_client.query(
  153. collection_name=knowledge_id,
  154. filter=filter_expr,
  155. output_fields=["pk", "chunk_id", "doc_id", "content"],
  156. limit=len(chunk_ids)
  157. )
  158. return results
  159. except Exception as e:
  160. logger.error(f"获取Milvus切片详细信息失败: {e}")
  161. return []
  162. def delete_milvus_slices(self, knowledge_id: str, chunk_ids: List[str]) -> Tuple[bool, str]:
  163. """
  164. 删除Milvus中的指定切片
  165. Args:
  166. knowledge_id: 知识库ID
  167. chunk_ids: 要删除的切片ID列表
  168. Returns:
  169. Tuple[bool, str]: (是否成功, 消息)
  170. """
  171. if not self.milvus_client or not chunk_ids:
  172. return True, "没有需要删除的切片"
  173. try:
  174. # 分批删除,避免一次删除太多数据
  175. batch_size = 100
  176. total_deleted = 0
  177. for i in range(0, len(chunk_ids), batch_size):
  178. batch_chunk_ids = chunk_ids[i:i + batch_size]
  179. # 先查询获取主键
  180. chunk_ids_str = "', '".join(batch_chunk_ids)
  181. filter_expr = f"chunk_id in ['{chunk_ids_str}']"
  182. results = self.milvus_client.query(
  183. collection_name=knowledge_id,
  184. filter=filter_expr,
  185. output_fields=["pk"],
  186. limit=len(batch_chunk_ids)
  187. )
  188. if not results:
  189. continue
  190. # 提取主键
  191. primary_keys = [result["pk"] for result in results]
  192. # 执行删除
  193. delete_result = self.milvus_client.delete(
  194. collection_name=knowledge_id,
  195. ids=primary_keys
  196. )
  197. total_deleted += len(primary_keys)
  198. logger.info(f"删除了 {len(primary_keys)} 个切片,累计删除 {total_deleted} 个")
  199. # 执行flush和compact操作
  200. self.milvus_client.flush(collection_name=knowledge_id)
  201. logger.info(f"成功删除 {total_deleted} 个脏数据切片")
  202. return True, f"成功删除 {total_deleted} 个脏数据切片"
  203. except Exception as e:
  204. logger.error(f"删除Milvus切片失败: {e}")
  205. return False, f"删除失败: {str(e)}"
  206. def cleanup_data(self, knowledge_id: str, document_id: str = None, dry_run: bool = True) -> Dict:
  207. """
  208. 执行数据清理
  209. Args:
  210. knowledge_id: 知识库ID
  211. document_id: 文档ID,可选
  212. dry_run: 是否为试运行模式(不实际删除数据)
  213. Returns:
  214. Dict: 清理结果统计
  215. """
  216. logger.info(f"开始数据清理 - 知识库: {knowledge_id}, 文档: {document_id or '全部'}, 试运行: {dry_run}")
  217. # 检查数据库连接
  218. if not self.mysql_conn:
  219. return {"error": "MySQL连接失败"}
  220. if not self.milvus_client:
  221. return {"error": "Milvus连接失败"}
  222. try:
  223. # 获取MySQL中的切片ID
  224. mysql_slice_ids = self.get_mysql_slices(knowledge_id, document_id)
  225. # 获取Milvus中的切片ID
  226. milvus_slice_ids = self.get_milvus_slices(knowledge_id, document_id)
  227. # 找出需要删除的切片(在Milvus中但不在MySQL中)
  228. dirty_slice_ids = milvus_slice_ids - mysql_slice_ids
  229. # 找出缺失的切片(在MySQL中但不在Milvus中)
  230. missing_slice_ids = mysql_slice_ids - milvus_slice_ids
  231. result = {
  232. "knowledge_id": knowledge_id,
  233. "document_id": document_id,
  234. "mysql_slice_count": len(mysql_slice_ids),
  235. "milvus_slice_count": len(milvus_slice_ids),
  236. "dirty_slice_count": len(dirty_slice_ids),
  237. "missing_slice_count": len(missing_slice_ids),
  238. "dirty_slice_ids": list(dirty_slice_ids),
  239. "missing_slice_ids": list(missing_slice_ids),
  240. "dry_run": dry_run
  241. }
  242. logger.info(f"数据统计 - MySQL切片: {len(mysql_slice_ids)}, "
  243. f"Milvus切片: {len(milvus_slice_ids)}, "
  244. f"脏数据: {len(dirty_slice_ids)}, "
  245. f"缺失数据: {len(missing_slice_ids)}")
  246. if dirty_slice_ids:
  247. if dry_run:
  248. logger.info(f"试运行模式 - 发现 {len(dirty_slice_ids)} 个脏数据切片,但不会实际删除")
  249. # 获取脏数据的详细信息用于日志记录
  250. dirty_details = self.get_milvus_slice_details(knowledge_id, list(dirty_slice_ids)[:10]) # 只获取前10个的详情
  251. result["dirty_slice_samples"] = dirty_details
  252. else:
  253. logger.info(f"开始删除 {len(dirty_slice_ids)} 个脏数据切片")
  254. success, message = self.delete_milvus_slices(knowledge_id, list(dirty_slice_ids))
  255. result["delete_success"] = success
  256. result["delete_message"] = message
  257. else:
  258. logger.info("没有发现脏数据,数据一致性良好")
  259. result["delete_success"] = True
  260. result["delete_message"] = "没有脏数据需要删除"
  261. if missing_slice_ids:
  262. logger.warning(f"发现 {len(missing_slice_ids)} 个切片在MySQL中存在但在Milvus中缺失")
  263. return result
  264. except Exception as e:
  265. logger.error(f"数据清理过程中发生错误: {e}")
  266. return {"error": str(e)}
  267. finally:
  268. # 注意:不在这里关闭连接,因为可能还需要使用
  269. pass
  270. def generate_report(self, result: Dict) -> str:
  271. """
  272. 生成清理报告
  273. Args:
  274. result: 清理结果
  275. Returns:
  276. str: 格式化的报告
  277. """
  278. if "error" in result:
  279. return f"清理失败: {result['error']}"
  280. report = f"""
  281. 数据清理报告
  282. {'='*50}
  283. 知识库ID: {result['knowledge_id']}
  284. 文档ID: {result.get('document_id', '全部文档')}
  285. 执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
  286. 试运行模式: {'是' if result['dry_run'] else '否'}
  287. 数据统计:
  288. - MySQL中的切片数量: {result['mysql_slice_count']}
  289. - Milvus中的切片数量: {result['milvus_slice_count']}
  290. - 脏数据切片数量: {result['dirty_slice_count']}
  291. - 缺失切片数量: {result['missing_slice_count']}
  292. 清理结果:
  293. - 删除状态: {'成功' if result.get('delete_success', False) else '失败'}
  294. - 删除消息: {result.get('delete_message', 'N/A')}
  295. 建议:
  296. """
  297. if result['dirty_slice_count'] > 0:
  298. report += f"- 发现 {result['dirty_slice_count']} 个脏数据切片"
  299. if result['dry_run']:
  300. report += ",建议在非试运行模式下执行清理\n"
  301. else:
  302. report += ",已执行清理\n"
  303. else:
  304. report += "- 数据一致性良好,无需清理\n"
  305. if result['missing_slice_count'] > 0:
  306. report += f"- 发现 {result['missing_slice_count']} 个切片在Milvus中缺失,建议检查数据同步\n"
  307. return report
  308. def main():
  309. """主函数"""
  310. import argparse
  311. parser = argparse.ArgumentParser(description='数据清理脚本 - 清理向量库中的脏数据')
  312. parser.add_argument('knowledge_id', help='知识库ID')
  313. parser.add_argument('--document_id', help='文档ID(可选,不提供则清理整个知识库)')
  314. parser.add_argument('--dry-run', action='store_true', default=True,
  315. help='试运行模式(默认开启,不实际删除数据)')
  316. parser.add_argument('--execute', action='store_true',
  317. help='实际执行删除操作(关闭运行模式)')
  318. args = parser.parse_args()
  319. # 如果指定了--execute,则关闭运行模式
  320. dry_run = args.execute
  321. # 创建清理工具
  322. cleanup_tool = VectorDataCleaner()
  323. try:
  324. # 执行清理
  325. result = cleanup_tool.cleanup_data(
  326. knowledge_id=args.knowledge_id,
  327. document_id=args.document_id,
  328. dry_run=dry_run
  329. )
  330. # 生成并保存报告
  331. report = cleanup_tool.generate_report(result)
  332. print(report)
  333. finally:
  334. # 关闭连接
  335. cleanup_tool.close_connections()
  336. # 保存报告到文件
  337. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  338. os.makedirs("./data_clean", exist_ok=True)
  339. report_filename = f"./data_clean/data_cleanup_report_{args.knowledge_id}_{timestamp}.txt"
  340. try:
  341. with open(report_filename, 'w', encoding='utf-8') as f:
  342. f.write(report)
  343. print(f"\n报告已保存到: {report_filename}")
  344. except Exception as e:
  345. logger.error(f"保存报告失败: {e}")
  346. if __name__ == "__main__":
  347. main()