moveToActive-11.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. const content = `--[[
  2. Move next job to be processed to active, lock it and fetch its data. The job
  3. may be delayed, in that case we need to move it to the delayed set instead.
  4. This operation guarantees that the worker owns the job during the lock
  5. expiration time. The worker is responsible of keeping the lock fresh
  6. so that no other worker picks this job again.
  7. Input:
  8. KEYS[1] wait key
  9. KEYS[2] active key
  10. KEYS[3] prioritized key
  11. KEYS[4] stream events key
  12. KEYS[5] stalled key
  13. -- Rate limiting
  14. KEYS[6] rate limiter key
  15. KEYS[7] delayed key
  16. -- Delayed jobs
  17. KEYS[8] paused key
  18. KEYS[9] meta key
  19. KEYS[10] pc priority counter
  20. -- Marker
  21. KEYS[11] marker key
  22. -- Arguments
  23. ARGV[1] key prefix
  24. ARGV[2] timestamp
  25. ARGV[3] opts
  26. opts - token - lock token
  27. opts - lockDuration
  28. opts - limiter
  29. opts - name - worker name
  30. ]]
  31. local rcall = redis.call
  32. local waitKey = KEYS[1]
  33. local activeKey = KEYS[2]
  34. local eventStreamKey = KEYS[4]
  35. local rateLimiterKey = KEYS[6]
  36. local delayedKey = KEYS[7]
  37. local opts = cmsgpack.unpack(ARGV[3])
  38. -- Includes
  39. --[[
  40. Function to return the next delayed job timestamp.
  41. ]]
  42. local function getNextDelayedTimestamp(delayedKey)
  43. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  44. if #result then
  45. local nextTimestamp = tonumber(result[2])
  46. if nextTimestamp ~= nil then
  47. return nextTimestamp / 0x1000
  48. end
  49. end
  50. end
  51. --[[
  52. Function to get current rate limit ttl.
  53. ]]
  54. local function getRateLimitTTL(maxJobs, rateLimiterKey)
  55. if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
  56. local pttl = rcall("PTTL", rateLimiterKey)
  57. if pttl == 0 then
  58. rcall("DEL", rateLimiterKey)
  59. end
  60. if pttl > 0 then
  61. return pttl
  62. end
  63. end
  64. return 0
  65. end
  66. --[[
  67. Function to check for the meta.paused key to decide if we are paused or not
  68. (since an empty list and !EXISTS are not really the same).
  69. ]]
  70. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  71. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  72. if queueAttributes[1] then
  73. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  74. else
  75. if queueAttributes[2] then
  76. local activeCount = rcall("LLEN", activeKey)
  77. if activeCount >= tonumber(queueAttributes[2]) then
  78. return waitKey, true, queueAttributes[3], queueAttributes[4]
  79. else
  80. return waitKey, false, queueAttributes[3], queueAttributes[4]
  81. end
  82. end
  83. end
  84. return waitKey, false, queueAttributes[3], queueAttributes[4]
  85. end
  86. --[[
  87. Function to move job from prioritized state to active.
  88. ]]
  89. local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey)
  90. local prioritizedJob = rcall("ZPOPMIN", priorityKey)
  91. if #prioritizedJob > 0 then
  92. rcall("LPUSH", activeKey, prioritizedJob[1])
  93. return prioritizedJob[1]
  94. else
  95. rcall("DEL", priorityCounterKey)
  96. end
  97. end
  98. --[[
  99. Function to move job from wait state to active.
  100. Input:
  101. opts - token - lock token
  102. opts - lockDuration
  103. opts - limiter
  104. ]]
  105. -- Includes
  106. --[[
  107. Add marker if needed when a job is available.
  108. ]]
  109. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  110. if not isPausedOrMaxed then
  111. rcall("ZADD", markerKey, 0, "0")
  112. end
  113. end
  114. local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
  115. jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
  116. local jobKey = keyPrefix .. jobId
  117. -- Check if we need to perform rate limiting.
  118. if maxJobs then
  119. local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
  120. if jobCounter == 1 then
  121. local integerDuration = math.floor(math.abs(limiterDuration))
  122. rcall("PEXPIRE", rateLimiterKey, integerDuration)
  123. end
  124. end
  125. -- get a lock
  126. if opts['token'] ~= "0" then
  127. local lockKey = jobKey .. ':lock'
  128. rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
  129. end
  130. local optionalValues = {}
  131. if opts['name'] then
  132. -- Set "processedBy" field to the worker name
  133. table.insert(optionalValues, "pb")
  134. table.insert(optionalValues, opts['name'])
  135. end
  136. rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
  137. rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
  138. rcall("HINCRBY", jobKey, "ats", 1)
  139. addBaseMarkerIfNeeded(markerKey, false)
  140. -- rate limit delay must be 0 in this case to prevent adding more delay
  141. -- when job that is moved to active needs to be processed
  142. return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
  143. end
  144. --[[
  145. Updates the delay set, by moving delayed jobs that should
  146. be processed now to "wait".
  147. Events:
  148. 'waiting'
  149. ]]
  150. -- Includes
  151. --[[
  152. Function to add job in target list and add marker if needed.
  153. ]]
  154. -- Includes
  155. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  156. rcall(pushCmd, targetKey, jobId)
  157. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  158. end
  159. --[[
  160. Function to add job considering priority.
  161. ]]
  162. -- Includes
  163. --[[
  164. Function to get priority score.
  165. ]]
  166. local function getPriorityScore(priority, priorityCounterKey)
  167. local prioCounter = rcall("INCR", priorityCounterKey)
  168. return priority * 0x100000000 + prioCounter % 0x100000000
  169. end
  170. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  171. isPausedOrMaxed)
  172. local score = getPriorityScore(priority, priorityCounterKey)
  173. rcall("ZADD", prioritizedKey, score, jobId)
  174. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  175. end
  176. -- Try to get as much as 1000 jobs at once
  177. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  178. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  179. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  180. if (#jobs > 0) then
  181. rcall("ZREM", delayedKey, unpack(jobs))
  182. for _, jobId in ipairs(jobs) do
  183. local jobKey = prefix .. jobId
  184. local priority =
  185. tonumber(rcall("HGET", jobKey, "priority")) or 0
  186. if priority == 0 then
  187. -- LIFO or FIFO
  188. rcall("LPUSH", targetKey, jobId)
  189. else
  190. local score = getPriorityScore(priority, priorityCounterKey)
  191. rcall("ZADD", prioritizedKey, score, jobId)
  192. end
  193. -- Emit waiting event
  194. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  195. jobId, "prev", "delayed")
  196. rcall("HSET", jobKey, "delay", 0)
  197. end
  198. addBaseMarkerIfNeeded(markerKey, isPaused)
  199. end
  200. end
  201. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9],
  202. activeKey, waitKey, KEYS[8])
  203. -- Check if there are delayed jobs that we can move to wait.
  204. local markerKey = KEYS[11]
  205. promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
  206. ARGV[2], KEYS[10], isPausedOrMaxed)
  207. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  208. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  209. -- Check if we are rate limited first.
  210. if expireTime > 0 then return {0, 0, expireTime, 0} end
  211. -- paused or maxed queue
  212. if isPausedOrMaxed then return {0, 0, 0, 0} end
  213. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  214. -- no job ID, try non-blocking move from wait to active
  215. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  216. -- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6.
  217. if jobId and string.sub(jobId, 1, 2) == "0:" then
  218. rcall("LREM", activeKey, 1, jobId)
  219. jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  220. end
  221. if jobId then
  222. return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
  223. maxJobs, limiterDuration, markerKey, opts)
  224. else
  225. jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10])
  226. if jobId then
  227. return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
  228. maxJobs, limiterDuration, markerKey, opts)
  229. end
  230. end
  231. -- Return the timestamp for the next delayed job if any.
  232. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  233. if nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end
  234. return {0, 0, 0, 0}
  235. `;
  236. export const moveToActive = {
  237. name: 'moveToActive',
  238. content,
  239. keys: 11,
  240. };
  241. //# sourceMappingURL=moveToActive-11.js.map