| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241 |
- # import threading
- # import time
- # import random
- # import requests
- # import json
- # from datetime import datetime
- # from utils.get_logger import setup_logger
- # logger = setup_logger(__name__)
- # class ProgressReporter:
- # """文件处理进度 - 独立线程防止阻塞"""
-
- # def __init__(self, task_id: str, callback_url: str, estimate_seconds: int = 60, user_id: str = ""):
- # """
- # 输入:
- # task_id: 任务唯一标识
- # callback_url: Java中转服务器地址
- # estimate_seconds: 预估完成时间(秒) (弃用)
- # """
- # self.task_id = task_id
- # self.callback_url = callback_url
- # self.estimate_seconds = estimate_seconds
- # self.current_progress = 0
- # self.is_completed = False
- # self.thread = None
- # self.start_time = None
- # self.user_id = user_id
-
- # def start(self):
- # """启动后台进度任务(独立线程)"""
- # self.start_time = datetime.now()
- # self.thread = threading.Thread(target=self._progress_loop, daemon=True)
- # self.thread.start()
- # logger.info(f"[{self.task_id}] 进度任务已启动(独立线程)")
-
- # def complete(self, success: bool = True):
- # """
- # 标记完成,立即发送最终状态
-
- # 输入:
- # success: True=成功(status=1), False=失败(status=2)
- # """
- # self.is_completed = True
-
- # # 等待线程结束(2秒)
- # if self.thread and self.thread.is_alive():
- # self.thread.join(timeout=2)
-
- # # 发送最终进度:成功=100%, 失败=当前进度
- # final_progress = 100 if success else self.current_progress
- # # status: 0=处理中, 1=完成, 2=失败
- # message = "success" if success else "error"
- # status = "1" if success else "2"
- # self._send_progress(final_progress, message, status)
- # logger.info(f"[{self.task_id}] 任务完成,状态: {'成功' if success else '失败'}")
-
- # def _progress_loop(self):
- # """后台循环任务:随机递增进度到99%"""
- # try:
- # while not self.is_completed:
- # time.sleep(1)
-
- # if self.is_completed:
- # break
-
- # # 随机增量
- # if self.current_progress < 30:
- # increment = random.randint(3, 8) # 前期快
- # elif self.current_progress < 60:
- # increment = random.randint(2, 5) # 中期中速
- # elif self.current_progress < 85:
- # increment = random.randint(1, 3) # 后期慢
- # else:
- # increment = random.randint(0, 1) # 接近完成时很慢
-
- # self.current_progress = min(99, self.current_progress + increment)
- # self._send_progress(self.current_progress, "处理中", "0") # 0=处理中
-
- # except Exception as e:
- # logger.error(f"[{self.task_id}] 进度任务异常: {e}")
- # # 异常时也发送失败通知
- # self._send_progress(self.current_progress, "error", "2") # 2=失败
-
- # def _send_progress(self, progress: int, message: str = "", status: str = "0"):
- # """
- # 发送HTTP请求到Java中转
-
- # 输入:
- # progress: 进度百分比 (0-100)
- # status: 状态码 (0=处理中, 1=完成, 2=失败)
- # """
- # payload = {
- # "documentId": self.task_id,
- # "progress": progress,
- # "message": message,
- # "status": status,
- # "timestamp": datetime.now().isoformat()
- # }
- # body = {
- # "userIds": [int(self.user_id)],
- # "message": json.dumps(payload)
- # }
-
- # try:
- # response = requests.post(
- # self.callback_url,
- # json=body,
- # timeout=5
- # )
- # if response.status_code == 200:
- # logger.debug(f"[{self.task_id}] 进度发送成功: {progress}% status: {status}")
- # else:
- # logger.warning(f"[{self.task_id}] 进度发送失败: HTTP {response.status_code}")
- # except requests.exceptions.Timeout:
- # logger.warning(f"[{self.task_id}] 进度发送超时")
- # except Exception as e:
- # logger.warning(f"[{self.task_id}] 进度发送异常: {e}")
- import threading
- import time
- import random
- import requests
- import json
- from datetime import datetime
- from utils.get_logger import setup_logger
- logger = setup_logger(__name__)
- class ProgressReporter:
- """文件处理进度 - 独立线程防止阻塞"""
- def __init__(self, task_id: str, callback_url: str, estimate_seconds: int = 60, user_id: str = ""):
- """
- 输入:
- task_id: 任务唯一标识
- callback_url: Java 中转服务器地址
- estimate_seconds: 预估完成时间(保留兼容性,内部不用)
- user_id: 用户 ID,用于 Java 推送
- """
- self.task_id = task_id
- self.callback_url = callback_url
- self.estimate_seconds = estimate_seconds
- self.user_id = user_id
- self.current_progress = 0
- self.is_completed = False
- self.thread = None
- self.start_time = None
- def start(self):
- """启动后台进度任务(独立线程)"""
- self.start_time = datetime.now()
- self.thread = threading.Thread(target=self._progress_loop, daemon=True)
- self.thread.start()
- logger.info(f"[{self.task_id}] 进度任务已启动(线程)")
- def complete(self, success: bool = True):
- """标记完成,发送最终状态"""
- self.is_completed = True
- # 等待线程结束(不阻塞主流程)
- if self.thread and self.thread.is_alive():
- self.thread.join(timeout=2)
- final_progress = 100 if success else self.current_progress
- message = "success" if success else "error"
- status = "1" if success else "2" # 1=成功, 2=失败
- self._send_progress(final_progress, message, status)
- logger.info(f"[{self.task_id}] 任务完成,状态: {'成功' if success else '失败'}")
- # ----------------------------
- # 后台持续推送进度的线程逻辑
- # ----------------------------
- def _progress_loop(self):
- try:
- while not self.is_completed:
- time.sleep(1)
- if self.is_completed:
- break
- # 随机加速段(与你旧版本完全一致)
- if self.current_progress < 30:
- increment = random.randint(3, 8)
- elif self.current_progress < 60:
- increment = random.randint(2, 5)
- elif self.current_progress < 85:
- increment = random.randint(1, 3)
- else:
- increment = random.randint(0, 1)
- self.current_progress = min(99, self.current_progress + increment)
- self._send_progress(self.current_progress, "处理中", "0") # 0 = 处理中
- except Exception as e:
- logger.error(f"[{self.task_id}] 进度任务异常: {e}")
- self._send_progress(self.current_progress, "error", "2")
- # ----------------------------
- # 对接 Java 推送服务
- # ----------------------------
- def _send_progress(self, progress: int, message: str = "", status: str = "0"):
- payload = {
- "documentId": self.task_id,
- "progress": progress,
- "message": message,
- "status": status,
- "timestamp": datetime.now().isoformat()
- }
- body = {
- "userIds": [self.user_id],
- "message": json.dumps(payload)
- }
- try:
- resp = requests.post(
- self.callback_url,
- json=body,
- timeout=5
- )
- if resp.status_code == 200:
- logger.debug(f"[{self.task_id}] 进度发送成功: {progress}% status:{status}")
- else:
- logger.warning(f"[{self.task_id}] 进度发送失败: HTTP {resp.status_code}")
- except requests.exceptions.Timeout:
- logger.warning(f"[{self.task_id}] 进度发送超时")
- except Exception as e:
- logger.warning(f"[{self.task_id}] 进度发送异常: {e}")
|