| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- const content = `--[[
- Adds a job to the queue by doing the following:
- - Increases the job counter if needed.
- - Creates a new job key with the job data.
- - if delayed:
- - computes timestamp.
- - adds to delayed zset.
- - Emits a global event 'delayed' if the job is delayed.
- - if not delayed
- - Adds the jobId to the wait/paused list in one of three ways:
- - LIFO
- - FIFO
- - prioritized.
- - Adds the job to the "added" list so that workers gets notified.
- Input:
- KEYS[1] 'wait',
- KEYS[2] 'paused'
- KEYS[3] 'meta'
- KEYS[4] 'id'
- KEYS[5] 'completed'
- KEYS[6] 'delayed'
- KEYS[7] 'active'
- KEYS[8] events stream key
- KEYS[9] marker key
- ARGV[1] msgpacked arguments array
- [1] key prefix,
- [2] custom id (will not generate one automatically)
- [3] name
- [4] timestamp
- [5] parentKey?
- [6] parent dependencies key.
- [7] parent? {id, queueKey}
- [8] repeat job key
- [9] deduplication key
- ARGV[2] Json stringified job data
- ARGV[3] msgpacked options
- Output:
- jobId - OK
- -5 - Missing parent key
- ]]
- local eventsKey = KEYS[8]
- local jobId
- local jobIdKey
- local rcall = redis.call
- local args = cmsgpack.unpack(ARGV[1])
- local data = ARGV[2]
- local opts = cmsgpack.unpack(ARGV[3])
- local parentKey = args[5]
- local parent = args[7]
- local repeatJobKey = args[8]
- local deduplicationKey = args[9]
- local parentData
- -- Includes
- --[[
- Function to add job in target list and add marker if needed.
- ]]
- -- Includes
- --[[
- Add marker if needed when a job is available.
- ]]
- local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- if not isPausedOrMaxed then
- rcall("ZADD", markerKey, 0, "0")
- end
- end
- local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
- rcall(pushCmd, targetKey, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to debounce a job.
- ]]
- -- Includes
- --[[
- Function to deduplicate a job.
- ]]
- --[[
- Function to set the deduplication key for a job.
- Uses TTL from deduplication opts if provided.
- ]]
- local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
- local ttl = deduplicationOpts and deduplicationOpts['ttl']
- if ttl and ttl > 0 then
- rcall('SET', deduplicationKey, jobId, 'PX', ttl)
- else
- rcall('SET', deduplicationKey, jobId)
- end
- end
- --[[
- Function to store a deduplicated next job if the existing job is active
- and keepLastIfActive is set. When the active job finishes, the stored
- proto-job is used to create a real job in the queue.
- Returns true if the proto-job was stored, false otherwise.
- ]]
- --[[
- Functions to check if a item belongs to a list.
- ]]
- local function checkItemInList(list, item)
- for _, v in pairs(list) do
- if v == item then
- return 1
- end
- end
- return nil
- end
- local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
- deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
- parentKey, parentData, parentDependenciesKey, repeatJobKey)
- if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
- local activeKey = prefix .. "active"
- local activeItems = rcall('LRANGE', activeKey, 0, -1)
- if checkItemInList(activeItems, currentDebounceJobId) then
- local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
- local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
- if parentKey then
- fields[#fields+1] = 'pk'
- fields[#fields+1] = parentKey
- end
- if parentData then
- fields[#fields+1] = 'pd'
- fields[#fields+1] = parentData
- end
- if parentDependenciesKey then
- fields[#fields+1] = 'pdk'
- fields[#fields+1] = parentDependenciesKey
- end
- if repeatJobKey then
- fields[#fields+1] = 'rjk'
- fields[#fields+1] = repeatJobKey
- end
- rcall('HSET', deduplicationNextKey, unpack(fields))
- -- Ensure the dedup key does not expire while the job is active,
- -- so subsequent adds always hit the dedup path and never bypass
- -- the active-check because of a TTL expiry.
- local deduplicationKey = prefix .. "de:" .. deduplicationId
- rcall('PERSIST', deduplicationKey)
- -- TODO remove debounced event in next breaking change
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
- currentDebounceJobId, "debounceId", deduplicationId)
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
- currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
- return true
- end
- end
- return false
- end
- local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
- eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
- parentKey, parentData, parentDependenciesKey, repeatJobKey)
- local ttl = deduplicationOpts['ttl']
- local deduplicationKeyExists
- if ttl and ttl > 0 then
- if deduplicationOpts['extend'] then
- local currentDebounceJobId = rcall('GET', deduplicationKey)
- if currentDebounceJobId then
- if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
- deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
- parentKey, parentData, parentDependenciesKey, repeatJobKey) then
- return currentDebounceJobId
- end
- if deduplicationOpts['keepLastIfActive'] then
- rcall('SET', deduplicationKey, currentDebounceJobId)
- else
- setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
- end
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
- "jobId", currentDebounceJobId, "debounceId", deduplicationId)
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
- currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
- return currentDebounceJobId
- else
- if deduplicationOpts['keepLastIfActive'] then
- rcall('SET', deduplicationKey, jobId)
- else
- setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
- end
- return
- end
- else
- if deduplicationOpts['keepLastIfActive'] then
- deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
- else
- deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
- end
- end
- else
- deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
- end
- if deduplicationKeyExists then
- local currentDebounceJobId = rcall('GET', deduplicationKey)
- if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
- deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
- parentKey, parentData, parentDependenciesKey, repeatJobKey) then
- return currentDebounceJobId
- end
- -- TODO remove debounced event in next breaking change
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
- currentDebounceJobId, "debounceId", deduplicationId)
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
- currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
- return currentDebounceJobId
- end
- end
- --[[
- Function to remove job keys.
- ]]
- local function removeJobKeys(jobKey)
- return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
- jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
- end
- local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
- jobId, deduplicationId, prefix)
- if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
- removeJobKeys(prefix .. currentDeduplicatedJobId)
- rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
- "prev", "delayed")
- -- TODO remove debounced event in next breaking change
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
- jobId, "debounceId", deduplicationId)
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
- jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
- return true
- end
- return false
- end
- local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
- prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
- local deduplicationId = deduplicationOpts and deduplicationOpts['id']
- if deduplicationId then
- if deduplicationOpts['replace'] then
- local currentDebounceJobId = rcall('GET', deduplicationKey)
- if currentDebounceJobId then
- local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
- currentDebounceJobId, jobId, deduplicationId, prefix)
- if isRemoved then
- if deduplicationOpts['keepLastIfActive'] then
- rcall('SET', deduplicationKey, jobId)
- else
- local ttl = deduplicationOpts['ttl']
- if not deduplicationOpts['extend'] and ttl and ttl > 0 then
- rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
- else
- setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
- end
- end
- return
- else
- storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
- deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
- parentKey, parentData, parentDependenciesKey, repeatJobKey)
- return currentDebounceJobId
- end
- else
- if deduplicationOpts['keepLastIfActive'] then
- rcall('SET', deduplicationKey, jobId)
- else
- setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
- end
- return
- end
- else
- return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
- jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
- parentKey, parentData, parentDependenciesKey, repeatJobKey)
- end
- end
- end
- --[[
- Function to get max events value or set by default 10000.
- ]]
- local function getOrSetMaxEvents(metaKey)
- local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
- if not maxEvents then
- maxEvents = 10000
- rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
- end
- return maxEvents
- end
- --[[
- Function to check for the meta.paused key to decide if we are paused or not
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
- local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
- if queueAttributes[1] then
- return pausedKey, true, queueAttributes[3], queueAttributes[4]
- else
- if queueAttributes[2] then
- local activeCount = rcall("LLEN", activeKey)
- if activeCount >= tonumber(queueAttributes[2]) then
- return waitKey, true, queueAttributes[3], queueAttributes[4]
- else
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- end
- end
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- --[[
- Function to handle the case when job is duplicated.
- ]]
- -- Includes
- --[[
- This function is used to update the parent's dependencies if the job
- is already completed and about to be ignored. The parent must get its
- dependencies updated to avoid the parent job being stuck forever in
- the waiting-children state.
- ]]
- -- Includes
- --[[
- Validate and move or add dependencies to parent.
- ]]
- -- Includes
- --[[
- Validate and move parent to a wait status (waiting, delayed or prioritized)
- if no pending dependencies.
- ]]
- -- Includes
- --[[
- Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
- ]]
- -- Includes
- --[[
- Move parent to a wait status (wait, prioritized or delayed)
- ]]
- -- Includes
- --[[
- Add delay marker if needed.
- ]]
- -- Includes
- --[[
- Function to return the next delayed job timestamp.
- ]]
- local function getNextDelayedTimestamp(delayedKey)
- local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
- if #result then
- local nextTimestamp = tonumber(result[2])
- if nextTimestamp ~= nil then
- return nextTimestamp / 0x1000
- end
- end
- end
- local function addDelayMarkerIfNeeded(markerKey, delayedKey)
- local nextTimestamp = getNextDelayedTimestamp(delayedKey)
- if nextTimestamp ~= nil then
- -- Replace the score of the marker with the newest known
- -- next timestamp.
- rcall("ZADD", markerKey, nextTimestamp, "1")
- end
- end
- --[[
- Function to add job considering priority.
- ]]
- -- Includes
- --[[
- Function to get priority score.
- ]]
- local function getPriorityScore(priority, priorityCounterKey)
- local prioCounter = rcall("INCR", priorityCounterKey)
- return priority * 0x100000000 + prioCounter % 0x100000000
- end
- local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
- isPausedOrMaxed)
- local score = getPriorityScore(priority, priorityCounterKey)
- rcall("ZADD", prioritizedKey, score, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to check if queue is paused or maxed
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
- local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
- if queueAttributes[1] then
- return true
- else
- if queueAttributes[2] then
- local activeCount = rcall("LLEN", activeKey)
- return activeCount >= tonumber(queueAttributes[2])
- end
- end
- return false
- end
- local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
- local parentWaitKey = parentQueueKey .. ":wait"
- local parentPausedKey = parentQueueKey .. ":paused"
- local parentActiveKey = parentQueueKey .. ":active"
- local parentMetaKey = parentQueueKey .. ":meta"
- local parentMarkerKey = parentQueueKey .. ":marker"
- local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
- local priority = tonumber(jobAttributes[1]) or 0
- local delay = tonumber(jobAttributes[2]) or 0
- if delay > 0 then
- local delayedTimestamp = tonumber(timestamp) + delay
- local score = delayedTimestamp * 0x1000
- local parentDelayedKey = parentQueueKey .. ":delayed"
- rcall("ZADD", parentDelayedKey, score, parentId)
- rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
- delayedTimestamp)
- addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
- else
- if priority == 0 then
- local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
- parentWaitKey, parentPausedKey)
- addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
- else
- local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
- addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
- parentQueueKey .. ":pc", isPausedOrMaxed)
- end
- rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
- "waiting-children")
- end
- end
- local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
- if rcall("EXISTS", parentKey) == 1 then
- local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
- if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
- rcall("ZREM", parentWaitingChildrenKey, parentId)
- moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
- end
- end
- end
- local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
- parentId, timestamp)
- local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
- if doNotHavePendingDependencies then
- moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
- end
- end
- local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
- parentId, jobIdKey, returnvalue, timestamp )
- local processedSet = parentKey .. ":processed"
- rcall("HSET", processedSet, jobIdKey, returnvalue)
- moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
- end
- local function updateExistingJobsParent(parentKey, parent, parentData,
- parentDependenciesKey, completedKey,
- jobIdKey, jobId, timestamp)
- if parentKey ~= nil then
- if rcall("ZSCORE", completedKey, jobId) then
- local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
- updateParentDepsIfNeeded(parentKey, parent['queueKey'],
- parentDependenciesKey, parent['id'],
- jobIdKey, returnvalue, timestamp)
- else
- if parentDependenciesKey ~= nil then
- rcall("SADD", parentDependenciesKey, jobIdKey)
- end
- end
- rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
- end
- end
- local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
- parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
- local existedParentKey = rcall("HGET", jobKey, "parentKey")
- if not existedParentKey or existedParentKey == currentParentKey then
- updateExistingJobsParent(currentParentKey, currentParent, parentData,
- parentDependenciesKey, completedKey, jobKey,
- jobId, timestamp)
- else
- if currentParentKey ~= nil and currentParentKey ~= existedParentKey
- and (rcall("EXISTS", existedParentKey) == 1) then
- return -7
- end
- end
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
- "duplicated", "jobId", jobId)
- return jobId .. "" -- convert to string
- end
- --[[
- Function to store a job
- ]]
- local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
- parentKey, parentData, repeatJobKey)
- local jsonOpts = cjson.encode(opts)
- local delay = opts['delay'] or 0
- local priority = opts['priority'] or 0
- local debounceId = opts['de'] and opts['de']['id']
- local optionalValues = {}
- if parentKey ~= nil then
- table.insert(optionalValues, "parentKey")
- table.insert(optionalValues, parentKey)
- table.insert(optionalValues, "parent")
- table.insert(optionalValues, parentData)
- end
- if repeatJobKey then
- table.insert(optionalValues, "rjk")
- table.insert(optionalValues, repeatJobKey)
- end
- if debounceId then
- table.insert(optionalValues, "deid")
- table.insert(optionalValues, debounceId)
- end
- rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
- "timestamp", timestamp, "delay", delay, "priority", priority,
- unpack(optionalValues))
- rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
- return delay, priority
- end
- if parentKey ~= nil then
- if rcall("EXISTS", parentKey) ~= 1 then return -5 end
- parentData = cjson.encode(parent)
- end
- local jobCounter = rcall("INCR", KEYS[4])
- local metaKey = KEYS[3]
- local maxEvents = getOrSetMaxEvents(metaKey)
- local parentDependenciesKey = args[6]
- local timestamp = args[4]
- if args[2] == "" then
- jobId = jobCounter
- jobIdKey = args[1] .. jobId
- else
- jobId = args[2]
- jobIdKey = args[1] .. jobId
- if rcall("EXISTS", jobIdKey) == 1 then
- return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
- parentData, parentDependenciesKey, KEYS[5], eventsKey,
- maxEvents, timestamp)
- end
- end
- local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[6],
- deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
- parentKey, parentData, parentDependenciesKey, repeatJobKey)
- if deduplicationJobId then
- return deduplicationJobId
- end
- -- Store the job.
- storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
- parentKey, parentData, repeatJobKey)
- local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[1], KEYS[2])
- -- LIFO or FIFO
- local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
- addJobInTargetList(target, KEYS[9], pushCmd, isPausedOrMaxed, jobId)
- -- Emit waiting event
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
- "jobId", jobId)
- -- Check if this job is a child of another job, if so add it to the parents dependencies
- if parentDependenciesKey ~= nil then
- rcall("SADD", parentDependenciesKey, jobIdKey)
- end
- return jobId .. "" -- convert to string
- `;
- export const addStandardJob = {
- name: 'addStandardJob',
- content,
- keys: 9,
- };
- //# sourceMappingURL=addStandardJob-9.js.map
|