drain-5.js 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. const content = `--[[
  2. Drains the queue, removes all jobs that are waiting
  3. or delayed, but not active, completed or failed
  4. Input:
  5. KEYS[1] 'wait',
  6. KEYS[2] 'paused'
  7. KEYS[3] 'delayed'
  8. KEYS[4] 'prioritized'
  9. KEYS[5] 'jobschedulers' (repeat)
  10. ARGV[1] queue key prefix
  11. ARGV[2] should clean delayed jobs
  12. ]]
  13. local rcall = redis.call
  14. local queueBaseKey = ARGV[1]
  15. --[[
  16. Functions to remove jobs.
  17. ]]
  18. -- Includes
  19. --[[
  20. Function to filter out jobs to ignore from a table.
  21. ]]
  22. local function filterOutJobsToIgnore(jobs, jobsToIgnore)
  23. local filteredJobs = {}
  24. for i = 1, #jobs do
  25. if not jobsToIgnore[jobs[i]] then
  26. table.insert(filteredJobs, jobs[i])
  27. end
  28. end
  29. return filteredJobs
  30. end
  31. --[[
  32. Functions to remove jobs.
  33. ]]
  34. -- Includes
  35. --[[
  36. Function to remove job.
  37. ]]
  38. -- Includes
  39. --[[
  40. Function to remove deduplication key if needed
  41. when a job is being removed.
  42. ]]
  43. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  44. jobId, deduplicationId)
  45. if deduplicationId then
  46. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  47. local currentJobId = rcall('GET', deduplicationKey)
  48. if currentJobId and currentJobId == jobId then
  49. rcall("DEL", deduplicationKey)
  50. -- Also clean up any pending dedup-next data for this dedup ID
  51. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  52. return 1
  53. end
  54. end
  55. end
  56. --[[
  57. Function to remove job keys.
  58. ]]
  59. local function removeJobKeys(jobKey)
  60. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  61. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  62. end
  63. --[[
  64. Check if this job has a parent. If so we will just remove it from
  65. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  66. which requires code from "moveToFinished"
  67. ]]
  68. -- Includes
  69. --[[
  70. Function to add job in target list and add marker if needed.
  71. ]]
  72. -- Includes
  73. --[[
  74. Add marker if needed when a job is available.
  75. ]]
  76. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  77. if not isPausedOrMaxed then
  78. rcall("ZADD", markerKey, 0, "0")
  79. end
  80. end
  81. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  82. rcall(pushCmd, targetKey, jobId)
  83. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  84. end
  85. --[[
  86. Functions to destructure job key.
  87. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  88. ]]
  89. local getJobIdFromKey = function (jobKey)
  90. return string.match(jobKey, ".*:(.*)")
  91. end
  92. local getJobKeyPrefix = function (jobKey, jobId)
  93. return string.sub(jobKey, 0, #jobKey - #jobId)
  94. end
  95. --[[
  96. Function to check for the meta.paused key to decide if we are paused or not
  97. (since an empty list and !EXISTS are not really the same).
  98. ]]
  99. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  100. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  101. if queueAttributes[1] then
  102. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  103. else
  104. if queueAttributes[2] then
  105. local activeCount = rcall("LLEN", activeKey)
  106. if activeCount >= tonumber(queueAttributes[2]) then
  107. return waitKey, true, queueAttributes[3], queueAttributes[4]
  108. else
  109. return waitKey, false, queueAttributes[3], queueAttributes[4]
  110. end
  111. end
  112. end
  113. return waitKey, false, queueAttributes[3], queueAttributes[4]
  114. end
  115. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  116. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  117. parentPrefix .. "wait", parentPrefix .. "paused")
  118. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  119. if emitEvent then
  120. local parentEventStream = parentPrefix .. "events"
  121. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  122. end
  123. end
  124. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  125. if parentKey then
  126. local parentDependenciesKey = parentKey .. ":dependencies"
  127. local result = rcall("SREM", parentDependenciesKey, jobKey)
  128. if result > 0 then
  129. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  130. if pendingDependencies == 0 then
  131. local parentId = getJobIdFromKey(parentKey)
  132. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  133. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  134. if numRemovedElements == 1 then
  135. if hard then -- remove parent in same queue
  136. if parentPrefix == baseKey then
  137. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  138. removeJobKeys(parentKey)
  139. if debounceId then
  140. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  141. end
  142. else
  143. _moveParentToWait(parentPrefix, parentId)
  144. end
  145. else
  146. _moveParentToWait(parentPrefix, parentId, true)
  147. end
  148. end
  149. end
  150. return true
  151. end
  152. else
  153. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  154. local missedParentKey = parentAttributes[1]
  155. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  156. and (rcall("EXISTS", missedParentKey) == 1)) then
  157. local parentDependenciesKey = missedParentKey .. ":dependencies"
  158. local result = rcall("SREM", parentDependenciesKey, jobKey)
  159. if result > 0 then
  160. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  161. if pendingDependencies == 0 then
  162. local parentId = getJobIdFromKey(missedParentKey)
  163. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  164. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  165. if numRemovedElements == 1 then
  166. if hard then
  167. if parentPrefix == baseKey then
  168. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  169. removeJobKeys(missedParentKey)
  170. if parentAttributes[2] then
  171. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  172. end
  173. else
  174. _moveParentToWait(parentPrefix, parentId)
  175. end
  176. else
  177. _moveParentToWait(parentPrefix, parentId, true)
  178. end
  179. end
  180. end
  181. return true
  182. end
  183. end
  184. end
  185. return false
  186. end
  187. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  188. local jobKey = baseKey .. jobId
  189. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  190. if shouldRemoveDeduplicationKey then
  191. local deduplicationId = rcall("HGET", jobKey, "deid")
  192. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  193. end
  194. removeJobKeys(jobKey)
  195. end
  196. local function removeJobs(keys, hard, baseKey, max)
  197. for i, key in ipairs(keys) do
  198. removeJob(key, hard, baseKey, true --[[remove debounce key]])
  199. end
  200. return max - #keys
  201. end
  202. local function getListItems(keyName, max)
  203. return rcall('LRANGE', keyName, 0, max - 1)
  204. end
  205. local function removeListJobs(keyName, hard, baseKey, max, jobsToIgnore)
  206. local jobs = getListItems(keyName, max)
  207. if jobsToIgnore then
  208. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  209. end
  210. local count = removeJobs(jobs, hard, baseKey, max)
  211. rcall("LTRIM", keyName, #jobs, -1)
  212. return count
  213. end
  214. -- Includes
  215. --[[
  216. Function to loop in batches.
  217. Just a bit of warning, some commands as ZREM
  218. could receive a maximum of 7000 parameters per call.
  219. ]]
  220. local function batches(n, batchSize)
  221. local i = 0
  222. return function()
  223. local from = i * batchSize + 1
  224. i = i + 1
  225. if (from <= n) then
  226. local to = math.min(from + batchSize - 1, n)
  227. return from, to
  228. end
  229. end
  230. end
  231. --[[
  232. Function to get ZSet items.
  233. ]]
  234. local function getZSetItems(keyName, max)
  235. return rcall('ZRANGE', keyName, 0, max - 1)
  236. end
  237. local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore)
  238. local jobs = getZSetItems(keyName, max)
  239. if jobsToIgnore then
  240. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  241. end
  242. local count = removeJobs(jobs, hard, baseKey, max)
  243. if(#jobs > 0) then
  244. for from, to in batches(#jobs, 7000) do
  245. rcall("ZREM", keyName, unpack(jobs, from, to))
  246. end
  247. end
  248. return count
  249. end
  250. -- We must not remove delayed jobs if they are associated to a job scheduler.
  251. local scheduledJobs = {}
  252. local jobSchedulers = rcall("ZRANGE", KEYS[5], 0, -1, "WITHSCORES")
  253. -- For every job scheduler, get the current delayed job id.
  254. for i = 1, #jobSchedulers, 2 do
  255. local jobSchedulerId = jobSchedulers[i]
  256. local jobSchedulerMillis = jobSchedulers[i + 1]
  257. local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. jobSchedulerMillis
  258. scheduledJobs[delayedJobId] = true
  259. end
  260. removeListJobs(KEYS[1], true, queueBaseKey, 0, scheduledJobs) -- wait
  261. removeListJobs(KEYS[2], true, queueBaseKey, 0, scheduledJobs) -- paused
  262. if ARGV[2] == "1" then
  263. removeZSetJobs(KEYS[3], true, queueBaseKey, 0, scheduledJobs) -- delayed
  264. end
  265. removeZSetJobs(KEYS[4], true, queueBaseKey, 0, scheduledJobs) -- prioritized
  266. `;
  267. export const drain = {
  268. name: 'drain',
  269. content,
  270. keys: 5,
  271. };
  272. //# sourceMappingURL=drain-5.js.map