cleanJobsInSet-3.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. const content = `--[[
  2. Remove jobs from the specific set.
  3. Input:
  4. KEYS[1] set key,
  5. KEYS[2] events stream key
  6. KEYS[3] repeat key
  7. ARGV[1] jobKey prefix
  8. ARGV[2] timestamp
  9. ARGV[3] limit the number of jobs to be removed. 0 is unlimited
  10. ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
  11. ]]
  12. local rcall = redis.call
  13. local repeatKey = KEYS[3]
  14. local rangeStart = 0
  15. local rangeEnd = -1
  16. local limit = tonumber(ARGV[3])
  17. -- If we're only deleting _n_ items, avoid retrieving all items
  18. -- for faster performance
  19. --
  20. -- Start from the tail of the list, since that's where oldest elements
  21. -- are generally added for FIFO lists
  22. if limit > 0 then
  23. rangeStart = -1 - limit + 1
  24. rangeEnd = -1
  25. end
  26. -- Includes
  27. --[[
  28. Function to clean job list.
  29. Returns jobIds and deleted count number.
  30. ]]
  31. -- Includes
  32. --[[
  33. Function to get the latest saved timestamp.
  34. ]]
  35. local function getTimestamp(jobKey, attributes)
  36. if #attributes == 1 then
  37. return rcall("HGET", jobKey, attributes[1])
  38. end
  39. local jobTs
  40. for _, ts in ipairs(rcall("HMGET", jobKey, unpack(attributes))) do
  41. if (ts) then
  42. jobTs = ts
  43. break
  44. end
  45. end
  46. return jobTs
  47. end
  48. --[[
  49. Function to check if the job belongs to a job scheduler and
  50. current delayed job matches with jobId
  51. ]]
  52. local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey)
  53. local repeatJobKey = rcall("HGET", jobKey, "rjk")
  54. if repeatJobKey then
  55. local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey)
  56. if prevMillis then
  57. local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis
  58. return jobId == currentDelayedJobId
  59. end
  60. end
  61. return false
  62. end
  63. --[[
  64. Function to remove job.
  65. ]]
  66. -- Includes
  67. --[[
  68. Function to remove deduplication key if needed
  69. when a job is being removed.
  70. ]]
  71. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  72. jobId, deduplicationId)
  73. if deduplicationId then
  74. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  75. local currentJobId = rcall('GET', deduplicationKey)
  76. if currentJobId and currentJobId == jobId then
  77. rcall("DEL", deduplicationKey)
  78. -- Also clean up any pending dedup-next data for this dedup ID
  79. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  80. return 1
  81. end
  82. end
  83. end
  84. --[[
  85. Function to remove job keys.
  86. ]]
  87. local function removeJobKeys(jobKey)
  88. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  89. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  90. end
  91. --[[
  92. Check if this job has a parent. If so we will just remove it from
  93. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  94. which requires code from "moveToFinished"
  95. ]]
  96. -- Includes
  97. --[[
  98. Function to add job in target list and add marker if needed.
  99. ]]
  100. -- Includes
  101. --[[
  102. Add marker if needed when a job is available.
  103. ]]
  104. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  105. if not isPausedOrMaxed then
  106. rcall("ZADD", markerKey, 0, "0")
  107. end
  108. end
  109. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  110. rcall(pushCmd, targetKey, jobId)
  111. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  112. end
  113. --[[
  114. Functions to destructure job key.
  115. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  116. ]]
  117. local getJobIdFromKey = function (jobKey)
  118. return string.match(jobKey, ".*:(.*)")
  119. end
  120. local getJobKeyPrefix = function (jobKey, jobId)
  121. return string.sub(jobKey, 0, #jobKey - #jobId)
  122. end
  123. --[[
  124. Function to check for the meta.paused key to decide if we are paused or not
  125. (since an empty list and !EXISTS are not really the same).
  126. ]]
  127. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  128. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  129. if queueAttributes[1] then
  130. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  131. else
  132. if queueAttributes[2] then
  133. local activeCount = rcall("LLEN", activeKey)
  134. if activeCount >= tonumber(queueAttributes[2]) then
  135. return waitKey, true, queueAttributes[3], queueAttributes[4]
  136. else
  137. return waitKey, false, queueAttributes[3], queueAttributes[4]
  138. end
  139. end
  140. end
  141. return waitKey, false, queueAttributes[3], queueAttributes[4]
  142. end
  143. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  144. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  145. parentPrefix .. "wait", parentPrefix .. "paused")
  146. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  147. if emitEvent then
  148. local parentEventStream = parentPrefix .. "events"
  149. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  150. end
  151. end
  152. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  153. if parentKey then
  154. local parentDependenciesKey = parentKey .. ":dependencies"
  155. local result = rcall("SREM", parentDependenciesKey, jobKey)
  156. if result > 0 then
  157. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  158. if pendingDependencies == 0 then
  159. local parentId = getJobIdFromKey(parentKey)
  160. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  161. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  162. if numRemovedElements == 1 then
  163. if hard then -- remove parent in same queue
  164. if parentPrefix == baseKey then
  165. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  166. removeJobKeys(parentKey)
  167. if debounceId then
  168. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  169. end
  170. else
  171. _moveParentToWait(parentPrefix, parentId)
  172. end
  173. else
  174. _moveParentToWait(parentPrefix, parentId, true)
  175. end
  176. end
  177. end
  178. return true
  179. end
  180. else
  181. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  182. local missedParentKey = parentAttributes[1]
  183. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  184. and (rcall("EXISTS", missedParentKey) == 1)) then
  185. local parentDependenciesKey = missedParentKey .. ":dependencies"
  186. local result = rcall("SREM", parentDependenciesKey, jobKey)
  187. if result > 0 then
  188. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  189. if pendingDependencies == 0 then
  190. local parentId = getJobIdFromKey(missedParentKey)
  191. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  192. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  193. if numRemovedElements == 1 then
  194. if hard then
  195. if parentPrefix == baseKey then
  196. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  197. removeJobKeys(missedParentKey)
  198. if parentAttributes[2] then
  199. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  200. end
  201. else
  202. _moveParentToWait(parentPrefix, parentId)
  203. end
  204. else
  205. _moveParentToWait(parentPrefix, parentId, true)
  206. end
  207. end
  208. end
  209. return true
  210. end
  211. end
  212. end
  213. return false
  214. end
  215. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  216. local jobKey = baseKey .. jobId
  217. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  218. if shouldRemoveDeduplicationKey then
  219. local deduplicationId = rcall("HGET", jobKey, "deid")
  220. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  221. end
  222. removeJobKeys(jobKey)
  223. end
  224. local function cleanList(listKey, jobKeyPrefix, rangeStart, rangeEnd,
  225. timestamp, isWaiting, jobSchedulersKey)
  226. local jobs = rcall("LRANGE", listKey, rangeStart, rangeEnd)
  227. local deleted = {}
  228. local deletedCount = 0
  229. local jobTS
  230. local deletionMarker = ''
  231. local jobIdsLen = #jobs
  232. for i, job in ipairs(jobs) do
  233. if limit > 0 and deletedCount >= limit then
  234. break
  235. end
  236. local jobKey = jobKeyPrefix .. job
  237. if (isWaiting or rcall("EXISTS", jobKey .. ":lock") == 0) and
  238. not isJobSchedulerJob(job, jobKey, jobSchedulersKey) then
  239. -- Find the right timestamp of the job to compare to maxTimestamp:
  240. -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
  241. -- * processedOn represents when the job was last attempted, but it doesn't get populated until
  242. -- the job is first tried
  243. -- * timestamp is the original job submission time
  244. -- Fetch all three of these (in that order) and use the first one that is set so that we'll leave jobs
  245. -- that have been active within the grace period:
  246. jobTS = getTimestamp(jobKey, {"finishedOn", "processedOn", "timestamp"})
  247. if (not jobTS or jobTS <= timestamp) then
  248. -- replace the entry with a deletion marker; the actual deletion will
  249. -- occur at the end of the script
  250. rcall("LSET", listKey, rangeEnd - jobIdsLen + i, deletionMarker)
  251. removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]])
  252. deletedCount = deletedCount + 1
  253. table.insert(deleted, job)
  254. end
  255. end
  256. end
  257. rcall("LREM", listKey, 0, deletionMarker)
  258. return {deleted, deletedCount}
  259. end
  260. --[[
  261. Function to clean job set.
  262. Returns jobIds and deleted count number.
  263. ]]
  264. -- Includes
  265. --[[
  266. Function to loop in batches.
  267. Just a bit of warning, some commands as ZREM
  268. could receive a maximum of 7000 parameters per call.
  269. ]]
  270. local function batches(n, batchSize)
  271. local i = 0
  272. return function()
  273. local from = i * batchSize + 1
  274. i = i + 1
  275. if (from <= n) then
  276. local to = math.min(from + batchSize - 1, n)
  277. return from, to
  278. end
  279. end
  280. end
  281. --[[
  282. We use ZRANGEBYSCORE to make the case where we're deleting a limited number
  283. of items in a sorted set only run a single iteration. If we simply used
  284. ZRANGE, we may take a long time traversing through jobs that are within the
  285. grace period.
  286. ]]
  287. local function getJobsInZset(zsetKey, rangeEnd, limit)
  288. if limit > 0 then
  289. return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd, "LIMIT", 0, limit)
  290. else
  291. return rcall("ZRANGEBYSCORE", zsetKey, 0, rangeEnd)
  292. end
  293. end
  294. local function cleanSet(
  295. setKey,
  296. jobKeyPrefix,
  297. rangeEnd,
  298. timestamp,
  299. limit,
  300. attributes,
  301. isFinished,
  302. jobSchedulersKey)
  303. local jobs = getJobsInZset(setKey, rangeEnd, limit)
  304. local deleted = {}
  305. local deletedCount = 0
  306. local jobTS
  307. for i, job in ipairs(jobs) do
  308. if limit > 0 and deletedCount >= limit then
  309. break
  310. end
  311. local jobKey = jobKeyPrefix .. job
  312. -- Extract a Job Scheduler Id from jobId ("repeat:job-scheduler-id:millis")
  313. -- and check if it is in the scheduled jobs
  314. if not (jobSchedulersKey and isJobSchedulerJob(job, jobKey, jobSchedulersKey)) then
  315. if isFinished then
  316. removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
  317. deletedCount = deletedCount + 1
  318. table.insert(deleted, job)
  319. else
  320. -- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
  321. jobTS = getTimestamp(jobKey, attributes)
  322. if (not jobTS or jobTS <= timestamp) then
  323. removeJob(job, true, jobKeyPrefix, true --[[remove debounce key]] )
  324. deletedCount = deletedCount + 1
  325. table.insert(deleted, job)
  326. end
  327. end
  328. end
  329. end
  330. if (#deleted > 0) then
  331. for from, to in batches(#deleted, 7000) do
  332. rcall("ZREM", setKey, unpack(deleted, from, to))
  333. end
  334. end
  335. return {deleted, deletedCount}
  336. end
  337. local result
  338. if ARGV[4] == "active" then
  339. result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], false --[[ hasFinished ]],
  340. repeatKey)
  341. elseif ARGV[4] == "delayed" then
  342. rangeEnd = "+inf"
  343. result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
  344. {"processedOn", "timestamp"}, false --[[ hasFinished ]], repeatKey)
  345. elseif ARGV[4] == "prioritized" then
  346. rangeEnd = "+inf"
  347. result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
  348. {"timestamp"}, false --[[ hasFinished ]], repeatKey)
  349. elseif ARGV[4] == "wait" or ARGV[4] == "paused" then
  350. result = cleanList(KEYS[1], ARGV[1], rangeStart, rangeEnd, ARGV[2], true --[[ hasFinished ]],
  351. repeatKey)
  352. else
  353. rangeEnd = ARGV[2]
  354. -- No need to pass repeat key as in that moment job won't be related to a job scheduler
  355. result = cleanSet(KEYS[1], ARGV[1], rangeEnd, ARGV[2], limit,
  356. {"finishedOn"}, true --[[ hasFinished ]])
  357. end
  358. rcall("XADD", KEYS[2], "*", "event", "cleaned", "count", result[2])
  359. return result[1]
  360. `;
  361. export const cleanJobsInSet = {
  362. name: 'cleanJobsInSet',
  363. content,
  364. keys: 3,
  365. };
  366. //# sourceMappingURL=cleanJobsInSet-3.js.map