promoteDelayedJobs.lua 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. --[[
  2. Updates the delay set, by moving delayed jobs that should
  3. be processed now to "wait".
  4. Events:
  5. 'waiting'
  6. ]]
  7. -- Includes
  8. --- @include "addBaseMarkerIfNeeded"
  9. --- @include "addJobInTargetList"
  10. --- @include "addJobWithPriority"
  11. --- @include "getPriorityScore"
  12. -- Try to get as much as 1000 jobs at once
  13. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  14. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  15. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  16. if (#jobs > 0) then
  17. rcall("ZREM", delayedKey, unpack(jobs))
  18. for _, jobId in ipairs(jobs) do
  19. local jobKey = prefix .. jobId
  20. local priority =
  21. tonumber(rcall("HGET", jobKey, "priority")) or 0
  22. if priority == 0 then
  23. -- LIFO or FIFO
  24. rcall("LPUSH", targetKey, jobId)
  25. else
  26. local score = getPriorityScore(priority, priorityCounterKey)
  27. rcall("ZADD", prioritizedKey, score, jobId)
  28. end
  29. -- Emit waiting event
  30. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  31. jobId, "prev", "delayed")
  32. rcall("HSET", jobKey, "delay", 0)
  33. end
  34. addBaseMarkerIfNeeded(markerKey, isPaused)
  35. end
  36. end