moveToDelayed-12.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.moveToDelayed = void 0;
  4. const content = `--[[
  5. Moves job from active to delayed set.
  6. Input:
  7. KEYS[1] marker key
  8. KEYS[2] active key
  9. KEYS[3] prioritized key
  10. KEYS[4] delayed key
  11. KEYS[5] job key
  12. KEYS[6] events stream
  13. KEYS[7] meta key
  14. KEYS[8] stalled key
  15. KEYS[9] wait key
  16. KEYS[10] rate limiter key
  17. KEYS[11] paused key
  18. KEYS[12] pc priority counter
  19. ARGV[1] key prefix
  20. ARGV[2] timestamp
  21. ARGV[3] the id of the job
  22. ARGV[4] queue token
  23. ARGV[5] delay value
  24. ARGV[6] skip attempt
  25. ARGV[7] optional job fields to update
  26. ARGV[8] fetch next?
  27. ARGV[9] opts
  28. Output:
  29. 0 - OK
  30. -1 - Missing job.
  31. -3 - Job not in active set.
  32. Events:
  33. - delayed key.
  34. ]]
  35. local rcall = redis.call
  36. -- Includes
  37. --[[
  38. Add delay marker if needed.
  39. ]]
  40. -- Includes
  41. --[[
  42. Function to return the next delayed job timestamp.
  43. ]]
  44. local function getNextDelayedTimestamp(delayedKey)
  45. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  46. if #result then
  47. local nextTimestamp = tonumber(result[2])
  48. if nextTimestamp ~= nil then
  49. return nextTimestamp / 0x1000
  50. end
  51. end
  52. end
  53. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  54. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  55. if nextTimestamp ~= nil then
  56. -- Replace the score of the marker with the newest known
  57. -- next timestamp.
  58. rcall("ZADD", markerKey, nextTimestamp, "1")
  59. end
  60. end
  61. --[[
  62. Function to fetch the next job to process.
  63. Tries to get the next job to avoid an extra roundtrip if the queue is
  64. not closing and not rate limited.
  65. Input:
  66. waitKey - wait list key
  67. activeKey - active list key
  68. prioritizedKey - prioritized sorted set key
  69. eventStreamKey - event stream key
  70. rateLimiterKey - rate limiter key
  71. delayedKey - delayed sorted set key
  72. pausedKey - paused list key
  73. metaKey - meta hash key
  74. pcKey - priority counter key
  75. markerKey - marker key
  76. prefix - keys prefix
  77. timestamp - current timestamp
  78. opts - options table:
  79. token (required) - lock token used when locking jobs
  80. lockDuration (required) - lock duration for acquired jobs
  81. limiter (optional) - rate limiter options table (e.g. { max = number })
  82. ]]
  83. -- Includes
  84. --[[
  85. Function to get current rate limit ttl.
  86. ]]
  87. local function getRateLimitTTL(maxJobs, rateLimiterKey)
  88. if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
  89. local pttl = rcall("PTTL", rateLimiterKey)
  90. if pttl == 0 then
  91. rcall("DEL", rateLimiterKey)
  92. end
  93. if pttl > 0 then
  94. return pttl
  95. end
  96. end
  97. return 0
  98. end
  99. --[[
  100. Function to check for the meta.paused key to decide if we are paused or not
  101. (since an empty list and !EXISTS are not really the same).
  102. ]]
  103. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  104. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  105. if queueAttributes[1] then
  106. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  107. else
  108. if queueAttributes[2] then
  109. local activeCount = rcall("LLEN", activeKey)
  110. if activeCount >= tonumber(queueAttributes[2]) then
  111. return waitKey, true, queueAttributes[3], queueAttributes[4]
  112. else
  113. return waitKey, false, queueAttributes[3], queueAttributes[4]
  114. end
  115. end
  116. end
  117. return waitKey, false, queueAttributes[3], queueAttributes[4]
  118. end
  119. --[[
  120. Function to move job from prioritized state to active.
  121. ]]
  122. local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey)
  123. local prioritizedJob = rcall("ZPOPMIN", priorityKey)
  124. if #prioritizedJob > 0 then
  125. rcall("LPUSH", activeKey, prioritizedJob[1])
  126. return prioritizedJob[1]
  127. else
  128. rcall("DEL", priorityCounterKey)
  129. end
  130. end
  131. --[[
  132. Function to move job from wait state to active.
  133. Input:
  134. opts - token - lock token
  135. opts - lockDuration
  136. opts - limiter
  137. ]]
  138. -- Includes
  139. --[[
  140. Add marker if needed when a job is available.
  141. ]]
  142. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  143. if not isPausedOrMaxed then
  144. rcall("ZADD", markerKey, 0, "0")
  145. end
  146. end
  147. local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
  148. jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
  149. local jobKey = keyPrefix .. jobId
  150. -- Check if we need to perform rate limiting.
  151. if maxJobs then
  152. local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
  153. if jobCounter == 1 then
  154. local integerDuration = math.floor(math.abs(limiterDuration))
  155. rcall("PEXPIRE", rateLimiterKey, integerDuration)
  156. end
  157. end
  158. -- get a lock
  159. if opts['token'] ~= "0" then
  160. local lockKey = jobKey .. ':lock'
  161. rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
  162. end
  163. local optionalValues = {}
  164. if opts['name'] then
  165. -- Set "processedBy" field to the worker name
  166. table.insert(optionalValues, "pb")
  167. table.insert(optionalValues, opts['name'])
  168. end
  169. rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
  170. rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
  171. rcall("HINCRBY", jobKey, "ats", 1)
  172. addBaseMarkerIfNeeded(markerKey, false)
  173. -- rate limit delay must be 0 in this case to prevent adding more delay
  174. -- when job that is moved to active needs to be processed
  175. return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
  176. end
  177. --[[
  178. Updates the delay set, by moving delayed jobs that should
  179. be processed now to "wait".
  180. Events:
  181. 'waiting'
  182. ]]
  183. -- Includes
  184. --[[
  185. Function to add job in target list and add marker if needed.
  186. ]]
  187. -- Includes
  188. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  189. rcall(pushCmd, targetKey, jobId)
  190. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  191. end
  192. --[[
  193. Function to add job considering priority.
  194. ]]
  195. -- Includes
  196. --[[
  197. Function to get priority score.
  198. ]]
  199. local function getPriorityScore(priority, priorityCounterKey)
  200. local prioCounter = rcall("INCR", priorityCounterKey)
  201. return priority * 0x100000000 + prioCounter % 0x100000000
  202. end
  203. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  204. isPausedOrMaxed)
  205. local score = getPriorityScore(priority, priorityCounterKey)
  206. rcall("ZADD", prioritizedKey, score, jobId)
  207. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  208. end
  209. -- Try to get as much as 1000 jobs at once
  210. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  211. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  212. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  213. if (#jobs > 0) then
  214. rcall("ZREM", delayedKey, unpack(jobs))
  215. for _, jobId in ipairs(jobs) do
  216. local jobKey = prefix .. jobId
  217. local priority =
  218. tonumber(rcall("HGET", jobKey, "priority")) or 0
  219. if priority == 0 then
  220. -- LIFO or FIFO
  221. rcall("LPUSH", targetKey, jobId)
  222. else
  223. local score = getPriorityScore(priority, priorityCounterKey)
  224. rcall("ZADD", prioritizedKey, score, jobId)
  225. end
  226. -- Emit waiting event
  227. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  228. jobId, "prev", "delayed")
  229. rcall("HSET", jobKey, "delay", 0)
  230. end
  231. addBaseMarkerIfNeeded(markerKey, isPaused)
  232. end
  233. end
  234. local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey,
  235. rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix,
  236. timestamp, opts)
  237. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration =
  238. getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  239. -- Check if there are delayed jobs that can be promoted
  240. promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey,
  241. eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed)
  242. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  243. -- Check if we are rate limited first.
  244. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  245. if expireTime > 0 then
  246. return {0, 0, expireTime, 0}
  247. end
  248. -- paused or maxed queue
  249. if isPausedOrMaxed then
  250. return {0, 0, 0, 0}
  251. end
  252. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  253. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  254. if jobId then
  255. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  256. if string.sub(jobId, 1, 2) == "0:" then
  257. rcall("LREM", activeKey, 1, jobId)
  258. -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
  259. -- but if ID is 0:0, then there is at least 1 prioritized job to process
  260. if jobId == "0:0" then
  261. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  262. return prepareJobForProcessing(prefix, rateLimiterKey,
  263. eventStreamKey, jobId, timestamp, maxJobs,
  264. limiterDuration, markerKey, opts)
  265. end
  266. else
  267. return prepareJobForProcessing(prefix, rateLimiterKey,
  268. eventStreamKey, jobId, timestamp, maxJobs,
  269. limiterDuration, markerKey, opts)
  270. end
  271. else
  272. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  273. if jobId then
  274. return prepareJobForProcessing(prefix, rateLimiterKey,
  275. eventStreamKey, jobId, timestamp, maxJobs,
  276. limiterDuration, markerKey, opts)
  277. end
  278. end
  279. -- Return the timestamp for the next delayed job if any.
  280. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  281. if nextTimestamp ~= nil then
  282. -- The result is guaranteed to be positive, since the
  283. -- ZRANGEBYSCORE command would have return a job otherwise.
  284. return {0, 0, 0, nextTimestamp}
  285. end
  286. end
  287. --[[
  288. Bake in the job id first 12 bits into the timestamp
  289. to guarantee correct execution order of delayed jobs
  290. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  291. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  292. ]]
  293. local function getDelayedScore(delayedKey, timestamp, delay)
  294. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  295. local minScore = delayedTimestamp * 0x1000
  296. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  297. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  298. minScore, "WITHSCORES","LIMIT", 0, 1)
  299. if #result then
  300. local currentMaxScore = tonumber(result[2])
  301. if currentMaxScore ~= nil then
  302. if currentMaxScore >= maxScore then
  303. return maxScore, delayedTimestamp
  304. else
  305. return currentMaxScore + 1, delayedTimestamp
  306. end
  307. end
  308. end
  309. return minScore, delayedTimestamp
  310. end
  311. --[[
  312. Function to get max events value or set by default 10000.
  313. ]]
  314. local function getOrSetMaxEvents(metaKey)
  315. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  316. if not maxEvents then
  317. maxEvents = 10000
  318. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  319. end
  320. return maxEvents
  321. end
  322. local function removeLock(jobKey, stalledKey, token, jobId)
  323. if token ~= "0" then
  324. local lockKey = jobKey .. ':lock'
  325. local lockToken = rcall("GET", lockKey)
  326. if lockToken == token then
  327. rcall("DEL", lockKey)
  328. rcall("SREM", stalledKey, jobId)
  329. else
  330. if lockToken then
  331. -- Lock exists but token does not match
  332. return -6
  333. else
  334. -- Lock is missing completely
  335. return -2
  336. end
  337. end
  338. end
  339. return 0
  340. end
  341. --[[
  342. Function to update a bunch of fields in a job.
  343. ]]
  344. local function updateJobFields(jobKey, msgpackedFields)
  345. if msgpackedFields and #msgpackedFields > 0 then
  346. local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
  347. if fieldsToUpdate then
  348. rcall("HMSET", jobKey, unpack(fieldsToUpdate))
  349. end
  350. end
  351. end
  352. local jobKey = KEYS[5]
  353. local markerKey = KEYS[1]
  354. local metaKey = KEYS[7]
  355. local token = ARGV[4]
  356. if rcall("EXISTS", jobKey) == 1 then
  357. local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[3])
  358. if errorCode < 0 then
  359. return errorCode
  360. end
  361. updateJobFields(jobKey, ARGV[7])
  362. local delayedKey = KEYS[4]
  363. local jobId = ARGV[3]
  364. local delay = tonumber(ARGV[5])
  365. local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
  366. if numRemovedElements < 1 then return -3 end
  367. local score, delayedTimestamp = getDelayedScore(delayedKey, ARGV[2], delay)
  368. if ARGV[6] == "0" then
  369. rcall("HINCRBY", jobKey, "atm", 1)
  370. end
  371. rcall("HSET", jobKey, "delay", ARGV[5])
  372. local maxEvents = getOrSetMaxEvents(metaKey)
  373. rcall("ZADD", delayedKey, score, jobId)
  374. rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  375. "jobId", jobId, "delay", delayedTimestamp)
  376. -- Try to get next job to avoid an extra roundtrip if the queue is not closing,
  377. -- and not rate limited.
  378. if (ARGV[8] == "1") then
  379. local opts = cmsgpack.unpack(ARGV[9])
  380. local result = fetchNextJob(KEYS[9], KEYS[2], KEYS[3], KEYS[6],
  381. KEYS[10], KEYS[4], KEYS[11], metaKey, KEYS[12], markerKey,
  382. ARGV[1], ARGV[2], opts)
  383. if result and type(result[1]) == "table" then
  384. return result
  385. end
  386. end
  387. -- Check if we need to push a marker job to wake up sleeping workers.
  388. addDelayMarkerIfNeeded(markerKey, delayedKey)
  389. return 0
  390. else
  391. return -1
  392. end
  393. `;
  394. exports.moveToDelayed = {
  395. name: 'moveToDelayed',
  396. content,
  397. keys: 12,
  398. };
  399. //# sourceMappingURL=moveToDelayed-12.js.map