moveToActive-11.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.moveToActive = void 0;
  4. const content = `--[[
  5. Move next job to be processed to active, lock it and fetch its data. The job
  6. may be delayed, in that case we need to move it to the delayed set instead.
  7. This operation guarantees that the worker owns the job during the lock
  8. expiration time. The worker is responsible of keeping the lock fresh
  9. so that no other worker picks this job again.
  10. Input:
  11. KEYS[1] wait key
  12. KEYS[2] active key
  13. KEYS[3] prioritized key
  14. KEYS[4] stream events key
  15. KEYS[5] stalled key
  16. -- Rate limiting
  17. KEYS[6] rate limiter key
  18. KEYS[7] delayed key
  19. -- Delayed jobs
  20. KEYS[8] paused key
  21. KEYS[9] meta key
  22. KEYS[10] pc priority counter
  23. -- Marker
  24. KEYS[11] marker key
  25. -- Arguments
  26. ARGV[1] key prefix
  27. ARGV[2] timestamp
  28. ARGV[3] opts
  29. opts - token - lock token
  30. opts - lockDuration
  31. opts - limiter
  32. opts - name - worker name
  33. ]]
  34. local rcall = redis.call
  35. local waitKey = KEYS[1]
  36. local activeKey = KEYS[2]
  37. local eventStreamKey = KEYS[4]
  38. local rateLimiterKey = KEYS[6]
  39. local delayedKey = KEYS[7]
  40. local opts = cmsgpack.unpack(ARGV[3])
  41. -- Includes
  42. --[[
  43. Function to return the next delayed job timestamp.
  44. ]]
  45. local function getNextDelayedTimestamp(delayedKey)
  46. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  47. if #result then
  48. local nextTimestamp = tonumber(result[2])
  49. if nextTimestamp ~= nil then
  50. return nextTimestamp / 0x1000
  51. end
  52. end
  53. end
  54. --[[
  55. Function to get current rate limit ttl.
  56. ]]
  57. local function getRateLimitTTL(maxJobs, rateLimiterKey)
  58. if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
  59. local pttl = rcall("PTTL", rateLimiterKey)
  60. if pttl == 0 then
  61. rcall("DEL", rateLimiterKey)
  62. end
  63. if pttl > 0 then
  64. return pttl
  65. end
  66. end
  67. return 0
  68. end
  69. --[[
  70. Function to check for the meta.paused key to decide if we are paused or not
  71. (since an empty list and !EXISTS are not really the same).
  72. ]]
  73. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  74. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  75. if queueAttributes[1] then
  76. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  77. else
  78. if queueAttributes[2] then
  79. local activeCount = rcall("LLEN", activeKey)
  80. if activeCount >= tonumber(queueAttributes[2]) then
  81. return waitKey, true, queueAttributes[3], queueAttributes[4]
  82. else
  83. return waitKey, false, queueAttributes[3], queueAttributes[4]
  84. end
  85. end
  86. end
  87. return waitKey, false, queueAttributes[3], queueAttributes[4]
  88. end
  89. --[[
  90. Function to move job from prioritized state to active.
  91. ]]
  92. local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey)
  93. local prioritizedJob = rcall("ZPOPMIN", priorityKey)
  94. if #prioritizedJob > 0 then
  95. rcall("LPUSH", activeKey, prioritizedJob[1])
  96. return prioritizedJob[1]
  97. else
  98. rcall("DEL", priorityCounterKey)
  99. end
  100. end
  101. --[[
  102. Function to move job from wait state to active.
  103. Input:
  104. opts - token - lock token
  105. opts - lockDuration
  106. opts - limiter
  107. ]]
  108. -- Includes
  109. --[[
  110. Add marker if needed when a job is available.
  111. ]]
  112. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  113. if not isPausedOrMaxed then
  114. rcall("ZADD", markerKey, 0, "0")
  115. end
  116. end
  117. local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
  118. jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
  119. local jobKey = keyPrefix .. jobId
  120. -- Check if we need to perform rate limiting.
  121. if maxJobs then
  122. local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
  123. if jobCounter == 1 then
  124. local integerDuration = math.floor(math.abs(limiterDuration))
  125. rcall("PEXPIRE", rateLimiterKey, integerDuration)
  126. end
  127. end
  128. -- get a lock
  129. if opts['token'] ~= "0" then
  130. local lockKey = jobKey .. ':lock'
  131. rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
  132. end
  133. local optionalValues = {}
  134. if opts['name'] then
  135. -- Set "processedBy" field to the worker name
  136. table.insert(optionalValues, "pb")
  137. table.insert(optionalValues, opts['name'])
  138. end
  139. rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
  140. rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
  141. rcall("HINCRBY", jobKey, "ats", 1)
  142. addBaseMarkerIfNeeded(markerKey, false)
  143. -- rate limit delay must be 0 in this case to prevent adding more delay
  144. -- when job that is moved to active needs to be processed
  145. return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
  146. end
  147. --[[
  148. Updates the delay set, by moving delayed jobs that should
  149. be processed now to "wait".
  150. Events:
  151. 'waiting'
  152. ]]
  153. -- Includes
  154. --[[
  155. Function to add job in target list and add marker if needed.
  156. ]]
  157. -- Includes
  158. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  159. rcall(pushCmd, targetKey, jobId)
  160. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  161. end
  162. --[[
  163. Function to add job considering priority.
  164. ]]
  165. -- Includes
  166. --[[
  167. Function to get priority score.
  168. ]]
  169. local function getPriorityScore(priority, priorityCounterKey)
  170. local prioCounter = rcall("INCR", priorityCounterKey)
  171. return priority * 0x100000000 + prioCounter % 0x100000000
  172. end
  173. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  174. isPausedOrMaxed)
  175. local score = getPriorityScore(priority, priorityCounterKey)
  176. rcall("ZADD", prioritizedKey, score, jobId)
  177. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  178. end
  179. -- Try to get as much as 1000 jobs at once
  180. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  181. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  182. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  183. if (#jobs > 0) then
  184. rcall("ZREM", delayedKey, unpack(jobs))
  185. for _, jobId in ipairs(jobs) do
  186. local jobKey = prefix .. jobId
  187. local priority =
  188. tonumber(rcall("HGET", jobKey, "priority")) or 0
  189. if priority == 0 then
  190. -- LIFO or FIFO
  191. rcall("LPUSH", targetKey, jobId)
  192. else
  193. local score = getPriorityScore(priority, priorityCounterKey)
  194. rcall("ZADD", prioritizedKey, score, jobId)
  195. end
  196. -- Emit waiting event
  197. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  198. jobId, "prev", "delayed")
  199. rcall("HSET", jobKey, "delay", 0)
  200. end
  201. addBaseMarkerIfNeeded(markerKey, isPaused)
  202. end
  203. end
  204. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration = getTargetQueueList(KEYS[9],
  205. activeKey, waitKey, KEYS[8])
  206. -- Check if there are delayed jobs that we can move to wait.
  207. local markerKey = KEYS[11]
  208. promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1],
  209. ARGV[2], KEYS[10], isPausedOrMaxed)
  210. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  211. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  212. -- Check if we are rate limited first.
  213. if expireTime > 0 then return {0, 0, expireTime, 0} end
  214. -- paused or maxed queue
  215. if isPausedOrMaxed then return {0, 0, 0, 0} end
  216. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  217. -- no job ID, try non-blocking move from wait to active
  218. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  219. -- Markers in waitlist DEPRECATED in v5: Will be completely removed in v6.
  220. if jobId and string.sub(jobId, 1, 2) == "0:" then
  221. rcall("LREM", activeKey, 1, jobId)
  222. jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  223. end
  224. if jobId then
  225. return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
  226. maxJobs, limiterDuration, markerKey, opts)
  227. else
  228. jobId = moveJobFromPrioritizedToActive(KEYS[3], activeKey, KEYS[10])
  229. if jobId then
  230. return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
  231. maxJobs, limiterDuration, markerKey, opts)
  232. end
  233. end
  234. -- Return the timestamp for the next delayed job if any.
  235. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  236. if nextTimestamp ~= nil then return {0, 0, 0, nextTimestamp} end
  237. return {0, 0, 0, 0}
  238. `;
  239. exports.moveToActive = {
  240. name: 'moveToActive',
  241. content,
  242. keys: 11,
  243. };
  244. //# sourceMappingURL=moveToActive-11.js.map