reprocessJob-8.lua 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. --[[
  2. Attempts to reprocess a job
  3. Input:
  4. KEYS[1] job key
  5. KEYS[2] events stream
  6. KEYS[3] job state
  7. KEYS[4] wait key
  8. KEYS[5] meta
  9. KEYS[6] paused key
  10. KEYS[7] active key
  11. KEYS[8] marker key
  12. ARGV[1] job.id
  13. ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH'
  14. ARGV[3] propVal - failedReason/returnvalue
  15. ARGV[4] prev state - failed/completed
  16. ARGV[5] reset attemptsMade - "1" or "0"
  17. ARGV[6] reset attemptsStarted - "1" or "0"
  18. Output:
  19. 1 means the operation was a success
  20. -1 means the job does not exist
  21. -3 means the job was not found in the expected set.
  22. ]]
  23. local rcall = redis.call;
  24. -- Includes
  25. --- @include "includes/addJobInTargetList"
  26. --- @include "includes/getOrSetMaxEvents"
  27. --- @include "includes/getTargetQueueList"
  28. local jobKey = KEYS[1]
  29. if rcall("EXISTS", jobKey) == 1 then
  30. local jobId = ARGV[1]
  31. if (rcall("ZREM", KEYS[3], jobId) == 1) then
  32. local attributesToRemove = {}
  33. if ARGV[5] == "1" then
  34. table.insert(attributesToRemove, "atm")
  35. end
  36. if ARGV[6] == "1" then
  37. table.insert(attributesToRemove, "ats")
  38. end
  39. rcall("HDEL", jobKey, "finishedOn", "processedOn", ARGV[3], unpack(attributesToRemove))
  40. local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6])
  41. addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId)
  42. local parentKey = rcall("HGET", jobKey, "parentKey")
  43. if parentKey and rcall("EXISTS", parentKey) == 1 then
  44. if ARGV[4] == "failed" then
  45. if rcall("ZREM", parentKey .. ":unsuccessful", jobKey) == 1 or
  46. rcall("ZREM", parentKey .. ":failed", jobKey) == 1 then
  47. rcall("SADD", parentKey .. ":dependencies", jobKey)
  48. end
  49. else
  50. if rcall("HDEL", parentKey .. ":processed", jobKey) == 1 then
  51. rcall("SADD", parentKey .. ":dependencies", jobKey)
  52. end
  53. end
  54. end
  55. local maxEvents = getOrSetMaxEvents(KEYS[5])
  56. -- Emit waiting event
  57. rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  58. "jobId", jobId, "prev", ARGV[4]);
  59. return 1
  60. else
  61. return -3
  62. end
  63. else
  64. return -1
  65. end