promote-9.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.promote = void 0;
  4. const content = `--[[
  5. Promotes a job that is currently "delayed" to the "waiting" state
  6. Input:
  7. KEYS[1] 'delayed'
  8. KEYS[2] 'wait'
  9. KEYS[3] 'paused'
  10. KEYS[4] 'meta'
  11. KEYS[5] 'prioritized'
  12. KEYS[6] 'active'
  13. KEYS[7] 'pc' priority counter
  14. KEYS[8] 'event stream'
  15. KEYS[9] 'marker'
  16. ARGV[1] queue.toKey('')
  17. ARGV[2] jobId
  18. Output:
  19. 0 - OK
  20. -3 - Job not in delayed zset.
  21. Events:
  22. 'waiting'
  23. ]]
  24. local rcall = redis.call
  25. local jobId = ARGV[2]
  26. -- Includes
  27. --[[
  28. Function to add job in target list and add marker if needed.
  29. ]]
  30. -- Includes
  31. --[[
  32. Add marker if needed when a job is available.
  33. ]]
  34. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  35. if not isPausedOrMaxed then
  36. rcall("ZADD", markerKey, 0, "0")
  37. end
  38. end
  39. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  40. rcall(pushCmd, targetKey, jobId)
  41. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  42. end
  43. --[[
  44. Function to add job considering priority.
  45. ]]
  46. -- Includes
  47. --[[
  48. Function to get priority score.
  49. ]]
  50. local function getPriorityScore(priority, priorityCounterKey)
  51. local prioCounter = rcall("INCR", priorityCounterKey)
  52. return priority * 0x100000000 + prioCounter % 0x100000000
  53. end
  54. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  55. isPausedOrMaxed)
  56. local score = getPriorityScore(priority, priorityCounterKey)
  57. rcall("ZADD", prioritizedKey, score, jobId)
  58. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  59. end
  60. --[[
  61. Function to check for the meta.paused key to decide if we are paused or not
  62. (since an empty list and !EXISTS are not really the same).
  63. ]]
  64. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  65. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  66. if queueAttributes[1] then
  67. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  68. else
  69. if queueAttributes[2] then
  70. local activeCount = rcall("LLEN", activeKey)
  71. if activeCount >= tonumber(queueAttributes[2]) then
  72. return waitKey, true, queueAttributes[3], queueAttributes[4]
  73. else
  74. return waitKey, false, queueAttributes[3], queueAttributes[4]
  75. end
  76. end
  77. end
  78. return waitKey, false, queueAttributes[3], queueAttributes[4]
  79. end
  80. if rcall("ZREM", KEYS[1], jobId) == 1 then
  81. local jobKey = ARGV[1] .. jobId
  82. local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0
  83. local metaKey = KEYS[4]
  84. local markerKey = KEYS[9]
  85. -- Remove delayed "marker" from the wait list if there is any.
  86. -- Since we are adding a job we do not need the marker anymore.
  87. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  88. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[2], KEYS[3])
  89. local marker = rcall("LINDEX", target, 0)
  90. if marker and string.sub(marker, 1, 2) == "0:" then rcall("LPOP", target) end
  91. if priority == 0 then
  92. -- LIFO or FIFO
  93. addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId)
  94. else
  95. addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed)
  96. end
  97. rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId, "prev",
  98. "delayed");
  99. rcall("HSET", jobKey, "delay", 0)
  100. return 0
  101. else
  102. return -3
  103. end
  104. `;
  105. exports.promote = {
  106. name: 'promote',
  107. content,
  108. keys: 9,
  109. };
  110. //# sourceMappingURL=promote-9.js.map