| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- --[[
- Adds a priotitized job to the queue by doing the following:
- - Increases the job counter if needed.
- - Creates a new job key with the job data.
- - Adds the job to the "added" list so that workers gets notified.
- Input:
- KEYS[1] 'marker',
- KEYS[2] 'meta'
- KEYS[3] 'id'
- KEYS[4] 'prioritized'
- KEYS[5] 'delayed'
- KEYS[6] 'completed'
- KEYS[7] 'active'
- KEYS[8] events stream key
- KEYS[9] 'pc' priority counter
- ARGV[1] msgpacked arguments array
- [1] key prefix,
- [2] custom id (will not generate one automatically)
- [3] name
- [4] timestamp
- [5] parentKey?
- [6] parent dependencies key.
- [7] parent? {id, queueKey}
- [8] repeat job key
- [9] deduplication key
- ARGV[2] Json stringified job data
- ARGV[3] msgpacked options
- Output:
- jobId - OK
- -5 - Missing parent key
- ]]
- local metaKey = KEYS[2]
- local idKey = KEYS[3]
- local priorityKey = KEYS[4]
- local completedKey = KEYS[6]
- local activeKey = KEYS[7]
- local eventsKey = KEYS[8]
- local priorityCounterKey = KEYS[9]
- local jobId
- local jobIdKey
- local rcall = redis.call
- local args = cmsgpack.unpack(ARGV[1])
- local data = ARGV[2]
- local opts = cmsgpack.unpack(ARGV[3])
- local parentKey = args[5]
- local parent = args[7]
- local repeatJobKey = args[8]
- local deduplicationKey = args[9]
- local parentData
- -- Includes
- --- @include "includes/addJobWithPriority"
- --- @include "includes/deduplicateJob"
- --- @include "includes/storeJob"
- --- @include "includes/getOrSetMaxEvents"
- --- @include "includes/handleDuplicatedJob"
- --- @include "includes/isQueuePausedOrMaxed"
- if parentKey ~= nil then
- if rcall("EXISTS", parentKey) ~= 1 then return -5 end
- parentData = cjson.encode(parent)
- end
- local jobCounter = rcall("INCR", idKey)
- local maxEvents = getOrSetMaxEvents(metaKey)
- local parentDependenciesKey = args[6]
- local timestamp = args[4]
- if args[2] == "" then
- jobId = jobCounter
- jobIdKey = args[1] .. jobId
- else
- jobId = args[2]
- jobIdKey = args[1] .. jobId
- if rcall("EXISTS", jobIdKey) == 1 then
- return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
- parentData, parentDependenciesKey, completedKey, eventsKey,
- maxEvents, timestamp)
- end
- end
- local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[5],
- deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
- parentKey, parentData, parentDependenciesKey, repeatJobKey)
- if deduplicationJobId then
- return deduplicationJobId
- end
- -- Store the job.
- local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
- opts, timestamp, parentKey, parentData,
- repeatJobKey)
- -- Add the job to the prioritized set
- local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
- addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)
- -- Emit waiting event
- rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
- "jobId", jobId)
- -- Check if this job is a child of another job, if so add it to the parents dependencies
- if parentDependenciesKey ~= nil then
- rcall("SADD", parentDependenciesKey, jobIdKey)
- end
- return jobId .. "" -- convert to string
|