removeJob-2.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. const content = `--[[
  2. Remove a job from all the statuses it may be in as well as all its data.
  3. In order to be able to remove a job, it cannot be active.
  4. Input:
  5. KEYS[1] jobKey
  6. KEYS[2] repeat key
  7. ARGV[1] jobId
  8. ARGV[2] remove children
  9. ARGV[3] queue prefix
  10. Events:
  11. 'removed'
  12. ]]
  13. local rcall = redis.call
  14. -- Includes
  15. --[[
  16. Function to check if the job belongs to a job scheduler and
  17. current delayed job matches with jobId
  18. ]]
  19. local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey)
  20. local repeatJobKey = rcall("HGET", jobKey, "rjk")
  21. if repeatJobKey then
  22. local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey)
  23. if prevMillis then
  24. local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis
  25. return jobId == currentDelayedJobId
  26. end
  27. end
  28. return false
  29. end
  30. --[[
  31. Function to recursively check if there are no locks
  32. on the jobs to be removed.
  33. returns:
  34. boolean
  35. ]]
  36. --[[
  37. Functions to destructure job key.
  38. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  39. ]]
  40. local getJobIdFromKey = function (jobKey)
  41. return string.match(jobKey, ".*:(.*)")
  42. end
  43. local getJobKeyPrefix = function (jobKey, jobId)
  44. return string.sub(jobKey, 0, #jobKey - #jobId)
  45. end
  46. local function isLocked( prefix, jobId, removeChildren)
  47. local jobKey = prefix .. jobId;
  48. -- Check if this job is locked
  49. local lockKey = jobKey .. ':lock'
  50. local lock = rcall("GET", lockKey)
  51. if not lock then
  52. if removeChildren == "1" then
  53. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  54. if (#dependencies > 0) then
  55. for i, childJobKey in ipairs(dependencies) do
  56. -- We need to get the jobId for this job.
  57. local childJobId = getJobIdFromKey(childJobKey)
  58. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  59. local result = isLocked( childJobPrefix, childJobId, removeChildren )
  60. if result then
  61. return true
  62. end
  63. end
  64. end
  65. end
  66. return false
  67. end
  68. return true
  69. end
  70. --[[
  71. Remove a job from all the statuses it may be in as well as all its data,
  72. including its children. Active children can be ignored.
  73. Events:
  74. 'removed'
  75. ]]
  76. local rcall = redis.call
  77. -- Includes
  78. --[[
  79. Function to get max events value or set by default 10000.
  80. ]]
  81. local function getOrSetMaxEvents(metaKey)
  82. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  83. if not maxEvents then
  84. maxEvents = 10000
  85. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  86. end
  87. return maxEvents
  88. end
  89. --[[
  90. Function to remove deduplication key if needed
  91. when a job is being removed.
  92. ]]
  93. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  94. jobId, deduplicationId)
  95. if deduplicationId then
  96. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  97. local currentJobId = rcall('GET', deduplicationKey)
  98. if currentJobId and currentJobId == jobId then
  99. rcall("DEL", deduplicationKey)
  100. -- Also clean up any pending dedup-next data for this dedup ID
  101. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  102. return 1
  103. end
  104. end
  105. end
  106. --[[
  107. Function to remove from any state.
  108. returns:
  109. prev state
  110. ]]
  111. local function removeJobFromAnyState( prefix, jobId)
  112. -- We start with the ZSCORE checks, since they have O(1) complexity
  113. if rcall("ZSCORE", prefix .. "completed", jobId) then
  114. rcall("ZREM", prefix .. "completed", jobId)
  115. return "completed"
  116. elseif rcall("ZSCORE", prefix .. "waiting-children", jobId) then
  117. rcall("ZREM", prefix .. "waiting-children", jobId)
  118. return "waiting-children"
  119. elseif rcall("ZSCORE", prefix .. "delayed", jobId) then
  120. rcall("ZREM", prefix .. "delayed", jobId)
  121. return "delayed"
  122. elseif rcall("ZSCORE", prefix .. "failed", jobId) then
  123. rcall("ZREM", prefix .. "failed", jobId)
  124. return "failed"
  125. elseif rcall("ZSCORE", prefix .. "prioritized", jobId) then
  126. rcall("ZREM", prefix .. "prioritized", jobId)
  127. return "prioritized"
  128. -- We remove only 1 element from the list, since we assume they are not added multiple times
  129. elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then
  130. return "wait"
  131. elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then
  132. return "paused"
  133. elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then
  134. return "active"
  135. end
  136. return "unknown"
  137. end
  138. --[[
  139. Function to remove job keys.
  140. ]]
  141. local function removeJobKeys(jobKey)
  142. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  143. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  144. end
  145. --[[
  146. Check if this job has a parent. If so we will just remove it from
  147. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  148. which requires code from "moveToFinished"
  149. ]]
  150. -- Includes
  151. --[[
  152. Function to add job in target list and add marker if needed.
  153. ]]
  154. -- Includes
  155. --[[
  156. Add marker if needed when a job is available.
  157. ]]
  158. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  159. if not isPausedOrMaxed then
  160. rcall("ZADD", markerKey, 0, "0")
  161. end
  162. end
  163. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  164. rcall(pushCmd, targetKey, jobId)
  165. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  166. end
  167. --[[
  168. Function to check for the meta.paused key to decide if we are paused or not
  169. (since an empty list and !EXISTS are not really the same).
  170. ]]
  171. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  172. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  173. if queueAttributes[1] then
  174. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  175. else
  176. if queueAttributes[2] then
  177. local activeCount = rcall("LLEN", activeKey)
  178. if activeCount >= tonumber(queueAttributes[2]) then
  179. return waitKey, true, queueAttributes[3], queueAttributes[4]
  180. else
  181. return waitKey, false, queueAttributes[3], queueAttributes[4]
  182. end
  183. end
  184. end
  185. return waitKey, false, queueAttributes[3], queueAttributes[4]
  186. end
  187. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  188. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  189. parentPrefix .. "wait", parentPrefix .. "paused")
  190. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  191. if emitEvent then
  192. local parentEventStream = parentPrefix .. "events"
  193. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  194. end
  195. end
  196. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  197. if parentKey then
  198. local parentDependenciesKey = parentKey .. ":dependencies"
  199. local result = rcall("SREM", parentDependenciesKey, jobKey)
  200. if result > 0 then
  201. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  202. if pendingDependencies == 0 then
  203. local parentId = getJobIdFromKey(parentKey)
  204. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  205. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  206. if numRemovedElements == 1 then
  207. if hard then -- remove parent in same queue
  208. if parentPrefix == baseKey then
  209. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  210. removeJobKeys(parentKey)
  211. if debounceId then
  212. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  213. end
  214. else
  215. _moveParentToWait(parentPrefix, parentId)
  216. end
  217. else
  218. _moveParentToWait(parentPrefix, parentId, true)
  219. end
  220. end
  221. end
  222. return true
  223. end
  224. else
  225. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  226. local missedParentKey = parentAttributes[1]
  227. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  228. and (rcall("EXISTS", missedParentKey) == 1)) then
  229. local parentDependenciesKey = missedParentKey .. ":dependencies"
  230. local result = rcall("SREM", parentDependenciesKey, jobKey)
  231. if result > 0 then
  232. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  233. if pendingDependencies == 0 then
  234. local parentId = getJobIdFromKey(missedParentKey)
  235. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  236. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  237. if numRemovedElements == 1 then
  238. if hard then
  239. if parentPrefix == baseKey then
  240. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  241. removeJobKeys(missedParentKey)
  242. if parentAttributes[2] then
  243. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  244. end
  245. else
  246. _moveParentToWait(parentPrefix, parentId)
  247. end
  248. else
  249. _moveParentToWait(parentPrefix, parentId, true)
  250. end
  251. end
  252. end
  253. return true
  254. end
  255. end
  256. end
  257. return false
  258. end
  259. local removeJobChildren
  260. local removeJobWithChildren
  261. removeJobChildren = function(prefix, jobKey, options)
  262. -- Check if this job has children
  263. -- If so, we are going to try to remove the children recursively in a depth-first way
  264. -- because if some job is locked, we must exit with an error.
  265. if not options.ignoreProcessed then
  266. local processed = rcall("HGETALL", jobKey .. ":processed")
  267. if #processed > 0 then
  268. for i = 1, #processed, 2 do
  269. local childJobId = getJobIdFromKey(processed[i])
  270. local childJobPrefix = getJobKeyPrefix(processed[i], childJobId)
  271. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  272. end
  273. end
  274. local failed = rcall("HGETALL", jobKey .. ":failed")
  275. if #failed > 0 then
  276. for i = 1, #failed, 2 do
  277. local childJobId = getJobIdFromKey(failed[i])
  278. local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
  279. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  280. end
  281. end
  282. local unsuccessful = rcall("ZRANGE", jobKey .. ":unsuccessful", 0, -1)
  283. if #unsuccessful > 0 then
  284. for i = 1, #unsuccessful, 1 do
  285. local childJobId = getJobIdFromKey(unsuccessful[i])
  286. local childJobPrefix = getJobKeyPrefix(unsuccessful[i], childJobId)
  287. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  288. end
  289. end
  290. end
  291. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  292. if #dependencies > 0 then
  293. for i, childJobKey in ipairs(dependencies) do
  294. local childJobId = getJobIdFromKey(childJobKey)
  295. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  296. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  297. end
  298. end
  299. end
  300. removeJobWithChildren = function(prefix, jobId, parentKey, options)
  301. local jobKey = prefix .. jobId
  302. if options.ignoreLocked then
  303. if isLocked(prefix, jobId) then
  304. return
  305. end
  306. end
  307. -- Check if job is in the failed zset
  308. local failedSet = prefix .. "failed"
  309. if not (options.ignoreProcessed and rcall("ZSCORE", failedSet, jobId)) then
  310. removeParentDependencyKey(jobKey, false, parentKey, nil)
  311. if options.removeChildren then
  312. removeJobChildren(prefix, jobKey, options)
  313. end
  314. local prev = removeJobFromAnyState(prefix, jobId)
  315. local deduplicationId = rcall("HGET", jobKey, "deid")
  316. removeDeduplicationKeyIfNeededOnRemoval(prefix, jobId, deduplicationId)
  317. if removeJobKeys(jobKey) > 0 then
  318. local metaKey = prefix .. "meta"
  319. local maxEvents = getOrSetMaxEvents(metaKey)
  320. rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
  321. "jobId", jobId, "prev", prev)
  322. end
  323. end
  324. end
  325. local jobId = ARGV[1]
  326. local shouldRemoveChildren = ARGV[2]
  327. local prefix = ARGV[3]
  328. local jobKey = KEYS[1]
  329. local repeatKey = KEYS[2]
  330. if isJobSchedulerJob(jobId, jobKey, repeatKey) then
  331. return -8
  332. end
  333. if not isLocked(prefix, jobId, shouldRemoveChildren) then
  334. local options = {
  335. removeChildren = shouldRemoveChildren == "1",
  336. ignoreProcessed = false,
  337. ignoreLocked = false
  338. }
  339. removeJobWithChildren(prefix, jobId, nil, options)
  340. return 1
  341. end
  342. return 0
  343. `;
  344. export const removeJob = {
  345. name: 'removeJob',
  346. content,
  347. keys: 2,
  348. };
  349. //# sourceMappingURL=removeJob-2.js.map