drain-5.lua 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. --[[
  2. Drains the queue, removes all jobs that are waiting
  3. or delayed, but not active, completed or failed
  4. Input:
  5. KEYS[1] 'wait',
  6. KEYS[2] 'paused'
  7. KEYS[3] 'delayed'
  8. KEYS[4] 'prioritized'
  9. KEYS[5] 'jobschedulers' (repeat)
  10. ARGV[1] queue key prefix
  11. ARGV[2] should clean delayed jobs
  12. ]]
  13. local rcall = redis.call
  14. local queueBaseKey = ARGV[1]
  15. --- @include "includes/removeListJobs"
  16. --- @include "includes/removeZSetJobs"
  17. -- We must not remove delayed jobs if they are associated to a job scheduler.
  18. local scheduledJobs = {}
  19. local jobSchedulers = rcall("ZRANGE", KEYS[5], 0, -1, "WITHSCORES")
  20. -- For every job scheduler, get the current delayed job id.
  21. for i = 1, #jobSchedulers, 2 do
  22. local jobSchedulerId = jobSchedulers[i]
  23. local jobSchedulerMillis = jobSchedulers[i + 1]
  24. local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. jobSchedulerMillis
  25. scheduledJobs[delayedJobId] = true
  26. end
  27. removeListJobs(KEYS[1], true, queueBaseKey, 0, scheduledJobs) -- wait
  28. removeListJobs(KEYS[2], true, queueBaseKey, 0, scheduledJobs) -- paused
  29. if ARGV[2] == "1" then
  30. removeZSetJobs(KEYS[3], true, queueBaseKey, 0, scheduledJobs) -- delayed
  31. end
  32. removeZSetJobs(KEYS[4], true, queueBaseKey, 0, scheduledJobs) -- prioritized