moveToActive-11.lua 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. --[[
  2. Move next job to be processed to active, lock it and fetch its data. The job
  3. may be delayed, in that case we need to move it to the delayed set instead.
  4. This operation guarantees that the worker owns the job during the lock
  5. expiration time. The worker is responsible of keeping the lock fresh
  6. so that no other worker picks this job again.
  7. Input:
  8. KEYS[1] wait key
  9. KEYS[2] active key
  10. KEYS[3] prioritized key
  11. KEYS[4] stream events key
  12. KEYS[5] stalled key
  13. -- Rate limiting
  14. KEYS[6] rate limiter key
  15. KEYS[7] delayed key
  16. -- Delayed jobs
  17. KEYS[8] paused key
  18. KEYS[9] meta key
  19. KEYS[10] pc priority counter
  20. -- Marker
  21. KEYS[11] marker key
  22. -- Arguments
  23. ARGV[1] key prefix
  24. ARGV[2] timestamp
  25. ARGV[3] opts
  26. opts - token - lock token
  27. opts - lockDuration
  28. opts - limiter
  29. opts - name - worker name
  30. ]]
  31. local rcall = redis.call
  32. local waitKey = KEYS[1]
  33. local activeKey = KEYS[2]
  34. local eventStreamKey = KEYS[4]
  35. local rateLimiterKey = KEYS[6]
  36. local delayedKey = KEYS[7]
  37. local opts = cmsgpack.unpack(ARGV[3])
  38. -- Includes
  39. --- @include "includes/getNextDelayedTimestamp"
  40. --- @include "includes/getRateLimitTTL"
  41. --- @include "includes/getTargetQueueList"
  42. --- @include "includes/moveJobFromPrioritizedToActive"
  43. --- @include "includes/prepareJobForProcessing"
  44. --- @include "includes/promoteDelayedJobs"
  45. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9],
  46. activeKey, waitKey, KEYS[8])
  47. -- Check if there are delayed jobs that we can move to wait.
  48. local markerKey = KEYS[11]
  49. promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
  50. ARGV[2], KEYS[10], isPausedOrMaxed)
  51. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  52. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  53. -- Check if we are rate limited first.
  54. if expireTime > 0 then return {0, 0, expireTime, 0} end
  55. -- paused or maxed queue
  56. if isPausedOrMaxed then return {0, 0, 0, 0} end
  57. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  58. -- no job ID, try non-blocking move from wait to active
  59. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  60. -- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6.
  61. if jobId and string.sub(jobId, 1, 2) == "0:" then
  62. rcall("LREM", activeKey, 1, jobId)
  63. jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  64. end
  65. if jobId then
  66. return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
  67. maxJobs, limiterDuration, markerKey, opts)
  68. else
  69. jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10])
  70. if jobId then
  71. return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
  72. maxJobs, limiterDuration, markerKey, opts)
  73. end
  74. end
  75. -- Return the timestamp for the next delayed job if any.
  76. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  77. if nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end
  78. return {0, 0, 0, 0}