retryJob-11.lua 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. --[[
  2. Retries a failed job by moving it back to the wait queue.
  3. Input:
  4. KEYS[1] 'active',
  5. KEYS[2] 'wait'
  6. KEYS[3] 'paused'
  7. KEYS[4] job key
  8. KEYS[5] 'meta'
  9. KEYS[6] events stream
  10. KEYS[7] delayed key
  11. KEYS[8] prioritized key
  12. KEYS[9] 'pc' priority counter
  13. KEYS[10] 'marker'
  14. KEYS[11] 'stalled'
  15. ARGV[1] key prefix
  16. ARGV[2] timestamp
  17. ARGV[3] pushCmd
  18. ARGV[4] jobId
  19. ARGV[5] token
  20. ARGV[6] optional job fields to update
  21. Events:
  22. 'waiting'
  23. Output:
  24. 0 - OK
  25. -1 - Missing key
  26. -2 - Missing lock
  27. -3 - Job not in active set
  28. ]]
  29. local rcall = redis.call
  30. -- Includes
  31. --- @include "includes/addJobInTargetList"
  32. --- @include "includes/addJobWithPriority"
  33. --- @include "includes/getOrSetMaxEvents"
  34. --- @include "includes/getTargetQueueList"
  35. --- @include "includes/isQueuePausedOrMaxed"
  36. --- @include "includes/promoteDelayedJobs"
  37. --- @include "includes/removeLock"
  38. --- @include "includes/updateJobFields"
  39. local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3])
  40. local markerKey = KEYS[10]
  41. -- Check if there are delayed jobs that we can move to wait.
  42. -- test example: when there are delayed jobs between retries
  43. promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed)
  44. local jobKey = KEYS[4]
  45. if rcall("EXISTS", jobKey) == 1 then
  46. local errorCode = removeLock(jobKey, KEYS[11], ARGV[5], ARGV[4])
  47. if errorCode < 0 then
  48. return errorCode
  49. end
  50. updateJobFields(jobKey, ARGV[6])
  51. local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[4])
  52. if (numRemovedElements < 1) then return -3 end
  53. local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0
  54. --need to re-evaluate after removing job from active
  55. isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1])
  56. -- Standard or priority add
  57. if priority == 0 then
  58. addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4])
  59. else
  60. addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed)
  61. end
  62. rcall("HINCRBY", jobKey, "atm", 1)
  63. local maxEvents = getOrSetMaxEvents(KEYS[5])
  64. -- Emit waiting event
  65. rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  66. "jobId", ARGV[4], "prev", "active")
  67. return 0
  68. else
  69. return -1
  70. end