progress_reporter.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. # import threading
  2. # import time
  3. # import random
  4. # import requests
  5. # import json
  6. # from datetime import datetime
  7. # from utils.get_logger import setup_logger
  8. # logger = setup_logger(__name__)
  9. # class ProgressReporter:
  10. # """文件处理进度 - 独立线程防止阻塞"""
  11. # def __init__(self, task_id: str, callback_url: str, estimate_seconds: int = 60, user_id: str = ""):
  12. # """
  13. # 输入:
  14. # task_id: 任务唯一标识
  15. # callback_url: Java中转服务器地址
  16. # estimate_seconds: 预估完成时间(秒) (弃用)
  17. # """
  18. # self.task_id = task_id
  19. # self.callback_url = callback_url
  20. # self.estimate_seconds = estimate_seconds
  21. # self.current_progress = 0
  22. # self.is_completed = False
  23. # self.thread = None
  24. # self.start_time = None
  25. # self.user_id = user_id
  26. # def start(self):
  27. # """启动后台进度任务(独立线程)"""
  28. # self.start_time = datetime.now()
  29. # self.thread = threading.Thread(target=self._progress_loop, daemon=True)
  30. # self.thread.start()
  31. # logger.info(f"[{self.task_id}] 进度任务已启动(独立线程)")
  32. # def complete(self, success: bool = True):
  33. # """
  34. # 标记完成,立即发送最终状态
  35. # 输入:
  36. # success: True=成功(status=1), False=失败(status=2)
  37. # """
  38. # self.is_completed = True
  39. # # 等待线程结束(2秒)
  40. # if self.thread and self.thread.is_alive():
  41. # self.thread.join(timeout=2)
  42. # # 发送最终进度:成功=100%, 失败=当前进度
  43. # final_progress = 100 if success else self.current_progress
  44. # # status: 0=处理中, 1=完成, 2=失败
  45. # message = "success" if success else "error"
  46. # status = "1" if success else "2"
  47. # self._send_progress(final_progress, message, status)
  48. # logger.info(f"[{self.task_id}] 任务完成,状态: {'成功' if success else '失败'}")
  49. # def _progress_loop(self):
  50. # """后台循环任务:随机递增进度到99%"""
  51. # try:
  52. # while not self.is_completed:
  53. # time.sleep(1)
  54. # if self.is_completed:
  55. # break
  56. # # 随机增量
  57. # if self.current_progress < 30:
  58. # increment = random.randint(3, 8) # 前期快
  59. # elif self.current_progress < 60:
  60. # increment = random.randint(2, 5) # 中期中速
  61. # elif self.current_progress < 85:
  62. # increment = random.randint(1, 3) # 后期慢
  63. # else:
  64. # increment = random.randint(0, 1) # 接近完成时很慢
  65. # self.current_progress = min(99, self.current_progress + increment)
  66. # self._send_progress(self.current_progress, "处理中", "0") # 0=处理中
  67. # except Exception as e:
  68. # logger.error(f"[{self.task_id}] 进度任务异常: {e}")
  69. # # 异常时也发送失败通知
  70. # self._send_progress(self.current_progress, "error", "2") # 2=失败
  71. # def _send_progress(self, progress: int, message: str = "", status: str = "0"):
  72. # """
  73. # 发送HTTP请求到Java中转
  74. # 输入:
  75. # progress: 进度百分比 (0-100)
  76. # status: 状态码 (0=处理中, 1=完成, 2=失败)
  77. # """
  78. # payload = {
  79. # "documentId": self.task_id,
  80. # "progress": progress,
  81. # "message": message,
  82. # "status": status,
  83. # "timestamp": datetime.now().isoformat()
  84. # }
  85. # body = {
  86. # "userIds": [int(self.user_id)],
  87. # "message": json.dumps(payload)
  88. # }
  89. # try:
  90. # response = requests.post(
  91. # self.callback_url,
  92. # json=body,
  93. # timeout=5
  94. # )
  95. # if response.status_code == 200:
  96. # logger.debug(f"[{self.task_id}] 进度发送成功: {progress}% status: {status}")
  97. # else:
  98. # logger.warning(f"[{self.task_id}] 进度发送失败: HTTP {response.status_code}")
  99. # except requests.exceptions.Timeout:
  100. # logger.warning(f"[{self.task_id}] 进度发送超时")
  101. # except Exception as e:
  102. # logger.warning(f"[{self.task_id}] 进度发送异常: {e}")
  103. import threading
  104. import time
  105. import random
  106. import requests
  107. import json
  108. from datetime import datetime
  109. from utils.get_logger import setup_logger
  110. logger = setup_logger(__name__)
  111. class ProgressReporter:
  112. """文件处理进度 - 独立线程防止阻塞"""
  113. def __init__(self, task_id: str, callback_url: str, estimate_seconds: int = 60, user_id: str = ""):
  114. """
  115. 输入:
  116. task_id: 任务唯一标识
  117. callback_url: Java 中转服务器地址
  118. estimate_seconds: 预估完成时间(保留兼容性,内部不用)
  119. user_id: 用户 ID,用于 Java 推送
  120. """
  121. self.task_id = task_id
  122. self.callback_url = callback_url
  123. self.estimate_seconds = estimate_seconds
  124. self.user_id = user_id
  125. self.current_progress = 0
  126. self.is_completed = False
  127. self.thread = None
  128. self.start_time = None
  129. def start(self):
  130. """启动后台进度任务(独立线程)"""
  131. self.start_time = datetime.now()
  132. self.thread = threading.Thread(target=self._progress_loop, daemon=True)
  133. self.thread.start()
  134. logger.info(f"[{self.task_id}] 进度任务已启动(线程)")
  135. def complete(self, success: bool = True):
  136. """标记完成,发送最终状态"""
  137. self.is_completed = True
  138. # 等待线程结束(不阻塞主流程)
  139. if self.thread and self.thread.is_alive():
  140. self.thread.join(timeout=2)
  141. final_progress = 100 if success else self.current_progress
  142. message = "success" if success else "error"
  143. status = "1" if success else "2" # 1=成功, 2=失败
  144. self._send_progress(final_progress, message, status)
  145. logger.info(f"[{self.task_id}] 任务完成,状态: {'成功' if success else '失败'}")
  146. # ----------------------------
  147. # 后台持续推送进度的线程逻辑
  148. # ----------------------------
  149. def _progress_loop(self):
  150. try:
  151. while not self.is_completed:
  152. time.sleep(1)
  153. if self.is_completed:
  154. break
  155. # 随机加速段(与你旧版本完全一致)
  156. if self.current_progress < 30:
  157. increment = random.randint(3, 8)
  158. elif self.current_progress < 60:
  159. increment = random.randint(2, 5)
  160. elif self.current_progress < 85:
  161. increment = random.randint(1, 3)
  162. else:
  163. increment = random.randint(0, 1)
  164. self.current_progress = min(99, self.current_progress + increment)
  165. self._send_progress(self.current_progress, "处理中", "0") # 0 = 处理中
  166. except Exception as e:
  167. logger.error(f"[{self.task_id}] 进度任务异常: {e}")
  168. self._send_progress(self.current_progress, "error", "2")
  169. # ----------------------------
  170. # 对接 Java 推送服务
  171. # ----------------------------
  172. def _send_progress(self, progress: int, message: str = "", status: str = "0"):
  173. payload = {
  174. "documentId": self.task_id,
  175. "progress": progress,
  176. "message": message,
  177. "status": status,
  178. "timestamp": datetime.now().isoformat()
  179. }
  180. body = {
  181. "userIds": [self.user_id],
  182. "message": json.dumps(payload)
  183. }
  184. try:
  185. resp = requests.post(
  186. self.callback_url,
  187. json=body,
  188. timeout=5
  189. )
  190. if resp.status_code == 200:
  191. logger.debug(f"[{self.task_id}] 进度发送成功: {progress}% status:{status}")
  192. else:
  193. logger.warning(f"[{self.task_id}] 进度发送失败: HTTP {resp.status_code}")
  194. except requests.exceptions.Timeout:
  195. logger.warning(f"[{self.task_id}] 进度发送超时")
  196. except Exception as e:
  197. logger.warning(f"[{self.task_id}] 进度发送异常: {e}")