updateJobScheduler-12.lua 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. --[[
  2. Updates a job scheduler and adds next delayed job
  3. Input:
  4. KEYS[1] 'repeat' key
  5. KEYS[2] 'delayed'
  6. KEYS[3] 'wait' key
  7. KEYS[4] 'paused' key
  8. KEYS[5] 'meta'
  9. KEYS[6] 'prioritized' key
  10. KEYS[7] 'marker',
  11. KEYS[8] 'id'
  12. KEYS[9] events stream key
  13. KEYS[10] 'pc' priority counter
  14. KEYS[11] producer key
  15. KEYS[12] 'active' key
  16. ARGV[1] next milliseconds
  17. ARGV[2] jobs scheduler id
  18. ARGV[3] Json stringified delayed data
  19. ARGV[4] msgpacked delayed opts
  20. ARGV[5] timestamp
  21. ARGV[6] prefix key
  22. ARGV[7] producer id
  23. Output:
  24. next delayed job id - OK
  25. ]] local rcall = redis.call
  26. local repeatKey = KEYS[1]
  27. local delayedKey = KEYS[2]
  28. local waitKey = KEYS[3]
  29. local pausedKey = KEYS[4]
  30. local metaKey = KEYS[5]
  31. local prioritizedKey = KEYS[6]
  32. local nextMillis = tonumber(ARGV[1])
  33. local jobSchedulerId = ARGV[2]
  34. local timestamp = tonumber(ARGV[5])
  35. local prefixKey = ARGV[6]
  36. local producerId = ARGV[7]
  37. local jobOpts = cmsgpack.unpack(ARGV[4])
  38. -- Includes
  39. --- @include "includes/addJobFromScheduler"
  40. --- @include "includes/getOrSetMaxEvents"
  41. --- @include "includes/getJobSchedulerEveryNextMillis"
  42. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
  43. -- Validate that scheduler exists.
  44. -- If it does not exist we should not iterate anymore.
  45. if prevMillis then
  46. prevMillis = tonumber(prevMillis)
  47. local schedulerKey = repeatKey .. ":" .. jobSchedulerId
  48. local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data", "every", "startDate", "offset")
  49. local every = tonumber(schedulerAttributes[3])
  50. local now = tonumber(timestamp)
  51. -- If every is not found in scheduler attributes, try to get it from job options
  52. if not every and jobOpts['repeat'] and jobOpts['repeat']['every'] then
  53. every = tonumber(jobOpts['repeat']['every'])
  54. end
  55. if every then
  56. local startDate = schedulerAttributes[4]
  57. local jobOptsOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
  58. local offset = schedulerAttributes[5] or jobOptsOffset or 0
  59. local newOffset
  60. nextMillis, newOffset = getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  61. if not offset then
  62. rcall("HSET", schedulerKey, "offset", newOffset)
  63. jobOpts['repeat']['offset'] = newOffset
  64. end
  65. end
  66. local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
  67. local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
  68. local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
  69. if producerId == currentDelayedJobId then
  70. local eventsKey = KEYS[9]
  71. local maxEvents = getOrSetMaxEvents(metaKey)
  72. if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
  73. rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
  74. rcall("HINCRBY", schedulerKey, "ic", 1)
  75. rcall("INCR", KEYS[8])
  76. -- TODO: remove this workaround in next breaking change,
  77. -- all job-schedulers must save job data
  78. local templateData = schedulerAttributes[2] or ARGV[3]
  79. if templateData and templateData ~= '{}' then
  80. rcall("HSET", schedulerKey, "data", templateData)
  81. end
  82. local delay = nextMillis - now
  83. -- Fast Clamp delay to minimum of 0
  84. if delay < 0 then
  85. delay = 0
  86. end
  87. jobOpts["delay"] = delay
  88. addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, jobOpts, waitKey, pausedKey, KEYS[12], metaKey,
  89. prioritizedKey, KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerAttributes[1], maxEvents, ARGV[5],
  90. templateData or '{}', jobSchedulerId, delay)
  91. -- TODO: remove this workaround in next breaking change
  92. if KEYS[11] ~= "" then
  93. rcall("HSET", KEYS[11], "nrjid", nextDelayedJobId)
  94. end
  95. return nextDelayedJobId .. "" -- convert to string
  96. else
  97. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "duplicated", "jobId", nextDelayedJobId)
  98. end
  99. end
  100. end