promote-9.js 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. const content = `--[[
  2. Promotes a job that is currently "delayed" to the "waiting" state
  3. Input:
  4. KEYS[1] 'delayed'
  5. KEYS[2] 'wait'
  6. KEYS[3] 'paused'
  7. KEYS[4] 'meta'
  8. KEYS[5] 'prioritized'
  9. KEYS[6] 'active'
  10. KEYS[7] 'pc' priority counter
  11. KEYS[8] 'event stream'
  12. KEYS[9] 'marker'
  13. ARGV[1] queue.toKey('')
  14. ARGV[2] jobId
  15. Output:
  16. 0 - OK
  17. -3 - Job not in delayed zset.
  18. Events:
  19. 'waiting'
  20. ]]
  21. local rcall = redis.call
  22. local jobId = ARGV[2]
  23. -- Includes
  24. --[[
  25. Function to add job in target list and add marker if needed.
  26. ]]
  27. -- Includes
  28. --[[
  29. Add marker if needed when a job is available.
  30. ]]
  31. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  32. if not isPausedOrMaxed then
  33. rcall("ZADD", markerKey, 0, "0")
  34. end
  35. end
  36. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  37. rcall(pushCmd, targetKey, jobId)
  38. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  39. end
  40. --[[
  41. Function to add job considering priority.
  42. ]]
  43. -- Includes
  44. --[[
  45. Function to get priority score.
  46. ]]
  47. local function getPriorityScore(priority, priorityCounterKey)
  48. local prioCounter = rcall("INCR", priorityCounterKey)
  49. return priority * 0x100000000 + prioCounter % 0x100000000
  50. end
  51. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  52. isPausedOrMaxed)
  53. local score = getPriorityScore(priority, priorityCounterKey)
  54. rcall("ZADD", prioritizedKey, score, jobId)
  55. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  56. end
  57. --[[
  58. Function to check for the meta.paused key to decide if we are paused or not
  59. (since an empty list and !EXISTS are not really the same).
  60. ]]
  61. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  62. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  63. if queueAttributes[1] then
  64. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  65. else
  66. if queueAttributes[2] then
  67. local activeCount = rcall("LLEN", activeKey)
  68. if activeCount >= tonumber(queueAttributes[2]) then
  69. return waitKey, true, queueAttributes[3], queueAttributes[4]
  70. else
  71. return waitKey, false, queueAttributes[3], queueAttributes[4]
  72. end
  73. end
  74. end
  75. return waitKey, false, queueAttributes[3], queueAttributes[4]
  76. end
  77. if rcall("ZREM", KEYS[1], jobId) == 1 then
  78. local jobKey = ARGV[1] .. jobId
  79. local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0
  80. local metaKey = KEYS[4]
  81. local markerKey = KEYS[9]
  82. -- Remove delayed "marker" from the wait list if there is any.
  83. -- Since we are adding a job we do not need the marker anymore.
  84. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  85. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[2], KEYS[3])
  86. local marker = rcall("LINDEX", target, 0)
  87. if marker and string.sub(marker, 1, 2) == "0:" then rcall("LPOP", target) end
  88. if priority == 0 then
  89. -- LIFO or FIFO
  90. addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId)
  91. else
  92. addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed)
  93. end
  94. rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId, "prev",
  95. "delayed");
  96. rcall("HSET", jobKey, "delay", 0)
  97. return 0
  98. else
  99. return -3
  100. end
  101. `;
  102. export const promote = {
  103. name: 'promote',
  104. content,
  105. keys: 9,
  106. };
  107. //# sourceMappingURL=promote-9.js.map