retryJob-11.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. const content = `--[[
  2. Retries a failed job by moving it back to the wait queue.
  3. Input:
  4. KEYS[1] 'active',
  5. KEYS[2] 'wait'
  6. KEYS[3] 'paused'
  7. KEYS[4] job key
  8. KEYS[5] 'meta'
  9. KEYS[6] events stream
  10. KEYS[7] delayed key
  11. KEYS[8] prioritized key
  12. KEYS[9] 'pc' priority counter
  13. KEYS[10] 'marker'
  14. KEYS[11] 'stalled'
  15. ARGV[1] key prefix
  16. ARGV[2] timestamp
  17. ARGV[3] pushCmd
  18. ARGV[4] jobId
  19. ARGV[5] token
  20. ARGV[6] optional job fields to update
  21. Events:
  22. 'waiting'
  23. Output:
  24. 0 - OK
  25. -1 - Missing key
  26. -2 - Missing lock
  27. -3 - Job not in active set
  28. ]]
  29. local rcall = redis.call
  30. -- Includes
  31. --[[
  32. Function to add job in target list and add marker if needed.
  33. ]]
  34. -- Includes
  35. --[[
  36. Add marker if needed when a job is available.
  37. ]]
  38. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  39. if not isPausedOrMaxed then
  40. rcall("ZADD", markerKey, 0, "0")
  41. end
  42. end
  43. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  44. rcall(pushCmd, targetKey, jobId)
  45. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  46. end
  47. --[[
  48. Function to add job considering priority.
  49. ]]
  50. -- Includes
  51. --[[
  52. Function to get priority score.
  53. ]]
  54. local function getPriorityScore(priority, priorityCounterKey)
  55. local prioCounter = rcall("INCR", priorityCounterKey)
  56. return priority * 0x100000000 + prioCounter % 0x100000000
  57. end
  58. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  59. isPausedOrMaxed)
  60. local score = getPriorityScore(priority, priorityCounterKey)
  61. rcall("ZADD", prioritizedKey, score, jobId)
  62. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  63. end
  64. --[[
  65. Function to get max events value or set by default 10000.
  66. ]]
  67. local function getOrSetMaxEvents(metaKey)
  68. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  69. if not maxEvents then
  70. maxEvents = 10000
  71. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  72. end
  73. return maxEvents
  74. end
  75. --[[
  76. Function to check for the meta.paused key to decide if we are paused or not
  77. (since an empty list and !EXISTS are not really the same).
  78. ]]
  79. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  80. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  81. if queueAttributes[1] then
  82. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  83. else
  84. if queueAttributes[2] then
  85. local activeCount = rcall("LLEN", activeKey)
  86. if activeCount >= tonumber(queueAttributes[2]) then
  87. return waitKey, true, queueAttributes[3], queueAttributes[4]
  88. else
  89. return waitKey, false, queueAttributes[3], queueAttributes[4]
  90. end
  91. end
  92. end
  93. return waitKey, false, queueAttributes[3], queueAttributes[4]
  94. end
  95. --[[
  96. Function to check if queue is paused or maxed
  97. (since an empty list and !EXISTS are not really the same).
  98. ]]
  99. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  100. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  101. if queueAttributes[1] then
  102. return true
  103. else
  104. if queueAttributes[2] then
  105. local activeCount = rcall("LLEN", activeKey)
  106. return activeCount >= tonumber(queueAttributes[2])
  107. end
  108. end
  109. return false
  110. end
  111. --[[
  112. Updates the delay set, by moving delayed jobs that should
  113. be processed now to "wait".
  114. Events:
  115. 'waiting'
  116. ]]
  117. -- Includes
  118. -- Try to get as much as 1000 jobs at once
  119. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  120. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  121. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  122. if (#jobs > 0) then
  123. rcall("ZREM", delayedKey, unpack(jobs))
  124. for _, jobId in ipairs(jobs) do
  125. local jobKey = prefix .. jobId
  126. local priority =
  127. tonumber(rcall("HGET", jobKey, "priority")) or 0
  128. if priority == 0 then
  129. -- LIFO or FIFO
  130. rcall("LPUSH", targetKey, jobId)
  131. else
  132. local score = getPriorityScore(priority, priorityCounterKey)
  133. rcall("ZADD", prioritizedKey, score, jobId)
  134. end
  135. -- Emit waiting event
  136. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  137. jobId, "prev", "delayed")
  138. rcall("HSET", jobKey, "delay", 0)
  139. end
  140. addBaseMarkerIfNeeded(markerKey, isPaused)
  141. end
  142. end
  143. local function removeLock(jobKey, stalledKey, token, jobId)
  144. if token ~= "0" then
  145. local lockKey = jobKey .. ':lock'
  146. local lockToken = rcall("GET", lockKey)
  147. if lockToken == token then
  148. rcall("DEL", lockKey)
  149. rcall("SREM", stalledKey, jobId)
  150. else
  151. if lockToken then
  152. -- Lock exists but token does not match
  153. return -6
  154. else
  155. -- Lock is missing completely
  156. return -2
  157. end
  158. end
  159. end
  160. return 0
  161. end
  162. --[[
  163. Function to update a bunch of fields in a job.
  164. ]]
  165. local function updateJobFields(jobKey, msgpackedFields)
  166. if msgpackedFields and #msgpackedFields > 0 then
  167. local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
  168. if fieldsToUpdate then
  169. rcall("HMSET", jobKey, unpack(fieldsToUpdate))
  170. end
  171. end
  172. end
  173. local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3])
  174. local markerKey = KEYS[10]
  175. -- Check if there are delayed jobs that we can move to wait.
  176. -- test example: when there are delayed jobs between retries
  177. promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed)
  178. local jobKey = KEYS[4]
  179. if rcall("EXISTS", jobKey) == 1 then
  180. local errorCode = removeLock(jobKey, KEYS[11], ARGV[5], ARGV[4])
  181. if errorCode < 0 then
  182. return errorCode
  183. end
  184. updateJobFields(jobKey, ARGV[6])
  185. local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[4])
  186. if (numRemovedElements < 1) then return -3 end
  187. local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0
  188. --need to re-evaluate after removing job from active
  189. isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1])
  190. -- Standard or priority add
  191. if priority == 0 then
  192. addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4])
  193. else
  194. addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed)
  195. end
  196. rcall("HINCRBY", jobKey, "atm", 1)
  197. local maxEvents = getOrSetMaxEvents(KEYS[5])
  198. -- Emit waiting event
  199. rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  200. "jobId", ARGV[4], "prev", "active")
  201. return 0
  202. else
  203. return -1
  204. end
  205. `;
  206. export const retryJob = {
  207. name: 'retryJob',
  208. content,
  209. keys: 11,
  210. };
  211. //# sourceMappingURL=retryJob-11.js.map