retryJob-11.js 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.retryJob = void 0;
  4. const content = `--[[
  5. Retries a failed job by moving it back to the wait queue.
  6. Input:
  7. KEYS[1] 'active',
  8. KEYS[2] 'wait'
  9. KEYS[3] 'paused'
  10. KEYS[4] job key
  11. KEYS[5] 'meta'
  12. KEYS[6] events stream
  13. KEYS[7] delayed key
  14. KEYS[8] prioritized key
  15. KEYS[9] 'pc' priority counter
  16. KEYS[10] 'marker'
  17. KEYS[11] 'stalled'
  18. ARGV[1] key prefix
  19. ARGV[2] timestamp
  20. ARGV[3] pushCmd
  21. ARGV[4] jobId
  22. ARGV[5] token
  23. ARGV[6] optional job fields to update
  24. Events:
  25. 'waiting'
  26. Output:
  27. 0 - OK
  28. -1 - Missing key
  29. -2 - Missing lock
  30. -3 - Job not in active set
  31. ]]
  32. local rcall = redis.call
  33. -- Includes
  34. --[[
  35. Function to add job in target list and add marker if needed.
  36. ]]
  37. -- Includes
  38. --[[
  39. Add marker if needed when a job is available.
  40. ]]
  41. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  42. if not isPausedOrMaxed then
  43. rcall("ZADD", markerKey, 0, "0")
  44. end
  45. end
  46. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  47. rcall(pushCmd, targetKey, jobId)
  48. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  49. end
  50. --[[
  51. Function to add job considering priority.
  52. ]]
  53. -- Includes
  54. --[[
  55. Function to get priority score.
  56. ]]
  57. local function getPriorityScore(priority, priorityCounterKey)
  58. local prioCounter = rcall("INCR", priorityCounterKey)
  59. return priority * 0x100000000 + prioCounter % 0x100000000
  60. end
  61. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  62. isPausedOrMaxed)
  63. local score = getPriorityScore(priority, priorityCounterKey)
  64. rcall("ZADD", prioritizedKey, score, jobId)
  65. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  66. end
  67. --[[
  68. Function to get max events value or set by default 10000.
  69. ]]
  70. local function getOrSetMaxEvents(metaKey)
  71. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  72. if not maxEvents then
  73. maxEvents = 10000
  74. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  75. end
  76. return maxEvents
  77. end
  78. --[[
  79. Function to check for the meta.paused key to decide if we are paused or not
  80. (since an empty list and !EXISTS are not really the same).
  81. ]]
  82. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  83. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  84. if queueAttributes[1] then
  85. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  86. else
  87. if queueAttributes[2] then
  88. local activeCount = rcall("LLEN", activeKey)
  89. if activeCount >= tonumber(queueAttributes[2]) then
  90. return waitKey, true, queueAttributes[3], queueAttributes[4]
  91. else
  92. return waitKey, false, queueAttributes[3], queueAttributes[4]
  93. end
  94. end
  95. end
  96. return waitKey, false, queueAttributes[3], queueAttributes[4]
  97. end
  98. --[[
  99. Function to check if queue is paused or maxed
  100. (since an empty list and !EXISTS are not really the same).
  101. ]]
  102. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  103. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  104. if queueAttributes[1] then
  105. return true
  106. else
  107. if queueAttributes[2] then
  108. local activeCount = rcall("LLEN", activeKey)
  109. return activeCount >= tonumber(queueAttributes[2])
  110. end
  111. end
  112. return false
  113. end
  114. --[[
  115. Updates the delay set, by moving delayed jobs that should
  116. be processed now to "wait".
  117. Events:
  118. 'waiting'
  119. ]]
  120. -- Includes
  121. -- Try to get as much as 1000 jobs at once
  122. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  123. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  124. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  125. if (#jobs > 0) then
  126. rcall("ZREM", delayedKey, unpack(jobs))
  127. for _, jobId in ipairs(jobs) do
  128. local jobKey = prefix .. jobId
  129. local priority =
  130. tonumber(rcall("HGET", jobKey, "priority")) or 0
  131. if priority == 0 then
  132. -- LIFO or FIFO
  133. rcall("LPUSH", targetKey, jobId)
  134. else
  135. local score = getPriorityScore(priority, priorityCounterKey)
  136. rcall("ZADD", prioritizedKey, score, jobId)
  137. end
  138. -- Emit waiting event
  139. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  140. jobId, "prev", "delayed")
  141. rcall("HSET", jobKey, "delay", 0)
  142. end
  143. addBaseMarkerIfNeeded(markerKey, isPaused)
  144. end
  145. end
  146. local function removeLock(jobKey, stalledKey, token, jobId)
  147. if token ~= "0" then
  148. local lockKey = jobKey .. ':lock'
  149. local lockToken = rcall("GET", lockKey)
  150. if lockToken == token then
  151. rcall("DEL", lockKey)
  152. rcall("SREM", stalledKey, jobId)
  153. else
  154. if lockToken then
  155. -- Lock exists but token does not match
  156. return -6
  157. else
  158. -- Lock is missing completely
  159. return -2
  160. end
  161. end
  162. end
  163. return 0
  164. end
  165. --[[
  166. Function to update a bunch of fields in a job.
  167. ]]
  168. local function updateJobFields(jobKey, msgpackedFields)
  169. if msgpackedFields and #msgpackedFields > 0 then
  170. local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
  171. if fieldsToUpdate then
  172. rcall("HMSET", jobKey, unpack(fieldsToUpdate))
  173. end
  174. end
  175. end
  176. local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3])
  177. local markerKey = KEYS[10]
  178. -- Check if there are delayed jobs that we can move to wait.
  179. -- test example: when there are delayed jobs between retries
  180. promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed)
  181. local jobKey = KEYS[4]
  182. if rcall("EXISTS", jobKey) == 1 then
  183. local errorCode = removeLock(jobKey, KEYS[11], ARGV[5], ARGV[4])
  184. if errorCode < 0 then
  185. return errorCode
  186. end
  187. updateJobFields(jobKey, ARGV[6])
  188. local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[4])
  189. if (numRemovedElements < 1) then return -3 end
  190. local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0
  191. --need to re-evaluate after removing job from active
  192. isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1])
  193. -- Standard or priority add
  194. if priority == 0 then
  195. addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4])
  196. else
  197. addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed)
  198. end
  199. rcall("HINCRBY", jobKey, "atm", 1)
  200. local maxEvents = getOrSetMaxEvents(KEYS[5])
  201. -- Emit waiting event
  202. rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  203. "jobId", ARGV[4], "prev", "active")
  204. return 0
  205. else
  206. return -1
  207. end
  208. `;
  209. exports.retryJob = {
  210. name: 'retryJob',
  211. content,
  212. keys: 11,
  213. };
  214. //# sourceMappingURL=retryJob-11.js.map