| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.moveJobFromActiveToWait = void 0;
- const content = `--[[
- Function to move job from active state to wait.
- Input:
- KEYS[1] active key
- KEYS[2] wait key
- KEYS[3] stalled key
- KEYS[4] paused key
- KEYS[5] meta key
- KEYS[6] limiter key
- KEYS[7] prioritized key
- KEYS[8] marker key
- KEYS[9] event key
- ARGV[1] job id
- ARGV[2] lock token
- ARGV[3] job id key
- ]]
- local rcall = redis.call
- -- Includes
- --[[
- Function to add job in target list and add marker if needed.
- ]]
- -- Includes
- --[[
- Add marker if needed when a job is available.
- ]]
- local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- if not isPausedOrMaxed then
- rcall("ZADD", markerKey, 0, "0")
- end
- end
- local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
- rcall(pushCmd, targetKey, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to push back job considering priority in front of same prioritized jobs.
- ]]
- local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
- -- in order to put it at front of same prioritized jobs
- -- we consider prioritized counter as 0
- local score = priority * 0x100000000
- rcall("ZADD", prioritizedKey, score, jobId)
- end
- --[[
- Function to get max events value or set by default 10000.
- ]]
- local function getOrSetMaxEvents(metaKey)
- local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
- if not maxEvents then
- maxEvents = 10000
- rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
- end
- return maxEvents
- end
- --[[
- Function to check for the meta.paused key to decide if we are paused or not
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
- local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
- if queueAttributes[1] then
- return pausedKey, true, queueAttributes[3], queueAttributes[4]
- else
- if queueAttributes[2] then
- local activeCount = rcall("LLEN", activeKey)
- if activeCount >= tonumber(queueAttributes[2]) then
- return waitKey, true, queueAttributes[3], queueAttributes[4]
- else
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- end
- end
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- local function removeLock(jobKey, stalledKey, token, jobId)
- if token ~= "0" then
- local lockKey = jobKey .. ':lock'
- local lockToken = rcall("GET", lockKey)
- if lockToken == token then
- rcall("DEL", lockKey)
- rcall("SREM", stalledKey, jobId)
- else
- if lockToken then
- -- Lock exists but token does not match
- return -6
- else
- -- Lock is missing completely
- return -2
- end
- end
- end
- return 0
- end
- local jobId = ARGV[1]
- local token = ARGV[2]
- local jobKey = ARGV[3]
- if rcall("EXISTS", jobKey) == 0 then
- return -1
- end
- local errorCode = removeLock(jobKey, KEYS[3], token, jobId)
- if errorCode < 0 then
- return errorCode
- end
- local metaKey = KEYS[5]
- local removed = rcall("LREM", KEYS[1], 1, jobId)
- if removed > 0 then
- local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4])
- local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0
- if priority > 0 then
- pushBackJobWithPriority(KEYS[7], priority, jobId)
- else
- addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId)
- end
- local maxEvents = getOrSetMaxEvents(metaKey)
- -- Emit waiting event
- rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
- "jobId", jobId, "prev", "active")
- end
- local pttl = rcall("PTTL", KEYS[6])
- if pttl > 0 then
- return pttl
- else
- return 0
- end
- `;
- exports.moveJobFromActiveToWait = {
- name: 'moveJobFromActiveToWait',
- content,
- keys: 9,
- };
- //# sourceMappingURL=moveJobFromActiveToWait-9.js.map
|