data_cleanup_script_api.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. # app.py
  2. import os
  3. from datetime import datetime
  4. from typing import Optional
  5. from fastapi import FastAPI, HTTPException
  6. from pydantic import BaseModel
  7. from data_cleanup_script import VectorDataCleaner # 假设你的类放在vector_data_cleaner.py里
  8. import logging
  9. # 日志
  10. logger = logging.getLogger("uvicorn.error")
  11. # FastAPI实例
  12. app = FastAPI(title="Milvus 数据清理")
  13. # 请求体模型
  14. class CleanupRequest(BaseModel):
  15. knowledge_id: str
  16. document_id: Optional[str] = None
  17. dry_run: Optional[bool] = True # 默认试运行
  18. # 响应体模型(可选,可更详细)
  19. class CleanupResponse(BaseModel):
  20. knowledge_id: str
  21. document_id: Optional[str]
  22. mysql_slice_count: int
  23. milvus_slice_count: int
  24. dirty_slice_count: int
  25. missing_slice_count: int
  26. dirty_slice_ids: list
  27. missing_slice_ids: list
  28. dry_run: bool
  29. delete_success: Optional[bool] = None
  30. delete_message: Optional[str] = None
  31. dirty_slice_samples: Optional[list] = None
  32. report: str
  33. # 初始化数据清理工具(单例,避免每次请求重新连接数据库)
  34. cleanup_tool = VectorDataCleaner()
  35. @app.post("/cleanup", response_model=CleanupResponse)
  36. def cleanup_data_endpoint(request: CleanupRequest):
  37. """
  38. 清理Milvus向量数据库中的脏数据
  39. """
  40. try:
  41. # # 执行清理
  42. # result = cleanup_tool.cleanup_data(
  43. # knowledge_id=request.knowledge_id,
  44. # document_id=request.document_id,
  45. # dry_run=request.dry_run
  46. # )
  47. knowledge_ids_list = request.knowledge_ids
  48. for knowledge_id in knowledge_ids_list:
  49. result = cleanup_tool.cleanup_data(
  50. knowledge_id=knowledge_id,
  51. document_id=None,
  52. dry_run=False
  53. )
  54. # 检查是否有错误
  55. if "error" in result:
  56. raise HTTPException(status_code=500, detail=result["error"])
  57. # 生成报告
  58. report = cleanup_tool.generate_report(result)
  59. result["report"] = report
  60. timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
  61. # 保存脏数据ID
  62. if result.get("dirty_slice_ids"):
  63. dirty_id_file = f"./data_clean/dirty_ids_{request.knowledge_id}_{timestamp}.txt"
  64. with open(dirty_id_file, 'w', encoding='utf-8') as f:
  65. f.write("\n".join(result["dirty_slice_ids"]))
  66. logger.info(f"脏数据ID已保存到: {dirty_id_file}")
  67. # 保存缺失数据ID
  68. if result.get("missing_slice_ids"):
  69. missing_id_file = f"./data_clean/missing_ids_{request.knowledge_id}_{timestamp}.txt"
  70. with open(missing_id_file, 'w', encoding='utf-8') as f:
  71. f.write("\n".join(result["missing_slice_ids"]))
  72. logger.info(f"缺失数据ID已保存到: {missing_id_file}")
  73. # 保存报告到文件
  74. os.makedirs("./data_clean", exist_ok=True)
  75. report_filename = f"./data_clean/data_cleanup_report_{request.knowledge_id}_{timestamp}.txt"
  76. try:
  77. with open(report_filename, 'w', encoding='utf-8') as f:
  78. f.write(report)
  79. logger.info(f"清理报告已保存到: {report_filename}")
  80. except Exception as e:
  81. logger.error(f"保存报告失败: {e}")
  82. # return result
  83. return {"code": 200}
  84. except Exception as e:
  85. logger.error(f"清理请求失败: {e}")
  86. raise HTTPException(status_code=500, detail=str(e))
  87. @app.on_event("shutdown")
  88. def shutdown_event():
  89. """关闭数据库连接"""
  90. cleanup_tool.close_connections()
  91. if __name__ == "__main__":
  92. import uvicorn
  93. # 自动创建报告目录
  94. os.makedirs("./data_clean", exist_ok=True)
  95. # 启动 uvicorn
  96. uvicorn.run(
  97. "data_cleanup_script_api:app",
  98. host="0.0.0.0",
  99. port=9090,
  100. log_level="info",
  101. )