changeDelay-4.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.changeDelay = void 0;
  4. const content = `--[[
  5. Change job delay when it is in delayed set.
  6. Input:
  7. KEYS[1] delayed key
  8. KEYS[2] meta key
  9. KEYS[3] marker key
  10. KEYS[4] events stream
  11. ARGV[1] delay
  12. ARGV[2] timestamp
  13. ARGV[3] the id of the job
  14. ARGV[4] job key
  15. Output:
  16. 0 - OK
  17. -1 - Missing job.
  18. -3 - Job not in delayed set.
  19. Events:
  20. - delayed key.
  21. ]]
  22. local rcall = redis.call
  23. -- Includes
  24. --[[
  25. Add delay marker if needed.
  26. ]]
  27. -- Includes
  28. --[[
  29. Function to return the next delayed job timestamp.
  30. ]]
  31. local function getNextDelayedTimestamp(delayedKey)
  32. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  33. if #result then
  34. local nextTimestamp = tonumber(result[2])
  35. if nextTimestamp ~= nil then
  36. return nextTimestamp / 0x1000
  37. end
  38. end
  39. end
  40. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  41. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  42. if nextTimestamp ~= nil then
  43. -- Replace the score of the marker with the newest known
  44. -- next timestamp.
  45. rcall("ZADD", markerKey, nextTimestamp, "1")
  46. end
  47. end
  48. --[[
  49. Bake in the job id first 12 bits into the timestamp
  50. to guarantee correct execution order of delayed jobs
  51. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  52. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  53. ]]
  54. local function getDelayedScore(delayedKey, timestamp, delay)
  55. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  56. local minScore = delayedTimestamp * 0x1000
  57. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  58. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  59. minScore, "WITHSCORES","LIMIT", 0, 1)
  60. if #result then
  61. local currentMaxScore = tonumber(result[2])
  62. if currentMaxScore ~= nil then
  63. if currentMaxScore >= maxScore then
  64. return maxScore, delayedTimestamp
  65. else
  66. return currentMaxScore + 1, delayedTimestamp
  67. end
  68. end
  69. end
  70. return minScore, delayedTimestamp
  71. end
  72. --[[
  73. Function to get max events value or set by default 10000.
  74. ]]
  75. local function getOrSetMaxEvents(metaKey)
  76. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  77. if not maxEvents then
  78. maxEvents = 10000
  79. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  80. end
  81. return maxEvents
  82. end
  83. if rcall("EXISTS", ARGV[4]) == 1 then
  84. local jobId = ARGV[3]
  85. local delay = tonumber(ARGV[1])
  86. local score, delayedTimestamp = getDelayedScore(KEYS[1], ARGV[2], delay)
  87. local numRemovedElements = rcall("ZREM", KEYS[1], jobId)
  88. if numRemovedElements < 1 then
  89. return -3
  90. end
  91. rcall("HSET", ARGV[4], "delay", delay)
  92. rcall("ZADD", KEYS[1], score, jobId)
  93. local maxEvents = getOrSetMaxEvents(KEYS[2])
  94. rcall("XADD", KEYS[4], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  95. "jobId", jobId, "delay", delayedTimestamp)
  96. -- mark that a delayed job is available
  97. addDelayMarkerIfNeeded(KEYS[3], KEYS[1])
  98. return 0
  99. else
  100. return -1
  101. end`;
  102. exports.changeDelay = {
  103. name: 'changeDelay',
  104. content,
  105. keys: 4,
  106. };
  107. //# sourceMappingURL=changeDelay-4.js.map