addJobScheduler-11.lua 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. --[[
  2. Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).
  3. Input:
  4. KEYS[1] 'repeat' key
  5. KEYS[2] 'delayed' key
  6. KEYS[3] 'wait' key
  7. KEYS[4] 'paused' key
  8. KEYS[5] 'meta' key
  9. KEYS[6] 'prioritized' key
  10. KEYS[7] 'marker' key
  11. KEYS[8] 'id' key
  12. KEYS[9] 'events' key
  13. KEYS[10] 'pc' priority counter
  14. KEYS[11] 'active' key
  15. ARGV[1] next milliseconds
  16. ARGV[2] msgpacked options
  17. [1] name
  18. [2] tz?
  19. [3] pattern?
  20. [4] endDate?
  21. [5] every?
  22. ARGV[3] jobs scheduler id
  23. ARGV[4] Json stringified template data
  24. ARGV[5] mspacked template opts
  25. ARGV[6] msgpacked delayed opts
  26. ARGV[7] timestamp
  27. ARGV[8] prefix key
  28. ARGV[9] producer key
  29. Output:
  30. repeatableKey - OK
  31. ]] local rcall = redis.call
  32. local repeatKey = KEYS[1]
  33. local delayedKey = KEYS[2]
  34. local waitKey = KEYS[3]
  35. local pausedKey = KEYS[4]
  36. local metaKey = KEYS[5]
  37. local prioritizedKey = KEYS[6]
  38. local eventsKey = KEYS[9]
  39. local nextMillis = ARGV[1]
  40. local jobSchedulerId = ARGV[3]
  41. local templateOpts = cmsgpack.unpack(ARGV[5])
  42. local now = tonumber(ARGV[7])
  43. local prefixKey = ARGV[8]
  44. local jobOpts = cmsgpack.unpack(ARGV[6])
  45. -- Includes
  46. --- @include "includes/addJobFromScheduler"
  47. --- @include "includes/getOrSetMaxEvents"
  48. --- @include "includes/isQueuePaused"
  49. --- @include "includes/removeJob"
  50. --- @include "includes/storeJobScheduler"
  51. --- @include "includes/getJobSchedulerEveryNextMillis"
  52. -- If we are overriding a repeatable job we must delete the delayed job for
  53. -- the next iteration.
  54. local schedulerKey = repeatKey .. ":" .. jobSchedulerId
  55. local maxEvents = getOrSetMaxEvents(metaKey)
  56. local templateData = ARGV[4]
  57. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
  58. if prevMillis then
  59. prevMillis = tonumber(prevMillis)
  60. end
  61. local schedulerOpts = cmsgpack.unpack(ARGV[2])
  62. local every = schedulerOpts['every']
  63. -- For backwards compatibility we also check the offset from the job itself.
  64. -- could be removed in future major versions.
  65. local jobOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
  66. local offset = schedulerOpts['offset'] or jobOffset or 0
  67. local newOffset = offset
  68. local updatedEvery = false
  69. if every then
  70. -- if we changed the 'every' value we need to reset millis to nil
  71. local millis = prevMillis
  72. if prevMillis then
  73. local prevEvery = tonumber(rcall("HGET", schedulerKey, "every"))
  74. if prevEvery ~= every then
  75. millis = nil
  76. updatedEvery = true
  77. end
  78. end
  79. local startDate = schedulerOpts['startDate']
  80. nextMillis, newOffset = getJobSchedulerEveryNextMillis(millis, every, now, offset, startDate)
  81. end
  82. local function removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, jobId, metaKey,
  83. eventsKey)
  84. if rcall("ZSCORE", delayedKey, jobId) then
  85. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  86. rcall("ZREM", delayedKey, jobId)
  87. return true
  88. elseif rcall("ZSCORE", prioritizedKey, jobId) then
  89. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  90. rcall("ZREM", prioritizedKey, jobId)
  91. return true
  92. else
  93. local pausedOrWaitKey = waitKey
  94. if isQueuePaused(metaKey) then
  95. pausedOrWaitKey = pausedKey
  96. end
  97. if rcall("LREM", pausedOrWaitKey, 1, jobId) > 0 then
  98. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  99. return true
  100. end
  101. end
  102. return false
  103. end
  104. local removedPrevJob = false
  105. if prevMillis then
  106. local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
  107. local currentJobKey = schedulerKey .. ":" .. prevMillis
  108. -- In theory it should always exist the currentJobKey if there is a prevMillis unless something has
  109. -- gone really wrong.
  110. if rcall("EXISTS", currentJobKey) == 1 then
  111. removedPrevJob = removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, currentJobId,
  112. metaKey, eventsKey)
  113. end
  114. end
  115. if removedPrevJob then
  116. -- The jobs has been removed and we want to replace it, so lets use the same millis.
  117. if every and not updatedEvery then
  118. nextMillis = prevMillis
  119. end
  120. else
  121. -- Special case where no job was removed, and we need to add the next iteration.
  122. schedulerOpts['offset'] = newOffset
  123. end
  124. -- Check for job ID collision with existing jobs (in any state)
  125. local jobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
  126. local jobKey = prefixKey .. jobId
  127. -- If there's already a job with this ID, in a state
  128. -- that is not updatable (active, completed, failed) we must
  129. -- handle the collision
  130. local hasCollision = false
  131. if rcall("EXISTS", jobKey) == 1 then
  132. if every then
  133. -- For 'every' case: try next time slot to avoid collision
  134. local nextSlotMillis = nextMillis + every
  135. local nextSlotJobId = "repeat:" .. jobSchedulerId .. ":" .. nextSlotMillis
  136. local nextSlotJobKey = prefixKey .. nextSlotJobId
  137. if rcall("EXISTS", nextSlotJobKey) == 0 then
  138. -- Next slot is free, use it
  139. nextMillis = nextSlotMillis
  140. jobId = nextSlotJobId
  141. else
  142. -- Next slot also has a job, return error code
  143. return -11 -- SchedulerJobSlotsBusy
  144. end
  145. else
  146. hasCollision = true
  147. end
  148. end
  149. local delay = nextMillis - now
  150. -- Fast Clamp delay to minimum of 0
  151. if delay < 0 then
  152. delay = 0
  153. end
  154. local nextJobKey = schedulerKey .. ":" .. nextMillis
  155. if not hasCollision or removedPrevJob then
  156. -- jobId already calculated above during collision check
  157. storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, templateData, templateOpts)
  158. rcall("INCR", KEYS[8])
  159. addJobFromScheduler(nextJobKey, jobId, jobOpts, waitKey, pausedKey, KEYS[11], metaKey, prioritizedKey, KEYS[10],
  160. delayedKey, KEYS[7], eventsKey, schedulerOpts['name'], maxEvents, now, templateData, jobSchedulerId, delay)
  161. elseif hasCollision then
  162. -- For 'pattern' case: return error code
  163. return -10 -- SchedulerJobIdCollision
  164. end
  165. if ARGV[9] ~= "" then
  166. rcall("HSET", ARGV[9], "nrjid", jobId)
  167. end
  168. return {jobId .. "", delay}