removeUnprocessedChildren-2.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.removeUnprocessedChildren = void 0;
  4. const content = `--[[
  5. Remove a job from all the statuses it may be in as well as all its data.
  6. In order to be able to remove a job, it cannot be active.
  7. Input:
  8. KEYS[1] jobKey
  9. KEYS[2] meta key
  10. ARGV[1] prefix
  11. ARGV[2] jobId
  12. Events:
  13. 'removed' for every children removed
  14. ]]
  15. -- Includes
  16. --[[
  17. Remove a job from all the statuses it may be in as well as all its data,
  18. including its children. Active children can be ignored.
  19. Events:
  20. 'removed'
  21. ]]
  22. local rcall = redis.call
  23. -- Includes
  24. --[[
  25. Functions to destructure job key.
  26. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  27. ]]
  28. local getJobIdFromKey = function (jobKey)
  29. return string.match(jobKey, ".*:(.*)")
  30. end
  31. local getJobKeyPrefix = function (jobKey, jobId)
  32. return string.sub(jobKey, 0, #jobKey - #jobId)
  33. end
  34. --[[
  35. Function to get max events value or set by default 10000.
  36. ]]
  37. local function getOrSetMaxEvents(metaKey)
  38. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  39. if not maxEvents then
  40. maxEvents = 10000
  41. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  42. end
  43. return maxEvents
  44. end
  45. --[[
  46. Function to check if the job belongs to a job scheduler and
  47. current delayed job matches with jobId
  48. ]]
  49. local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey)
  50. local repeatJobKey = rcall("HGET", jobKey, "rjk")
  51. if repeatJobKey then
  52. local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey)
  53. if prevMillis then
  54. local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis
  55. return jobId == currentDelayedJobId
  56. end
  57. end
  58. return false
  59. end
  60. --[[
  61. Function to remove deduplication key if needed
  62. when a job is being removed.
  63. ]]
  64. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  65. jobId, deduplicationId)
  66. if deduplicationId then
  67. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  68. local currentJobId = rcall('GET', deduplicationKey)
  69. if currentJobId and currentJobId == jobId then
  70. rcall("DEL", deduplicationKey)
  71. -- Also clean up any pending dedup-next data for this dedup ID
  72. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  73. return 1
  74. end
  75. end
  76. end
  77. --[[
  78. Function to remove from any state.
  79. returns:
  80. prev state
  81. ]]
  82. local function removeJobFromAnyState( prefix, jobId)
  83. -- We start with the ZSCORE checks, since they have O(1) complexity
  84. if rcall("ZSCORE", prefix .. "completed", jobId) then
  85. rcall("ZREM", prefix .. "completed", jobId)
  86. return "completed"
  87. elseif rcall("ZSCORE", prefix .. "waiting-children", jobId) then
  88. rcall("ZREM", prefix .. "waiting-children", jobId)
  89. return "waiting-children"
  90. elseif rcall("ZSCORE", prefix .. "delayed", jobId) then
  91. rcall("ZREM", prefix .. "delayed", jobId)
  92. return "delayed"
  93. elseif rcall("ZSCORE", prefix .. "failed", jobId) then
  94. rcall("ZREM", prefix .. "failed", jobId)
  95. return "failed"
  96. elseif rcall("ZSCORE", prefix .. "prioritized", jobId) then
  97. rcall("ZREM", prefix .. "prioritized", jobId)
  98. return "prioritized"
  99. -- We remove only 1 element from the list, since we assume they are not added multiple times
  100. elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then
  101. return "wait"
  102. elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then
  103. return "paused"
  104. elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then
  105. return "active"
  106. end
  107. return "unknown"
  108. end
  109. --[[
  110. Function to remove job keys.
  111. ]]
  112. local function removeJobKeys(jobKey)
  113. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  114. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  115. end
  116. --[[
  117. Check if this job has a parent. If so we will just remove it from
  118. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  119. which requires code from "moveToFinished"
  120. ]]
  121. -- Includes
  122. --[[
  123. Function to add job in target list and add marker if needed.
  124. ]]
  125. -- Includes
  126. --[[
  127. Add marker if needed when a job is available.
  128. ]]
  129. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  130. if not isPausedOrMaxed then
  131. rcall("ZADD", markerKey, 0, "0")
  132. end
  133. end
  134. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  135. rcall(pushCmd, targetKey, jobId)
  136. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  137. end
  138. --[[
  139. Function to check for the meta.paused key to decide if we are paused or not
  140. (since an empty list and !EXISTS are not really the same).
  141. ]]
  142. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  143. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  144. if queueAttributes[1] then
  145. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  146. else
  147. if queueAttributes[2] then
  148. local activeCount = rcall("LLEN", activeKey)
  149. if activeCount >= tonumber(queueAttributes[2]) then
  150. return waitKey, true, queueAttributes[3], queueAttributes[4]
  151. else
  152. return waitKey, false, queueAttributes[3], queueAttributes[4]
  153. end
  154. end
  155. end
  156. return waitKey, false, queueAttributes[3], queueAttributes[4]
  157. end
  158. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  159. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  160. parentPrefix .. "wait", parentPrefix .. "paused")
  161. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  162. if emitEvent then
  163. local parentEventStream = parentPrefix .. "events"
  164. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  165. end
  166. end
  167. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  168. if parentKey then
  169. local parentDependenciesKey = parentKey .. ":dependencies"
  170. local result = rcall("SREM", parentDependenciesKey, jobKey)
  171. if result > 0 then
  172. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  173. if pendingDependencies == 0 then
  174. local parentId = getJobIdFromKey(parentKey)
  175. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  176. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  177. if numRemovedElements == 1 then
  178. if hard then -- remove parent in same queue
  179. if parentPrefix == baseKey then
  180. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  181. removeJobKeys(parentKey)
  182. if debounceId then
  183. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  184. end
  185. else
  186. _moveParentToWait(parentPrefix, parentId)
  187. end
  188. else
  189. _moveParentToWait(parentPrefix, parentId, true)
  190. end
  191. end
  192. end
  193. return true
  194. end
  195. else
  196. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  197. local missedParentKey = parentAttributes[1]
  198. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  199. and (rcall("EXISTS", missedParentKey) == 1)) then
  200. local parentDependenciesKey = missedParentKey .. ":dependencies"
  201. local result = rcall("SREM", parentDependenciesKey, jobKey)
  202. if result > 0 then
  203. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  204. if pendingDependencies == 0 then
  205. local parentId = getJobIdFromKey(missedParentKey)
  206. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  207. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  208. if numRemovedElements == 1 then
  209. if hard then
  210. if parentPrefix == baseKey then
  211. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  212. removeJobKeys(missedParentKey)
  213. if parentAttributes[2] then
  214. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  215. end
  216. else
  217. _moveParentToWait(parentPrefix, parentId)
  218. end
  219. else
  220. _moveParentToWait(parentPrefix, parentId, true)
  221. end
  222. end
  223. end
  224. return true
  225. end
  226. end
  227. end
  228. return false
  229. end
  230. --[[
  231. Function to recursively check if there are no locks
  232. on the jobs to be removed.
  233. returns:
  234. boolean
  235. ]]
  236. local function isLocked( prefix, jobId, removeChildren)
  237. local jobKey = prefix .. jobId;
  238. -- Check if this job is locked
  239. local lockKey = jobKey .. ':lock'
  240. local lock = rcall("GET", lockKey)
  241. if not lock then
  242. if removeChildren == "1" then
  243. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  244. if (#dependencies > 0) then
  245. for i, childJobKey in ipairs(dependencies) do
  246. -- We need to get the jobId for this job.
  247. local childJobId = getJobIdFromKey(childJobKey)
  248. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  249. local result = isLocked( childJobPrefix, childJobId, removeChildren )
  250. if result then
  251. return true
  252. end
  253. end
  254. end
  255. end
  256. return false
  257. end
  258. return true
  259. end
  260. local removeJobChildren
  261. local removeJobWithChildren
  262. removeJobChildren = function(prefix, jobKey, options)
  263. -- Check if this job has children
  264. -- If so, we are going to try to remove the children recursively in a depth-first way
  265. -- because if some job is locked, we must exit with an error.
  266. if not options.ignoreProcessed then
  267. local processed = rcall("HGETALL", jobKey .. ":processed")
  268. if #processed > 0 then
  269. for i = 1, #processed, 2 do
  270. local childJobId = getJobIdFromKey(processed[i])
  271. local childJobPrefix = getJobKeyPrefix(processed[i], childJobId)
  272. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  273. end
  274. end
  275. local failed = rcall("HGETALL", jobKey .. ":failed")
  276. if #failed > 0 then
  277. for i = 1, #failed, 2 do
  278. local childJobId = getJobIdFromKey(failed[i])
  279. local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
  280. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  281. end
  282. end
  283. local unsuccessful = rcall("ZRANGE", jobKey .. ":unsuccessful", 0, -1)
  284. if #unsuccessful > 0 then
  285. for i = 1, #unsuccessful, 1 do
  286. local childJobId = getJobIdFromKey(unsuccessful[i])
  287. local childJobPrefix = getJobKeyPrefix(unsuccessful[i], childJobId)
  288. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  289. end
  290. end
  291. end
  292. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  293. if #dependencies > 0 then
  294. for i, childJobKey in ipairs(dependencies) do
  295. local childJobId = getJobIdFromKey(childJobKey)
  296. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  297. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  298. end
  299. end
  300. end
  301. removeJobWithChildren = function(prefix, jobId, parentKey, options)
  302. local jobKey = prefix .. jobId
  303. if options.ignoreLocked then
  304. if isLocked(prefix, jobId) then
  305. return
  306. end
  307. end
  308. -- Check if job is in the failed zset
  309. local failedSet = prefix .. "failed"
  310. if not (options.ignoreProcessed and rcall("ZSCORE", failedSet, jobId)) then
  311. removeParentDependencyKey(jobKey, false, parentKey, nil)
  312. if options.removeChildren then
  313. removeJobChildren(prefix, jobKey, options)
  314. end
  315. local prev = removeJobFromAnyState(prefix, jobId)
  316. local deduplicationId = rcall("HGET", jobKey, "deid")
  317. removeDeduplicationKeyIfNeededOnRemoval(prefix, jobId, deduplicationId)
  318. if removeJobKeys(jobKey) > 0 then
  319. local metaKey = prefix .. "meta"
  320. local maxEvents = getOrSetMaxEvents(metaKey)
  321. rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
  322. "jobId", jobId, "prev", prev)
  323. end
  324. end
  325. end
  326. local prefix = ARGV[1]
  327. local jobId = ARGV[2]
  328. local jobKey = KEYS[1]
  329. local metaKey = KEYS[2]
  330. local options = {
  331. removeChildren = "1",
  332. ignoreProcessed = true,
  333. ignoreLocked = true
  334. }
  335. removeJobChildren(prefix, jobKey, options)
  336. `;
  337. exports.removeUnprocessedChildren = {
  338. name: 'removeUnprocessedChildren',
  339. content,
  340. keys: 2,
  341. };
  342. //# sourceMappingURL=removeUnprocessedChildren-2.js.map