changeDelay-4.js 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. const content = `--[[
  2. Change job delay when it is in delayed set.
  3. Input:
  4. KEYS[1] delayed key
  5. KEYS[2] meta key
  6. KEYS[3] marker key
  7. KEYS[4] events stream
  8. ARGV[1] delay
  9. ARGV[2] timestamp
  10. ARGV[3] the id of the job
  11. ARGV[4] job key
  12. Output:
  13. 0 - OK
  14. -1 - Missing job.
  15. -3 - Job not in delayed set.
  16. Events:
  17. - delayed key.
  18. ]]
  19. local rcall = redis.call
  20. -- Includes
  21. --[[
  22. Add delay marker if needed.
  23. ]]
  24. -- Includes
  25. --[[
  26. Function to return the next delayed job timestamp.
  27. ]]
  28. local function getNextDelayedTimestamp(delayedKey)
  29. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  30. if #result then
  31. local nextTimestamp = tonumber(result[2])
  32. if nextTimestamp ~= nil then
  33. return nextTimestamp / 0x1000
  34. end
  35. end
  36. end
  37. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  38. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  39. if nextTimestamp ~= nil then
  40. -- Replace the score of the marker with the newest known
  41. -- next timestamp.
  42. rcall("ZADD", markerKey, nextTimestamp, "1")
  43. end
  44. end
  45. --[[
  46. Bake in the job id first 12 bits into the timestamp
  47. to guarantee correct execution order of delayed jobs
  48. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  49. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  50. ]]
  51. local function getDelayedScore(delayedKey, timestamp, delay)
  52. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  53. local minScore = delayedTimestamp * 0x1000
  54. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  55. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  56. minScore, "WITHSCORES","LIMIT", 0, 1)
  57. if #result then
  58. local currentMaxScore = tonumber(result[2])
  59. if currentMaxScore ~= nil then
  60. if currentMaxScore >= maxScore then
  61. return maxScore, delayedTimestamp
  62. else
  63. return currentMaxScore + 1, delayedTimestamp
  64. end
  65. end
  66. end
  67. return minScore, delayedTimestamp
  68. end
  69. --[[
  70. Function to get max events value or set by default 10000.
  71. ]]
  72. local function getOrSetMaxEvents(metaKey)
  73. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  74. if not maxEvents then
  75. maxEvents = 10000
  76. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  77. end
  78. return maxEvents
  79. end
  80. if rcall("EXISTS", ARGV[4]) == 1 then
  81. local jobId = ARGV[3]
  82. local delay = tonumber(ARGV[1])
  83. local score, delayedTimestamp = getDelayedScore(KEYS[1], ARGV[2], delay)
  84. local numRemovedElements = rcall("ZREM", KEYS[1], jobId)
  85. if numRemovedElements < 1 then
  86. return -3
  87. end
  88. rcall("HSET", ARGV[4], "delay", delay)
  89. rcall("ZADD", KEYS[1], score, jobId)
  90. local maxEvents = getOrSetMaxEvents(KEYS[2])
  91. rcall("XADD", KEYS[4], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  92. "jobId", jobId, "delay", delayedTimestamp)
  93. -- mark that a delayed job is available
  94. addDelayMarkerIfNeeded(KEYS[3], KEYS[1])
  95. return 0
  96. else
  97. return -1
  98. end`;
  99. export const changeDelay = {
  100. name: 'changeDelay',
  101. content,
  102. keys: 4,
  103. };
  104. //# sourceMappingURL=changeDelay-4.js.map