| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- import asyncio
- from queue import Queue
- from typing import Dict, Callable, Any
- from utils.get_logger import setup_logger
- import time
- logger = setup_logger(__name__)
- class UploadQueueManager:
- """
- 文档上传队列管理器
-
- 功能:
- 1. 控制最多10个并发任务
- 2. 前一个任务完成后,下一个任务自动开始
- 3. 提供队列状态监控
- """
-
- def __init__(self, max_concurrent: int = 10):
- """
- 初始化队列管理器
-
- 参数:
- max_concurrent: 最大并发数,默认10
- """
- self.max_concurrent = max_concurrent
- self.task_queue = asyncio.Queue() # 任务队列
- self.semaphore = asyncio.Semaphore(max_concurrent) # 信号量控制并发
- self.active_tasks: Dict[str, Dict[str, Any]] = {} # 活跃任务字典 {task_id: task_info}
- self.completed_tasks: Dict[str, Dict[str, Any]] = {} # 已完成任务字典
- self.failed_tasks: Dict[str, Dict[str, Any]] = {} # 失败任务字典
- self.workers = [] # 工作协程列表
- self.is_running = False
- self._lock = asyncio.Lock() # 用于保护共享状态
-
- logger.info(f"队列管理器初始化完成,最大并发数: {max_concurrent}")
-
- async def start(self):
- """启动队列管理器的工作协程"""
- if self.is_running:
- logger.warning("队列管理器已经在运行中")
- return
-
- self.is_running = True
- # 启动多个工作协程
- for i in range(self.max_concurrent):
- worker = asyncio.create_task(self._worker(i))
- self.workers.append(worker)
-
- logger.info(f"队列管理器已启动,工作协程数: {self.max_concurrent}")
-
- async def stop(self):
- """停止队列管理器"""
- self.is_running = False
-
- # 等待所有工作协程完成
- if self.workers:
- await asyncio.gather(*self.workers, return_exceptions=True)
- self.workers.clear()
-
- logger.info("队列管理器已停止")
-
- async def _worker(self, worker_id: int):
- """
- 工作协程,从队列中取任务并执行
-
- 参数:
- worker_id: 工作协程ID
- """
- logger.info(f"工作协程 {worker_id} 已启动")
-
- while self.is_running:
- try:
- # 从队列中获取任务(带超时,避免永久阻塞)
- try:
- task_info = await asyncio.wait_for(
- self.task_queue.get(),
- timeout=1.0
- )
- except asyncio.TimeoutError:
- continue
-
- task_id = task_info["task_id"]
- task_func = task_info["task_func"]
- task_args = task_info["task_args"]
- task_kwargs = task_info["task_kwargs"]
-
- logger.info(f"工作协程 {worker_id} 开始处理任务: {task_id}")
-
- # 使用信号量控制并发
- async with self.semaphore:
- # 记录任务开始
- async with self._lock:
- self.active_tasks[task_id] = {
- "task_id": task_id,
- "worker_id": worker_id,
- "start_time": time.time(),
- "status": "running"
- }
-
- try:
- # 执行任务
- result = await task_func(*task_args, **task_kwargs)
-
- # 记录任务完成
- async with self._lock:
- if task_id in self.active_tasks:
- task_info = self.active_tasks.pop(task_id)
- task_info["end_time"] = time.time()
- task_info["duration"] = task_info["end_time"] - task_info["start_time"]
- task_info["status"] = "completed"
- task_info["result"] = result
- self.completed_tasks[task_id] = task_info
-
- logger.info(f"工作协程 {worker_id} 完成任务: {task_id}, 耗时: {task_info['duration']:.2f}秒")
-
- except Exception as e:
- # 记录任务失败
- logger.error(f"工作协程 {worker_id} 执行任务 {task_id} 失败: {e}", exc_info=True)
-
- async with self._lock:
- if task_id in self.active_tasks:
- task_info = self.active_tasks.pop(task_id)
- task_info["end_time"] = time.time()
- task_info["duration"] = task_info["end_time"] - task_info["start_time"]
- task_info["status"] = "failed"
- task_info["error"] = str(e)
- self.failed_tasks[task_id] = task_info
-
- # 标记任务完成
- self.task_queue.task_done()
-
- except Exception as e:
- logger.error(f"工作协程 {worker_id} 发生异常: {e}", exc_info=True)
-
- logger.info(f"工作协程 {worker_id} 已停止")
-
- async def submit_task(self, task_id: str, task_func: Callable, *args, **kwargs) -> None:
- """
- 提交任务到队列
-
- 参数:
- task_id: 任务ID(通常是 document_id)
- task_func: 任务函数(异步函数)
- *args: 任务函数的位置参数
- **kwargs: 任务函数的关键字参数
- """
- task_info = {
- "task_id": task_id,
- "task_func": task_func,
- "task_args": args,
- "task_kwargs": kwargs,
- "submit_time": time.time()
- }
-
- await self.task_queue.put(task_info)
-
- queue_size = self.task_queue.qsize()
- active_count = len(self.active_tasks)
-
- logger.info(f"任务已提交到队列: {task_id}, 队列长度: {queue_size}, 活跃任务数: {active_count}")
-
- def get_queue_status(self) -> Dict[str, Any]:
- """
- 获取队列状态
-
- 返回:
- 包含队列状态信息的字典
- """
- return {
- "is_running": self.is_running,
- "max_concurrent": self.max_concurrent,
- "queue_size": self.task_queue.qsize(),
- "active_tasks_count": len(self.active_tasks),
- "completed_tasks_count": len(self.completed_tasks),
- "failed_tasks_count": len(self.failed_tasks),
- "active_tasks": list(self.active_tasks.keys()),
- "worker_count": len(self.workers)
- }
-
- def get_task_status(self, task_id: str) -> Dict[str, Any]:
- """
- 获取指定任务的状态
-
- 参数:
- task_id: 任务ID
-
- 返回:
- 任务状态信息,如果任务不存在返回 None
- """
- if task_id in self.active_tasks:
- return {**self.active_tasks[task_id], "status": "running"}
- elif task_id in self.completed_tasks:
- return {**self.completed_tasks[task_id], "status": "completed"}
- elif task_id in self.failed_tasks:
- return {**self.failed_tasks[task_id], "status": "failed"}
- else:
- return {"status": "not_found"}
-
- async def wait_for_completion(self):
- """等待队列中的所有任务完成"""
- await self.task_queue.join()
- logger.info("队列中的所有任务已完成")
-
- async def remove_from_queue(self, task_id: str) -> bool:
- """
- 从等待队列中删除指定任务
-
- 参数:
- task_id: 任务ID
-
- 返回:
- bool: 是否成功删除
- """
- async with self._lock:
- # 获取队列中的所有任务
- temp_tasks = []
- found = False
-
- # 从队列中取出所有任务
- while not self.task_queue.empty():
- try:
- task_info = self.task_queue.get_nowait()
- if task_info["task_id"] == task_id:
- found = True
- logger.info(f"从队列中找到并删除任务: {task_id}")
- self.task_queue.task_done()
- else:
- temp_tasks.append(task_info)
- except:
- break
-
- # 将其他任务放回队列
- for task_info in temp_tasks:
- await self.task_queue.put(task_info)
-
- return found
-
- def clear_history(self):
- """清空已完成和失败任务的历史记录"""
- cleared_completed = len(self.completed_tasks)
- cleared_failed = len(self.failed_tasks)
-
- self.completed_tasks.clear()
- self.failed_tasks.clear()
-
- logger.info(f"已清空历史记录: 完成任务 {cleared_completed} 个, 失败任务 {cleared_failed} 个")
- # 全局队列管理器实例(单例模式)
- _global_queue_manager: UploadQueueManager = None
- def get_queue_manager(max_concurrent: int = 10) -> UploadQueueManager:
- """
- 获取全局队列管理器实例(单例模式)
-
- 参数:
- max_concurrent: 最大并发数,默认10
-
- 返回:
- UploadQueueManager 实例
- """
- global _global_queue_manager
-
- if _global_queue_manager is None:
- _global_queue_manager = UploadQueueManager(max_concurrent)
- logger.info("创建全局队列管理器实例")
-
- return _global_queue_manager
|