| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- # app.py
- import os
- from datetime import datetime
- from typing import Optional
- from fastapi import FastAPI, HTTPException
- from pydantic import BaseModel
- from data_cleanup_script import VectorDataCleaner # 假设你的类放在vector_data_cleaner.py里
- import logging
- # 日志
- logger = logging.getLogger("uvicorn.error")
- # FastAPI实例
- app = FastAPI(title="Milvus 数据清理")
- # 请求体模型
- class CleanupRequest(BaseModel):
- knowledge_id: str
- document_id: Optional[str] = None
- dry_run: Optional[bool] = True # 默认试运行
- # 响应体模型(可选,可更详细)
- class CleanupResponse(BaseModel):
- knowledge_id: str
- document_id: Optional[str]
- mysql_slice_count: int
- milvus_slice_count: int
- dirty_slice_count: int
- missing_slice_count: int
- dirty_slice_ids: list
- missing_slice_ids: list
- dry_run: bool
- delete_success: Optional[bool] = None
- delete_message: Optional[str] = None
- dirty_slice_samples: Optional[list] = None
- report: str
- # 初始化数据清理工具(单例,避免每次请求重新连接数据库)
- cleanup_tool = VectorDataCleaner()
- @app.post("/cleanup", response_model=CleanupResponse)
- def cleanup_data_endpoint(request: CleanupRequest):
- """
- 清理Milvus向量数据库中的脏数据
- """
- try:
- # # 执行清理
- # result = cleanup_tool.cleanup_data(
- # knowledge_id=request.knowledge_id,
- # document_id=request.document_id,
- # dry_run=request.dry_run
- # )
- knowledge_ids_list = request.knowledge_ids
- for knowledge_id in knowledge_ids_list:
- result = cleanup_tool.cleanup_data(
- knowledge_id=knowledge_id,
- document_id=None,
- dry_run=False
- )
- # 检查是否有错误
- if "error" in result:
- raise HTTPException(status_code=500, detail=result["error"])
- # 生成报告
- report = cleanup_tool.generate_report(result)
- result["report"] = report
- timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
- # 保存脏数据ID
- if result.get("dirty_slice_ids"):
- dirty_id_file = f"./data_clean/dirty_ids_{request.knowledge_id}_{timestamp}.txt"
- with open(dirty_id_file, 'w', encoding='utf-8') as f:
- f.write("\n".join(result["dirty_slice_ids"]))
- logger.info(f"脏数据ID已保存到: {dirty_id_file}")
- # 保存缺失数据ID
- if result.get("missing_slice_ids"):
- missing_id_file = f"./data_clean/missing_ids_{request.knowledge_id}_{timestamp}.txt"
- with open(missing_id_file, 'w', encoding='utf-8') as f:
- f.write("\n".join(result["missing_slice_ids"]))
- logger.info(f"缺失数据ID已保存到: {missing_id_file}")
- # 保存报告到文件
- os.makedirs("./data_clean", exist_ok=True)
- report_filename = f"./data_clean/data_cleanup_report_{request.knowledge_id}_{timestamp}.txt"
- try:
- with open(report_filename, 'w', encoding='utf-8') as f:
- f.write(report)
- logger.info(f"清理报告已保存到: {report_filename}")
- except Exception as e:
- logger.error(f"保存报告失败: {e}")
- # return result
- return {"code": 200}
- except Exception as e:
- logger.error(f"清理请求失败: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.on_event("shutdown")
- def shutdown_event():
- """关闭数据库连接"""
- cleanup_tool.close_connections()
- if __name__ == "__main__":
- import uvicorn
- # 自动创建报告目录
- os.makedirs("./data_clean", exist_ok=True)
- # 启动 uvicorn
- uvicorn.run(
- "data_cleanup_script_api:app",
- host="0.0.0.0",
- port=9090,
- log_level="info",
- )
|