| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- const content = `--[[
- Updates a job scheduler and adds next delayed job
- Input:
- KEYS[1] 'repeat' key
- KEYS[2] 'delayed'
- KEYS[3] 'wait' key
- KEYS[4] 'paused' key
- KEYS[5] 'meta'
- KEYS[6] 'prioritized' key
- KEYS[7] 'marker',
- KEYS[8] 'id'
- KEYS[9] events stream key
- KEYS[10] 'pc' priority counter
- KEYS[11] producer key
- KEYS[12] 'active' key
- ARGV[1] next milliseconds
- ARGV[2] jobs scheduler id
- ARGV[3] Json stringified delayed data
- ARGV[4] msgpacked delayed opts
- ARGV[5] timestamp
- ARGV[6] prefix key
- ARGV[7] producer id
- Output:
- next delayed job id - OK
- ]] local rcall = redis.call
- local repeatKey = KEYS[1]
- local delayedKey = KEYS[2]
- local waitKey = KEYS[3]
- local pausedKey = KEYS[4]
- local metaKey = KEYS[5]
- local prioritizedKey = KEYS[6]
- local nextMillis = tonumber(ARGV[1])
- local jobSchedulerId = ARGV[2]
- local timestamp = tonumber(ARGV[5])
- local prefixKey = ARGV[6]
- local producerId = ARGV[7]
- local jobOpts = cmsgpack.unpack(ARGV[4])
- -- Includes
- --[[
- Add delay marker if needed.
- ]]
- -- Includes
- --[[
- Shared helper to store a job and enqueue it into the appropriate list/set.
- Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
- Emits the appropriate event after enqueuing ("delayed" or "waiting").
- Returns delay, priority from storeJob.
- ]]
- -- Includes
- --[[
- Adds a delayed job to the queue by doing the following:
- - Creates a new job key with the job data.
- - adds to delayed zset.
- - Emits a global event 'delayed' if the job is delayed.
- ]]
- -- Includes
- --[[
- Add delay marker if needed.
- ]]
- -- Includes
- --[[
- Function to return the next delayed job timestamp.
- ]]
- local function getNextDelayedTimestamp(delayedKey)
- local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
- if #result then
- local nextTimestamp = tonumber(result[2])
- if nextTimestamp ~= nil then
- return nextTimestamp / 0x1000
- end
- end
- end
- local function addDelayMarkerIfNeeded(markerKey, delayedKey)
- local nextTimestamp = getNextDelayedTimestamp(delayedKey)
- if nextTimestamp ~= nil then
- -- Replace the score of the marker with the newest known
- -- next timestamp.
- rcall("ZADD", markerKey, nextTimestamp, "1")
- end
- end
- --[[
- Bake in the job id first 12 bits into the timestamp
- to guarantee correct execution order of delayed jobs
- (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
- WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
- ]]
- local function getDelayedScore(delayedKey, timestamp, delay)
- local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
- local minScore = delayedTimestamp * 0x1000
- local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
- local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
- minScore, "WITHSCORES","LIMIT", 0, 1)
- if #result then
- local currentMaxScore = tonumber(result[2])
- if currentMaxScore ~= nil then
- if currentMaxScore >= maxScore then
- return maxScore, delayedTimestamp
- else
- return currentMaxScore + 1, delayedTimestamp
- end
- end
- end
- return minScore, delayedTimestamp
- end
- local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
- maxEvents, markerKey, delay)
- local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
- rcall("ZADD", delayedKey, score, jobId)
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
- "jobId", jobId, "delay", delayedTimestamp)
- -- mark that a delayed job is available
- addDelayMarkerIfNeeded(markerKey, delayedKey)
- end
- --[[
- Function to add job in target list and add marker if needed.
- ]]
- -- Includes
- --[[
- Add marker if needed when a job is available.
- ]]
- local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- if not isPausedOrMaxed then
- rcall("ZADD", markerKey, 0, "0")
- end
- end
- local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
- rcall(pushCmd, targetKey, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to add job considering priority.
- ]]
- -- Includes
- --[[
- Function to get priority score.
- ]]
- local function getPriorityScore(priority, priorityCounterKey)
- local prioCounter = rcall("INCR", priorityCounterKey)
- return priority * 0x100000000 + prioCounter % 0x100000000
- end
- local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
- isPausedOrMaxed)
- local score = getPriorityScore(priority, priorityCounterKey)
- rcall("ZADD", prioritizedKey, score, jobId)
- addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
- end
- --[[
- Function to check for the meta.paused key to decide if we are paused or not
- (since an empty list and !EXISTS are not really the same).
- ]]
- local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
- local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
- if queueAttributes[1] then
- return pausedKey, true, queueAttributes[3], queueAttributes[4]
- else
- if queueAttributes[2] then
- local activeCount = rcall("LLEN", activeKey)
- if activeCount >= tonumber(queueAttributes[2]) then
- return waitKey, true, queueAttributes[3], queueAttributes[4]
- else
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- end
- end
- return waitKey, false, queueAttributes[3], queueAttributes[4]
- end
- --[[
- Function to store a job
- ]]
- local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
- parentKey, parentData, repeatJobKey)
- local jsonOpts = cjson.encode(opts)
- local delay = opts['delay'] or 0
- local priority = opts['priority'] or 0
- local debounceId = opts['de'] and opts['de']['id']
- local optionalValues = {}
- if parentKey ~= nil then
- table.insert(optionalValues, "parentKey")
- table.insert(optionalValues, parentKey)
- table.insert(optionalValues, "parent")
- table.insert(optionalValues, parentData)
- end
- if repeatJobKey then
- table.insert(optionalValues, "rjk")
- table.insert(optionalValues, repeatJobKey)
- end
- if debounceId then
- table.insert(optionalValues, "deid")
- table.insert(optionalValues, debounceId)
- end
- rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
- "timestamp", timestamp, "delay", delay, "priority", priority,
- unpack(optionalValues))
- rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
- return delay, priority
- end
- local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
- timestamp, parentKey, parentData, repeatJobKey, maxEvents,
- waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
- priorityCounterKey, delayedKey, markerKey)
- local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
- opts, timestamp, parentKey, parentData, repeatJobKey)
- if delay ~= 0 and delayedKey then
- addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
- else
- local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
- if priority > 0 then
- addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
- priorityCounterKey, isPausedOrMaxed)
- else
- local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
- addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
- end
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
- "jobId", jobId)
- end
- return delay, priority
- end
- local function addJobFromScheduler(jobKey, jobId, opts, waitKey, pausedKey, activeKey, metaKey,
- prioritizedKey, priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp,
- data, jobSchedulerId, repeatDelay)
- opts['delay'] = repeatDelay
- opts['jobId'] = jobId
- storeAndEnqueueJob(eventsKey, jobKey, jobId, name, data, opts,
- timestamp, nil, nil, jobSchedulerId, maxEvents,
- waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
- priorityCounter, delayedKey, markerKey)
- end
- --[[
- Function to get max events value or set by default 10000.
- ]]
- local function getOrSetMaxEvents(metaKey)
- local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
- if not maxEvents then
- maxEvents = 10000
- rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
- end
- return maxEvents
- end
- local function getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
- local nextMillis
- if not prevMillis then
- if startDate then
- -- Assuming startDate is passed as milliseconds from JavaScript
- nextMillis = tonumber(startDate)
- nextMillis = nextMillis > now and nextMillis or now
- else
- nextMillis = now
- end
- else
- nextMillis = prevMillis + every
- -- check if we may have missed some iterations
- if nextMillis < now then
- nextMillis = math.floor(now / every) * every + every + (offset or 0)
- end
- end
- if not offset or offset == 0 then
- local timeSlot = math.floor(nextMillis / every) * every;
- offset = nextMillis - timeSlot;
- end
- -- Return a tuple nextMillis, offset
- return math.floor(nextMillis), math.floor(offset)
- end
- local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
- -- Validate that scheduler exists.
- -- If it does not exist we should not iterate anymore.
- if prevMillis then
- prevMillis = tonumber(prevMillis)
- local schedulerKey = repeatKey .. ":" .. jobSchedulerId
- local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data", "every", "startDate", "offset")
- local every = tonumber(schedulerAttributes[3])
- local now = tonumber(timestamp)
- -- If every is not found in scheduler attributes, try to get it from job options
- if not every and jobOpts['repeat'] and jobOpts['repeat']['every'] then
- every = tonumber(jobOpts['repeat']['every'])
- end
- if every then
- local startDate = schedulerAttributes[4]
- local jobOptsOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
- local offset = schedulerAttributes[5] or jobOptsOffset or 0
- local newOffset
- nextMillis, newOffset = getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
- if not offset then
- rcall("HSET", schedulerKey, "offset", newOffset)
- jobOpts['repeat']['offset'] = newOffset
- end
- end
- local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
- local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
- local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
- if producerId == currentDelayedJobId then
- local eventsKey = KEYS[9]
- local maxEvents = getOrSetMaxEvents(metaKey)
- if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
- rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
- rcall("HINCRBY", schedulerKey, "ic", 1)
- rcall("INCR", KEYS[8])
- -- TODO: remove this workaround in next breaking change,
- -- all job-schedulers must save job data
- local templateData = schedulerAttributes[2] or ARGV[3]
- if templateData and templateData ~= '{}' then
- rcall("HSET", schedulerKey, "data", templateData)
- end
- local delay = nextMillis - now
- -- Fast Clamp delay to minimum of 0
- if delay < 0 then
- delay = 0
- end
- jobOpts["delay"] = delay
- addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, jobOpts, waitKey, pausedKey, KEYS[12], metaKey,
- prioritizedKey, KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerAttributes[1], maxEvents, ARGV[5],
- templateData or '{}', jobSchedulerId, delay)
- -- TODO: remove this workaround in next breaking change
- if KEYS[11] ~= "" then
- rcall("HSET", KEYS[11], "nrjid", nextDelayedJobId)
- end
- return nextDelayedJobId .. "" -- convert to string
- else
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "duplicated", "jobId", nextDelayedJobId)
- end
- end
- end
- `;
- export const updateJobScheduler = {
- name: 'updateJobScheduler',
- content,
- keys: 12,
- };
- //# sourceMappingURL=updateJobScheduler-12.js.map
|