| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- --[[
- Function to create a new job from stored dedup-next data
- when a deduplicated job with keepLastIfActive finishes.
- At most one next job is created per deduplication ID.
- Multiple triggers while active overwrite the dedup-next data,
- so only the latest data is used.
- ]]
- -- Includes
- --- @include "getOrSetMaxEvents"
- --- @include "setDeduplicationKey"
- --- @include "storeAndEnqueueJob"
- local function requeueDeduplicatedJob(prefix, deduplicationId, eventStreamKey,
- metaKey, activeKey, waitKey, pausedKey, markerKey, prioritizedKey,
- priorityCounterKey, delayedKey, timestamp)
- local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
- if rcall("EXISTS", deduplicationNextKey) == 1 then
- local nextData = rcall("HMGET", deduplicationNextKey,
- "name", "data", "opts", "pk", "pd", "pdk", "rjk")
- local newJobId = rcall("INCR", prefix .. "id") .. ""
- local newJobIdKey = prefix .. newJobId
- local newOpts = cjson.decode(nextData[3])
- local deduplicationKey = prefix .. "de:" .. deduplicationId
- local parentKey = nextData[4] or nil
- local parentData = nextData[5] or nil
- local parentDependenciesKey = nextData[6] or nil
- local repeatJobKey = nextData[7] or nil
- -- Set dedup key for the new job (without TTL when keepLastIfActive,
- -- so the key outlives the job's active duration)
- local deOpts = newOpts['de']
- if deOpts and deOpts['keepLastIfActive'] then
- rcall('SET', deduplicationKey, newJobId)
- else
- setDeduplicationKey(deduplicationKey, newJobId, deOpts)
- end
- -- Store and enqueue using the shared helper (handles priority/lifo/delayed)
- local maxEvents = getOrSetMaxEvents(metaKey)
- storeAndEnqueueJob(eventStreamKey, newJobIdKey, newJobId, nextData[1], nextData[2],
- newOpts, timestamp, parentKey, parentData, repeatJobKey, maxEvents,
- waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
- priorityCounterKey, delayedKey, markerKey)
- -- Register as child dependency if the job has a parent
- if parentDependenciesKey then
- rcall("SADD", parentDependenciesKey, newJobIdKey)
- end
- -- Only delete the dedup-next hash after the job is fully created,
- -- so that if any step above errors, the data is not permanently lost.
- rcall("DEL", deduplicationNextKey)
- end
- end
|