moveToDelayed-12.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. const content = `--[[
  2. Moves job from active to delayed set.
  3. Input:
  4. KEYS[1] marker key
  5. KEYS[2] active key
  6. KEYS[3] prioritized key
  7. KEYS[4] delayed key
  8. KEYS[5] job key
  9. KEYS[6] events stream
  10. KEYS[7] meta key
  11. KEYS[8] stalled key
  12. KEYS[9] wait key
  13. KEYS[10] rate limiter key
  14. KEYS[11] paused key
  15. KEYS[12] pc priority counter
  16. ARGV[1] key prefix
  17. ARGV[2] timestamp
  18. ARGV[3] the id of the job
  19. ARGV[4] queue token
  20. ARGV[5] delay value
  21. ARGV[6] skip attempt
  22. ARGV[7] optional job fields to update
  23. ARGV[8] fetch next?
  24. ARGV[9] opts
  25. Output:
  26. 0 - OK
  27. -1 - Missing job.
  28. -3 - Job not in active set.
  29. Events:
  30. - delayed key.
  31. ]]
  32. local rcall = redis.call
  33. -- Includes
  34. --[[
  35. Add delay marker if needed.
  36. ]]
  37. -- Includes
  38. --[[
  39. Function to return the next delayed job timestamp.
  40. ]]
  41. local function getNextDelayedTimestamp(delayedKey)
  42. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  43. if #result then
  44. local nextTimestamp = tonumber(result[2])
  45. if nextTimestamp ~= nil then
  46. return nextTimestamp / 0x1000
  47. end
  48. end
  49. end
  50. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  51. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  52. if nextTimestamp ~= nil then
  53. -- Replace the score of the marker with the newest known
  54. -- next timestamp.
  55. rcall("ZADD", markerKey, nextTimestamp, "1")
  56. end
  57. end
  58. --[[
  59. Function to fetch the next job to process.
  60. Tries to get the next job to avoid an extra roundtrip if the queue is
  61. not closing and not rate limited.
  62. Input:
  63. waitKey - wait list key
  64. activeKey - active list key
  65. prioritizedKey - prioritized sorted set key
  66. eventStreamKey - event stream key
  67. rateLimiterKey - rate limiter key
  68. delayedKey - delayed sorted set key
  69. pausedKey - paused list key
  70. metaKey - meta hash key
  71. pcKey - priority counter key
  72. markerKey - marker key
  73. prefix - keys prefix
  74. timestamp - current timestamp
  75. opts - options table:
  76. token (required) - lock token used when locking jobs
  77. lockDuration (required) - lock duration for acquired jobs
  78. limiter (optional) - rate limiter options table (e.g. { max = number })
  79. ]]
  80. -- Includes
  81. --[[
  82. Function to get current rate limit ttl.
  83. ]]
  84. local function getRateLimitTTL(maxJobs, rateLimiterKey)
  85. if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
  86. local pttl = rcall("PTTL", rateLimiterKey)
  87. if pttl == 0 then
  88. rcall("DEL", rateLimiterKey)
  89. end
  90. if pttl > 0 then
  91. return pttl
  92. end
  93. end
  94. return 0
  95. end
  96. --[[
  97. Function to check for the meta.paused key to decide if we are paused or not
  98. (since an empty list and !EXISTS are not really the same).
  99. ]]
  100. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  101. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  102. if queueAttributes[1] then
  103. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  104. else
  105. if queueAttributes[2] then
  106. local activeCount = rcall("LLEN", activeKey)
  107. if activeCount >= tonumber(queueAttributes[2]) then
  108. return waitKey, true, queueAttributes[3], queueAttributes[4]
  109. else
  110. return waitKey, false, queueAttributes[3], queueAttributes[4]
  111. end
  112. end
  113. end
  114. return waitKey, false, queueAttributes[3], queueAttributes[4]
  115. end
  116. --[[
  117. Function to move job from prioritized state to active.
  118. ]]
  119. local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey)
  120. local prioritizedJob = rcall("ZPOPMIN", priorityKey)
  121. if #prioritizedJob > 0 then
  122. rcall("LPUSH", activeKey, prioritizedJob[1])
  123. return prioritizedJob[1]
  124. else
  125. rcall("DEL", priorityCounterKey)
  126. end
  127. end
  128. --[[
  129. Function to move job from wait state to active.
  130. Input:
  131. opts - token - lock token
  132. opts - lockDuration
  133. opts - limiter
  134. ]]
  135. -- Includes
  136. --[[
  137. Add marker if needed when a job is available.
  138. ]]
  139. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  140. if not isPausedOrMaxed then
  141. rcall("ZADD", markerKey, 0, "0")
  142. end
  143. end
  144. local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
  145. jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
  146. local jobKey = keyPrefix .. jobId
  147. -- Check if we need to perform rate limiting.
  148. if maxJobs then
  149. local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
  150. if jobCounter == 1 then
  151. local integerDuration = math.floor(math.abs(limiterDuration))
  152. rcall("PEXPIRE", rateLimiterKey, integerDuration)
  153. end
  154. end
  155. -- get a lock
  156. if opts['token'] ~= "0" then
  157. local lockKey = jobKey .. ':lock'
  158. rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
  159. end
  160. local optionalValues = {}
  161. if opts['name'] then
  162. -- Set "processedBy" field to the worker name
  163. table.insert(optionalValues, "pb")
  164. table.insert(optionalValues, opts['name'])
  165. end
  166. rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
  167. rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
  168. rcall("HINCRBY", jobKey, "ats", 1)
  169. addBaseMarkerIfNeeded(markerKey, false)
  170. -- rate limit delay must be 0 in this case to prevent adding more delay
  171. -- when job that is moved to active needs to be processed
  172. return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
  173. end
  174. --[[
  175. Updates the delay set, by moving delayed jobs that should
  176. be processed now to "wait".
  177. Events:
  178. 'waiting'
  179. ]]
  180. -- Includes
  181. --[[
  182. Function to add job in target list and add marker if needed.
  183. ]]
  184. -- Includes
  185. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  186. rcall(pushCmd, targetKey, jobId)
  187. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  188. end
  189. --[[
  190. Function to add job considering priority.
  191. ]]
  192. -- Includes
  193. --[[
  194. Function to get priority score.
  195. ]]
  196. local function getPriorityScore(priority, priorityCounterKey)
  197. local prioCounter = rcall("INCR", priorityCounterKey)
  198. return priority * 0x100000000 + prioCounter % 0x100000000
  199. end
  200. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  201. isPausedOrMaxed)
  202. local score = getPriorityScore(priority, priorityCounterKey)
  203. rcall("ZADD", prioritizedKey, score, jobId)
  204. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  205. end
  206. -- Try to get as much as 1000 jobs at once
  207. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  208. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  209. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  210. if (#jobs > 0) then
  211. rcall("ZREM", delayedKey, unpack(jobs))
  212. for _, jobId in ipairs(jobs) do
  213. local jobKey = prefix .. jobId
  214. local priority =
  215. tonumber(rcall("HGET", jobKey, "priority")) or 0
  216. if priority == 0 then
  217. -- LIFO or FIFO
  218. rcall("LPUSH", targetKey, jobId)
  219. else
  220. local score = getPriorityScore(priority, priorityCounterKey)
  221. rcall("ZADD", prioritizedKey, score, jobId)
  222. end
  223. -- Emit waiting event
  224. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  225. jobId, "prev", "delayed")
  226. rcall("HSET", jobKey, "delay", 0)
  227. end
  228. addBaseMarkerIfNeeded(markerKey, isPaused)
  229. end
  230. end
  231. local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey,
  232. rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix,
  233. timestamp, opts)
  234. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration =
  235. getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  236. -- Check if there are delayed jobs that can be promoted
  237. promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey,
  238. eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed)
  239. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  240. -- Check if we are rate limited first.
  241. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  242. if expireTime > 0 then
  243. return {0, 0, expireTime, 0}
  244. end
  245. -- paused or maxed queue
  246. if isPausedOrMaxed then
  247. return {0, 0, 0, 0}
  248. end
  249. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  250. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  251. if jobId then
  252. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  253. if string.sub(jobId, 1, 2) == "0:" then
  254. rcall("LREM", activeKey, 1, jobId)
  255. -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
  256. -- but if ID is 0:0, then there is at least 1 prioritized job to process
  257. if jobId == "0:0" then
  258. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  259. return prepareJobForProcessing(prefix, rateLimiterKey,
  260. eventStreamKey, jobId, timestamp, maxJobs,
  261. limiterDuration, markerKey, opts)
  262. end
  263. else
  264. return prepareJobForProcessing(prefix, rateLimiterKey,
  265. eventStreamKey, jobId, timestamp, maxJobs,
  266. limiterDuration, markerKey, opts)
  267. end
  268. else
  269. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  270. if jobId then
  271. return prepareJobForProcessing(prefix, rateLimiterKey,
  272. eventStreamKey, jobId, timestamp, maxJobs,
  273. limiterDuration, markerKey, opts)
  274. end
  275. end
  276. -- Return the timestamp for the next delayed job if any.
  277. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  278. if nextTimestamp ~= nil then
  279. -- The result is guaranteed to be positive, since the
  280. -- ZRANGEBYSCORE command would have return a job otherwise.
  281. return {0, 0, 0, nextTimestamp}
  282. end
  283. end
  284. --[[
  285. Bake in the job id first 12 bits into the timestamp
  286. to guarantee correct execution order of delayed jobs
  287. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  288. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  289. ]]
  290. local function getDelayedScore(delayedKey, timestamp, delay)
  291. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  292. local minScore = delayedTimestamp * 0x1000
  293. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  294. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  295. minScore, "WITHSCORES","LIMIT", 0, 1)
  296. if #result then
  297. local currentMaxScore = tonumber(result[2])
  298. if currentMaxScore ~= nil then
  299. if currentMaxScore >= maxScore then
  300. return maxScore, delayedTimestamp
  301. else
  302. return currentMaxScore + 1, delayedTimestamp
  303. end
  304. end
  305. end
  306. return minScore, delayedTimestamp
  307. end
  308. --[[
  309. Function to get max events value or set by default 10000.
  310. ]]
  311. local function getOrSetMaxEvents(metaKey)
  312. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  313. if not maxEvents then
  314. maxEvents = 10000
  315. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  316. end
  317. return maxEvents
  318. end
  319. local function removeLock(jobKey, stalledKey, token, jobId)
  320. if token ~= "0" then
  321. local lockKey = jobKey .. ':lock'
  322. local lockToken = rcall("GET", lockKey)
  323. if lockToken == token then
  324. rcall("DEL", lockKey)
  325. rcall("SREM", stalledKey, jobId)
  326. else
  327. if lockToken then
  328. -- Lock exists but token does not match
  329. return -6
  330. else
  331. -- Lock is missing completely
  332. return -2
  333. end
  334. end
  335. end
  336. return 0
  337. end
  338. --[[
  339. Function to update a bunch of fields in a job.
  340. ]]
  341. local function updateJobFields(jobKey, msgpackedFields)
  342. if msgpackedFields and #msgpackedFields > 0 then
  343. local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
  344. if fieldsToUpdate then
  345. rcall("HMSET", jobKey, unpack(fieldsToUpdate))
  346. end
  347. end
  348. end
  349. local jobKey = KEYS[5]
  350. local markerKey = KEYS[1]
  351. local metaKey = KEYS[7]
  352. local token = ARGV[4]
  353. if rcall("EXISTS", jobKey) == 1 then
  354. local errorCode = removeLock(jobKey, KEYS[8], token, ARGV[3])
  355. if errorCode < 0 then
  356. return errorCode
  357. end
  358. updateJobFields(jobKey, ARGV[7])
  359. local delayedKey = KEYS[4]
  360. local jobId = ARGV[3]
  361. local delay = tonumber(ARGV[5])
  362. local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
  363. if numRemovedElements < 1 then return -3 end
  364. local score, delayedTimestamp = getDelayedScore(delayedKey, ARGV[2], delay)
  365. if ARGV[6] == "0" then
  366. rcall("HINCRBY", jobKey, "atm", 1)
  367. end
  368. rcall("HSET", jobKey, "delay", ARGV[5])
  369. local maxEvents = getOrSetMaxEvents(metaKey)
  370. rcall("ZADD", delayedKey, score, jobId)
  371. rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  372. "jobId", jobId, "delay", delayedTimestamp)
  373. -- Try to get next job to avoid an extra roundtrip if the queue is not closing,
  374. -- and not rate limited.
  375. if (ARGV[8] == "1") then
  376. local opts = cmsgpack.unpack(ARGV[9])
  377. local result = fetchNextJob(KEYS[9], KEYS[2], KEYS[3], KEYS[6],
  378. KEYS[10], KEYS[4], KEYS[11], metaKey, KEYS[12], markerKey,
  379. ARGV[1], ARGV[2], opts)
  380. if result and type(result[1]) == "table" then
  381. return result
  382. end
  383. end
  384. -- Check if we need to push a marker job to wake up sleeping workers.
  385. addDelayMarkerIfNeeded(markerKey, delayedKey)
  386. return 0
  387. else
  388. return -1
  389. end
  390. `;
  391. export const moveToDelayed = {
  392. name: 'moveToDelayed',
  393. content,
  394. keys: 12,
  395. };
  396. //# sourceMappingURL=moveToDelayed-12.js.map