| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.moveToActive = void 0;
- const content = `--[[
- Move next job to be processed to active, lock it and fetch its data. The job
- may be delayed, in that case we need to move it to the delayed set instead.
- This operation guarantees that the worker owns the job during the lock
- expiration time. The worker is responsible of keeping the lock fresh
- so that no other worker picks this job again.
- Input:
- KEYS[1] wait key
- KEYS[2] active key
- KEYS[3] prioritized key
- KEYS[4] stream events key
- KEYS[5] stalled key
- -- Rate limiting
- KEYS[6] rate limiter key
- KEYS[7] delayed key
- -- Delayed jobs
- KEYS[8] paused key
- KEYS[9] meta key
- KEYS[10] pc priority counter
- -- Marker
- KEYS[11] marker key
- -- Arguments
- ARGV[1] key prefix
- ARGV[2] timestamp
- ARGV[3] opts
- opts - token - lock token
- opts - lockDuration
- opts - limiter
- opts - name - worker name
- ]]
- local rcall = redis.call
- local waitKey = KEYS[1]
- local activeKey = KEYS[2]
- local eventStreamKey = KEYS[4]
- local rateLimiterKey = KEYS[6]
- local delayedKey = KEYS[7]
- local opts = cmsgpack.unpack(ARGV[3])
- -- Includes
- --[[
- Function to return the next delayed job timestamp.
- ]]
- local function getNextDelayedTimestamp(delayedKey)
- local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
- if #result then
- local nextTimestamp = tonumber(result[2])
- if nextTimestamp ~= nil then
- return nextTimestamp / 0x1000
- end
- end
- end
- --[[
- Function to get current rate limit ttl.
- ]]
- local function getRateLimitTTL(maxJobs, rateLimiterKey)
- if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
- local pttl = rcall("PTTL", rateLimiterKey)
- if pttl == 0 then
- rcall("DEL", rateLimiterKey)
- end
- if pttl > 0 then
- return pttl
- end
- end
- return 0
- end
- --[[
- Function to check for the meta.paused key to decide if we are paused or not
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
- local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
- if queueAttributes[1] then
- return pausedKey, true, queueAttributes[3], queueAttributes[4]
- else
- if queueAttributes[2] then
- local activeCount = rcall("LLEN", activeKey)
- if activeCount >= tonumber(queueAttributes[2]) then
- return waitKey, true, queueAttributes[3], queueAttributes[4]
- else
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- end
- end
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- --[[
- Function to move job from prioritized state to active.
- ]]
- local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey)
- local prioritizedJob = rcall("ZPOPMIN", priorityKey)
- if #prioritizedJob > 0 then
- rcall("LPUSH", activeKey, prioritizedJob[1])
- return prioritizedJob[1]
- else
- rcall("DEL", priorityCounterKey)
- end
- end
- --[[
- Function to move job from wait state to active.
- Input:
- opts - token - lock token
- opts - lockDuration
- opts - limiter
- ]]
- -- Includes
- --[[
- Add marker if needed when a job is available.
- ]]
- local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- if not isPausedOrMaxed then
- rcall("ZADD", markerKey, 0, "0")
- end
- end
- local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
- jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
- local jobKey = keyPrefix .. jobId
- -- Check if we need to perform rate limiting.
- if maxJobs then
- local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
- if jobCounter == 1 then
- local integerDuration = math.floor(math.abs(limiterDuration))
- rcall("PEXPIRE", rateLimiterKey, integerDuration)
- end
- end
- -- get a lock
- if opts['token'] ~= "0" then
- local lockKey = jobKey .. ':lock'
- rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
- end
- local optionalValues = {}
- if opts['name'] then
- -- Set "processedBy" field to the worker name
- table.insert(optionalValues, "pb")
- table.insert(optionalValues, opts['name'])
- end
- rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
- rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
- rcall("HINCRBY", jobKey, "ats", 1)
- addBaseMarkerIfNeeded(markerKey, false)
- -- rate limit delay must be 0 in this case to prevent adding more delay
- -- when job that is moved to active needs to be processed
- return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
- end
- --[[
- Updates the delay set, by moving delayed jobs that should
- be processed now to "wait".
- Events:
- 'waiting'
- ]]
- -- Includes
- --[[
- Function to add job in target list and add marker if needed.
- ]]
- -- Includes
- local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
- rcall(pushCmd, targetKey, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to add job considering priority.
- ]]
- -- Includes
- --[[
- Function to get priority score.
- ]]
- local function getPriorityScore(priority, priorityCounterKey)
- local prioCounter = rcall("INCR", priorityCounterKey)
- return priority * 0x100000000 + prioCounter % 0x100000000
- end
- local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
- isPausedOrMaxed)
- local score = getPriorityScore(priority, priorityCounterKey)
- rcall("ZADD", prioritizedKey, score, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- -- Try to get as much as 1000 jobs at once
- local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
- eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
- local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
- if (#jobs > 0) then
- rcall("ZREM", delayedKey, unpack(jobs))
- for _, jobId in ipairs(jobs) do
- local jobKey = prefix .. jobId
- local priority =
- tonumber(rcall("HGET", jobKey, "priority")) or 0
- if priority == 0 then
- -- LIFO or FIFO
- rcall("LPUSH", targetKey, jobId)
- else
- local score = getPriorityScore(priority, priorityCounterKey)
- rcall("ZADD", prioritizedKey, score, jobId)
- end
- -- Emit waiting event
- rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
- jobId, "prev", "delayed")
- rcall("HSET", jobKey, "delay", 0)
- end
- addBaseMarkerIfNeeded(markerKey, isPaused)
- end
- end
- local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9],
- activeKey, waitKey, KEYS[8])
- -- Check if there are delayed jobs that we can move to wait.
- local markerKey = KEYS[11]
- promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
- ARGV[2], KEYS[10], isPausedOrMaxed)
- local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
- local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
- -- Check if we are rate limited first.
- if expireTime > 0 then return {0, 0, expireTime, 0} end
- -- paused or maxed queue
- if isPausedOrMaxed then return {0, 0, 0, 0} end
- local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
- -- no job ID, try non-blocking move from wait to active
- local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
- -- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6.
- if jobId and string.sub(jobId, 1, 2) == "0:" then
- rcall("LREM", activeKey, 1, jobId)
- jobId = rcall("RPOPLPUSH", waitKey, activeKey)
- end
- if jobId then
- return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
- maxJobs, limiterDuration, markerKey, opts)
- else
- jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10])
- if jobId then
- return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
- maxJobs, limiterDuration, markerKey, opts)
- end
- end
- -- Return the timestamp for the next delayed job if any.
- local nextTimestamp = getNextDelayedTimestamp(delayedKey)
- if nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end
- return {0, 0, 0, 0}
- `;
- exports.moveToActive = {
- name: 'moveToActive',
- content,
- keys: 11,
- };
- //# sourceMappingURL=moveToActive-11.js.map
|