| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- --[[
- Move next job to be processed to active, lock it and fetch its data. The job
- may be delayed, in that case we need to move it to the delayed set instead.
- This operation guarantees that the worker owns the job during the lock
- expiration time. The worker is responsible of keeping the lock fresh
- so that no other worker picks this job again.
- Input:
- KEYS[1] wait key
- KEYS[2] active key
- KEYS[3] prioritized key
- KEYS[4] stream events key
- KEYS[5] stalled key
- -- Rate limiting
- KEYS[6] rate limiter key
- KEYS[7] delayed key
- -- Delayed jobs
- KEYS[8] paused key
- KEYS[9] meta key
- KEYS[10] pc priority counter
- -- Marker
- KEYS[11] marker key
- -- Arguments
- ARGV[1] key prefix
- ARGV[2] timestamp
- ARGV[3] opts
- opts - token - lock token
- opts - lockDuration
- opts - limiter
- opts - name - worker name
- ]]
- local rcall = redis.call
- local waitKey = KEYS[1]
- local activeKey = KEYS[2]
- local eventStreamKey = KEYS[4]
- local rateLimiterKey = KEYS[6]
- local delayedKey = KEYS[7]
- local opts = cmsgpack.unpack(ARGV[3])
- -- Includes
- --- @include "includes/getNextDelayedTimestamp"
- --- @include "includes/getRateLimitTTL"
- --- @include "includes/getTargetQueueList"
- --- @include "includes/moveJobFromPrioritizedToActive"
- --- @include "includes/prepareJobForProcessing"
- --- @include "includes/promoteDelayedJobs"
- local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9],
- activeKey, waitKey, KEYS[8])
- -- Check if there are delayed jobs that we can move to wait.
- local markerKey = KEYS[11]
- promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
- ARGV[2], KEYS[10], isPausedOrMaxed)
- local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
- local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
- -- Check if we are rate limited first.
- if expireTime > 0 then return {0, 0, expireTime, 0} end
- -- paused or maxed queue
- if isPausedOrMaxed then return {0, 0, 0, 0} end
- local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
- -- no job ID, try non-blocking move from wait to active
- local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
- -- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6.
- if jobId and string.sub(jobId, 1, 2) == "0:" then
- rcall("LREM", activeKey, 1, jobId)
- jobId = rcall("RPOPLPUSH", waitKey, activeKey)
- end
- if jobId then
- return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
- maxJobs, limiterDuration, markerKey, opts)
- else
- jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10])
- if jobId then
- return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
- maxJobs, limiterDuration, markerKey, opts)
- end
- end
- -- Return the timestamp for the next delayed job if any.
- local nextTimestamp = getNextDelayedTimestamp(delayedKey)
- if nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end
- return {0, 0, 0, 0}
|