| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- --[[
- Function to fetch the next job to process.
- Tries to get the next job to avoid an extra roundtrip if the queue is
- not closing and not rate limited.
- Input:
- waitKey - wait list key
- activeKey - active list key
- prioritizedKey - prioritized sorted set key
- eventStreamKey - event stream key
- rateLimiterKey - rate limiter key
- delayedKey - delayed sorted set key
- pausedKey - paused list key
- metaKey - meta hash key
- pcKey - priority counter key
- markerKey - marker key
- prefix - keys prefix
- timestamp - current timestamp
- opts - options table:
- token (required) - lock token used when locking jobs
- lockDuration (required) - lock duration for acquired jobs
- limiter (optional) - rate limiter options table (e.g. { max = number })
- ]]
- -- Includes
- --- @include "getNextDelayedTimestamp"
- --- @include "getRateLimitTTL"
- --- @include "getTargetQueueList"
- --- @include "moveJobFromPrioritizedToActive"
- --- @include "prepareJobForProcessing"
- --- @include "promoteDelayedJobs"
- local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey,
- rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix,
- timestamp, opts)
- local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration =
- getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
- -- Check if there are delayed jobs that can be promoted
- promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey,
- eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed)
- local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
- -- Check if we are rate limited first.
- local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
- if expireTime > 0 then
- return {0, 0, expireTime, 0}
- end
- -- paused or maxed queue
- if isPausedOrMaxed then
- return {0, 0, 0, 0}
- end
- local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
- local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
- if jobId then
- -- Markers in waitlist DEPRECATED in v5: Remove in v6.
- if string.sub(jobId, 1, 2) == "0:" then
- rcall("LREM", activeKey, 1, jobId)
- -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
- -- but if ID is 0:0, then there is at least 1 prioritized job to process
- if jobId == "0:0" then
- jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
- return prepareJobForProcessing(prefix, rateLimiterKey,
- eventStreamKey, jobId, timestamp, maxJobs,
- limiterDuration, markerKey, opts)
- end
- else
- return prepareJobForProcessing(prefix, rateLimiterKey,
- eventStreamKey, jobId, timestamp, maxJobs,
- limiterDuration, markerKey, opts)
- end
- else
- jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
- if jobId then
- return prepareJobForProcessing(prefix, rateLimiterKey,
- eventStreamKey, jobId, timestamp, maxJobs,
- limiterDuration, markerKey, opts)
- end
- end
- -- Return the timestamp for the next delayed job if any.
- local nextTimestamp = getNextDelayedTimestamp(delayedKey)
- if nextTimestamp ~= nil then
- -- The result is guaranteed to be positive, since the
- -- ZRANGEBYSCORE command would have return a job otherwise.
- return {0, 0, 0, nextTimestamp}
- end
- end
|