cleanJobsInSet-3.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.cleanJobsInSet = void 0;
  4. const content = `--[[
  5. Remove jobs from the specific set.
  6. Input:
  7. KEYS[1] set key,
  8. KEYS[2] events stream key
  9. KEYS[3] repeat key
  10. ARGV[1] jobKey prefix
  11. ARGV[2] timestamp
  12. ARGV[3] limit the number of jobs to be removed. 0 is unlimited
  13. ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
  14. ]]
  15. local rcall = redis.call
  16. local repeatKey = KEYS[3]
  17. local rangeStart = 0
  18. local rangeEnd = -1
  19. local limit = tonumber(ARGV[3])
  20. -- If we're only deleting _n_ items, avoid retrieving all items
  21. -- for faster performance
  22. --
  23. -- Start from the tail of the list, since that's where oldest elements
  24. -- are generally added for FIFO lists
  25. if limit > 0 then
  26. rangeStart = -1 - limit + 1
  27. rangeEnd = -1
  28. end
  29. -- Includes
  30. --[[
  31. Function to clean job list.
  32. Returns jobIds and deleted count number.
  33. ]]
  34. -- Includes
  35. --[[
  36. Function to get the latest saved timestamp.
  37. ]]
  38. local function getTimestamp(jobKey, attributes)
  39. if #attributes == 1 then
  40. return rcall("HGET", jobKey, attributes[1])
  41. end
  42. local jobTs
  43. for _, ts in ipairs(rcall("HMGET", jobKey, unpack(attributes))) do
  44. if (ts) then
  45. jobTs = ts
  46. break
  47. end
  48. end
  49. return jobTs
  50. end
  51. --[[
  52. Function to check if the job belongs to a job scheduler and
  53. current delayed job matches with jobId
  54. ]]
  55. local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey)
  56. local repeatJobKey = rcall("HGET", jobKey, "rjk")
  57. if repeatJobKey then
  58. local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey)
  59. if prevMillis then
  60. local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis
  61. return jobId == currentDelayedJobId
  62. end
  63. end
  64. return false
  65. end
  66. --[[
  67. Function to remove job.
  68. ]]
  69. -- Includes
  70. --[[
  71. Function to remove deduplication key if needed
  72. when a job is being removed.
  73. ]]
  74. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  75. jobId, deduplicationId)
  76. if deduplicationId then
  77. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  78. local currentJobId = rcall('GET', deduplicationKey)
  79. if currentJobId and currentJobId == jobId then
  80. rcall("DEL", deduplicationKey)
  81. -- Also clean up any pending dedup-next data for this dedup ID
  82. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  83. return 1
  84. end
  85. end
  86. end
  87. --[[
  88. Function to remove job keys.
  89. ]]
  90. local function removeJobKeys(jobKey)
  91. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  92. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  93. end
  94. --[[
  95. Check if this job has a parent. If so we will just remove it from
  96. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  97. which requires code from "moveToFinished"
  98. ]]
  99. -- Includes
  100. --[[
  101. Function to add job in target list and add marker if needed.
  102. ]]
  103. -- Includes
  104. --[[
  105. Add marker if needed when a job is available.
  106. ]]
  107. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  108. if not isPausedOrMaxed then
  109. rcall("ZADD", markerKey, 0, "0")
  110. end
  111. end
  112. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  113. rcall(pushCmd, targetKey, jobId)
  114. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  115. end
  116. --[[
  117. Functions to destructure job key.
  118. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  119. ]]
  120. local getJobIdFromKey = function (jobKey)
  121. return string.match(jobKey, ".*:(.*)")
  122. end
  123. local getJobKeyPrefix = function (jobKey, jobId)
  124. return string.sub(jobKey, 0, #jobKey - #jobId)
  125. end
  126. --[[
  127. Function to check for the meta.paused key to decide if we are paused or not
  128. (since an empty list and !EXISTS are not really the same).
  129. ]]
  130. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  131. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  132. if queueAttributes[1] then
  133. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  134. else
  135. if queueAttributes[2] then
  136. local activeCount = rcall("LLEN", activeKey)
  137. if activeCount >= tonumber(queueAttributes[2]) then
  138. return waitKey, true, queueAttributes[3], queueAttributes[4]
  139. else
  140. return waitKey, false, queueAttributes[3], queueAttributes[4]
  141. end
  142. end
  143. end
  144. return waitKey, false, queueAttributes[3], queueAttributes[4]
  145. end
  146. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  147. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  148. parentPrefix .. "wait", parentPrefix .. "paused")
  149. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  150. if emitEvent then
  151. local parentEventStream = parentPrefix .. "events"
  152. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  153. end
  154. end
  155. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  156. if parentKey then
  157. local parentDependenciesKey = parentKey .. ":dependencies"
  158. local result = rcall("SREM", parentDependenciesKey, jobKey)
  159. if result > 0 then
  160. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  161. if pendingDependencies == 0 then
  162. local parentId = getJobIdFromKey(parentKey)
  163. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  164. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  165. if numRemovedElements == 1 then
  166. if hard then -- remove parent in same queue
  167. if parentPrefix == baseKey then
  168. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  169. removeJobKeys(parentKey)
  170. if debounceId then
  171. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  172. end
  173. else
  174. _moveParentToWait(parentPrefix, parentId)
  175. end
  176. else
  177. _moveParentToWait(parentPrefix, parentId, true)
  178. end
  179. end
  180. end
  181. return true
  182. end
  183. else
  184. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  185. local missedParentKey = parentAttributes[1]
  186. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  187. and (rcall("EXISTS", missedParentKey) == 1)) then
  188. local parentDependenciesKey = missedParentKey .. ":dependencies"
  189. local result = rcall("SREM", parentDependenciesKey, jobKey)
  190. if result > 0 then
  191. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  192. if pendingDependencies == 0 then
  193. local parentId = getJobIdFromKey(missedParentKey)
  194. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  195. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  196. if numRemovedElements == 1 then
  197. if hard then
  198. if parentPrefix == baseKey then
  199. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  200. removeJobKeys(missedParentKey)
  201. if parentAttributes[2] then
  202. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  203. end
  204. else
  205. _moveParentToWait(parentPrefix, parentId)
  206. end
  207. else
  208. _moveParentToWait(parentPrefix, parentId, true)
  209. end
  210. end
  211. end
  212. return true
  213. end
  214. end
  215. end
  216. return false
  217. end
  218. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  219. local jobKey = baseKey .. jobId
  220. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  221. if shouldRemoveDeduplicationKey then
  222. local deduplicationId = rcall("HGET", jobKey, "deid")
  223. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  224. end
  225. removeJobKeys(jobKey)
  226. end
  227. local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
  228. timestamp, isWaiting, jobSchedulersKey)
  229. local jobs = rcall("LRANGE", listKey, rangeStart, rangeEnd)
  230. local deleted = {}
  231. local deletedCount = 0
  232. local jobTS
  233. local deletionMarker = ''
  234. local jobIdsLen = #jobs
  235. for i, job in ipairs(jobs) do
  236. if limit > 0 and deletedCount >= limit then
  237. break
  238. end
  239. local jobKey = jobKeyPrefix .. job
  240. if (isWaiting or rcall("EXISTS", jobKey .. ":lock") == 0) and
  241. not isJobSchedulerJob(job, jobKey, jobSchedulersKey) then
  242. -- Find the right timestamp of the job to compare to maxTimestamp:
  243. -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
  244. -- * processedOn represents when the job was last attempted, but it doesn't get populated until
  245. -- the job is first tried
  246. -- * timestamp is the original job submission time
  247. -- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs
  248. -- that have been active within the grace period:
  249. jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"})
  250. if (not jobTS or jobTS <= timestamp) then
  251. -- replace the entry with a deletion marker; the actual deletion will
  252. -- occur at the end of the script
  253. rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
  254. removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
  255. deletedCount = deletedCount + 1
  256. table.insert(deleted, job)
  257. end
  258. end
  259. end
  260. rcall("LREM", listKey, 0, deletionMarker)
  261. return {deleted, deletedCount}
  262. end
  263. --[[
  264. Function to clean job set.
  265. Returns jobIds and deleted count number.
  266. ]]
  267. -- Includes
  268. --[[
  269. Function to loop in batches.
  270. Just a bit of warning, some commands as ZREM
  271. could receive a maximum of 7000 parameters per call.
  272. ]]
  273. local function batches(n, batchSize)
  274. local i = 0
  275. return function()
  276. local from = i * batchSize + 1
  277. i = i + 1
  278. if (from <= n) then
  279. local to = math.min(from + batchSize - 1, n)
  280. return from, to
  281. end
  282. end
  283. end
  284. --[[
  285. We use ZRANGEBYSCORE to make the case where we're deleting a limited number
  286. of items in a sorted set only run a single iteration. If we simply used
  287. ZRANGE, we may take a long time traversing through jobs that are within the
  288. grace period.
  289. ]]
  290. local function getJobsInZset(zsetKey, rangeEnd, limit)
  291. if limit > 0 then
  292. return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit)
  293. else
  294. return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd)
  295. end
  296. end
  297. local function cleanSet(
  298. setKey,
  299. jobKeyPrefix,
  300. rangeEnd,
  301. timestamp,
  302. limit,
  303. attributes,
  304. isFinished,
  305. jobSchedulersKey)
  306. local jobs = getJobsInZset(setKey, rangeEnd, limit)
  307. local deleted = {}
  308. local deletedCount = 0
  309. local jobTS
  310. for i, job in ipairs(jobs) do
  311. if limit > 0 and deletedCount >= limit then
  312. break
  313. end
  314. local jobKey = jobKeyPrefix .. job
  315. -- Extract a Job Scheduler Id from jobId ("repeat:job-scheduler-id:millis")
  316. -- and check if it is in the scheduled jobs
  317. if not (jobSchedulersKey and isJobSchedulerJob(job, jobKey, jobSchedulersKey)) then
  318. if isFinished then
  319. removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
  320. deletedCount = deletedCount + 1
  321. table.insert(deleted, job)
  322. else
  323. -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
  324. jobTS = getTimestamp(jobKey, attributes)
  325. if (not jobTS or jobTS <= timestamp) then
  326. removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
  327. deletedCount = deletedCount + 1
  328. table.insert(deleted, job)
  329. end
  330. end
  331. end
  332. end
  333. if (#deleted > 0) then
  334. for from, to in batches(#deleted, 7000) do
  335. rcall("ZREM", setKey, unpack(deleted, from, to))
  336. end
  337. end
  338. return {deleted, deletedCount}
  339. end
  340. local result
  341. if ARGV[4] == "active" then
  342. result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false --[[ hasFinished ]],
  343. repeatKey)
  344. elseif ARGV[4] == "delayed" then
  345. rangeEnd = "+inf"
  346. result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
  347. {"processedOn", "timestamp"}, false --[[ hasFinished ]], repeatKey)
  348. elseif ARGV[4] == "prioritized" then
  349. rangeEnd = "+inf"
  350. result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
  351. {"timestamp"}, false --[[ hasFinished ]], repeatKey)
  352. elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
  353. result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true --[[ hasFinished ]],
  354. repeatKey)
  355. else
  356. rangeEnd = ARGV[2]
  357. -- No need to pass repeat key as in that moment job won't be related to a job scheduler
  358. result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
  359. {"finishedOn"}, true --[[ hasFinished ]])
  360. end
  361. rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
  362. return result[1]
  363. `;
  364. exports.cleanJobsInSet = {
  365. name: 'cleanJobsInSet',
  366. content,
  367. keys: 3,
  368. };
  369. //# sourceMappingURL=cleanJobsInSet-3.js.map