obliterate-2.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. const content = `--[[
  2. Completely obliterates a queue and all of its contents
  3. This command completely destroys a queue including all of its jobs, current or past
  4. leaving no trace of its existence. Since this script needs to iterate to find all the job
  5. keys, consider that this call may be slow for very large queues.
  6. The queue needs to be "paused" or it will return an error
  7. If the queue has currently active jobs then the script by default will return error,
  8. however this behaviour can be overrided using the 'force' option.
  9. Input:
  10. KEYS[1] meta
  11. KEYS[2] base
  12. ARGV[1] count
  13. ARGV[2] force
  14. ]]
  15. local maxCount = tonumber(ARGV[1])
  16. local baseKey = KEYS[2]
  17. local rcall = redis.call
  18. -- Includes
  19. --[[
  20. Functions to remove jobs.
  21. ]]
  22. -- Includes
  23. --[[
  24. Function to remove job.
  25. ]]
  26. -- Includes
  27. --[[
  28. Function to remove deduplication key if needed
  29. when a job is being removed.
  30. ]]
  31. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  32. jobId, deduplicationId)
  33. if deduplicationId then
  34. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  35. local currentJobId = rcall('GET', deduplicationKey)
  36. if currentJobId and currentJobId == jobId then
  37. rcall("DEL", deduplicationKey)
  38. -- Also clean up any pending dedup-next data for this dedup ID
  39. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  40. return 1
  41. end
  42. end
  43. end
  44. --[[
  45. Function to remove job keys.
  46. ]]
  47. local function removeJobKeys(jobKey)
  48. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  49. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  50. end
  51. --[[
  52. Check if this job has a parent. If so we will just remove it from
  53. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  54. which requires code from "moveToFinished"
  55. ]]
  56. -- Includes
  57. --[[
  58. Function to add job in target list and add marker if needed.
  59. ]]
  60. -- Includes
  61. --[[
  62. Add marker if needed when a job is available.
  63. ]]
  64. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  65. if not isPausedOrMaxed then
  66. rcall("ZADD", markerKey, 0, "0")
  67. end
  68. end
  69. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  70. rcall(pushCmd, targetKey, jobId)
  71. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  72. end
  73. --[[
  74. Functions to destructure job key.
  75. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  76. ]]
  77. local getJobIdFromKey = function (jobKey)
  78. return string.match(jobKey, ".*:(.*)")
  79. end
  80. local getJobKeyPrefix = function (jobKey, jobId)
  81. return string.sub(jobKey, 0, #jobKey - #jobId)
  82. end
  83. --[[
  84. Function to check for the meta.paused key to decide if we are paused or not
  85. (since an empty list and !EXISTS are not really the same).
  86. ]]
  87. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  88. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  89. if queueAttributes[1] then
  90. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  91. else
  92. if queueAttributes[2] then
  93. local activeCount = rcall("LLEN", activeKey)
  94. if activeCount >= tonumber(queueAttributes[2]) then
  95. return waitKey, true, queueAttributes[3], queueAttributes[4]
  96. else
  97. return waitKey, false, queueAttributes[3], queueAttributes[4]
  98. end
  99. end
  100. end
  101. return waitKey, false, queueAttributes[3], queueAttributes[4]
  102. end
  103. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  104. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  105. parentPrefix .. "wait", parentPrefix .. "paused")
  106. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  107. if emitEvent then
  108. local parentEventStream = parentPrefix .. "events"
  109. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  110. end
  111. end
  112. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  113. if parentKey then
  114. local parentDependenciesKey = parentKey .. ":dependencies"
  115. local result = rcall("SREM", parentDependenciesKey, jobKey)
  116. if result > 0 then
  117. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  118. if pendingDependencies == 0 then
  119. local parentId = getJobIdFromKey(parentKey)
  120. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  121. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  122. if numRemovedElements == 1 then
  123. if hard then -- remove parent in same queue
  124. if parentPrefix == baseKey then
  125. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  126. removeJobKeys(parentKey)
  127. if debounceId then
  128. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  129. end
  130. else
  131. _moveParentToWait(parentPrefix, parentId)
  132. end
  133. else
  134. _moveParentToWait(parentPrefix, parentId, true)
  135. end
  136. end
  137. end
  138. return true
  139. end
  140. else
  141. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  142. local missedParentKey = parentAttributes[1]
  143. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  144. and (rcall("EXISTS", missedParentKey) == 1)) then
  145. local parentDependenciesKey = missedParentKey .. ":dependencies"
  146. local result = rcall("SREM", parentDependenciesKey, jobKey)
  147. if result > 0 then
  148. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  149. if pendingDependencies == 0 then
  150. local parentId = getJobIdFromKey(missedParentKey)
  151. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  152. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  153. if numRemovedElements == 1 then
  154. if hard then
  155. if parentPrefix == baseKey then
  156. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  157. removeJobKeys(missedParentKey)
  158. if parentAttributes[2] then
  159. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  160. end
  161. else
  162. _moveParentToWait(parentPrefix, parentId)
  163. end
  164. else
  165. _moveParentToWait(parentPrefix, parentId, true)
  166. end
  167. end
  168. end
  169. return true
  170. end
  171. end
  172. end
  173. return false
  174. end
  175. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  176. local jobKey = baseKey .. jobId
  177. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  178. if shouldRemoveDeduplicationKey then
  179. local deduplicationId = rcall("HGET", jobKey, "deid")
  180. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  181. end
  182. removeJobKeys(jobKey)
  183. end
  184. local function removeJobs(keys, hard, baseKey, max)
  185. for i, key in ipairs(keys) do
  186. removeJob(key, hard, baseKey, true --[[remove debounce key]])
  187. end
  188. return max - #keys
  189. end
  190. --[[
  191. Functions to remove jobs.
  192. ]]
  193. -- Includes
  194. --[[
  195. Function to filter out jobs to ignore from a table.
  196. ]]
  197. local function filterOutJobsToIgnore(jobs, jobsToIgnore)
  198. local filteredJobs = {}
  199. for i = 1, #jobs do
  200. if not jobsToIgnore[jobs[i]] then
  201. table.insert(filteredJobs, jobs[i])
  202. end
  203. end
  204. return filteredJobs
  205. end
  206. local function getListItems(keyName, max)
  207. return rcall('LRANGE', keyName, 0, max - 1)
  208. end
  209. local function removeListJobs(keyName, hard, baseKey, max, jobsToIgnore)
  210. local jobs = getListItems(keyName, max)
  211. if jobsToIgnore then
  212. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  213. end
  214. local count = removeJobs(jobs, hard, baseKey, max)
  215. rcall("LTRIM", keyName, #jobs, -1)
  216. return count
  217. end
  218. -- Includes
  219. --[[
  220. Function to loop in batches.
  221. Just a bit of warning, some commands as ZREM
  222. could receive a maximum of 7000 parameters per call.
  223. ]]
  224. local function batches(n, batchSize)
  225. local i = 0
  226. return function()
  227. local from = i * batchSize + 1
  228. i = i + 1
  229. if (from <= n) then
  230. local to = math.min(from + batchSize - 1, n)
  231. return from, to
  232. end
  233. end
  234. end
  235. --[[
  236. Function to get ZSet items.
  237. ]]
  238. local function getZSetItems(keyName, max)
  239. return rcall('ZRANGE', keyName, 0, max - 1)
  240. end
  241. local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore)
  242. local jobs = getZSetItems(keyName, max)
  243. if jobsToIgnore then
  244. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  245. end
  246. local count = removeJobs(jobs, hard, baseKey, max)
  247. if(#jobs > 0) then
  248. for from, to in batches(#jobs, 7000) do
  249. rcall("ZREM", keyName, unpack(jobs, from, to))
  250. end
  251. end
  252. return count
  253. end
  254. local function removeLockKeys(keys)
  255. for i, key in ipairs(keys) do
  256. rcall("DEL", baseKey .. key .. ':lock')
  257. end
  258. end
  259. -- 1) Check if paused, if not return with error.
  260. if rcall("HEXISTS", KEYS[1], "paused") ~= 1 then
  261. return -1 -- Error, NotPaused
  262. end
  263. -- 2) Check if there are active jobs, if there are and not "force" return error.
  264. local activeKey = baseKey .. 'active'
  265. local activeJobs = getListItems(activeKey, maxCount)
  266. if (#activeJobs > 0) then
  267. if(ARGV[2] == "") then
  268. return -2 -- Error, ExistActiveJobs
  269. end
  270. end
  271. removeLockKeys(activeJobs)
  272. maxCount = removeJobs(activeJobs, true, baseKey, maxCount)
  273. rcall("LTRIM", activeKey, #activeJobs, -1)
  274. if(maxCount <= 0) then
  275. return 1
  276. end
  277. local delayedKey = baseKey .. 'delayed'
  278. maxCount = removeZSetJobs(delayedKey, true, baseKey, maxCount)
  279. if(maxCount <= 0) then
  280. return 1
  281. end
  282. local repeatKey = baseKey .. 'repeat'
  283. local repeatJobsIds = getZSetItems(repeatKey, maxCount)
  284. for i, key in ipairs(repeatJobsIds) do
  285. local jobKey = repeatKey .. ":" .. key
  286. rcall("DEL", jobKey)
  287. end
  288. if(#repeatJobsIds > 0) then
  289. for from, to in batches(#repeatJobsIds, 7000) do
  290. rcall("ZREM", repeatKey, unpack(repeatJobsIds, from, to))
  291. end
  292. end
  293. maxCount = maxCount - #repeatJobsIds
  294. if(maxCount <= 0) then
  295. return 1
  296. end
  297. local completedKey = baseKey .. 'completed'
  298. maxCount = removeZSetJobs(completedKey, true, baseKey, maxCount)
  299. if(maxCount <= 0) then
  300. return 1
  301. end
  302. local waitKey = baseKey .. 'paused'
  303. maxCount = removeListJobs(waitKey, true, baseKey, maxCount)
  304. if(maxCount <= 0) then
  305. return 1
  306. end
  307. local prioritizedKey = baseKey .. 'prioritized'
  308. maxCount = removeZSetJobs(prioritizedKey, true, baseKey, maxCount)
  309. if(maxCount <= 0) then
  310. return 1
  311. end
  312. local failedKey = baseKey .. 'failed'
  313. maxCount = removeZSetJobs(failedKey, true, baseKey, maxCount)
  314. if(maxCount <= 0) then
  315. return 1
  316. end
  317. if(maxCount > 0) then
  318. rcall("DEL",
  319. baseKey .. 'events',
  320. baseKey .. 'delay',
  321. baseKey .. 'stalled-check',
  322. baseKey .. 'stalled',
  323. baseKey .. 'id',
  324. baseKey .. 'pc',
  325. baseKey .. 'marker',
  326. baseKey .. 'meta',
  327. baseKey .. 'metrics:completed',
  328. baseKey .. 'metrics:completed:data',
  329. baseKey .. 'metrics:failed',
  330. baseKey .. 'metrics:failed:data')
  331. return 0
  332. else
  333. return 1
  334. end
  335. `;
  336. export const obliterate = {
  337. name: 'obliterate',
  338. content,
  339. keys: 2,
  340. };
  341. //# sourceMappingURL=obliterate-2.js.map