removeUnprocessedChildren-2.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. const content = `--[[
  2. Remove a job from all the statuses it may be in as well as all its data.
  3. In order to be able to remove a job, it cannot be active.
  4. Input:
  5. KEYS[1] jobKey
  6. KEYS[2] meta key
  7. ARGV[1] prefix
  8. ARGV[2] jobId
  9. Events:
  10. 'removed' for every children removed
  11. ]]
  12. -- Includes
  13. --[[
  14. Remove a job from all the statuses it may be in as well as all its data,
  15. including its children. Active children can be ignored.
  16. Events:
  17. 'removed'
  18. ]]
  19. local rcall = redis.call
  20. -- Includes
  21. --[[
  22. Functions to destructure job key.
  23. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  24. ]]
  25. local getJobIdFromKey = function (jobKey)
  26. return string.match(jobKey, ".*:(.*)")
  27. end
  28. local getJobKeyPrefix = function (jobKey, jobId)
  29. return string.sub(jobKey, 0, #jobKey - #jobId)
  30. end
  31. --[[
  32. Function to get max events value or set by default 10000.
  33. ]]
  34. local function getOrSetMaxEvents(metaKey)
  35. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  36. if not maxEvents then
  37. maxEvents = 10000
  38. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  39. end
  40. return maxEvents
  41. end
  42. --[[
  43. Function to check if the job belongs to a job scheduler and
  44. current delayed job matches with jobId
  45. ]]
  46. local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey)
  47. local repeatJobKey = rcall("HGET", jobKey, "rjk")
  48. if repeatJobKey then
  49. local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey)
  50. if prevMillis then
  51. local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis
  52. return jobId == currentDelayedJobId
  53. end
  54. end
  55. return false
  56. end
  57. --[[
  58. Function to remove deduplication key if needed
  59. when a job is being removed.
  60. ]]
  61. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  62. jobId, deduplicationId)
  63. if deduplicationId then
  64. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  65. local currentJobId = rcall('GET', deduplicationKey)
  66. if currentJobId and currentJobId == jobId then
  67. rcall("DEL", deduplicationKey)
  68. -- Also clean up any pending dedup-next data for this dedup ID
  69. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  70. return 1
  71. end
  72. end
  73. end
  74. --[[
  75. Function to remove from any state.
  76. returns:
  77. prev state
  78. ]]
  79. local function removeJobFromAnyState( prefix, jobId)
  80. -- We start with the ZSCORE checks, since they have O(1) complexity
  81. if rcall("ZSCORE", prefix .. "completed", jobId) then
  82. rcall("ZREM", prefix .. "completed", jobId)
  83. return "completed"
  84. elseif rcall("ZSCORE", prefix .. "waiting-children", jobId) then
  85. rcall("ZREM", prefix .. "waiting-children", jobId)
  86. return "waiting-children"
  87. elseif rcall("ZSCORE", prefix .. "delayed", jobId) then
  88. rcall("ZREM", prefix .. "delayed", jobId)
  89. return "delayed"
  90. elseif rcall("ZSCORE", prefix .. "failed", jobId) then
  91. rcall("ZREM", prefix .. "failed", jobId)
  92. return "failed"
  93. elseif rcall("ZSCORE", prefix .. "prioritized", jobId) then
  94. rcall("ZREM", prefix .. "prioritized", jobId)
  95. return "prioritized"
  96. -- We remove only 1 element from the list, since we assume they are not added multiple times
  97. elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then
  98. return "wait"
  99. elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then
  100. return "paused"
  101. elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then
  102. return "active"
  103. end
  104. return "unknown"
  105. end
  106. --[[
  107. Function to remove job keys.
  108. ]]
  109. local function removeJobKeys(jobKey)
  110. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  111. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  112. end
  113. --[[
  114. Check if this job has a parent. If so we will just remove it from
  115. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  116. which requires code from "moveToFinished"
  117. ]]
  118. -- Includes
  119. --[[
  120. Function to add job in target list and add marker if needed.
  121. ]]
  122. -- Includes
  123. --[[
  124. Add marker if needed when a job is available.
  125. ]]
  126. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  127. if not isPausedOrMaxed then
  128. rcall("ZADD", markerKey, 0, "0")
  129. end
  130. end
  131. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  132. rcall(pushCmd, targetKey, jobId)
  133. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  134. end
  135. --[[
  136. Function to check for the meta.paused key to decide if we are paused or not
  137. (since an empty list and !EXISTS are not really the same).
  138. ]]
  139. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  140. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  141. if queueAttributes[1] then
  142. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  143. else
  144. if queueAttributes[2] then
  145. local activeCount = rcall("LLEN", activeKey)
  146. if activeCount >= tonumber(queueAttributes[2]) then
  147. return waitKey, true, queueAttributes[3], queueAttributes[4]
  148. else
  149. return waitKey, false, queueAttributes[3], queueAttributes[4]
  150. end
  151. end
  152. end
  153. return waitKey, false, queueAttributes[3], queueAttributes[4]
  154. end
  155. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  156. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  157. parentPrefix .. "wait", parentPrefix .. "paused")
  158. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  159. if emitEvent then
  160. local parentEventStream = parentPrefix .. "events"
  161. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  162. end
  163. end
  164. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  165. if parentKey then
  166. local parentDependenciesKey = parentKey .. ":dependencies"
  167. local result = rcall("SREM", parentDependenciesKey, jobKey)
  168. if result > 0 then
  169. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  170. if pendingDependencies == 0 then
  171. local parentId = getJobIdFromKey(parentKey)
  172. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  173. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  174. if numRemovedElements == 1 then
  175. if hard then -- remove parent in same queue
  176. if parentPrefix == baseKey then
  177. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  178. removeJobKeys(parentKey)
  179. if debounceId then
  180. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  181. end
  182. else
  183. _moveParentToWait(parentPrefix, parentId)
  184. end
  185. else
  186. _moveParentToWait(parentPrefix, parentId, true)
  187. end
  188. end
  189. end
  190. return true
  191. end
  192. else
  193. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  194. local missedParentKey = parentAttributes[1]
  195. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  196. and (rcall("EXISTS", missedParentKey) == 1)) then
  197. local parentDependenciesKey = missedParentKey .. ":dependencies"
  198. local result = rcall("SREM", parentDependenciesKey, jobKey)
  199. if result > 0 then
  200. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  201. if pendingDependencies == 0 then
  202. local parentId = getJobIdFromKey(missedParentKey)
  203. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  204. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  205. if numRemovedElements == 1 then
  206. if hard then
  207. if parentPrefix == baseKey then
  208. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  209. removeJobKeys(missedParentKey)
  210. if parentAttributes[2] then
  211. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  212. end
  213. else
  214. _moveParentToWait(parentPrefix, parentId)
  215. end
  216. else
  217. _moveParentToWait(parentPrefix, parentId, true)
  218. end
  219. end
  220. end
  221. return true
  222. end
  223. end
  224. end
  225. return false
  226. end
  227. --[[
  228. Function to recursively check if there are no locks
  229. on the jobs to be removed.
  230. returns:
  231. boolean
  232. ]]
  233. local function isLocked( prefix, jobId, removeChildren)
  234. local jobKey = prefix .. jobId;
  235. -- Check if this job is locked
  236. local lockKey = jobKey .. ':lock'
  237. local lock = rcall("GET", lockKey)
  238. if not lock then
  239. if removeChildren == "1" then
  240. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  241. if (#dependencies > 0) then
  242. for i, childJobKey in ipairs(dependencies) do
  243. -- We need to get the jobId for this job.
  244. local childJobId = getJobIdFromKey(childJobKey)
  245. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  246. local result = isLocked( childJobPrefix, childJobId, removeChildren )
  247. if result then
  248. return true
  249. end
  250. end
  251. end
  252. end
  253. return false
  254. end
  255. return true
  256. end
  257. local removeJobChildren
  258. local removeJobWithChildren
  259. removeJobChildren = function(prefix, jobKey, options)
  260. -- Check if this job has children
  261. -- If so, we are going to try to remove the children recursively in a depth-first way
  262. -- because if some job is locked, we must exit with an error.
  263. if not options.ignoreProcessed then
  264. local processed = rcall("HGETALL", jobKey .. ":processed")
  265. if #processed > 0 then
  266. for i = 1, #processed, 2 do
  267. local childJobId = getJobIdFromKey(processed[i])
  268. local childJobPrefix = getJobKeyPrefix(processed[i], childJobId)
  269. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  270. end
  271. end
  272. local failed = rcall("HGETALL", jobKey .. ":failed")
  273. if #failed > 0 then
  274. for i = 1, #failed, 2 do
  275. local childJobId = getJobIdFromKey(failed[i])
  276. local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
  277. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  278. end
  279. end
  280. local unsuccessful = rcall("ZRANGE", jobKey .. ":unsuccessful", 0, -1)
  281. if #unsuccessful > 0 then
  282. for i = 1, #unsuccessful, 1 do
  283. local childJobId = getJobIdFromKey(unsuccessful[i])
  284. local childJobPrefix = getJobKeyPrefix(unsuccessful[i], childJobId)
  285. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  286. end
  287. end
  288. end
  289. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  290. if #dependencies > 0 then
  291. for i, childJobKey in ipairs(dependencies) do
  292. local childJobId = getJobIdFromKey(childJobKey)
  293. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  294. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  295. end
  296. end
  297. end
  298. removeJobWithChildren = function(prefix, jobId, parentKey, options)
  299. local jobKey = prefix .. jobId
  300. if options.ignoreLocked then
  301. if isLocked(prefix, jobId) then
  302. return
  303. end
  304. end
  305. -- Check if job is in the failed zset
  306. local failedSet = prefix .. "failed"
  307. if not (options.ignoreProcessed and rcall("ZSCORE", failedSet, jobId)) then
  308. removeParentDependencyKey(jobKey, false, parentKey, nil)
  309. if options.removeChildren then
  310. removeJobChildren(prefix, jobKey, options)
  311. end
  312. local prev = removeJobFromAnyState(prefix, jobId)
  313. local deduplicationId = rcall("HGET", jobKey, "deid")
  314. removeDeduplicationKeyIfNeededOnRemoval(prefix, jobId, deduplicationId)
  315. if removeJobKeys(jobKey) > 0 then
  316. local metaKey = prefix .. "meta"
  317. local maxEvents = getOrSetMaxEvents(metaKey)
  318. rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
  319. "jobId", jobId, "prev", prev)
  320. end
  321. end
  322. end
  323. local prefix = ARGV[1]
  324. local jobId = ARGV[2]
  325. local jobKey = KEYS[1]
  326. local metaKey = KEYS[2]
  327. local options = {
  328. removeChildren = "1",
  329. ignoreProcessed = true,
  330. ignoreLocked = true
  331. }
  332. removeJobChildren(prefix, jobKey, options)
  333. `;
  334. export const removeUnprocessedChildren = {
  335. name: 'removeUnprocessedChildren',
  336. content,
  337. keys: 2,
  338. };
  339. //# sourceMappingURL=removeUnprocessedChildren-2.js.map