# import threading # import time # import random # import requests # import json # from datetime import datetime # from utils.get_logger import setup_logger # logger = setup_logger(__name__) # class ProgressReporter: # """文件处理进度 - 独立线程防止阻塞""" # def __init__(self, task_id: str, callback_url: str, estimate_seconds: int = 60, user_id: str = ""): # """ # 输入: # task_id: 任务唯一标识 # callback_url: Java中转服务器地址 # estimate_seconds: 预估完成时间(秒) (弃用) # """ # self.task_id = task_id # self.callback_url = callback_url # self.estimate_seconds = estimate_seconds # self.current_progress = 0 # self.is_completed = False # self.thread = None # self.start_time = None # self.user_id = user_id # def start(self): # """启动后台进度任务(独立线程)""" # self.start_time = datetime.now() # self.thread = threading.Thread(target=self._progress_loop, daemon=True) # self.thread.start() # logger.info(f"[{self.task_id}] 进度任务已启动(独立线程)") # def complete(self, success: bool = True): # """ # 标记完成,立即发送最终状态 # 输入: # success: True=成功(status=1), False=失败(status=2) # """ # self.is_completed = True # # 等待线程结束(2秒) # if self.thread and self.thread.is_alive(): # self.thread.join(timeout=2) # # 发送最终进度:成功=100%, 失败=当前进度 # final_progress = 100 if success else self.current_progress # # status: 0=处理中, 1=完成, 2=失败 # message = "success" if success else "error" # status = "1" if success else "2" # self._send_progress(final_progress, message, status) # logger.info(f"[{self.task_id}] 任务完成,状态: {'成功' if success else '失败'}") # def _progress_loop(self): # """后台循环任务:随机递增进度到99%""" # try: # while not self.is_completed: # time.sleep(1) # if self.is_completed: # break # # 随机增量 # if self.current_progress < 30: # increment = random.randint(3, 8) # 前期快 # elif self.current_progress < 60: # increment = random.randint(2, 5) # 中期中速 # elif self.current_progress < 85: # increment = random.randint(1, 3) # 后期慢 # else: # increment = random.randint(0, 1) # 接近完成时很慢 # self.current_progress = min(99, self.current_progress + increment) # self._send_progress(self.current_progress, "处理中", "0") # 0=处理中 # except Exception as e: # logger.error(f"[{self.task_id}] 进度任务异常: {e}") # # 异常时也发送失败通知 # self._send_progress(self.current_progress, "error", "2") # 2=失败 # def _send_progress(self, progress: int, message: str = "", status: str = "0"): # """ # 发送HTTP请求到Java中转 # 输入: # progress: 进度百分比 (0-100) # status: 状态码 (0=处理中, 1=完成, 2=失败) # """ # payload = { # "documentId": self.task_id, # "progress": progress, # "message": message, # "status": status, # "timestamp": datetime.now().isoformat() # } # body = { # "userIds": [int(self.user_id)], # "message": json.dumps(payload) # } # try: # response = requests.post( # self.callback_url, # json=body, # timeout=5 # ) # if response.status_code == 200: # logger.debug(f"[{self.task_id}] 进度发送成功: {progress}% status: {status}") # else: # logger.warning(f"[{self.task_id}] 进度发送失败: HTTP {response.status_code}") # except requests.exceptions.Timeout: # logger.warning(f"[{self.task_id}] 进度发送超时") # except Exception as e: # logger.warning(f"[{self.task_id}] 进度发送异常: {e}") import threading import time import random import requests import json from datetime import datetime from utils.get_logger import setup_logger logger = setup_logger(__name__) class ProgressReporter: """文件处理进度 - 独立线程防止阻塞""" def __init__(self, task_id: str, callback_url: str, estimate_seconds: int = 60, user_id: str = ""): """ 输入: task_id: 任务唯一标识 callback_url: Java 中转服务器地址 estimate_seconds: 预估完成时间(保留兼容性,内部不用) user_id: 用户 ID,用于 Java 推送 """ self.task_id = task_id self.callback_url = callback_url self.estimate_seconds = estimate_seconds self.user_id = user_id self.current_progress = 0 self.is_completed = False self.thread = None self.start_time = None def start(self): """启动后台进度任务(独立线程)""" self.start_time = datetime.now() self.thread = threading.Thread(target=self._progress_loop, daemon=True) self.thread.start() logger.info(f"[{self.task_id}] 进度任务已启动(线程)") def complete(self, success: bool = True): """标记完成,发送最终状态""" self.is_completed = True # 等待线程结束(不阻塞主流程) if self.thread and self.thread.is_alive(): self.thread.join(timeout=2) final_progress = 100 if success else self.current_progress message = "success" if success else "error" status = "1" if success else "2" # 1=成功, 2=失败 self._send_progress(final_progress, message, status) logger.info(f"[{self.task_id}] 任务完成,状态: {'成功' if success else '失败'}") # ---------------------------- # 后台持续推送进度的线程逻辑 # ---------------------------- def _progress_loop(self): try: while not self.is_completed: time.sleep(1) if self.is_completed: break # 随机加速段(与你旧版本完全一致) if self.current_progress < 30: increment = random.randint(3, 8) elif self.current_progress < 60: increment = random.randint(2, 5) elif self.current_progress < 85: increment = random.randint(1, 3) else: increment = random.randint(0, 1) self.current_progress = min(99, self.current_progress + increment) self._send_progress(self.current_progress, "处理中", "0") # 0 = 处理中 except Exception as e: logger.error(f"[{self.task_id}] 进度任务异常: {e}") self._send_progress(self.current_progress, "error", "2") # ---------------------------- # 对接 Java 推送服务 # ---------------------------- def _send_progress(self, progress: int, message: str = "", status: str = "0"): payload = { "documentId": self.task_id, "progress": progress, "message": message, "status": status, "timestamp": datetime.now().isoformat() } body = { "userIds": [self.user_id], "message": json.dumps(payload) } try: resp = requests.post( self.callback_url, json=body, timeout=5 ) if resp.status_code == 200: logger.debug(f"[{self.task_id}] 进度发送成功: {progress}% status:{status}") else: logger.warning(f"[{self.task_id}] 进度发送失败: HTTP {resp.status_code}") except requests.exceptions.Timeout: logger.warning(f"[{self.task_id}] 进度发送超时") except Exception as e: logger.warning(f"[{self.task_id}] 进度发送异常: {e}")