moveToDelayed-12.lua 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. --[[
  2. Moves job from active to delayed set.
  3. Input:
  4. KEYS[1] marker key
  5. KEYS[2] active key
  6. KEYS[3] prioritized key
  7. KEYS[4] delayed key
  8. KEYS[5] job key
  9. KEYS[6] events stream
  10. KEYS[7] meta key
  11. KEYS[8] stalled key
  12. KEYS[9] wait key
  13. KEYS[10] rate limiter key
  14. KEYS[11] paused key
  15. KEYS[12] pc priority counter
  16. ARGV[1] key prefix
  17. ARGV[2] timestamp
  18. ARGV[3] the id of the job
  19. ARGV[4] queue token
  20. ARGV[5] delay value
  21. ARGV[6] skip attempt
  22. ARGV[7] optional job fields to update
  23. ARGV[8] fetch next?
  24. ARGV[9] opts
  25. Output:
  26. 0 - OK
  27. -1 - Missing job.
  28. -3 - Job not in active set.
  29. Events:
  30. - delayed key.
  31. ]]
  32. local rcall = redis.call
  33. -- Includes
  34. --- @include "includes/addDelayMarkerIfNeeded"
  35. --- @include "includes/fetchNextJob"
  36. --- @include "includes/getDelayedScore"
  37. --- @include "includes/getOrSetMaxEvents"
  38. --- @include "includes/removeLock"
  39. --- @include "includes/updateJobFields"
  40. local jobKey = KEYS[5]
  41. local markerKey = KEYS[1]
  42. local metaKey = KEYS[7]
  43. local token = ARGV[4]
  44. if rcall("EXISTS", jobKey) == 1 then
  45. local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[3])
  46. if errorCode < 0 then
  47. return errorCode
  48. end
  49. updateJobFields(jobKey, ARGV[7])
  50. local delayedKey = KEYS[4]
  51. local jobId = ARGV[3]
  52. local delay = tonumber(ARGV[5])
  53. local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
  54. if numRemovedElements < 1 then return -3 end
  55. local score, delayedTimestamp = getDelayedScore(delayedKey, ARGV[2], delay)
  56. if ARGV[6] == "0" then
  57. rcall("HINCRBY", jobKey, "atm", 1)
  58. end
  59. rcall("HSET", jobKey, "delay", ARGV[5])
  60. local maxEvents = getOrSetMaxEvents(metaKey)
  61. rcall("ZADD", delayedKey, score, jobId)
  62. rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  63. "jobId", jobId, "delay", delayedTimestamp)
  64. -- Try to get next job to avoid an extra roundtrip if the queue is not closing,
  65. -- and not rate limited.
  66. if (ARGV[8] == "1") then
  67. local opts = cmsgpack.unpack(ARGV[9])
  68. local result = fetchNextJob(KEYS[9], KEYS[2], KEYS[3], KEYS[6],
  69. KEYS[10], KEYS[4], KEYS[11], metaKey, KEYS[12], markerKey,
  70. ARGV[1], ARGV[2], opts)
  71. if result and type(result[1]) == "table" then
  72. return result
  73. end
  74. end
  75. -- Check if we need to push a marker job to wake up sleeping workers.
  76. addDelayMarkerIfNeeded(markerKey, delayedKey)
  77. return 0
  78. else
  79. return -1
  80. end