fetchNextJob.lua 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. --[[
  2. Function to fetch the next job to process.
  3. Tries to get the next job to avoid an extra roundtrip if the queue is
  4. not closing and not rate limited.
  5. Input:
  6. waitKey - wait list key
  7. activeKey - active list key
  8. prioritizedKey - prioritized sorted set key
  9. eventStreamKey - event stream key
  10. rateLimiterKey - rate limiter key
  11. delayedKey - delayed sorted set key
  12. pausedKey - paused list key
  13. metaKey - meta hash key
  14. pcKey - priority counter key
  15. markerKey - marker key
  16. prefix - keys prefix
  17. timestamp - current timestamp
  18. opts - options table:
  19. token (required) - lock token used when locking jobs
  20. lockDuration (required) - lock duration for acquired jobs
  21. limiter (optional) - rate limiter options table (e.g. { max = number })
  22. ]]
  23. -- Includes
  24. --- @include "getNextDelayedTimestamp"
  25. --- @include "getRateLimitTTL"
  26. --- @include "getTargetQueueList"
  27. --- @include "moveJobFromPrioritizedToActive"
  28. --- @include "prepareJobForProcessing"
  29. --- @include "promoteDelayedJobs"
  30. local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey,
  31. rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix,
  32. timestamp, opts)
  33. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration =
  34. getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  35. -- Check if there are delayed jobs that can be promoted
  36. promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey,
  37. eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed)
  38. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  39. -- Check if we are rate limited first.
  40. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  41. if expireTime > 0 then
  42. return {0, 0, expireTime, 0}
  43. end
  44. -- paused or maxed queue
  45. if isPausedOrMaxed then
  46. return {0, 0, 0, 0}
  47. end
  48. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  49. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  50. if jobId then
  51. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  52. if string.sub(jobId, 1, 2) == "0:" then
  53. rcall("LREM", activeKey, 1, jobId)
  54. -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
  55. -- but if ID is 0:0, then there is at least 1 prioritized job to process
  56. if jobId == "0:0" then
  57. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  58. return prepareJobForProcessing(prefix, rateLimiterKey,
  59. eventStreamKey, jobId, timestamp, maxJobs,
  60. limiterDuration, markerKey, opts)
  61. end
  62. else
  63. return prepareJobForProcessing(prefix, rateLimiterKey,
  64. eventStreamKey, jobId, timestamp, maxJobs,
  65. limiterDuration, markerKey, opts)
  66. end
  67. else
  68. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  69. if jobId then
  70. return prepareJobForProcessing(prefix, rateLimiterKey,
  71. eventStreamKey, jobId, timestamp, maxJobs,
  72. limiterDuration, markerKey, opts)
  73. end
  74. end
  75. -- Return the timestamp for the next delayed job if any.
  76. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  77. if nextTimestamp ~= nil then
  78. -- The result is guaranteed to be positive, since the
  79. -- ZRANGEBYSCORE command would have return a job otherwise.
  80. return {0, 0, 0, nextTimestamp}
  81. end
  82. end