moveStalledJobsToWait-8.js 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.moveStalledJobsToWait = void 0;
  4. const content = `--[[
  5. Move stalled jobs to wait.
  6. Input:
  7. KEYS[1] 'stalled' (SET)
  8. KEYS[2] 'wait', (LIST)
  9. KEYS[3] 'active', (LIST)
  10. KEYS[4] 'stalled-check', (KEY)
  11. KEYS[5] 'meta', (KEY)
  12. KEYS[6] 'paused', (LIST)
  13. KEYS[7] 'marker'
  14. KEYS[8] 'event stream' (STREAM)
  15. ARGV[1] Max stalled job count
  16. ARGV[2] queue.toKey('')
  17. ARGV[3] timestamp
  18. ARGV[4] max check time
  19. Events:
  20. 'stalled' with stalled job id.
  21. ]]
  22. local rcall = redis.call
  23. -- Includes
  24. --[[
  25. Function to add job in target list and add marker if needed.
  26. ]]
  27. -- Includes
  28. --[[
  29. Add marker if needed when a job is available.
  30. ]]
  31. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  32. if not isPausedOrMaxed then
  33. rcall("ZADD", markerKey, 0, "0")
  34. end
  35. end
  36. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  37. rcall(pushCmd, targetKey, jobId)
  38. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  39. end
  40. --[[
  41. Function to loop in batches.
  42. Just a bit of warning, some commands as ZREM
  43. could receive a maximum of 7000 parameters per call.
  44. ]]
  45. local function batches(n, batchSize)
  46. local i = 0
  47. return function()
  48. local from = i * batchSize + 1
  49. i = i + 1
  50. if (from <= n) then
  51. local to = math.min(from + batchSize - 1, n)
  52. return from, to
  53. end
  54. end
  55. end
  56. --[[
  57. Function to move job to wait to be picked up by a waiting worker.
  58. ]]
  59. -- Includes
  60. --[[
  61. Function to check for the meta.paused key to decide if we are paused or not
  62. (since an empty list and !EXISTS are not really the same).
  63. ]]
  64. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  65. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  66. if queueAttributes[1] then
  67. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  68. else
  69. if queueAttributes[2] then
  70. local activeCount = rcall("LLEN", activeKey)
  71. if activeCount >= tonumber(queueAttributes[2]) then
  72. return waitKey, true, queueAttributes[3], queueAttributes[4]
  73. else
  74. return waitKey, false, queueAttributes[3], queueAttributes[4]
  75. end
  76. end
  77. end
  78. return waitKey, false, queueAttributes[3], queueAttributes[4]
  79. end
  80. local function moveJobToWait(metaKey, activeKey, waitKey, pausedKey, markerKey, eventStreamKey,
  81. jobId, pushCmd)
  82. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  83. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  84. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active')
  85. end
  86. --[[
  87. Function to trim events, default 10000.
  88. ]]
  89. -- Includes
  90. --[[
  91. Function to get max events value or set by default 10000.
  92. ]]
  93. local function getOrSetMaxEvents(metaKey)
  94. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  95. if not maxEvents then
  96. maxEvents = 10000
  97. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  98. end
  99. return maxEvents
  100. end
  101. local function trimEvents(metaKey, eventStreamKey)
  102. local maxEvents = getOrSetMaxEvents(metaKey)
  103. if maxEvents then
  104. rcall("XTRIM", eventStreamKey, "MAXLEN", "~", maxEvents)
  105. else
  106. rcall("XTRIM", eventStreamKey, "MAXLEN", "~", 10000)
  107. end
  108. end
  109. local stalledKey = KEYS[1]
  110. local waitKey = KEYS[2]
  111. local activeKey = KEYS[3]
  112. local stalledCheckKey = KEYS[4]
  113. local metaKey = KEYS[5]
  114. local pausedKey = KEYS[6]
  115. local markerKey = KEYS[7]
  116. local eventStreamKey = KEYS[8]
  117. local maxStalledJobCount = tonumber(ARGV[1])
  118. local queueKeyPrefix = ARGV[2]
  119. local timestamp = ARGV[3]
  120. local maxCheckTime = ARGV[4]
  121. if rcall("EXISTS", stalledCheckKey) == 1 then
  122. return {}
  123. end
  124. rcall("SET", stalledCheckKey, timestamp, "PX", maxCheckTime)
  125. -- Trim events before emiting them to avoid trimming events emitted in this script
  126. trimEvents(metaKey, eventStreamKey)
  127. -- Move all stalled jobs to wait
  128. local stalling = rcall('SMEMBERS', stalledKey)
  129. local stalled = {}
  130. if (#stalling > 0) then
  131. rcall('DEL', stalledKey)
  132. -- Remove from active list
  133. for i, jobId in ipairs(stalling) do
  134. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  135. if string.sub(jobId, 1, 2) == "0:" then
  136. -- If the jobId is a delay marker ID we just remove it.
  137. rcall("LREM", activeKey, 1, jobId)
  138. else
  139. local jobKey = queueKeyPrefix .. jobId
  140. -- Check that the lock is also missing, then we can handle this job as really stalled.
  141. if (rcall("EXISTS", jobKey .. ":lock") == 0) then
  142. -- Remove from the active queue.
  143. local removed = rcall("LREM", activeKey, 1, jobId)
  144. if (removed > 0) then
  145. -- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
  146. local stalledCount = rcall("HINCRBY", jobKey, "stc", 1)
  147. -- Check if this is a repeatable job by looking at job options
  148. local jobOpts = rcall("HGET", jobKey, "opts")
  149. local isRepeatableJob = false
  150. if jobOpts then
  151. local opts = cjson.decode(jobOpts)
  152. if opts and opts["repeat"] then
  153. isRepeatableJob = true
  154. end
  155. end
  156. -- Only fail job if it exceeds stall limit AND is not a repeatable job
  157. if stalledCount > maxStalledJobCount and not isRepeatableJob then
  158. local failedReason = "job stalled more than allowable limit"
  159. rcall("HSET", jobKey, "defa", failedReason)
  160. end
  161. moveJobToWait(metaKey, activeKey, waitKey, pausedKey, markerKey, eventStreamKey, jobId,
  162. "RPUSH")
  163. -- Emit the stalled event
  164. rcall("XADD", eventStreamKey, "*", "event", "stalled", "jobId", jobId)
  165. table.insert(stalled, jobId)
  166. end
  167. end
  168. end
  169. end
  170. end
  171. -- Mark potentially stalled jobs
  172. local active = rcall('LRANGE', activeKey, 0, -1)
  173. if (#active > 0) then
  174. for from, to in batches(#active, 7000) do
  175. rcall('SADD', stalledKey, unpack(active, from, to))
  176. end
  177. end
  178. return stalled
  179. `;
  180. exports.moveStalledJobsToWait = {
  181. name: 'moveStalledJobsToWait',
  182. content,
  183. keys: 8,
  184. };
  185. //# sourceMappingURL=moveStalledJobsToWait-8.js.map