| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- --[[
- Updates a job scheduler and adds next delayed job
- Input:
- KEYS[1] 'repeat' key
- KEYS[2] 'delayed'
- KEYS[3] 'wait' key
- KEYS[4] 'paused' key
- KEYS[5] 'meta'
- KEYS[6] 'prioritized' key
- KEYS[7] 'marker',
- KEYS[8] 'id'
- KEYS[9] events stream key
- KEYS[10] 'pc' priority counter
- KEYS[11] producer key
- KEYS[12] 'active' key
- ARGV[1] next milliseconds
- ARGV[2] jobs scheduler id
- ARGV[3] Json stringified delayed data
- ARGV[4] msgpacked delayed opts
- ARGV[5] timestamp
- ARGV[6] prefix key
- ARGV[7] producer id
- Output:
- next delayed job id - OK
- ]] local rcall = redis.call
- local repeatKey = KEYS[1]
- local delayedKey = KEYS[2]
- local waitKey = KEYS[3]
- local pausedKey = KEYS[4]
- local metaKey = KEYS[5]
- local prioritizedKey = KEYS[6]
- local nextMillis = tonumber(ARGV[1])
- local jobSchedulerId = ARGV[2]
- local timestamp = tonumber(ARGV[5])
- local prefixKey = ARGV[6]
- local producerId = ARGV[7]
- local jobOpts = cmsgpack.unpack(ARGV[4])
- -- Includes
- --- @include "includes/addJobFromScheduler"
- --- @include "includes/getOrSetMaxEvents"
- --- @include "includes/getJobSchedulerEveryNextMillis"
- local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
- -- Validate that scheduler exists.
- -- If it does not exist we should not iterate anymore.
- if prevMillis then
- prevMillis = tonumber(prevMillis)
- local schedulerKey = repeatKey .. ":" .. jobSchedulerId
- local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data", "every", "startDate", "offset")
- local every = tonumber(schedulerAttributes[3])
- local now = tonumber(timestamp)
- -- If every is not found in scheduler attributes, try to get it from job options
- if not every and jobOpts['repeat'] and jobOpts['repeat']['every'] then
- every = tonumber(jobOpts['repeat']['every'])
- end
- if every then
- local startDate = schedulerAttributes[4]
- local jobOptsOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
- local offset = schedulerAttributes[5] or jobOptsOffset or 0
- local newOffset
- nextMillis, newOffset = getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
- if not offset then
- rcall("HSET", schedulerKey, "offset", newOffset)
- jobOpts['repeat']['offset'] = newOffset
- end
- end
- local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
- local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
- local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
- if producerId == currentDelayedJobId then
- local eventsKey = KEYS[9]
- local maxEvents = getOrSetMaxEvents(metaKey)
- if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
- rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
- rcall("HINCRBY", schedulerKey, "ic", 1)
- rcall("INCR", KEYS[8])
- -- TODO: remove this workaround in next breaking change,
- -- all job-schedulers must save job data
- local templateData = schedulerAttributes[2] or ARGV[3]
- if templateData and templateData ~= '{}' then
- rcall("HSET", schedulerKey, "data", templateData)
- end
- local delay = nextMillis - now
- -- Fast Clamp delay to minimum of 0
- if delay < 0 then
- delay = 0
- end
- jobOpts["delay"] = delay
- addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, jobOpts, waitKey, pausedKey, KEYS[12], metaKey,
- prioritizedKey, KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerAttributes[1], maxEvents, ARGV[5],
- templateData or '{}', jobSchedulerId, delay)
- -- TODO: remove this workaround in next breaking change
- if KEYS[11] ~= "" then
- rcall("HSET", KEYS[11], "nrjid", nextDelayedJobId)
- end
- return nextDelayedJobId .. "" -- convert to string
- else
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "duplicated", "jobId", nextDelayedJobId)
- end
- end
- end
|