| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- --[[
- Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).
- Input:
- KEYS[1] 'repeat' key
- KEYS[2] 'delayed' key
- KEYS[3] 'wait' key
- KEYS[4] 'paused' key
- KEYS[5] 'meta' key
- KEYS[6] 'prioritized' key
- KEYS[7] 'marker' key
- KEYS[8] 'id' key
- KEYS[9] 'events' key
- KEYS[10] 'pc' priority counter
- KEYS[11] 'active' key
-
- ARGV[1] next milliseconds
- ARGV[2] msgpacked options
- [1] name
- [2] tz?
- [3] pattern?
- [4] endDate?
- [5] every?
- ARGV[3] jobs scheduler id
- ARGV[4] Json stringified template data
- ARGV[5] mspacked template opts
- ARGV[6] msgpacked delayed opts
- ARGV[7] timestamp
- ARGV[8] prefix key
- ARGV[9] producer key
- Output:
- repeatableKey - OK
- ]] local rcall = redis.call
- local repeatKey = KEYS[1]
- local delayedKey = KEYS[2]
- local waitKey = KEYS[3]
- local pausedKey = KEYS[4]
- local metaKey = KEYS[5]
- local prioritizedKey = KEYS[6]
- local eventsKey = KEYS[9]
- local nextMillis = ARGV[1]
- local jobSchedulerId = ARGV[3]
- local templateOpts = cmsgpack.unpack(ARGV[5])
- local now = tonumber(ARGV[7])
- local prefixKey = ARGV[8]
- local jobOpts = cmsgpack.unpack(ARGV[6])
- -- Includes
- --- @include "includes/addJobFromScheduler"
- --- @include "includes/getOrSetMaxEvents"
- --- @include "includes/isQueuePaused"
- --- @include "includes/removeJob"
- --- @include "includes/storeJobScheduler"
- --- @include "includes/getJobSchedulerEveryNextMillis"
- -- If we are overriding a repeatable job we must delete the delayed job for
- -- the next iteration.
- local schedulerKey = repeatKey .. ":" .. jobSchedulerId
- local maxEvents = getOrSetMaxEvents(metaKey)
- local templateData = ARGV[4]
- local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
- if prevMillis then
- prevMillis = tonumber(prevMillis)
- end
- local schedulerOpts = cmsgpack.unpack(ARGV[2])
- local every = schedulerOpts['every']
- -- For backwards compatibility we also check the offset from the job itself.
- -- could be removed in future major versions.
- local jobOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
- local offset = schedulerOpts['offset'] or jobOffset or 0
- local newOffset = offset
- local updatedEvery = false
- if every then
- -- if we changed the 'every' value we need to reset millis to nil
- local millis = prevMillis
- if prevMillis then
- local prevEvery = tonumber(rcall("HGET", schedulerKey, "every"))
- if prevEvery ~= every then
- millis = nil
- updatedEvery = true
- end
- end
- local startDate = schedulerOpts['startDate']
- nextMillis, newOffset = getJobSchedulerEveryNextMillis(millis, every, now, offset, startDate)
- end
- local function removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, jobId, metaKey,
- eventsKey)
- if rcall("ZSCORE", delayedKey, jobId) then
- removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
- rcall("ZREM", delayedKey, jobId)
- return true
- elseif rcall("ZSCORE", prioritizedKey, jobId) then
- removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
- rcall("ZREM", prioritizedKey, jobId)
- return true
- else
- local pausedOrWaitKey = waitKey
- if isQueuePaused(metaKey) then
- pausedOrWaitKey = pausedKey
- end
- if rcall("LREM", pausedOrWaitKey, 1, jobId) > 0 then
- removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
- return true
- end
- end
- return false
- end
- local removedPrevJob = false
- if prevMillis then
- local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
- local currentJobKey = schedulerKey .. ":" .. prevMillis
- -- In theory it should always exist the currentJobKey if there is a prevMillis unless something has
- -- gone really wrong.
- if rcall("EXISTS", currentJobKey) == 1 then
- removedPrevJob = removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, currentJobId,
- metaKey, eventsKey)
- end
- end
- if removedPrevJob then
- -- The jobs has been removed and we want to replace it, so lets use the same millis.
- if every and not updatedEvery then
- nextMillis = prevMillis
- end
- else
- -- Special case where no job was removed, and we need to add the next iteration.
- schedulerOpts['offset'] = newOffset
- end
- -- Check for job ID collision with existing jobs (in any state)
- local jobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
- local jobKey = prefixKey .. jobId
- -- If there's already a job with this ID, in a state
- -- that is not updatable (active, completed, failed) we must
- -- handle the collision
- local hasCollision = false
- if rcall("EXISTS", jobKey) == 1 then
- if every then
- -- For 'every' case: try next time slot to avoid collision
- local nextSlotMillis = nextMillis + every
- local nextSlotJobId = "repeat:" .. jobSchedulerId .. ":" .. nextSlotMillis
- local nextSlotJobKey = prefixKey .. nextSlotJobId
- if rcall("EXISTS", nextSlotJobKey) == 0 then
- -- Next slot is free, use it
- nextMillis = nextSlotMillis
- jobId = nextSlotJobId
- else
- -- Next slot also has a job, return error code
- return -11 -- SchedulerJobSlotsBusy
- end
- else
- hasCollision = true
- end
- end
- local delay = nextMillis - now
- -- Fast Clamp delay to minimum of 0
- if delay < 0 then
- delay = 0
- end
- local nextJobKey = schedulerKey .. ":" .. nextMillis
- if not hasCollision or removedPrevJob then
- -- jobId already calculated above during collision check
- storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, templateData, templateOpts)
- rcall("INCR", KEYS[8])
- addJobFromScheduler(nextJobKey, jobId, jobOpts, waitKey, pausedKey, KEYS[11], metaKey, prioritizedKey, KEYS[10],
- delayedKey, KEYS[7], eventsKey, schedulerOpts['name'], maxEvents, now, templateData, jobSchedulerId, delay)
- elseif hasCollision then
- -- For 'pattern' case: return error code
- return -10 -- SchedulerJobIdCollision
- end
- if ARGV[9] ~= "" then
- rcall("HSET", ARGV[9], "nrjid", jobId)
- end
- return {jobId .. "", delay}
|