# app.py import os from datetime import datetime from typing import Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel from data_cleanup_script import VectorDataCleaner # 假设你的类放在vector_data_cleaner.py里 import logging # 日志 logger = logging.getLogger("uvicorn.error") # FastAPI实例 app = FastAPI(title="Milvus 数据清理") # 请求体模型 class CleanupRequest(BaseModel): knowledge_id: str document_id: Optional[str] = None dry_run: Optional[bool] = True # 默认试运行 # 响应体模型(可选,可更详细) class CleanupResponse(BaseModel): knowledge_id: str document_id: Optional[str] mysql_slice_count: int milvus_slice_count: int dirty_slice_count: int missing_slice_count: int dirty_slice_ids: list missing_slice_ids: list dry_run: bool delete_success: Optional[bool] = None delete_message: Optional[str] = None dirty_slice_samples: Optional[list] = None report: str # 初始化数据清理工具(单例,避免每次请求重新连接数据库) cleanup_tool = VectorDataCleaner() @app.post("/cleanup", response_model=CleanupResponse) def cleanup_data_endpoint(request: CleanupRequest): """ 清理Milvus向量数据库中的脏数据 """ try: # # 执行清理 # result = cleanup_tool.cleanup_data( # knowledge_id=request.knowledge_id, # document_id=request.document_id, # dry_run=request.dry_run # ) knowledge_ids_list = request.knowledge_ids for knowledge_id in knowledge_ids_list: result = cleanup_tool.cleanup_data( knowledge_id=knowledge_id, document_id=None, dry_run=False ) # 检查是否有错误 if "error" in result: raise HTTPException(status_code=500, detail=result["error"]) # 生成报告 report = cleanup_tool.generate_report(result) result["report"] = report timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') # 保存脏数据ID if result.get("dirty_slice_ids"): dirty_id_file = f"./data_clean/dirty_ids_{request.knowledge_id}_{timestamp}.txt" with open(dirty_id_file, 'w', encoding='utf-8') as f: f.write("\n".join(result["dirty_slice_ids"])) logger.info(f"脏数据ID已保存到: {dirty_id_file}") # 保存缺失数据ID if result.get("missing_slice_ids"): missing_id_file = f"./data_clean/missing_ids_{request.knowledge_id}_{timestamp}.txt" with open(missing_id_file, 'w', encoding='utf-8') as f: f.write("\n".join(result["missing_slice_ids"])) logger.info(f"缺失数据ID已保存到: {missing_id_file}") # 保存报告到文件 os.makedirs("./data_clean", exist_ok=True) report_filename = f"./data_clean/data_cleanup_report_{request.knowledge_id}_{timestamp}.txt" try: with open(report_filename, 'w', encoding='utf-8') as f: f.write(report) logger.info(f"清理报告已保存到: {report_filename}") except Exception as e: logger.error(f"保存报告失败: {e}") # return result return {"code": 200} except Exception as e: logger.error(f"清理请求失败: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.on_event("shutdown") def shutdown_event(): """关闭数据库连接""" cleanup_tool.close_connections() if __name__ == "__main__": import uvicorn # 自动创建报告目录 os.makedirs("./data_clean", exist_ok=True) # 启动 uvicorn uvicorn.run( "data_cleanup_script_api:app", host="0.0.0.0", port=9090, log_level="info", )