requeueDeduplicatedJob.lua 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. --[[
  2. Function to create a new job from stored dedup-next data
  3. when a deduplicated job with keepLastIfActive finishes.
  4. At most one next job is created per deduplication ID.
  5. Multiple triggers while active overwrite the dedup-next data,
  6. so only the latest data is used.
  7. ]]
  8. -- Includes
  9. --- @include "getOrSetMaxEvents"
  10. --- @include "setDeduplicationKey"
  11. --- @include "storeAndEnqueueJob"
  12. local function requeueDeduplicatedJob(prefix, deduplicationId, eventStreamKey,
  13. metaKey, activeKey, waitKey, pausedKey, markerKey, prioritizedKey,
  14. priorityCounterKey, delayedKey, timestamp)
  15. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  16. if rcall("EXISTS", deduplicationNextKey) == 1 then
  17. local nextData = rcall("HMGET", deduplicationNextKey,
  18. "name", "data", "opts", "pk", "pd", "pdk", "rjk")
  19. local newJobId = rcall("INCR", prefix .. "id") .. ""
  20. local newJobIdKey = prefix .. newJobId
  21. local newOpts = cjson.decode(nextData[3])
  22. local deduplicationKey = prefix .. "de:" .. deduplicationId
  23. local parentKey = nextData[4] or nil
  24. local parentData = nextData[5] or nil
  25. local parentDependenciesKey = nextData[6] or nil
  26. local repeatJobKey = nextData[7] or nil
  27. -- Set dedup key for the new job (without TTL when keepLastIfActive,
  28. -- so the key outlives the job's active duration)
  29. local deOpts = newOpts['de']
  30. if deOpts and deOpts['keepLastIfActive'] then
  31. rcall('SET', deduplicationKey, newJobId)
  32. else
  33. setDeduplicationKey(deduplicationKey, newJobId, deOpts)
  34. end
  35. -- Store and enqueue using the shared helper (handles priority/lifo/delayed)
  36. local maxEvents = getOrSetMaxEvents(metaKey)
  37. storeAndEnqueueJob(eventStreamKey, newJobIdKey, newJobId, nextData[1], nextData[2],
  38. newOpts, timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  39. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  40. priorityCounterKey, delayedKey, markerKey)
  41. -- Register as child dependency if the job has a parent
  42. if parentDependenciesKey then
  43. rcall("SADD", parentDependenciesKey, newJobIdKey)
  44. end
  45. -- Only delete the dedup-next hash after the job is fully created,
  46. -- so that if any step above errors, the data is not permanently lost.
  47. rcall("DEL", deduplicationNextKey)
  48. end
  49. end