import asyncio from queue import Queue from typing import Dict, Callable, Any from utils.get_logger import setup_logger import time logger = setup_logger(__name__) class UploadQueueManager: """ 文档上传队列管理器 功能: 1. 控制最多10个并发任务 2. 前一个任务完成后,下一个任务自动开始 3. 提供队列状态监控 """ def __init__(self, max_concurrent: int = 10): """ 初始化队列管理器 参数: max_concurrent: 最大并发数,默认10 """ self.max_concurrent = max_concurrent self.task_queue = asyncio.Queue() # 任务队列 self.semaphore = asyncio.Semaphore(max_concurrent) # 信号量控制并发 self.active_tasks: Dict[str, Dict[str, Any]] = {} # 活跃任务字典 {task_id: task_info} self.completed_tasks: Dict[str, Dict[str, Any]] = {} # 已完成任务字典 self.failed_tasks: Dict[str, Dict[str, Any]] = {} # 失败任务字典 self.workers = [] # 工作协程列表 self.is_running = False self._lock = asyncio.Lock() # 用于保护共享状态 logger.info(f"队列管理器初始化完成,最大并发数: {max_concurrent}") async def start(self): """启动队列管理器的工作协程""" if self.is_running: logger.warning("队列管理器已经在运行中") return self.is_running = True # 启动多个工作协程 for i in range(self.max_concurrent): worker = asyncio.create_task(self._worker(i)) self.workers.append(worker) logger.info(f"队列管理器已启动,工作协程数: {self.max_concurrent}") async def stop(self): """停止队列管理器""" self.is_running = False # 等待所有工作协程完成 if self.workers: await asyncio.gather(*self.workers, return_exceptions=True) self.workers.clear() logger.info("队列管理器已停止") async def _worker(self, worker_id: int): """ 工作协程,从队列中取任务并执行 参数: worker_id: 工作协程ID """ logger.info(f"工作协程 {worker_id} 已启动") while self.is_running: try: # 从队列中获取任务(带超时,避免永久阻塞) try: task_info = await asyncio.wait_for( self.task_queue.get(), timeout=1.0 ) except asyncio.TimeoutError: continue task_id = task_info["task_id"] task_func = task_info["task_func"] task_args = task_info["task_args"] task_kwargs = task_info["task_kwargs"] logger.info(f"工作协程 {worker_id} 开始处理任务: {task_id}") # 使用信号量控制并发 async with self.semaphore: # 记录任务开始 async with self._lock: self.active_tasks[task_id] = { "task_id": task_id, "worker_id": worker_id, "start_time": time.time(), "status": "running" } try: # 执行任务 result = await task_func(*task_args, **task_kwargs) # 记录任务完成 async with self._lock: if task_id in self.active_tasks: task_info = self.active_tasks.pop(task_id) task_info["end_time"] = time.time() task_info["duration"] = task_info["end_time"] - task_info["start_time"] task_info["status"] = "completed" task_info["result"] = result self.completed_tasks[task_id] = task_info logger.info(f"工作协程 {worker_id} 完成任务: {task_id}, 耗时: {task_info['duration']:.2f}秒") except Exception as e: # 记录任务失败 logger.error(f"工作协程 {worker_id} 执行任务 {task_id} 失败: {e}", exc_info=True) async with self._lock: if task_id in self.active_tasks: task_info = self.active_tasks.pop(task_id) task_info["end_time"] = time.time() task_info["duration"] = task_info["end_time"] - task_info["start_time"] task_info["status"] = "failed" task_info["error"] = str(e) self.failed_tasks[task_id] = task_info # 标记任务完成 self.task_queue.task_done() except Exception as e: logger.error(f"工作协程 {worker_id} 发生异常: {e}", exc_info=True) logger.info(f"工作协程 {worker_id} 已停止") async def submit_task(self, task_id: str, task_func: Callable, *args, **kwargs) -> None: """ 提交任务到队列 参数: task_id: 任务ID(通常是 document_id) task_func: 任务函数(异步函数) *args: 任务函数的位置参数 **kwargs: 任务函数的关键字参数 """ task_info = { "task_id": task_id, "task_func": task_func, "task_args": args, "task_kwargs": kwargs, "submit_time": time.time() } await self.task_queue.put(task_info) queue_size = self.task_queue.qsize() active_count = len(self.active_tasks) logger.info(f"任务已提交到队列: {task_id}, 队列长度: {queue_size}, 活跃任务数: {active_count}") def get_queue_status(self) -> Dict[str, Any]: """ 获取队列状态 返回: 包含队列状态信息的字典 """ return { "is_running": self.is_running, "max_concurrent": self.max_concurrent, "queue_size": self.task_queue.qsize(), "active_tasks_count": len(self.active_tasks), "completed_tasks_count": len(self.completed_tasks), "failed_tasks_count": len(self.failed_tasks), "active_tasks": list(self.active_tasks.keys()), "worker_count": len(self.workers) } def get_task_status(self, task_id: str) -> Dict[str, Any]: """ 获取指定任务的状态 参数: task_id: 任务ID 返回: 任务状态信息,如果任务不存在返回 None """ if task_id in self.active_tasks: return {**self.active_tasks[task_id], "status": "running"} elif task_id in self.completed_tasks: return {**self.completed_tasks[task_id], "status": "completed"} elif task_id in self.failed_tasks: return {**self.failed_tasks[task_id], "status": "failed"} else: return {"status": "not_found"} async def wait_for_completion(self): """等待队列中的所有任务完成""" await self.task_queue.join() logger.info("队列中的所有任务已完成") async def remove_from_queue(self, task_id: str) -> bool: """ 从等待队列中删除指定任务 参数: task_id: 任务ID 返回: bool: 是否成功删除 """ async with self._lock: # 获取队列中的所有任务 temp_tasks = [] found = False # 从队列中取出所有任务 while not self.task_queue.empty(): try: task_info = self.task_queue.get_nowait() if task_info["task_id"] == task_id: found = True logger.info(f"从队列中找到并删除任务: {task_id}") self.task_queue.task_done() else: temp_tasks.append(task_info) except: break # 将其他任务放回队列 for task_info in temp_tasks: await self.task_queue.put(task_info) return found def clear_history(self): """清空已完成和失败任务的历史记录""" cleared_completed = len(self.completed_tasks) cleared_failed = len(self.failed_tasks) self.completed_tasks.clear() self.failed_tasks.clear() logger.info(f"已清空历史记录: 完成任务 {cleared_completed} 个, 失败任务 {cleared_failed} 个") # 全局队列管理器实例(单例模式) _global_queue_manager: UploadQueueManager = None def get_queue_manager(max_concurrent: int = 10) -> UploadQueueManager: """ 获取全局队列管理器实例(单例模式) 参数: max_concurrent: 最大并发数,默认10 返回: UploadQueueManager 实例 """ global _global_queue_manager if _global_queue_manager is None: _global_queue_manager = UploadQueueManager(max_concurrent) logger.info("创建全局队列管理器实例") return _global_queue_manager