moveToFinished-14.js 39 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.moveToFinished = void 0;
  4. const content = `--[[
  5. Move job from active to a finished status (completed o failed)
  6. A job can only be moved to completed if it was active.
  7. The job must be locked before it can be moved to a finished status,
  8. and the lock must be released in this script.
  9. Input:
  10. KEYS[1] wait key
  11. KEYS[2] active key
  12. KEYS[3] prioritized key
  13. KEYS[4] event stream key
  14. KEYS[5] stalled key
  15. -- Rate limiting
  16. KEYS[6] rate limiter key
  17. KEYS[7] delayed key
  18. KEYS[8] paused key
  19. KEYS[9] meta key
  20. KEYS[10] pc priority counter
  21. KEYS[11] completed/failed key
  22. KEYS[12] jobId key
  23. KEYS[13] metrics key
  24. KEYS[14] marker key
  25. ARGV[1] jobId
  26. ARGV[2] timestamp
  27. ARGV[3] msg property returnvalue / failedReason
  28. ARGV[4] return value / failed reason
  29. ARGV[5] target (completed/failed)
  30. ARGV[6] fetch next?
  31. ARGV[7] keys prefix
  32. ARGV[8] opts
  33. ARGV[9] job fields to update
  34. opts - token - lock token
  35. opts - keepJobs
  36. opts - lockDuration - lock duration in milliseconds
  37. opts - attempts max attempts
  38. opts - maxMetricsSize
  39. opts - fpof - fail parent on fail
  40. opts - cpof - continue parent on fail
  41. opts - idof - ignore dependency on fail
  42. opts - rdof - remove dependency on fail
  43. opts - name - worker name
  44. Output:
  45. 0 OK
  46. -1 Missing key.
  47. -2 Missing lock.
  48. -3 Job not in active set
  49. -4 Job has pending children
  50. -6 Lock is not owned by this client
  51. -9 Job has failed children
  52. Events:
  53. 'completed/failed'
  54. ]]
  55. local rcall = redis.call
  56. --- Includes
  57. --[[
  58. Functions to collect metrics based on a current and previous count of jobs.
  59. Granualarity is fixed at 1 minute.
  60. ]]
  61. --[[
  62. Function to loop in batches.
  63. Just a bit of warning, some commands as ZREM
  64. could receive a maximum of 7000 parameters per call.
  65. ]]
  66. local function batches(n, batchSize)
  67. local i = 0
  68. return function()
  69. local from = i * batchSize + 1
  70. i = i + 1
  71. if (from <= n) then
  72. local to = math.min(from + batchSize - 1, n)
  73. return from, to
  74. end
  75. end
  76. end
  77. local function collectMetrics(metaKey, dataPointsList, maxDataPoints,
  78. timestamp)
  79. -- Increment current count
  80. local count = rcall("HINCRBY", metaKey, "count", 1) - 1
  81. -- Compute how many data points we need to add to the list, N.
  82. local prevTS = rcall("HGET", metaKey, "prevTS")
  83. if not prevTS then
  84. -- If prevTS is nil, set it to the current timestamp
  85. rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0)
  86. return
  87. end
  88. local N = math.min(math.floor(timestamp / 60000) - math.floor(prevTS / 60000), tonumber(maxDataPoints))
  89. if N > 0 then
  90. local delta = count - rcall("HGET", metaKey, "prevCount")
  91. -- If N > 1, add N-1 zeros to the list
  92. if N > 1 then
  93. local points = {}
  94. points[1] = delta
  95. for i = 2, N do
  96. points[i] = 0
  97. end
  98. for from, to in batches(#points, 7000) do
  99. rcall("LPUSH", dataPointsList, unpack(points, from, to))
  100. end
  101. else
  102. -- LPUSH delta to the list
  103. rcall("LPUSH", dataPointsList, delta)
  104. end
  105. -- LTRIM to keep list to its max size
  106. rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1)
  107. -- update prev count with current count
  108. rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp)
  109. end
  110. end
  111. --[[
  112. Function to fetch the next job to process.
  113. Tries to get the next job to avoid an extra roundtrip if the queue is
  114. not closing and not rate limited.
  115. Input:
  116. waitKey - wait list key
  117. activeKey - active list key
  118. prioritizedKey - prioritized sorted set key
  119. eventStreamKey - event stream key
  120. rateLimiterKey - rate limiter key
  121. delayedKey - delayed sorted set key
  122. pausedKey - paused list key
  123. metaKey - meta hash key
  124. pcKey - priority counter key
  125. markerKey - marker key
  126. prefix - keys prefix
  127. timestamp - current timestamp
  128. opts - options table:
  129. token (required) - lock token used when locking jobs
  130. lockDuration (required) - lock duration for acquired jobs
  131. limiter (optional) - rate limiter options table (e.g. { max = number })
  132. ]]
  133. -- Includes
  134. --[[
  135. Function to return the next delayed job timestamp.
  136. ]]
  137. local function getNextDelayedTimestamp(delayedKey)
  138. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  139. if #result then
  140. local nextTimestamp = tonumber(result[2])
  141. if nextTimestamp ~= nil then
  142. return nextTimestamp / 0x1000
  143. end
  144. end
  145. end
  146. --[[
  147. Function to get current rate limit ttl.
  148. ]]
  149. local function getRateLimitTTL(maxJobs, rateLimiterKey)
  150. if maxJobs and maxJobs <= tonumber(rcall("GET", rateLimiterKey) or 0) then
  151. local pttl = rcall("PTTL", rateLimiterKey)
  152. if pttl == 0 then
  153. rcall("DEL", rateLimiterKey)
  154. end
  155. if pttl > 0 then
  156. return pttl
  157. end
  158. end
  159. return 0
  160. end
  161. --[[
  162. Function to check for the meta.paused key to decide if we are paused or not
  163. (since an empty list and !EXISTS are not really the same).
  164. ]]
  165. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  166. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  167. if queueAttributes[1] then
  168. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  169. else
  170. if queueAttributes[2] then
  171. local activeCount = rcall("LLEN", activeKey)
  172. if activeCount >= tonumber(queueAttributes[2]) then
  173. return waitKey, true, queueAttributes[3], queueAttributes[4]
  174. else
  175. return waitKey, false, queueAttributes[3], queueAttributes[4]
  176. end
  177. end
  178. end
  179. return waitKey, false, queueAttributes[3], queueAttributes[4]
  180. end
  181. --[[
  182. Function to move job from prioritized state to active.
  183. ]]
  184. local function moveJobFromPrioritizedToActive(priorityKey, activeKey, priorityCounterKey)
  185. local prioritizedJob = rcall("ZPOPMIN", priorityKey)
  186. if #prioritizedJob > 0 then
  187. rcall("LPUSH", activeKey, prioritizedJob[1])
  188. return prioritizedJob[1]
  189. else
  190. rcall("DEL", priorityCounterKey)
  191. end
  192. end
  193. --[[
  194. Function to move job from wait state to active.
  195. Input:
  196. opts - token - lock token
  197. opts - lockDuration
  198. opts - limiter
  199. ]]
  200. -- Includes
  201. --[[
  202. Add marker if needed when a job is available.
  203. ]]
  204. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  205. if not isPausedOrMaxed then
  206. rcall("ZADD", markerKey, 0, "0")
  207. end
  208. end
  209. local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
  210. jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
  211. local jobKey = keyPrefix .. jobId
  212. -- Check if we need to perform rate limiting.
  213. if maxJobs then
  214. local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
  215. if jobCounter == 1 then
  216. local integerDuration = math.floor(math.abs(limiterDuration))
  217. rcall("PEXPIRE", rateLimiterKey, integerDuration)
  218. end
  219. end
  220. -- get a lock
  221. if opts['token'] ~= "0" then
  222. local lockKey = jobKey .. ':lock'
  223. rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
  224. end
  225. local optionalValues = {}
  226. if opts['name'] then
  227. -- Set "processedBy" field to the worker name
  228. table.insert(optionalValues, "pb")
  229. table.insert(optionalValues, opts['name'])
  230. end
  231. rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
  232. rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
  233. rcall("HINCRBY", jobKey, "ats", 1)
  234. addBaseMarkerIfNeeded(markerKey, false)
  235. -- rate limit delay must be 0 in this case to prevent adding more delay
  236. -- when job that is moved to active needs to be processed
  237. return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
  238. end
  239. --[[
  240. Updates the delay set, by moving delayed jobs that should
  241. be processed now to "wait".
  242. Events:
  243. 'waiting'
  244. ]]
  245. -- Includes
  246. --[[
  247. Function to add job in target list and add marker if needed.
  248. ]]
  249. -- Includes
  250. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  251. rcall(pushCmd, targetKey, jobId)
  252. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  253. end
  254. --[[
  255. Function to add job considering priority.
  256. ]]
  257. -- Includes
  258. --[[
  259. Function to get priority score.
  260. ]]
  261. local function getPriorityScore(priority, priorityCounterKey)
  262. local prioCounter = rcall("INCR", priorityCounterKey)
  263. return priority * 0x100000000 + prioCounter % 0x100000000
  264. end
  265. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  266. isPausedOrMaxed)
  267. local score = getPriorityScore(priority, priorityCounterKey)
  268. rcall("ZADD", prioritizedKey, score, jobId)
  269. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  270. end
  271. -- Try to get as much as 1000 jobs at once
  272. local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
  273. eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
  274. local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)
  275. if (#jobs > 0) then
  276. rcall("ZREM", delayedKey, unpack(jobs))
  277. for _, jobId in ipairs(jobs) do
  278. local jobKey = prefix .. jobId
  279. local priority =
  280. tonumber(rcall("HGET", jobKey, "priority")) or 0
  281. if priority == 0 then
  282. -- LIFO or FIFO
  283. rcall("LPUSH", targetKey, jobId)
  284. else
  285. local score = getPriorityScore(priority, priorityCounterKey)
  286. rcall("ZADD", prioritizedKey, score, jobId)
  287. end
  288. -- Emit waiting event
  289. rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId",
  290. jobId, "prev", "delayed")
  291. rcall("HSET", jobKey, "delay", 0)
  292. end
  293. addBaseMarkerIfNeeded(markerKey, isPaused)
  294. end
  295. end
  296. local function fetchNextJob(waitKey, activeKey, prioritizedKey, eventStreamKey,
  297. rateLimiterKey, delayedKey, pausedKey, metaKey, pcKey, markerKey, prefix,
  298. timestamp, opts)
  299. local target, isPausedOrMaxed, rateLimitMax, rateLimitDuration =
  300. getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  301. -- Check if there are delayed jobs that can be promoted
  302. promoteDelayedJobs(delayedKey, markerKey, target, prioritizedKey,
  303. eventStreamKey, prefix, timestamp, pcKey, isPausedOrMaxed)
  304. local maxJobs = tonumber(rateLimitMax or (opts['limiter'] and opts['limiter']['max']))
  305. -- Check if we are rate limited first.
  306. local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
  307. if expireTime > 0 then
  308. return {0, 0, expireTime, 0}
  309. end
  310. -- paused or maxed queue
  311. if isPausedOrMaxed then
  312. return {0, 0, 0, 0}
  313. end
  314. local limiterDuration = (opts['limiter'] and opts['limiter']['duration']) or rateLimitDuration
  315. local jobId = rcall("RPOPLPUSH", waitKey, activeKey)
  316. if jobId then
  317. -- Markers in waitlist DEPRECATED in v5: Remove in v6.
  318. if string.sub(jobId, 1, 2) == "0:" then
  319. rcall("LREM", activeKey, 1, jobId)
  320. -- If jobId is special ID 0:delay (delay greater than 0), then there is no job to process
  321. -- but if ID is 0:0, then there is at least 1 prioritized job to process
  322. if jobId == "0:0" then
  323. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  324. return prepareJobForProcessing(prefix, rateLimiterKey,
  325. eventStreamKey, jobId, timestamp, maxJobs,
  326. limiterDuration, markerKey, opts)
  327. end
  328. else
  329. return prepareJobForProcessing(prefix, rateLimiterKey,
  330. eventStreamKey, jobId, timestamp, maxJobs,
  331. limiterDuration, markerKey, opts)
  332. end
  333. else
  334. jobId = moveJobFromPrioritizedToActive(prioritizedKey, activeKey, pcKey)
  335. if jobId then
  336. return prepareJobForProcessing(prefix, rateLimiterKey,
  337. eventStreamKey, jobId, timestamp, maxJobs,
  338. limiterDuration, markerKey, opts)
  339. end
  340. end
  341. -- Return the timestamp for the next delayed job if any.
  342. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  343. if nextTimestamp ~= nil then
  344. -- The result is guaranteed to be positive, since the
  345. -- ZRANGEBYSCORE command would have return a job otherwise.
  346. return {0, 0, 0, nextTimestamp}
  347. end
  348. end
  349. --[[
  350. Function to recursively move from waitingChildren to failed.
  351. ]]
  352. -- Includes
  353. --[[
  354. Validate and move parent to a wait status (waiting, delayed or prioritized)
  355. if no pending dependencies.
  356. ]]
  357. -- Includes
  358. --[[
  359. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  360. ]]
  361. -- Includes
  362. --[[
  363. Move parent to a wait status (wait, prioritized or delayed)
  364. ]]
  365. -- Includes
  366. --[[
  367. Add delay marker if needed.
  368. ]]
  369. -- Includes
  370. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  371. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  372. if nextTimestamp ~= nil then
  373. -- Replace the score of the marker with the newest known
  374. -- next timestamp.
  375. rcall("ZADD", markerKey, nextTimestamp, "1")
  376. end
  377. end
  378. --[[
  379. Function to check if queue is paused or maxed
  380. (since an empty list and !EXISTS are not really the same).
  381. ]]
  382. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  383. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  384. if queueAttributes[1] then
  385. return true
  386. else
  387. if queueAttributes[2] then
  388. local activeCount = rcall("LLEN", activeKey)
  389. return activeCount >= tonumber(queueAttributes[2])
  390. end
  391. end
  392. return false
  393. end
  394. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  395. local parentWaitKey = parentQueueKey .. ":wait"
  396. local parentPausedKey = parentQueueKey .. ":paused"
  397. local parentActiveKey = parentQueueKey .. ":active"
  398. local parentMetaKey = parentQueueKey .. ":meta"
  399. local parentMarkerKey = parentQueueKey .. ":marker"
  400. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  401. local priority = tonumber(jobAttributes[1]) or 0
  402. local delay = tonumber(jobAttributes[2]) or 0
  403. if delay > 0 then
  404. local delayedTimestamp = tonumber(timestamp) + delay
  405. local score = delayedTimestamp * 0x1000
  406. local parentDelayedKey = parentQueueKey .. ":delayed"
  407. rcall("ZADD", parentDelayedKey, score, parentId)
  408. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  409. delayedTimestamp)
  410. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  411. else
  412. if priority == 0 then
  413. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  414. parentWaitKey, parentPausedKey)
  415. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  416. else
  417. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  418. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  419. parentQueueKey .. ":pc", isPausedOrMaxed)
  420. end
  421. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  422. "waiting-children")
  423. end
  424. end
  425. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  426. if rcall("EXISTS", parentKey) == 1 then
  427. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  428. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  429. rcall("ZREM", parentWaitingChildrenKey, parentId)
  430. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  431. end
  432. end
  433. end
  434. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  435. parentId, timestamp)
  436. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  437. if doNotHavePendingDependencies then
  438. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  439. end
  440. end
  441. local handleChildFailureAndMoveParentToWait = function (parentQueueKey, parentKey, parentId, jobIdKey, timestamp)
  442. if rcall("EXISTS", parentKey) == 1 then
  443. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  444. local parentDelayedKey = parentQueueKey .. ":delayed"
  445. local parentWaitingChildrenOrDelayedKey
  446. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  447. parentWaitingChildrenOrDelayedKey = parentWaitingChildrenKey
  448. elseif rcall("ZSCORE", parentDelayedKey, parentId) then
  449. parentWaitingChildrenOrDelayedKey = parentDelayedKey
  450. rcall("HSET", parentKey, "delay", 0)
  451. end
  452. if parentWaitingChildrenOrDelayedKey then
  453. rcall("ZREM", parentWaitingChildrenOrDelayedKey, parentId)
  454. local deferredFailure = "child " .. jobIdKey .. " failed"
  455. rcall("HSET", parentKey, "defa", deferredFailure)
  456. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  457. else
  458. if not rcall("ZSCORE", parentQueueKey .. ":failed", parentId) then
  459. local deferredFailure = "child " .. jobIdKey .. " failed"
  460. rcall("HSET", parentKey, "defa", deferredFailure)
  461. end
  462. end
  463. end
  464. end
  465. local moveChildFromDependenciesIfNeeded = function (rawParentData, childKey, failedReason, timestamp)
  466. if rawParentData then
  467. local parentData = cjson.decode(rawParentData)
  468. local parentKey = parentData['queueKey'] .. ':' .. parentData['id']
  469. local parentDependenciesChildrenKey = parentKey .. ":dependencies"
  470. if parentData['fpof'] then
  471. if rcall("SREM", parentDependenciesChildrenKey, childKey) == 1 then
  472. local parentUnsuccessfulChildrenKey = parentKey .. ":unsuccessful"
  473. rcall("ZADD", parentUnsuccessfulChildrenKey, timestamp, childKey)
  474. handleChildFailureAndMoveParentToWait(
  475. parentData['queueKey'],
  476. parentKey,
  477. parentData['id'],
  478. childKey,
  479. timestamp
  480. )
  481. end
  482. elseif parentData['cpof'] then
  483. if rcall("SREM", parentDependenciesChildrenKey, childKey) == 1 then
  484. local parentFailedChildrenKey = parentKey .. ":failed"
  485. rcall("HSET", parentFailedChildrenKey, childKey, failedReason)
  486. moveParentToWaitIfNeeded(parentData['queueKey'], parentKey, parentData['id'], timestamp)
  487. end
  488. elseif parentData['idof'] or parentData['rdof'] then
  489. if rcall("SREM", parentDependenciesChildrenKey, childKey) == 1 then
  490. moveParentToWaitIfNoPendingDependencies(parentData['queueKey'], parentDependenciesChildrenKey,
  491. parentKey, parentData['id'], timestamp)
  492. if parentData['idof'] then
  493. local parentFailedChildrenKey = parentKey .. ":failed"
  494. rcall("HSET", parentFailedChildrenKey, childKey, failedReason)
  495. end
  496. end
  497. end
  498. end
  499. end
  500. --[[
  501. Function to remove deduplication key if needed
  502. when a job is moved to completed or failed states.
  503. ]]
  504. local function removeDeduplicationKeyIfNeededOnFinalization(prefixKey,
  505. deduplicationId, jobId)
  506. if deduplicationId then
  507. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  508. local pttl = rcall("PTTL", deduplicationKey)
  509. if pttl == 0 then
  510. return rcall("DEL", deduplicationKey)
  511. end
  512. if pttl == -1 then
  513. local currentJobId = rcall('GET', deduplicationKey)
  514. if currentJobId and currentJobId == jobId then
  515. return rcall("DEL", deduplicationKey)
  516. end
  517. end
  518. end
  519. end
  520. --[[
  521. Function to remove job keys.
  522. ]]
  523. local function removeJobKeys(jobKey)
  524. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  525. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  526. end
  527. --[[
  528. Functions to remove jobs by max age.
  529. ]]
  530. -- Includes
  531. --[[
  532. Function to remove job.
  533. ]]
  534. -- Includes
  535. --[[
  536. Function to remove deduplication key if needed
  537. when a job is being removed.
  538. ]]
  539. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  540. jobId, deduplicationId)
  541. if deduplicationId then
  542. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  543. local currentJobId = rcall('GET', deduplicationKey)
  544. if currentJobId and currentJobId == jobId then
  545. rcall("DEL", deduplicationKey)
  546. -- Also clean up any pending dedup-next data for this dedup ID
  547. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  548. return 1
  549. end
  550. end
  551. end
  552. --[[
  553. Check if this job has a parent. If so we will just remove it from
  554. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  555. which requires code from "moveToFinished"
  556. ]]
  557. -- Includes
  558. --[[
  559. Functions to destructure job key.
  560. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  561. ]]
  562. local getJobIdFromKey = function (jobKey)
  563. return string.match(jobKey, ".*:(.*)")
  564. end
  565. local getJobKeyPrefix = function (jobKey, jobId)
  566. return string.sub(jobKey, 0, #jobKey - #jobId)
  567. end
  568. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  569. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  570. parentPrefix .. "wait", parentPrefix .. "paused")
  571. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  572. if emitEvent then
  573. local parentEventStream = parentPrefix .. "events"
  574. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  575. end
  576. end
  577. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  578. if parentKey then
  579. local parentDependenciesKey = parentKey .. ":dependencies"
  580. local result = rcall("SREM", parentDependenciesKey, jobKey)
  581. if result > 0 then
  582. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  583. if pendingDependencies == 0 then
  584. local parentId = getJobIdFromKey(parentKey)
  585. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  586. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  587. if numRemovedElements == 1 then
  588. if hard then -- remove parent in same queue
  589. if parentPrefix == baseKey then
  590. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  591. removeJobKeys(parentKey)
  592. if debounceId then
  593. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  594. end
  595. else
  596. _moveParentToWait(parentPrefix, parentId)
  597. end
  598. else
  599. _moveParentToWait(parentPrefix, parentId, true)
  600. end
  601. end
  602. end
  603. return true
  604. end
  605. else
  606. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  607. local missedParentKey = parentAttributes[1]
  608. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  609. and (rcall("EXISTS", missedParentKey) == 1)) then
  610. local parentDependenciesKey = missedParentKey .. ":dependencies"
  611. local result = rcall("SREM", parentDependenciesKey, jobKey)
  612. if result > 0 then
  613. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  614. if pendingDependencies == 0 then
  615. local parentId = getJobIdFromKey(missedParentKey)
  616. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  617. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  618. if numRemovedElements == 1 then
  619. if hard then
  620. if parentPrefix == baseKey then
  621. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  622. removeJobKeys(missedParentKey)
  623. if parentAttributes[2] then
  624. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  625. end
  626. else
  627. _moveParentToWait(parentPrefix, parentId)
  628. end
  629. else
  630. _moveParentToWait(parentPrefix, parentId, true)
  631. end
  632. end
  633. end
  634. return true
  635. end
  636. end
  637. end
  638. return false
  639. end
  640. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  641. local jobKey = baseKey .. jobId
  642. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  643. if shouldRemoveDeduplicationKey then
  644. local deduplicationId = rcall("HGET", jobKey, "deid")
  645. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  646. end
  647. removeJobKeys(jobKey)
  648. end
  649. local function removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix, maxLimit)
  650. local start = timestamp - maxAge * 1000
  651. local jobIds = rcall("ZREVRANGEBYSCORE", targetSet, start, "-inf", "LIMIT", 0, maxLimit)
  652. for i, jobId in ipairs(jobIds) do
  653. removeJob(jobId, false, prefix, false --[[remove debounce key]])
  654. end
  655. if #jobIds > 0 then
  656. if #jobIds < maxLimit then
  657. rcall("ZREMRANGEBYSCORE", targetSet, "-inf", start)
  658. else
  659. for from, to in batches(#jobIds, 7000) do
  660. rcall("ZREM", targetSet, unpack(jobIds, from, to))
  661. end
  662. end
  663. end
  664. end
  665. --[[
  666. Functions to remove jobs by max count.
  667. ]]
  668. -- Includes
  669. local function removeJobsByMaxCount(maxCount, targetSet, prefix)
  670. local start = maxCount
  671. local jobIds = rcall("ZREVRANGE", targetSet, start, -1)
  672. for i, jobId in ipairs(jobIds) do
  673. removeJob(jobId, false, prefix, false --[[remove debounce key]])
  674. end
  675. rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
  676. end
  677. local function removeLock(jobKey, stalledKey, token, jobId)
  678. if token ~= "0" then
  679. local lockKey = jobKey .. ':lock'
  680. local lockToken = rcall("GET", lockKey)
  681. if lockToken == token then
  682. rcall("DEL", lockKey)
  683. rcall("SREM", stalledKey, jobId)
  684. else
  685. if lockToken then
  686. -- Lock exists but token does not match
  687. return -6
  688. else
  689. -- Lock is missing completely
  690. return -2
  691. end
  692. end
  693. end
  694. return 0
  695. end
  696. --[[
  697. Function to create a new job from stored dedup-next data
  698. when a deduplicated job with keepLastIfActive finishes.
  699. At most one next job is created per deduplication ID.
  700. Multiple triggers while active overwrite the dedup-next data,
  701. so only the latest data is used.
  702. ]]
  703. -- Includes
  704. --[[
  705. Function to get max events value or set by default 10000.
  706. ]]
  707. local function getOrSetMaxEvents(metaKey)
  708. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  709. if not maxEvents then
  710. maxEvents = 10000
  711. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  712. end
  713. return maxEvents
  714. end
  715. --[[
  716. Function to set the deduplication key for a job.
  717. Uses TTL from deduplication opts if provided.
  718. ]]
  719. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  720. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  721. if ttl and ttl > 0 then
  722. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  723. else
  724. rcall('SET', deduplicationKey, jobId)
  725. end
  726. end
  727. --[[
  728. Shared helper to store a job and enqueue it into the appropriate list/set.
  729. Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
  730. Emits the appropriate event after enqueuing ("delayed" or "waiting").
  731. Returns delay, priority from storeJob.
  732. ]]
  733. -- Includes
  734. --[[
  735. Adds a delayed job to the queue by doing the following:
  736. - Creates a new job key with the job data.
  737. - adds to delayed zset.
  738. - Emits a global event 'delayed' if the job is delayed.
  739. ]]
  740. -- Includes
  741. --[[
  742. Bake in the job id first 12 bits into the timestamp
  743. to guarantee correct execution order of delayed jobs
  744. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  745. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  746. ]]
  747. local function getDelayedScore(delayedKey, timestamp, delay)
  748. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  749. local minScore = delayedTimestamp * 0x1000
  750. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  751. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  752. minScore, "WITHSCORES","LIMIT", 0, 1)
  753. if #result then
  754. local currentMaxScore = tonumber(result[2])
  755. if currentMaxScore ~= nil then
  756. if currentMaxScore >= maxScore then
  757. return maxScore, delayedTimestamp
  758. else
  759. return currentMaxScore + 1, delayedTimestamp
  760. end
  761. end
  762. end
  763. return minScore, delayedTimestamp
  764. end
  765. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  766. maxEvents, markerKey, delay)
  767. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  768. rcall("ZADD", delayedKey, score, jobId)
  769. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  770. "jobId", jobId, "delay", delayedTimestamp)
  771. -- mark that a delayed job is available
  772. addDelayMarkerIfNeeded(markerKey, delayedKey)
  773. end
  774. --[[
  775. Function to store a job
  776. ]]
  777. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  778. parentKey, parentData, repeatJobKey)
  779. local jsonOpts = cjson.encode(opts)
  780. local delay = opts['delay'] or 0
  781. local priority = opts['priority'] or 0
  782. local debounceId = opts['de'] and opts['de']['id']
  783. local optionalValues = {}
  784. if parentKey ~= nil then
  785. table.insert(optionalValues, "parentKey")
  786. table.insert(optionalValues, parentKey)
  787. table.insert(optionalValues, "parent")
  788. table.insert(optionalValues, parentData)
  789. end
  790. if repeatJobKey then
  791. table.insert(optionalValues, "rjk")
  792. table.insert(optionalValues, repeatJobKey)
  793. end
  794. if debounceId then
  795. table.insert(optionalValues, "deid")
  796. table.insert(optionalValues, debounceId)
  797. end
  798. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  799. "timestamp", timestamp, "delay", delay, "priority", priority,
  800. unpack(optionalValues))
  801. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  802. return delay, priority
  803. end
  804. local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
  805. timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  806. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  807. priorityCounterKey, delayedKey, markerKey)
  808. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
  809. opts, timestamp, parentKey, parentData, repeatJobKey)
  810. if delay ~= 0 and delayedKey then
  811. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
  812. else
  813. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  814. if priority > 0 then
  815. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  816. priorityCounterKey, isPausedOrMaxed)
  817. else
  818. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  819. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  820. end
  821. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  822. "jobId", jobId)
  823. end
  824. return delay, priority
  825. end
  826. local function requeueDeduplicatedJob(prefix, deduplicationId, eventStreamKey,
  827. metaKey, activeKey, waitKey, pausedKey, markerKey, prioritizedKey,
  828. priorityCounterKey, delayedKey, timestamp)
  829. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  830. if rcall("EXISTS", deduplicationNextKey) == 1 then
  831. local nextData = rcall("HMGET", deduplicationNextKey,
  832. "name", "data", "opts", "pk", "pd", "pdk", "rjk")
  833. local newJobId = rcall("INCR", prefix .. "id") .. ""
  834. local newJobIdKey = prefix .. newJobId
  835. local newOpts = cjson.decode(nextData[3])
  836. local deduplicationKey = prefix .. "de:" .. deduplicationId
  837. local parentKey = nextData[4] or nil
  838. local parentData = nextData[5] or nil
  839. local parentDependenciesKey = nextData[6] or nil
  840. local repeatJobKey = nextData[7] or nil
  841. -- Set dedup key for the new job (without TTL when keepLastIfActive,
  842. -- so the key outlives the job's active duration)
  843. local deOpts = newOpts['de']
  844. if deOpts and deOpts['keepLastIfActive'] then
  845. rcall('SET', deduplicationKey, newJobId)
  846. else
  847. setDeduplicationKey(deduplicationKey, newJobId, deOpts)
  848. end
  849. -- Store and enqueue using the shared helper (handles priority/lifo/delayed)
  850. local maxEvents = getOrSetMaxEvents(metaKey)
  851. storeAndEnqueueJob(eventStreamKey, newJobIdKey, newJobId, nextData[1], nextData[2],
  852. newOpts, timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  853. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  854. priorityCounterKey, delayedKey, markerKey)
  855. -- Register as child dependency if the job has a parent
  856. if parentDependenciesKey then
  857. rcall("SADD", parentDependenciesKey, newJobIdKey)
  858. end
  859. -- Only delete the dedup-next hash after the job is fully created,
  860. -- so that if any step above errors, the data is not permanently lost.
  861. rcall("DEL", deduplicationNextKey)
  862. end
  863. end
  864. --[[
  865. Function to trim events, default 10000.
  866. ]]
  867. -- Includes
  868. local function trimEvents(metaKey, eventStreamKey)
  869. local maxEvents = getOrSetMaxEvents(metaKey)
  870. if maxEvents then
  871. rcall("XTRIM", eventStreamKey, "MAXLEN", "~", maxEvents)
  872. else
  873. rcall("XTRIM", eventStreamKey, "MAXLEN", "~", 10000)
  874. end
  875. end
  876. --[[
  877. Validate and move or add dependencies to parent.
  878. ]]
  879. -- Includes
  880. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  881. parentId, jobIdKey, returnvalue, timestamp )
  882. local processedSet = parentKey .. ":processed"
  883. rcall("HSET", processedSet, jobIdKey, returnvalue)
  884. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  885. end
  886. --[[
  887. Function to update a bunch of fields in a job.
  888. ]]
  889. local function updateJobFields(jobKey, msgpackedFields)
  890. if msgpackedFields and #msgpackedFields > 0 then
  891. local fieldsToUpdate = cmsgpack.unpack(msgpackedFields)
  892. if fieldsToUpdate then
  893. rcall("HMSET", jobKey, unpack(fieldsToUpdate))
  894. end
  895. end
  896. end
  897. local jobIdKey = KEYS[12]
  898. if rcall("EXISTS", jobIdKey) == 1 then -- Make sure job exists
  899. -- Make sure it does not have pending dependencies
  900. -- It must happen before removing lock
  901. if ARGV[5] == "completed" then
  902. if rcall("SCARD", jobIdKey .. ":dependencies") ~= 0 then
  903. return -4
  904. end
  905. if rcall("ZCARD", jobIdKey .. ":unsuccessful") ~= 0 then
  906. return -9
  907. end
  908. end
  909. local opts = cmsgpack.unpack(ARGV[8])
  910. local token = opts['token']
  911. local errorCode = removeLock(jobIdKey, KEYS[5], token, ARGV[1])
  912. if errorCode < 0 then
  913. return errorCode
  914. end
  915. updateJobFields(jobIdKey, ARGV[9]);
  916. local attempts = opts['attempts']
  917. local maxMetricsSize = opts['maxMetricsSize']
  918. local maxCount = opts['keepJobs']['count']
  919. local maxAge = opts['keepJobs']['age']
  920. local maxLimit = opts['keepJobs']['limit'] or 1000
  921. local jobAttributes = rcall("HMGET", jobIdKey, "parentKey", "parent", "deid")
  922. local parentKey = jobAttributes[1] or ""
  923. local parentId = ""
  924. local parentQueueKey = ""
  925. if jobAttributes[2] then -- TODO: need to revisit this logic if it's still needed
  926. local jsonDecodedParent = cjson.decode(jobAttributes[2])
  927. parentId = jsonDecodedParent['id']
  928. parentQueueKey = jsonDecodedParent['queueKey']
  929. end
  930. local jobId = ARGV[1]
  931. local timestamp = ARGV[2]
  932. -- Remove from active list (if not active we shall return error)
  933. local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
  934. if (numRemovedElements < 1) then
  935. return -3
  936. end
  937. local eventStreamKey = KEYS[4]
  938. local metaKey = KEYS[9]
  939. -- Trim events before emiting them to avoid trimming events emitted in this script
  940. trimEvents(metaKey, eventStreamKey)
  941. local prefix = ARGV[7]
  942. removeDeduplicationKeyIfNeededOnFinalization(prefix, jobAttributes[3], jobId)
  943. -- Check if there is requeue data for this dedup ID (keepLastIfActive mode)
  944. if jobAttributes[3] then
  945. requeueDeduplicatedJob(prefix, jobAttributes[3], eventStreamKey,
  946. metaKey, KEYS[2], KEYS[1], KEYS[8], KEYS[14], KEYS[3], KEYS[10],
  947. KEYS[7], timestamp)
  948. end
  949. -- If job has a parent we need to
  950. -- 1) remove this job id from parents dependencies
  951. -- 2) move the job Id to parent "processed" set
  952. -- 3) push the results into parent "results" list
  953. -- 4) if parent's dependencies is empty, then move parent to "wait/paused". Note it may be a different queue!.
  954. if parentId == "" and parentKey ~= "" then
  955. parentId = getJobIdFromKey(parentKey)
  956. parentQueueKey = getJobKeyPrefix(parentKey, ":" .. parentId)
  957. end
  958. if parentId ~= "" then
  959. if ARGV[5] == "completed" then
  960. local dependenciesSet = parentKey .. ":dependencies"
  961. if rcall("SREM", dependenciesSet, jobIdKey) == 1 then
  962. updateParentDepsIfNeeded(parentKey, parentQueueKey, dependenciesSet, parentId, jobIdKey, ARGV[4],
  963. timestamp)
  964. end
  965. else
  966. moveChildFromDependenciesIfNeeded(jobAttributes[2], jobIdKey, ARGV[4], timestamp)
  967. end
  968. end
  969. local attemptsMade = rcall("HINCRBY", jobIdKey, "atm", 1)
  970. -- Remove job?
  971. if maxCount ~= 0 then
  972. local targetSet = KEYS[11]
  973. -- Add to complete/failed set
  974. rcall("ZADD", targetSet, timestamp, jobId)
  975. rcall("HSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", timestamp)
  976. -- "returnvalue" / "failedReason" and "finishedOn"
  977. if ARGV[5] == "failed" then
  978. rcall("HDEL", jobIdKey, "defa")
  979. end
  980. -- Remove old jobs?
  981. if maxAge ~= nil then
  982. removeJobsByMaxAge(timestamp, maxAge, targetSet, prefix, maxLimit)
  983. end
  984. if maxCount ~= nil and maxCount > 0 then
  985. removeJobsByMaxCount(maxCount, targetSet, prefix)
  986. end
  987. else
  988. removeJobKeys(jobIdKey)
  989. if parentKey ~= "" then
  990. -- TODO: when a child is removed when finished, result or failure in parent
  991. -- must not be deleted, those value references should be deleted when the parent
  992. -- is deleted
  993. removeParentDependencyKey(jobIdKey, false, parentKey, jobAttributes[3])
  994. end
  995. end
  996. rcall("XADD", eventStreamKey, "*", "event", ARGV[5], "jobId", jobId, ARGV[3], ARGV[4], "prev", "active")
  997. if ARGV[5] == "failed" then
  998. if tonumber(attemptsMade) >= tonumber(attempts) then
  999. rcall("XADD", eventStreamKey, "*", "event", "retries-exhausted", "jobId", jobId, "attemptsMade",
  1000. attemptsMade)
  1001. end
  1002. end
  1003. -- Collect metrics
  1004. if maxMetricsSize ~= "" then
  1005. collectMetrics(KEYS[13], KEYS[13] .. ':data', maxMetricsSize, timestamp)
  1006. end
  1007. -- Try to get next job to avoid an extra roundtrip if the queue is not closing,
  1008. -- and not rate limited.
  1009. if (ARGV[6] == "1") then
  1010. local result = fetchNextJob(KEYS[1], KEYS[2], KEYS[3], eventStreamKey,
  1011. KEYS[6], KEYS[7], KEYS[8], metaKey, KEYS[10], KEYS[14], prefix,
  1012. timestamp, opts)
  1013. if result then
  1014. return result
  1015. end
  1016. end
  1017. local waitLen = rcall("LLEN", KEYS[1])
  1018. if waitLen == 0 then
  1019. local activeLen = rcall("LLEN", KEYS[2])
  1020. if activeLen == 0 then
  1021. local prioritizedLen = rcall("ZCARD", KEYS[3])
  1022. if prioritizedLen == 0 then
  1023. rcall("XADD", eventStreamKey, "*", "event", "drained")
  1024. end
  1025. end
  1026. end
  1027. return 0
  1028. else
  1029. return -1
  1030. end
  1031. `;
  1032. exports.moveToFinished = {
  1033. name: 'moveToFinished',
  1034. content,
  1035. keys: 14,
  1036. };
  1037. //# sourceMappingURL=moveToFinished-14.js.map