deduplicateJob.lua 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. --[[
  2. Function to debounce a job.
  3. ]]
  4. -- Includes
  5. --- @include "deduplicateJobWithoutReplace"
  6. --- @include "removeJobKeys"
  7. --- @include "setDeduplicationKey"
  8. --- @include "storeDeduplicatedNextJob"
  9. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  10. jobId, deduplicationId, prefix)
  11. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  12. removeJobKeys(prefix .. currentDeduplicatedJobId)
  13. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  14. "prev", "delayed")
  15. -- TODO remove debounced event in next breaking change
  16. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  17. jobId, "debounceId", deduplicationId)
  18. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  19. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  20. return true
  21. end
  22. return false
  23. end
  24. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  25. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  26. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  27. if deduplicationId then
  28. if deduplicationOpts['replace'] then
  29. local currentDebounceJobId = rcall('GET', deduplicationKey)
  30. if currentDebounceJobId then
  31. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  32. currentDebounceJobId, jobId, deduplicationId, prefix)
  33. if isRemoved then
  34. if deduplicationOpts['keepLastIfActive'] then
  35. rcall('SET', deduplicationKey, jobId)
  36. else
  37. local ttl = deduplicationOpts['ttl']
  38. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  39. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  40. else
  41. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  42. end
  43. end
  44. return
  45. else
  46. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  47. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  48. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  49. return currentDebounceJobId
  50. end
  51. else
  52. if deduplicationOpts['keepLastIfActive'] then
  53. rcall('SET', deduplicationKey, jobId)
  54. else
  55. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  56. end
  57. return
  58. end
  59. else
  60. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  61. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  62. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  63. end
  64. end
  65. end