removeJobWithChildren.lua 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. --[[
  2. Remove a job from all the statuses it may be in as well as all its data,
  3. including its children. Active children can be ignored.
  4. Events:
  5. 'removed'
  6. ]]
  7. local rcall = redis.call
  8. -- Includes
  9. --- @include "destructureJobKey"
  10. --- @include "getOrSetMaxEvents"
  11. --- @include "isJobSchedulerJob"
  12. --- @include "removeDeduplicationKeyIfNeededOnRemoval"
  13. --- @include "removeJobFromAnyState"
  14. --- @include "removeJobKeys"
  15. --- @include "removeParentDependencyKey"
  16. --- @include "isLocked"
  17. local removeJobChildren
  18. local removeJobWithChildren
  19. removeJobChildren = function(prefix, jobKey, options)
  20. -- Check if this job has children
  21. -- If so, we are going to try to remove the children recursively in a depth-first way
  22. -- because if some job is locked, we must exit with an error.
  23. if not options.ignoreProcessed then
  24. local processed = rcall("HGETALL", jobKey .. ":processed")
  25. if #processed > 0 then
  26. for i = 1, #processed, 2 do
  27. local childJobId = getJobIdFromKey(processed[i])
  28. local childJobPrefix = getJobKeyPrefix(processed[i], childJobId)
  29. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  30. end
  31. end
  32. local failed = rcall("HGETALL", jobKey .. ":failed")
  33. if #failed > 0 then
  34. for i = 1, #failed, 2 do
  35. local childJobId = getJobIdFromKey(failed[i])
  36. local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
  37. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  38. end
  39. end
  40. local unsuccessful = rcall("ZRANGE", jobKey .. ":unsuccessful", 0, -1)
  41. if #unsuccessful > 0 then
  42. for i = 1, #unsuccessful, 1 do
  43. local childJobId = getJobIdFromKey(unsuccessful[i])
  44. local childJobPrefix = getJobKeyPrefix(unsuccessful[i], childJobId)
  45. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  46. end
  47. end
  48. end
  49. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  50. if #dependencies > 0 then
  51. for i, childJobKey in ipairs(dependencies) do
  52. local childJobId = getJobIdFromKey(childJobKey)
  53. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  54. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  55. end
  56. end
  57. end
  58. removeJobWithChildren = function(prefix, jobId, parentKey, options)
  59. local jobKey = prefix .. jobId
  60. if options.ignoreLocked then
  61. if isLocked(prefix, jobId) then
  62. return
  63. end
  64. end
  65. -- Check if job is in the failed zset
  66. local failedSet = prefix .. "failed"
  67. if not (options.ignoreProcessed and rcall("ZSCORE", failedSet, jobId)) then
  68. removeParentDependencyKey(jobKey, false, parentKey, nil)
  69. if options.removeChildren then
  70. removeJobChildren(prefix, jobKey, options)
  71. end
  72. local prev = removeJobFromAnyState(prefix, jobId)
  73. local deduplicationId = rcall("HGET", jobKey, "deid")
  74. removeDeduplicationKeyIfNeededOnRemoval(prefix, jobId, deduplicationId)
  75. if removeJobKeys(jobKey) > 0 then
  76. local metaKey = prefix .. "meta"
  77. local maxEvents = getOrSetMaxEvents(metaKey)
  78. rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
  79. "jobId", jobId, "prev", prev)
  80. end
  81. end
  82. end