moveStalledJobsToWait-8.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. const content = `--[[
  2. Move stalled jobs to wait.
  3. Input:
  4. KEYS[1] 'stalled' (SET)
  5. KEYS[2] 'wait', (LIST)
  6. KEYS[3] 'active', (LIST)
  7. KEYS[4] 'stalled-check', (KEY)
  8. KEYS[5] 'meta', (KEY)
  9. KEYS[6] 'paused', (LIST)
  10. KEYS[7] 'marker'
  11. KEYS[8] 'event stream' (STREAM)
  12. ARGV[1] Max stalled job count
  13. ARGV[2] queue.toKey('')
  14. ARGV[3] timestamp
  15. ARGV[4] max check time
  16. Events:
  17. 'stalled' with stalled job id.
  18. ]]
  19. local rcall = redis.call
  20. -- Includes
  21. --[[
  22. Function to add job in target list and add marker if needed.
  23. ]]
  24. -- Includes
  25. --[[
  26. Add marker if needed when a job is available.
  27. ]]
  28. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  29. if not isPausedOrMaxed then
  30. rcall("ZADD", markerKey, 0, "0")
  31. end
  32. end
  33. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  34. rcall(pushCmd, targetKey, jobId)
  35. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  36. end
  37. --[[
  38. Function to loop in batches.
  39. Just a bit of warning, some commands as ZREM
  40. could receive a maximum of 7000 parameters per call.
  41. ]]
  42. local function batches(n, batchSize)
  43. local i = 0
  44. return function()
  45. local from = i * batchSize + 1
  46. i = i + 1
  47. if (from <= n) then
  48. local to = math.min(from + batchSize - 1, n)
  49. return from, to
  50. end
  51. end
  52. end
  53. --[[
  54. Function to move job to wait to be picked up by a waiting worker.
  55. ]]
  56. -- Includes
  57. --[[
  58. Function to check for the meta.paused key to decide if we are paused or not
  59. (since an empty list and !EXISTS are not really the same).
  60. ]]
  61. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  62. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  63. if queueAttributes[1] then
  64. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  65. else
  66. if queueAttributes[2] then
  67. local activeCount = rcall("LLEN", activeKey)
  68. if activeCount >= tonumber(queueAttributes[2]) then
  69. return waitKey, true, queueAttributes[3], queueAttributes[4]
  70. else
  71. return waitKey, false, queueAttributes[3], queueAttributes[4]
  72. end
  73. end
  74. end
  75. return waitKey, false, queueAttributes[3], queueAttributes[4]
  76. end
  77. local function moveJobToWait(metaKey, activeKey, waitKey, pausedKey, markerKey, eventStreamKey,
  78. jobId, pushCmd)
  79. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  80. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  81. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active')
  82. end
  83. --[[
  84. Function to trim events, default 10000.
  85. ]]
  86. -- Includes
  87. --[[
  88. Function to get max events value or set by default 10000.
  89. ]]
  90. local function getOrSetMaxEvents(metaKey)
  91. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  92. if not maxEvents then
  93. maxEvents = 10000
  94. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  95. end
  96. return maxEvents
  97. end
  98. local function trimEvents(metaKey, eventStreamKey)
  99. local maxEvents = getOrSetMaxEvents(metaKey)
  100. if maxEvents then
  101. rcall("XTRIM", eventStreamKey, "MAXLEN", "~", maxEvents)
  102. else
  103. rcall("XTRIM", eventStreamKey, "MAXLEN", "~", 10000)
  104. end
  105. end
  106. local stalledKey = KEYS[1]
  107. local waitKey = KEYS[2]
  108. local activeKey = KEYS[3]
  109. local stalledCheckKey = KEYS[4]
  110. local metaKey = KEYS[5]
  111. local pausedKey = KEYS[6]
  112. local markerKey = KEYS[7]
  113. local eventStreamKey = KEYS[8]
  114. local maxStalledJobCount = tonumber(ARGV[1])
  115. local queueKeyPrefix = ARGV[2]
  116. local timestamp = ARGV[3]
  117. local maxCheckTime = ARGV[4]
  118. if rcall("EXISTS", stalledCheckKey) == 1 then
  119. return {}
  120. end
  121. rcall("SET", stalledCheckKey, timestamp, "PX", maxCheckTime)
  122. -- Trim events before emiting them to avoid trimming events emitted in this script
  123. trimEvents(metaKey, eventStreamKey)
  124. -- Move all stalled jobs to wait
  125. local stalling = rcall('SMEMBERS', stalledKey)
  126. local stalled = {}
  127. if (#stalling > 0) then
  128. rcall('DEL', stalledKey)
  129. -- Remove from active list
  130. for i, jobId in ipairs(stalling) do
  131. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  132. if string.sub(jobId, 1, 2) == "0:" then
  133. -- If the jobId is a delay marker ID we just remove it.
  134. rcall("LREM", activeKey, 1, jobId)
  135. else
  136. local jobKey = queueKeyPrefix .. jobId
  137. -- Check that the lock is also missing, then we can handle this job as really stalled.
  138. if (rcall("EXISTS", jobKey .. ":lock") == 0) then
  139. -- Remove from the active queue.
  140. local removed = rcall("LREM", activeKey, 1, jobId)
  141. if (removed > 0) then
  142. -- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
  143. local stalledCount = rcall("HINCRBY", jobKey, "stc", 1)
  144. -- Check if this is a repeatable job by looking at job options
  145. local jobOpts = rcall("HGET", jobKey, "opts")
  146. local isRepeatableJob = false
  147. if jobOpts then
  148. local opts = cjson.decode(jobOpts)
  149. if opts and opts["repeat"] then
  150. isRepeatableJob = true
  151. end
  152. end
  153. -- Only fail job if it exceeds stall limit AND is not a repeatable job
  154. if stalledCount > maxStalledJobCount and not isRepeatableJob then
  155. local failedReason = "job stalled more than allowable limit"
  156. rcall("HSET", jobKey, "defa", failedReason)
  157. end
  158. moveJobToWait(metaKey, activeKey, waitKey, pausedKey, markerKey, eventStreamKey, jobId,
  159. "RPUSH")
  160. -- Emit the stalled event
  161. rcall("XADD", eventStreamKey, "*", "event", "stalled", "jobId", jobId)
  162. table.insert(stalled, jobId)
  163. end
  164. end
  165. end
  166. end
  167. end
  168. -- Mark potentially stalled jobs
  169. local active = rcall('LRANGE', activeKey, 0, -1)
  170. if (#active > 0) then
  171. for from, to in batches(#active, 7000) do
  172. rcall('SADD', stalledKey, unpack(active, from, to))
  173. end
  174. end
  175. return stalled
  176. `;
  177. export const moveStalledJobsToWait = {
  178. name: 'moveStalledJobsToWait',
  179. content,
  180. keys: 8,
  181. };
  182. //# sourceMappingURL=moveStalledJobsToWait-8.js.map