"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.moveToDelayed = void 0; const content = `--[[ Moves job from active to delayed set. Input: KEYS[1] marker key KEYS[2] active key KEYS[3] prioritized key KEYS[4] delayed key KEYS[5] job key KEYS[6] events stream KEYS[7] meta key KEYS[8] stalled key KEYS[9] wait key KEYS[10] rate limiter key KEYS[11] paused key KEYS[12] pc priority counter ARGV[1] key prefix ARGV[2] timestamp ARGV[3] the id of the job ARGV[4] queue token ARGV[5] delay value ARGV[6] skip attempt ARGV[7] optional job fields to update ARGV[8] fetch next? ARGV[9] opts Output: 0 - OK -1 - Missing job. -3 - Job not in active set. Events: - delayed key. ]] local rcall = redis.call -- Includes --[[ Add delay marker if needed. ]] -- Includes --[[ Function to return the next delayed job timestamp. ]] local function getNextDelayedTimestamp(delayedKey) local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES") if #result then local nextTimestamp = tonumber(result[2]) if nextTimestamp ~= nil then return nextTimestamp / 0x1000 end end end local function addDelayMarkerIfNeeded(markerKey, delayedKey) local nextTimestamp = getNextDelayedTimestamp(delayedKey) if nextTimestamp ~= nil then -- Replace the score of the marker with the newest known -- next timestamp. rcall("ZADD", markerKey, nextTimestamp, "1") end end --[[ Function to fetch the next job to process. Tries to get the next job to avoid an extra roundtrip if the queue is not closing and not rate limited. Input: waitKey - wait list key activeKey - active list key prioritizedKey - prioritized sorted set key eventStreamKey - event stream key rateLimiterKey - rate limiter key delayedKey - delayed sorted set key pausedKey - paused list key metaKey - meta hash key pcKey - priority counter key markerKey - marker key prefix - keys prefix timestamp - current timestamp opts - options table: token (required) - lock token used when locking jobs lockDuration (required) - lock duration for acquired jobs limiter (optional) - rate limiter options table (e.g. { max = number }) ]] -- Includes --[[ Function to get current rate limit ttl. ]] local function getRateLimitTTL(maxJobs, rateLimiterKey) if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then local pttl = rcall("PTTL", rateLimiterKey) if pttl == 0 then rcall("DEL", rateLimiterKey) end if pttl > 0 then return pttl end end return 0 end --[[ Function to check for the meta.paused key to decide if we are paused or not (since an empty list and !EXISTS are not really the same). ]] local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration") if queueAttributes[1] then return pausedKey, true, queueAttributes[3], queueAttributes[4] else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) if activeCount >= tonumber(queueAttributes[2]) then return waitKey, true, queueAttributes[3], queueAttributes[4] else return waitKey, false, queueAttributes[3], queueAttributes[4] end end end return waitKey, false, queueAttributes[3], queueAttributes[4] end --[[ Function to move job from prioritized state to active. ]] local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey) local prioritizedJob = rcall("ZPOPMIN", priorityKey) if #prioritizedJob > 0 then rcall("LPUSH", activeKey, prioritizedJob[1]) return prioritizedJob[1] else rcall("DEL", priorityCounterKey) end end --[[ Function to move job from wait state to active. Input: opts - token - lock token opts - lockDuration opts - limiter ]] -- Includes --[[ Add marker if needed when a job is available. ]] local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) if not isPausedOrMaxed then rcall("ZADD", markerKey, 0, "0") end end local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey, jobId, processedOn, maxJobs, limiterDuration, markerKey, opts) local jobKey = keyPrefix .. jobId -- Check if we need to perform rate limiting. if maxJobs then local jobCounter = tonumber(rcall("INCR", rateLimiterKey)) if jobCounter == 1 then local integerDuration = math.floor(math.abs(limiterDuration)) rcall("PEXPIRE", rateLimiterKey, integerDuration) end end -- get a lock if opts['token'] ~= "0" then local lockKey = jobKey .. ':lock' rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration']) end local optionalValues = {} if opts['name'] then -- Set "processedBy" field to the worker name table.insert(optionalValues, "pb") table.insert(optionalValues, opts['name']) end rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting") rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues)) rcall("HINCRBY", jobKey, "ats", 1) addBaseMarkerIfNeeded(markerKey, false) -- rate limit delay must be 0 in this case to prevent adding more delay -- when job that is moved to active needs to be processed return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data end --[[ Updates the delay set, by moving delayed jobs that should be processed now to "wait". Events: 'waiting' ]] -- Includes --[[ Function to add job in target list and add marker if needed. ]] -- Includes local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end --[[ Function to add job considering priority. ]] -- Includes --[[ Function to get priority score. ]] local function getPriorityScore(priority, priorityCounterKey) local prioCounter = rcall("INCR", priorityCounterKey) return priority * 0x100000000 + prioCounter % 0x100000000 end local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPausedOrMaxed) local score = getPriorityScore(priority, priorityCounterKey) rcall("ZADD", prioritizedKey, score, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end -- Try to get as much as 1000 jobs at once local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey, eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused) local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000) if (#jobs > 0) then rcall("ZREM", delayedKey, unpack(jobs)) for _, jobId in ipairs(jobs) do local jobKey = prefix .. jobId local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 if priority == 0 then -- LIFO or FIFO rcall("LPUSH", targetKey, jobId) else local score = getPriorityScore(priority, priorityCounterKey) rcall("ZADD", prioritizedKey, score, jobId) end -- Emit waiting event rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, "prev", "delayed") rcall("HSET", jobKey, "delay", 0) end addBaseMarkerIfNeeded(markerKey, isPaused) end end local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey, rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix, timestamp, opts) local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) -- Check if there are delayed jobs that can be promoted promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey, eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed) local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max'])) -- Check if we are rate limited first. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) if expireTime > 0 then return {0, 0, expireTime, 0} end -- paused or maxed queue if isPausedOrMaxed then return {0, 0, 0, 0} end local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration local jobId = rcall("RPOPLPUSH", waitKey, activeKey) if jobId then -- Markers in waitlist DEPRECATED in v5: Remove in v6. if string.sub(jobId, 1, 2) == "0:" then rcall("LREM", activeKey, 1, jobId) -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process -- but if ID is 0:0, then there is at least 1 prioritized job to process if jobId == "0:0" then jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey) return prepareJobForProcessing(prefix, rateLimiterKey, eventStreamKey, jobId, timestamp, maxJobs, limiterDuration, markerKey, opts) end else return prepareJobForProcessing(prefix, rateLimiterKey, eventStreamKey, jobId, timestamp, maxJobs, limiterDuration, markerKey, opts) end else jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey) if jobId then return prepareJobForProcessing(prefix, rateLimiterKey, eventStreamKey, jobId, timestamp, maxJobs, limiterDuration, markerKey, opts) end end -- Return the timestamp for the next delayed job if any. local nextTimestamp = getNextDelayedTimestamp(delayedKey) if nextTimestamp ~= nil then -- The result is guaranteed to be positive, since the -- ZRANGEBYSCORE command would have return a job otherwise. return {0, 0, 0, nextTimestamp} end end --[[ Bake in the job id first 12 bits into the timestamp to guarantee correct execution order of delayed jobs (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp) WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail ]] local function getDelayedScore(delayedKey, timestamp, delay) local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp) local minScore = delayedTimestamp * 0x1000 local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1 local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore, minScore, "WITHSCORES","LIMIT", 0, 1) if #result then local currentMaxScore = tonumber(result[2]) if currentMaxScore ~= nil then if currentMaxScore >= maxScore then return maxScore, delayedTimestamp else return currentMaxScore + 1, delayedTimestamp end end end return minScore, delayedTimestamp end --[[ Function to get max events value or set by default 10000. ]] local function getOrSetMaxEvents(metaKey) local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") if not maxEvents then maxEvents = 10000 rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents) end return maxEvents end local function removeLock(jobKey, stalledKey, token, jobId) if token ~= "0" then local lockKey = jobKey .. ':lock' local lockToken = rcall("GET", lockKey) if lockToken == token then rcall("DEL", lockKey) rcall("SREM", stalledKey, jobId) else if lockToken then -- Lock exists but token does not match return -6 else -- Lock is missing completely return -2 end end end return 0 end --[[ Function to update a bunch of fields in a job. ]] local function updateJobFields(jobKey, msgpackedFields) if msgpackedFields and #msgpackedFields > 0 then local fieldsToUpdate = cmsgpack.unpack(msgpackedFields) if fieldsToUpdate then rcall("HMSET", jobKey, unpack(fieldsToUpdate)) end end end local jobKey = KEYS[5] local markerKey = KEYS[1] local metaKey = KEYS[7] local token = ARGV[4] if rcall("EXISTS", jobKey) == 1 then local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[3]) if errorCode < 0 then return errorCode end updateJobFields(jobKey, ARGV[7]) local delayedKey = KEYS[4] local jobId = ARGV[3] local delay = tonumber(ARGV[5]) local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId) if numRemovedElements < 1 then return -3 end local score, delayedTimestamp = getDelayedScore(delayedKey, ARGV[2], delay) if ARGV[6] == "0" then rcall("HINCRBY", jobKey, "atm", 1) end rcall("HSET", jobKey, "delay", ARGV[5]) local maxEvents = getOrSetMaxEvents(metaKey) rcall("ZADD", delayedKey, score, jobId) rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp) -- Try to get next job to avoid an extra roundtrip if the queue is not closing, -- and not rate limited. if (ARGV[8] == "1") then local opts = cmsgpack.unpack(ARGV[9]) local result = fetchNextJob(KEYS[9], KEYS[2], KEYS[3], KEYS[6], KEYS[10], KEYS[4], KEYS[11], metaKey, KEYS[12], markerKey, ARGV[1], ARGV[2], opts) if result and type(result[1]) == "table" then return result end end -- Check if we need to push a marker job to wake up sleeping workers. addDelayMarkerIfNeeded(markerKey, delayedKey) return 0 else return -1 end `; exports.moveToDelayed = { name: 'moveToDelayed', content, keys: 12, }; //# sourceMappingURL=moveToDelayed-12.js.map