| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.addJobScheduler = void 0;
- const content = `--[[
- Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).
- Input:
- KEYS[1] 'repeat' key
- KEYS[2] 'delayed' key
- KEYS[3] 'wait' key
- KEYS[4] 'paused' key
- KEYS[5] 'meta' key
- KEYS[6] 'prioritized' key
- KEYS[7] 'marker' key
- KEYS[8] 'id' key
- KEYS[9] 'events' key
- KEYS[10] 'pc' priority counter
- KEYS[11] 'active' key
- ARGV[1] next milliseconds
- ARGV[2] msgpacked options
- [1] name
- [2] tz?
- [3] pattern?
- [4] endDate?
- [5] every?
- ARGV[3] jobs scheduler id
- ARGV[4] Json stringified template data
- ARGV[5] mspacked template opts
- ARGV[6] msgpacked delayed opts
- ARGV[7] timestamp
- ARGV[8] prefix key
- ARGV[9] producer key
- Output:
- repeatableKey - OK
- ]] local rcall = redis.call
- local repeatKey = KEYS[1]
- local delayedKey = KEYS[2]
- local waitKey = KEYS[3]
- local pausedKey = KEYS[4]
- local metaKey = KEYS[5]
- local prioritizedKey = KEYS[6]
- local eventsKey = KEYS[9]
- local nextMillis = ARGV[1]
- local jobSchedulerId = ARGV[3]
- local templateOpts = cmsgpack.unpack(ARGV[5])
- local now = tonumber(ARGV[7])
- local prefixKey = ARGV[8]
- local jobOpts = cmsgpack.unpack(ARGV[6])
- -- Includes
- --[[
- Add delay marker if needed.
- ]]
- -- Includes
- --[[
- Shared helper to store a job and enqueue it into the appropriate list/set.
- Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
- Emits the appropriate event after enqueuing ("delayed" or "waiting").
- Returns delay, priority from storeJob.
- ]]
- -- Includes
- --[[
- Adds a delayed job to the queue by doing the following:
- - Creates a new job key with the job data.
- - adds to delayed zset.
- - Emits a global event 'delayed' if the job is delayed.
- ]]
- -- Includes
- --[[
- Add delay marker if needed.
- ]]
- -- Includes
- --[[
- Function to return the next delayed job timestamp.
- ]]
- local function getNextDelayedTimestamp(delayedKey)
- local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
- if #result then
- local nextTimestamp = tonumber(result[2])
- if nextTimestamp ~= nil then
- return nextTimestamp / 0x1000
- end
- end
- end
- local function addDelayMarkerIfNeeded(markerKey, delayedKey)
- local nextTimestamp = getNextDelayedTimestamp(delayedKey)
- if nextTimestamp ~= nil then
- -- Replace the score of the marker with the newest known
- -- next timestamp.
- rcall("ZADD", markerKey, nextTimestamp, "1")
- end
- end
- --[[
- Bake in the job id first 12 bits into the timestamp
- to guarantee correct execution order of delayed jobs
- (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
- WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
- ]]
- local function getDelayedScore(delayedKey, timestamp, delay)
- local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
- local minScore = delayedTimestamp * 0x1000
- local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
- local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
- minScore, "WITHSCORES","LIMIT", 0, 1)
- if #result then
- local currentMaxScore = tonumber(result[2])
- if currentMaxScore ~= nil then
- if currentMaxScore >= maxScore then
- return maxScore, delayedTimestamp
- else
- return currentMaxScore + 1, delayedTimestamp
- end
- end
- end
- return minScore, delayedTimestamp
- end
- local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
- maxEvents, markerKey, delay)
- local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
- rcall("ZADD", delayedKey, score, jobId)
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
- "jobId", jobId, "delay", delayedTimestamp)
- -- mark that a delayed job is available
- addDelayMarkerIfNeeded(markerKey, delayedKey)
- end
- --[[
- Function to add job in target list and add marker if needed.
- ]]
- -- Includes
- --[[
- Add marker if needed when a job is available.
- ]]
- local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- if not isPausedOrMaxed then
- rcall("ZADD", markerKey, 0, "0")
- end
- end
- local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
- rcall(pushCmd, targetKey, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to add job considering priority.
- ]]
- -- Includes
- --[[
- Function to get priority score.
- ]]
- local function getPriorityScore(priority, priorityCounterKey)
- local prioCounter = rcall("INCR", priorityCounterKey)
- return priority * 0x100000000 + prioCounter % 0x100000000
- end
- local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
- isPausedOrMaxed)
- local score = getPriorityScore(priority, priorityCounterKey)
- rcall("ZADD", prioritizedKey, score, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to check for the meta.paused key to decide if we are paused or not
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
- local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
- if queueAttributes[1] then
- return pausedKey, true, queueAttributes[3], queueAttributes[4]
- else
- if queueAttributes[2] then
- local activeCount = rcall("LLEN", activeKey)
- if activeCount >= tonumber(queueAttributes[2]) then
- return waitKey, true, queueAttributes[3], queueAttributes[4]
- else
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- end
- end
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- --[[
- Function to store a job
- ]]
- local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
- parentKey, parentData, repeatJobKey)
- local jsonOpts = cjson.encode(opts)
- local delay = opts['delay'] or 0
- local priority = opts['priority'] or 0
- local debounceId = opts['de'] and opts['de']['id']
- local optionalValues = {}
- if parentKey ~= nil then
- table.insert(optionalValues, "parentKey")
- table.insert(optionalValues, parentKey)
- table.insert(optionalValues, "parent")
- table.insert(optionalValues, parentData)
- end
- if repeatJobKey then
- table.insert(optionalValues, "rjk")
- table.insert(optionalValues, repeatJobKey)
- end
- if debounceId then
- table.insert(optionalValues, "deid")
- table.insert(optionalValues, debounceId)
- end
- rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
- "timestamp", timestamp, "delay", delay, "priority", priority,
- unpack(optionalValues))
- rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
- return delay, priority
- end
- local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
- timestamp, parentKey, parentData, repeatJobKey, maxEvents,
- waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
- priorityCounterKey, delayedKey, markerKey)
- local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
- opts, timestamp, parentKey, parentData, repeatJobKey)
- if delay ~= 0 and delayedKey then
- addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
- else
- local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
- if priority > 0 then
- addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
- priorityCounterKey, isPausedOrMaxed)
- else
- local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
- addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
- end
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
- "jobId", jobId)
- end
- return delay, priority
- end
- local function addJobFromScheduler(jobKey, jobId, opts, waitKey, pausedKey, activeKey, metaKey,
- prioritizedKey, priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp,
- data, jobSchedulerId, repeatDelay)
- opts['delay'] = repeatDelay
- opts['jobId'] = jobId
- storeAndEnqueueJob(eventsKey, jobKey, jobId, name, data, opts,
- timestamp, nil, nil, jobSchedulerId, maxEvents,
- waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
- priorityCounter, delayedKey, markerKey)
- end
- --[[
- Function to get max events value or set by default 10000.
- ]]
- local function getOrSetMaxEvents(metaKey)
- local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
- if not maxEvents then
- maxEvents = 10000
- rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
- end
- return maxEvents
- end
- --[[
- Function to check for the meta.paused key to decide if we are paused or not
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function isQueuePaused(queueMetaKey)
- return rcall("HEXISTS", queueMetaKey, "paused") == 1
- end
- --[[
- Function to remove job.
- ]]
- -- Includes
- --[[
- Function to remove deduplication key if needed
- when a job is being removed.
- ]]
- local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
- jobId, deduplicationId)
- if deduplicationId then
- local deduplicationKey = prefixKey .. "de:" .. deduplicationId
- local currentJobId = rcall('GET', deduplicationKey)
- if currentJobId and currentJobId == jobId then
- rcall("DEL", deduplicationKey)
- -- Also clean up any pending dedup-next data for this dedup ID
- rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
- return 1
- end
- end
- end
- --[[
- Function to remove job keys.
- ]]
- local function removeJobKeys(jobKey)
- return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
- jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
- end
- --[[
- Check if this job has a parent. If so we will just remove it from
- the parent child list, but if it is the last child we should move the parent to "wait/paused"
- which requires code from "moveToFinished"
- ]]
- -- Includes
- --[[
- Functions to destructure job key.
- Just a bit of warning, these functions may be a bit slow and affect performance significantly.
- ]]
- local getJobIdFromKey = function (jobKey)
- return string.match(jobKey, ".*:(.*)")
- end
- local getJobKeyPrefix = function (jobKey, jobId)
- return string.sub(jobKey, 0, #jobKey - #jobId)
- end
- local function _moveParentToWait(parentPrefix, parentId, emitEvent)
- local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
- parentPrefix .. "wait", parentPrefix .. "paused")
- addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
- if emitEvent then
- local parentEventStream = parentPrefix .. "events"
- rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
- end
- end
- local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
- if parentKey then
- local parentDependenciesKey = parentKey .. ":dependencies"
- local result = rcall("SREM", parentDependenciesKey, jobKey)
- if result > 0 then
- local pendingDependencies = rcall("SCARD", parentDependenciesKey)
- if pendingDependencies == 0 then
- local parentId = getJobIdFromKey(parentKey)
- local parentPrefix = getJobKeyPrefix(parentKey, parentId)
- local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
- if numRemovedElements == 1 then
- if hard then -- remove parent in same queue
- if parentPrefix == baseKey then
- removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
- removeJobKeys(parentKey)
- if debounceId then
- rcall("DEL", parentPrefix .. "de:" .. debounceId)
- end
- else
- _moveParentToWait(parentPrefix, parentId)
- end
- else
- _moveParentToWait(parentPrefix, parentId, true)
- end
- end
- end
- return true
- end
- else
- local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
- local missedParentKey = parentAttributes[1]
- if( (type(missedParentKey) == "string") and missedParentKey ~= ""
- and (rcall("EXISTS", missedParentKey) == 1)) then
- local parentDependenciesKey = missedParentKey .. ":dependencies"
- local result = rcall("SREM", parentDependenciesKey, jobKey)
- if result > 0 then
- local pendingDependencies = rcall("SCARD", parentDependenciesKey)
- if pendingDependencies == 0 then
- local parentId = getJobIdFromKey(missedParentKey)
- local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
- local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
- if numRemovedElements == 1 then
- if hard then
- if parentPrefix == baseKey then
- removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
- removeJobKeys(missedParentKey)
- if parentAttributes[2] then
- rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
- end
- else
- _moveParentToWait(parentPrefix, parentId)
- end
- else
- _moveParentToWait(parentPrefix, parentId, true)
- end
- end
- end
- return true
- end
- end
- end
- return false
- end
- local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
- local jobKey = baseKey .. jobId
- removeParentDependencyKey(jobKey, hard, nil, baseKey)
- if shouldRemoveDeduplicationKey then
- local deduplicationId = rcall("HGET", jobKey, "deid")
- removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
- end
- removeJobKeys(jobKey)
- end
- --[[
- Function to store a job scheduler
- ]]
- local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMillis, opts,
- templateData, templateOpts)
- rcall("ZADD", repeatKey, nextMillis, schedulerId)
- local optionalValues = {}
- if opts['tz'] then
- table.insert(optionalValues, "tz")
- table.insert(optionalValues, opts['tz'])
- end
- if opts['limit'] then
- table.insert(optionalValues, "limit")
- table.insert(optionalValues, opts['limit'])
- end
- if opts['pattern'] then
- table.insert(optionalValues, "pattern")
- table.insert(optionalValues, opts['pattern'])
- end
- if opts['startDate'] then
- table.insert(optionalValues, "startDate")
- table.insert(optionalValues, opts['startDate'])
- end
- if opts['endDate'] then
- table.insert(optionalValues, "endDate")
- table.insert(optionalValues, opts['endDate'])
- end
- if opts['every'] then
- table.insert(optionalValues, "every")
- table.insert(optionalValues, opts['every'])
- end
- if opts['offset'] then
- table.insert(optionalValues, "offset")
- table.insert(optionalValues, opts['offset'])
- else
- local offset = rcall("HGET", schedulerKey, "offset")
- if offset then
- table.insert(optionalValues, "offset")
- table.insert(optionalValues, tonumber(offset))
- end
- end
- local jsonTemplateOpts = cjson.encode(templateOpts)
- if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
- table.insert(optionalValues, "opts")
- table.insert(optionalValues, jsonTemplateOpts)
- end
- if templateData and templateData ~= '{}' then
- table.insert(optionalValues, "data")
- table.insert(optionalValues, templateData)
- end
- table.insert(optionalValues, "ic")
- table.insert(optionalValues, rcall("HGET", schedulerKey, "ic") or 1)
- rcall("DEL", schedulerKey) -- remove all attributes and then re-insert new ones
- rcall("HMSET", schedulerKey, "name", opts['name'], unpack(optionalValues))
- end
- local function getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
- local nextMillis
- if not prevMillis then
- if startDate then
- -- Assuming startDate is passed as milliseconds from JavaScript
- nextMillis = tonumber(startDate)
- nextMillis = nextMillis > now and nextMillis or now
- else
- nextMillis = now
- end
- else
- nextMillis = prevMillis + every
- -- check if we may have missed some iterations
- if nextMillis < now then
- nextMillis = math.floor(now / every) * every + every + (offset or 0)
- end
- end
- if not offset or offset == 0 then
- local timeSlot = math.floor(nextMillis / every) * every;
- offset = nextMillis - timeSlot;
- end
- -- Return a tuple nextMillis, offset
- return math.floor(nextMillis), math.floor(offset)
- end
- -- If we are overriding a repeatable job we must delete the delayed job for
- -- the next iteration.
- local schedulerKey = repeatKey .. ":" .. jobSchedulerId
- local maxEvents = getOrSetMaxEvents(metaKey)
- local templateData = ARGV[4]
- local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
- if prevMillis then
- prevMillis = tonumber(prevMillis)
- end
- local schedulerOpts = cmsgpack.unpack(ARGV[2])
- local every = schedulerOpts['every']
- -- For backwards compatibility we also check the offset from the job itself.
- -- could be removed in future major versions.
- local jobOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
- local offset = schedulerOpts['offset'] or jobOffset or 0
- local newOffset = offset
- local updatedEvery = false
- if every then
- -- if we changed the 'every' value we need to reset millis to nil
- local millis = prevMillis
- if prevMillis then
- local prevEvery = tonumber(rcall("HGET", schedulerKey, "every"))
- if prevEvery ~= every then
- millis = nil
- updatedEvery = true
- end
- end
- local startDate = schedulerOpts['startDate']
- nextMillis, newOffset = getJobSchedulerEveryNextMillis(millis, every, now, offset, startDate)
- end
- local function removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, jobId, metaKey,
- eventsKey)
- if rcall("ZSCORE", delayedKey, jobId) then
- removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
- rcall("ZREM", delayedKey, jobId)
- return true
- elseif rcall("ZSCORE", prioritizedKey, jobId) then
- removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
- rcall("ZREM", prioritizedKey, jobId)
- return true
- else
- local pausedOrWaitKey = waitKey
- if isQueuePaused(metaKey) then
- pausedOrWaitKey = pausedKey
- end
- if rcall("LREM", pausedOrWaitKey, 1, jobId) > 0 then
- removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
- return true
- end
- end
- return false
- end
- local removedPrevJob = false
- if prevMillis then
- local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
- local currentJobKey = schedulerKey .. ":" .. prevMillis
- -- In theory it should always exist the currentJobKey if there is a prevMillis unless something has
- -- gone really wrong.
- if rcall("EXISTS", currentJobKey) == 1 then
- removedPrevJob = removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, currentJobId,
- metaKey, eventsKey)
- end
- end
- if removedPrevJob then
- -- The jobs has been removed and we want to replace it, so lets use the same millis.
- if every and not updatedEvery then
- nextMillis = prevMillis
- end
- else
- -- Special case where no job was removed, and we need to add the next iteration.
- schedulerOpts['offset'] = newOffset
- end
- -- Check for job ID collision with existing jobs (in any state)
- local jobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
- local jobKey = prefixKey .. jobId
- -- If there's already a job with this ID, in a state
- -- that is not updatable (active, completed, failed) we must
- -- handle the collision
- local hasCollision = false
- if rcall("EXISTS", jobKey) == 1 then
- if every then
- -- For 'every' case: try next time slot to avoid collision
- local nextSlotMillis = nextMillis + every
- local nextSlotJobId = "repeat:" .. jobSchedulerId .. ":" .. nextSlotMillis
- local nextSlotJobKey = prefixKey .. nextSlotJobId
- if rcall("EXISTS", nextSlotJobKey) == 0 then
- -- Next slot is free, use it
- nextMillis = nextSlotMillis
- jobId = nextSlotJobId
- else
- -- Next slot also has a job, return error code
- return -11 -- SchedulerJobSlotsBusy
- end
- else
- hasCollision = true
- end
- end
- local delay = nextMillis - now
- -- Fast Clamp delay to minimum of 0
- if delay < 0 then
- delay = 0
- end
- local nextJobKey = schedulerKey .. ":" .. nextMillis
- if not hasCollision or removedPrevJob then
- -- jobId already calculated above during collision check
- storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, templateData, templateOpts)
- rcall("INCR", KEYS[8])
- addJobFromScheduler(nextJobKey, jobId, jobOpts, waitKey, pausedKey, KEYS[11], metaKey, prioritizedKey, KEYS[10],
- delayedKey, KEYS[7], eventsKey, schedulerOpts['name'], maxEvents, now, templateData, jobSchedulerId, delay)
- elseif hasCollision then
- -- For 'pattern' case: return error code
- return -10 -- SchedulerJobIdCollision
- end
- if ARGV[9] ~= "" then
- rcall("HSET", ARGV[9], "nrjid", jobId)
- end
- return {jobId .. "", delay}
- `;
- exports.addJobScheduler = {
- name: 'addJobScheduler',
- content,
- keys: 11,
- };
- //# sourceMappingURL=addJobScheduler-11.js.map
|