obliterate-2.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.obliterate = void 0;
  4. const content = `--[[
  5. Completely obliterates a queue and all of its contents
  6. This command completely destroys a queue including all of its jobs, current or past
  7. leaving no trace of its existence. Since this script needs to iterate to find all the job
  8. keys, consider that this call may be slow for very large queues.
  9. The queue needs to be "paused" or it will return an error
  10. If the queue has currently active jobs then the script by default will return error,
  11. however this behaviour can be overrided using the 'force' option.
  12. Input:
  13. KEYS[1] meta
  14. KEYS[2] base
  15. ARGV[1] count
  16. ARGV[2] force
  17. ]]
  18. local maxCount = tonumber(ARGV[1])
  19. local baseKey = KEYS[2]
  20. local rcall = redis.call
  21. -- Includes
  22. --[[
  23. Functions to remove jobs.
  24. ]]
  25. -- Includes
  26. --[[
  27. Function to remove job.
  28. ]]
  29. -- Includes
  30. --[[
  31. Function to remove deduplication key if needed
  32. when a job is being removed.
  33. ]]
  34. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  35. jobId, deduplicationId)
  36. if deduplicationId then
  37. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  38. local currentJobId = rcall('GET', deduplicationKey)
  39. if currentJobId and currentJobId == jobId then
  40. rcall("DEL", deduplicationKey)
  41. -- Also clean up any pending dedup-next data for this dedup ID
  42. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  43. return 1
  44. end
  45. end
  46. end
  47. --[[
  48. Function to remove job keys.
  49. ]]
  50. local function removeJobKeys(jobKey)
  51. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  52. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  53. end
  54. --[[
  55. Check if this job has a parent. If so we will just remove it from
  56. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  57. which requires code from "moveToFinished"
  58. ]]
  59. -- Includes
  60. --[[
  61. Function to add job in target list and add marker if needed.
  62. ]]
  63. -- Includes
  64. --[[
  65. Add marker if needed when a job is available.
  66. ]]
  67. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  68. if not isPausedOrMaxed then
  69. rcall("ZADD", markerKey, 0, "0")
  70. end
  71. end
  72. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  73. rcall(pushCmd, targetKey, jobId)
  74. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  75. end
  76. --[[
  77. Functions to destructure job key.
  78. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  79. ]]
  80. local getJobIdFromKey = function (jobKey)
  81. return string.match(jobKey, ".*:(.*)")
  82. end
  83. local getJobKeyPrefix = function (jobKey, jobId)
  84. return string.sub(jobKey, 0, #jobKey - #jobId)
  85. end
  86. --[[
  87. Function to check for the meta.paused key to decide if we are paused or not
  88. (since an empty list and !EXISTS are not really the same).
  89. ]]
  90. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  91. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  92. if queueAttributes[1] then
  93. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  94. else
  95. if queueAttributes[2] then
  96. local activeCount = rcall("LLEN", activeKey)
  97. if activeCount >= tonumber(queueAttributes[2]) then
  98. return waitKey, true, queueAttributes[3], queueAttributes[4]
  99. else
  100. return waitKey, false, queueAttributes[3], queueAttributes[4]
  101. end
  102. end
  103. end
  104. return waitKey, false, queueAttributes[3], queueAttributes[4]
  105. end
  106. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  107. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  108. parentPrefix .. "wait", parentPrefix .. "paused")
  109. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  110. if emitEvent then
  111. local parentEventStream = parentPrefix .. "events"
  112. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  113. end
  114. end
  115. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  116. if parentKey then
  117. local parentDependenciesKey = parentKey .. ":dependencies"
  118. local result = rcall("SREM", parentDependenciesKey, jobKey)
  119. if result > 0 then
  120. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  121. if pendingDependencies == 0 then
  122. local parentId = getJobIdFromKey(parentKey)
  123. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  124. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  125. if numRemovedElements == 1 then
  126. if hard then -- remove parent in same queue
  127. if parentPrefix == baseKey then
  128. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  129. removeJobKeys(parentKey)
  130. if debounceId then
  131. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  132. end
  133. else
  134. _moveParentToWait(parentPrefix, parentId)
  135. end
  136. else
  137. _moveParentToWait(parentPrefix, parentId, true)
  138. end
  139. end
  140. end
  141. return true
  142. end
  143. else
  144. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  145. local missedParentKey = parentAttributes[1]
  146. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  147. and (rcall("EXISTS", missedParentKey) == 1)) then
  148. local parentDependenciesKey = missedParentKey .. ":dependencies"
  149. local result = rcall("SREM", parentDependenciesKey, jobKey)
  150. if result > 0 then
  151. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  152. if pendingDependencies == 0 then
  153. local parentId = getJobIdFromKey(missedParentKey)
  154. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  155. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  156. if numRemovedElements == 1 then
  157. if hard then
  158. if parentPrefix == baseKey then
  159. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  160. removeJobKeys(missedParentKey)
  161. if parentAttributes[2] then
  162. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  163. end
  164. else
  165. _moveParentToWait(parentPrefix, parentId)
  166. end
  167. else
  168. _moveParentToWait(parentPrefix, parentId, true)
  169. end
  170. end
  171. end
  172. return true
  173. end
  174. end
  175. end
  176. return false
  177. end
  178. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  179. local jobKey = baseKey .. jobId
  180. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  181. if shouldRemoveDeduplicationKey then
  182. local deduplicationId = rcall("HGET", jobKey, "deid")
  183. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  184. end
  185. removeJobKeys(jobKey)
  186. end
  187. local function removeJobs(keys, hard, baseKey, max)
  188. for i, key in ipairs(keys) do
  189. removeJob(key, hard, baseKey, true --[[remove debounce key]])
  190. end
  191. return max - #keys
  192. end
  193. --[[
  194. Functions to remove jobs.
  195. ]]
  196. -- Includes
  197. --[[
  198. Function to filter out jobs to ignore from a table.
  199. ]]
  200. local function filterOutJobsToIgnore(jobs, jobsToIgnore)
  201. local filteredJobs = {}
  202. for i = 1, #jobs do
  203. if not jobsToIgnore[jobs[i]] then
  204. table.insert(filteredJobs, jobs[i])
  205. end
  206. end
  207. return filteredJobs
  208. end
  209. local function getListItems(keyName, max)
  210. return rcall('LRANGE', keyName, 0, max - 1)
  211. end
  212. local function removeListJobs(keyName, hard, baseKey, max, jobsToIgnore)
  213. local jobs = getListItems(keyName, max)
  214. if jobsToIgnore then
  215. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  216. end
  217. local count = removeJobs(jobs, hard, baseKey, max)
  218. rcall("LTRIM", keyName, #jobs, -1)
  219. return count
  220. end
  221. -- Includes
  222. --[[
  223. Function to loop in batches.
  224. Just a bit of warning, some commands as ZREM
  225. could receive a maximum of 7000 parameters per call.
  226. ]]
  227. local function batches(n, batchSize)
  228. local i = 0
  229. return function()
  230. local from = i * batchSize + 1
  231. i = i + 1
  232. if (from <= n) then
  233. local to = math.min(from + batchSize - 1, n)
  234. return from, to
  235. end
  236. end
  237. end
  238. --[[
  239. Function to get ZSet items.
  240. ]]
  241. local function getZSetItems(keyName, max)
  242. return rcall('ZRANGE', keyName, 0, max - 1)
  243. end
  244. local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore)
  245. local jobs = getZSetItems(keyName, max)
  246. if jobsToIgnore then
  247. jobs = filterOutJobsToIgnore(jobs, jobsToIgnore)
  248. end
  249. local count = removeJobs(jobs, hard, baseKey, max)
  250. if(#jobs > 0) then
  251. for from, to in batches(#jobs, 7000) do
  252. rcall("ZREM", keyName, unpack(jobs, from, to))
  253. end
  254. end
  255. return count
  256. end
  257. local function removeLockKeys(keys)
  258. for i, key in ipairs(keys) do
  259. rcall("DEL", baseKey .. key .. ':lock')
  260. end
  261. end
  262. -- 1) Check if paused, if not return with error.
  263. if rcall("HEXISTS", KEYS[1], "paused") ~= 1 then
  264. return -1 -- Error, NotPaused
  265. end
  266. -- 2) Check if there are active jobs, if there are and not "force" return error.
  267. local activeKey = baseKey .. 'active'
  268. local activeJobs = getListItems(activeKey, maxCount)
  269. if (#activeJobs > 0) then
  270. if(ARGV[2] == "") then
  271. return -2 -- Error, ExistActiveJobs
  272. end
  273. end
  274. removeLockKeys(activeJobs)
  275. maxCount = removeJobs(activeJobs, true, baseKey, maxCount)
  276. rcall("LTRIM", activeKey, #activeJobs, -1)
  277. if(maxCount <= 0) then
  278. return 1
  279. end
  280. local delayedKey = baseKey .. 'delayed'
  281. maxCount = removeZSetJobs(delayedKey, true, baseKey, maxCount)
  282. if(maxCount <= 0) then
  283. return 1
  284. end
  285. local repeatKey = baseKey .. 'repeat'
  286. local repeatJobsIds = getZSetItems(repeatKey, maxCount)
  287. for i, key in ipairs(repeatJobsIds) do
  288. local jobKey = repeatKey .. ":" .. key
  289. rcall("DEL", jobKey)
  290. end
  291. if(#repeatJobsIds > 0) then
  292. for from, to in batches(#repeatJobsIds, 7000) do
  293. rcall("ZREM", repeatKey, unpack(repeatJobsIds, from, to))
  294. end
  295. end
  296. maxCount = maxCount - #repeatJobsIds
  297. if(maxCount <= 0) then
  298. return 1
  299. end
  300. local completedKey = baseKey .. 'completed'
  301. maxCount = removeZSetJobs(completedKey, true, baseKey, maxCount)
  302. if(maxCount <= 0) then
  303. return 1
  304. end
  305. local waitKey = baseKey .. 'paused'
  306. maxCount = removeListJobs(waitKey, true, baseKey, maxCount)
  307. if(maxCount <= 0) then
  308. return 1
  309. end
  310. local prioritizedKey = baseKey .. 'prioritized'
  311. maxCount = removeZSetJobs(prioritizedKey, true, baseKey, maxCount)
  312. if(maxCount <= 0) then
  313. return 1
  314. end
  315. local failedKey = baseKey .. 'failed'
  316. maxCount = removeZSetJobs(failedKey, true, baseKey, maxCount)
  317. if(maxCount <= 0) then
  318. return 1
  319. end
  320. if(maxCount > 0) then
  321. rcall("DEL",
  322. baseKey .. 'events',
  323. baseKey .. 'delay',
  324. baseKey .. 'stalled-check',
  325. baseKey .. 'stalled',
  326. baseKey .. 'id',
  327. baseKey .. 'pc',
  328. baseKey .. 'marker',
  329. baseKey .. 'meta',
  330. baseKey .. 'metrics:completed',
  331. baseKey .. 'metrics:completed:data',
  332. baseKey .. 'metrics:failed',
  333. baseKey .. 'metrics:failed:data')
  334. return 0
  335. else
  336. return 1
  337. end
  338. `;
  339. exports.obliterate = {
  340. name: 'obliterate',
  341. content,
  342. keys: 2,
  343. };
  344. //# sourceMappingURL=obliterate-2.js.map