drain-5.js 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.drain = void 0;
  4. const content = `--[[
  5. Drains the queue, removes all jobs that are waiting
  6. or delayed, but not active, completed or failed
  7. Input:
  8. KEYS[1] 'wait',
  9. KEYS[2] 'paused'
  10. KEYS[3] 'delayed'
  11. KEYS[4] 'prioritized'
  12. KEYS[5] 'jobschedulers' (repeat)
  13. ARGV[1] queue key prefix
  14. ARGV[2] should clean delayed jobs
  15. ]]
  16. local rcall = redis.call
  17. local queueBaseKey = ARGV[1]
  18. --[[
  19. Functions to remove jobs.
  20. ]]
  21. -- Includes
  22. --[[
  23. Function to filter out jobs to ignore from a table.
  24. ]]
  25. local function filterOutJobsToIgnore(jobs, jobsToIgnore)
  26. local filteredJobs = {}
  27. for i = 1, #jobs do
  28. if not jobsToIgnore[jobs[i]] then
  29. table.insert(filteredJobs, jobs[i])
  30. end
  31. end
  32. return filteredJobs
  33. end
  34. --[[
  35. Functions to remove jobs.
  36. ]]
  37. -- Includes
  38. --[[
  39. Function to remove job.
  40. ]]
  41. -- Includes
  42. --[[
  43. Function to remove deduplication key if needed
  44. when a job is being removed.
  45. ]]
  46. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  47. jobId, deduplicationId)
  48. if deduplicationId then
  49. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  50. local currentJobId = rcall('GET', deduplicationKey)
  51. if currentJobId and currentJobId == jobId then
  52. rcall("DEL", deduplicationKey)
  53. -- Also clean up any pending dedup-next data for this dedup ID
  54. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  55. return 1
  56. end
  57. end
  58. end
  59. --[[
  60. Function to remove job keys.
  61. ]]
  62. local function removeJobKeys(jobKey)
  63. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  64. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  65. end
  66. --[[
  67. Check if this job has a parent. If so we will just remove it from
  68. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  69. which requires code from "moveToFinished"
  70. ]]
  71. -- Includes
  72. --[[
  73. Function to add job in target list and add marker if needed.
  74. ]]
  75. -- Includes
  76. --[[
  77. Add marker if needed when a job is available.
  78. ]]
  79. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  80. if not isPausedOrMaxed then
  81. rcall("ZADD", markerKey, 0, "0")
  82. end
  83. end
  84. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  85. rcall(pushCmd, targetKey, jobId)
  86. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  87. end
  88. --[[
  89. Functions to destructure job key.
  90. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  91. ]]
  92. local getJobIdFromKey = function (jobKey)
  93. return string.match(jobKey, ".*:(.*)")
  94. end
  95. local getJobKeyPrefix = function (jobKey, jobId)
  96. return string.sub(jobKey, 0, #jobKey - #jobId)
  97. end
  98. --[[
  99. Function to check for the meta.paused key to decide if we are paused or not
  100. (since an empty list and !EXISTS are not really the same).
  101. ]]
  102. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  103. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  104. if queueAttributes[1] then
  105. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  106. else
  107. if queueAttributes[2] then
  108. local activeCount = rcall("LLEN", activeKey)
  109. if activeCount >= tonumber(queueAttributes[2]) then
  110. return waitKey, true, queueAttributes[3], queueAttributes[4]
  111. else
  112. return waitKey, false, queueAttributes[3], queueAttributes[4]
  113. end
  114. end
  115. end
  116. return waitKey, false, queueAttributes[3], queueAttributes[4]
  117. end
  118. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  119. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  120. parentPrefix .. "wait", parentPrefix .. "paused")
  121. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  122. if emitEvent then
  123. local parentEventStream = parentPrefix .. "events"
  124. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  125. end
  126. end
  127. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  128. if parentKey then
  129. local parentDependenciesKey = parentKey .. ":dependencies"
  130. local result = rcall("SREM", parentDependenciesKey, jobKey)
  131. if result > 0 then
  132. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  133. if pendingDependencies == 0 then
  134. local parentId = getJobIdFromKey(parentKey)
  135. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  136. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  137. if numRemovedElements == 1 then
  138. if hard then -- remove parent in same queue
  139. if parentPrefix == baseKey then
  140. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  141. removeJobKeys(parentKey)
  142. if debounceId then
  143. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  144. end
  145. else
  146. _moveParentToWait(parentPrefix, parentId)
  147. end
  148. else
  149. _moveParentToWait(parentPrefix, parentId, true)
  150. end
  151. end
  152. end
  153. return true
  154. end
  155. else
  156. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  157. local missedParentKey = parentAttributes[1]
  158. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  159. and (rcall("EXISTS", missedParentKey) == 1)) then
  160. local parentDependenciesKey = missedParentKey .. ":dependencies"
  161. local result = rcall("SREM", parentDependenciesKey, jobKey)
  162. if result > 0 then
  163. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  164. if pendingDependencies == 0 then
  165. local parentId = getJobIdFromKey(missedParentKey)
  166. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  167. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  168. if numRemovedElements == 1 then
  169. if hard then
  170. if parentPrefix == baseKey then
  171. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  172. removeJobKeys(missedParentKey)
  173. if parentAttributes[2] then
  174. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  175. end
  176. else
  177. _moveParentToWait(parentPrefix, parentId)
  178. end
  179. else
  180. _moveParentToWait(parentPrefix, parentId, true)
  181. end
  182. end
  183. end
  184. return true
  185. end
  186. end
  187. end
  188. return false
  189. end
  190. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  191. local jobKey = baseKey .. jobId
  192. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  193. if shouldRemoveDeduplicationKey then
  194. local deduplicationId = rcall("HGET", jobKey, "deid")
  195. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  196. end
  197. removeJobKeys(jobKey)
  198. end
  199. local function removeJobs(keys, hard, baseKey, max)
  200. for i, key in ipairs(keys) do
  201. removeJob(key, hard, baseKey, true --[[remove debounce key]])
  202. end
  203. return max - #keys
  204. end
  205. local function getListItems(keyName, max)
  206. return rcall('LRANGE', keyName, 0, max - 1)
  207. end
  208. local function removeListJobs(keyName, hard, baseKey, max, jobsToIgnore)
  209. local jobs = getListItems(keyName, max)
  210. if jobsToIgnore then
  211. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  212. end
  213. local count = removeJobs(jobs, hard, baseKey, max)
  214. rcall("LTRIM", keyName, #jobs, -1)
  215. return count
  216. end
  217. -- Includes
  218. --[[
  219. Function to loop in batches.
  220. Just a bit of warning, some commands as ZREM
  221. could receive a maximum of 7000 parameters per call.
  222. ]]
  223. local function batches(n, batchSize)
  224. local i = 0
  225. return function()
  226. local from = i * batchSize + 1
  227. i = i + 1
  228. if (from <= n) then
  229. local to = math.min(from + batchSize - 1, n)
  230. return from, to
  231. end
  232. end
  233. end
  234. --[[
  235. Function to get ZSet items.
  236. ]]
  237. local function getZSetItems(keyName, max)
  238. return rcall('ZRANGE', keyName, 0, max - 1)
  239. end
  240. local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore)
  241. local jobs = getZSetItems(keyName, max)
  242. if jobsToIgnore then
  243. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  244. end
  245. local count = removeJobs(jobs, hard, baseKey, max)
  246. if(#jobs > 0) then
  247. for from, to in batches(#jobs, 7000) do
  248. rcall("ZREM", keyName, unpack(jobs, from, to))
  249. end
  250. end
  251. return count
  252. end
  253. -- We must not remove delayed jobs if they are associated to a job scheduler.
  254. local scheduledJobs = {}
  255. local jobSchedulers = rcall("ZRANGE", KEYS[5], 0, -1, "WITHSCORES")
  256. -- For every job scheduler, get the current delayed job id.
  257. for i = 1, #jobSchedulers, 2 do
  258. local jobSchedulerId = jobSchedulers[i]
  259. local jobSchedulerMillis = jobSchedulers[i + 1]
  260. local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. jobSchedulerMillis
  261. scheduledJobs[delayedJobId] = true
  262. end
  263. removeListJobs(KEYS[1], true, queueBaseKey, 0, scheduledJobs) -- wait
  264. removeListJobs(KEYS[2], true, queueBaseKey, 0, scheduledJobs) -- paused
  265. if ARGV[2] == "1" then
  266. removeZSetJobs(KEYS[3], true, queueBaseKey, 0, scheduledJobs) -- delayed
  267. end
  268. removeZSetJobs(KEYS[4], true, queueBaseKey, 0, scheduledJobs) -- prioritized
  269. `;
  270. exports.drain = {
  271. name: 'drain',
  272. content,
  273. keys: 5,
  274. };
  275. //# sourceMappingURL=drain-5.js.map