removeJob-2.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.removeJob = void 0;
  4. const content = `--[[
  5. Remove a job from all the statuses it may be in as well as all its data.
  6. In order to be able to remove a job, it cannot be active.
  7. Input:
  8. KEYS[1] jobKey
  9. KEYS[2] repeat key
  10. ARGV[1] jobId
  11. ARGV[2] remove children
  12. ARGV[3] queue prefix
  13. Events:
  14. 'removed'
  15. ]]
  16. local rcall = redis.call
  17. -- Includes
  18. --[[
  19. Function to check if the job belongs to a job scheduler and
  20. current delayed job matches with jobId
  21. ]]
  22. local function isJobSchedulerJob(jobId, jobKey, jobSchedulersKey)
  23. local repeatJobKey = rcall("HGET", jobKey, "rjk")
  24. if repeatJobKey then
  25. local prevMillis = rcall("ZSCORE", jobSchedulersKey, repeatJobKey)
  26. if prevMillis then
  27. local currentDelayedJobId = "repeat:" .. repeatJobKey .. ":" .. prevMillis
  28. return jobId == currentDelayedJobId
  29. end
  30. end
  31. return false
  32. end
  33. --[[
  34. Function to recursively check if there are no locks
  35. on the jobs to be removed.
  36. returns:
  37. boolean
  38. ]]
  39. --[[
  40. Functions to destructure job key.
  41. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  42. ]]
  43. local getJobIdFromKey = function (jobKey)
  44. return string.match(jobKey, ".*:(.*)")
  45. end
  46. local getJobKeyPrefix = function (jobKey, jobId)
  47. return string.sub(jobKey, 0, #jobKey - #jobId)
  48. end
  49. local function isLocked( prefix, jobId, removeChildren)
  50. local jobKey = prefix .. jobId;
  51. -- Check if this job is locked
  52. local lockKey = jobKey .. ':lock'
  53. local lock = rcall("GET", lockKey)
  54. if not lock then
  55. if removeChildren == "1" then
  56. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  57. if (#dependencies > 0) then
  58. for i, childJobKey in ipairs(dependencies) do
  59. -- We need to get the jobId for this job.
  60. local childJobId = getJobIdFromKey(childJobKey)
  61. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  62. local result = isLocked( childJobPrefix, childJobId, removeChildren )
  63. if result then
  64. return true
  65. end
  66. end
  67. end
  68. end
  69. return false
  70. end
  71. return true
  72. end
  73. --[[
  74. Remove a job from all the statuses it may be in as well as all its data,
  75. including its children. Active children can be ignored.
  76. Events:
  77. 'removed'
  78. ]]
  79. local rcall = redis.call
  80. -- Includes
  81. --[[
  82. Function to get max events value or set by default 10000.
  83. ]]
  84. local function getOrSetMaxEvents(metaKey)
  85. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  86. if not maxEvents then
  87. maxEvents = 10000
  88. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  89. end
  90. return maxEvents
  91. end
  92. --[[
  93. Function to remove deduplication key if needed
  94. when a job is being removed.
  95. ]]
  96. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  97. jobId, deduplicationId)
  98. if deduplicationId then
  99. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  100. local currentJobId = rcall('GET', deduplicationKey)
  101. if currentJobId and currentJobId == jobId then
  102. rcall("DEL", deduplicationKey)
  103. -- Also clean up any pending dedup-next data for this dedup ID
  104. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  105. return 1
  106. end
  107. end
  108. end
  109. --[[
  110. Function to remove from any state.
  111. returns:
  112. prev state
  113. ]]
  114. local function removeJobFromAnyState( prefix, jobId)
  115. -- We start with the ZSCORE checks, since they have O(1) complexity
  116. if rcall("ZSCORE", prefix .. "completed", jobId) then
  117. rcall("ZREM", prefix .. "completed", jobId)
  118. return "completed"
  119. elseif rcall("ZSCORE", prefix .. "waiting-children", jobId) then
  120. rcall("ZREM", prefix .. "waiting-children", jobId)
  121. return "waiting-children"
  122. elseif rcall("ZSCORE", prefix .. "delayed", jobId) then
  123. rcall("ZREM", prefix .. "delayed", jobId)
  124. return "delayed"
  125. elseif rcall("ZSCORE", prefix .. "failed", jobId) then
  126. rcall("ZREM", prefix .. "failed", jobId)
  127. return "failed"
  128. elseif rcall("ZSCORE", prefix .. "prioritized", jobId) then
  129. rcall("ZREM", prefix .. "prioritized", jobId)
  130. return "prioritized"
  131. -- We remove only 1 element from the list, since we assume they are not added multiple times
  132. elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then
  133. return "wait"
  134. elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then
  135. return "paused"
  136. elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then
  137. return "active"
  138. end
  139. return "unknown"
  140. end
  141. --[[
  142. Function to remove job keys.
  143. ]]
  144. local function removeJobKeys(jobKey)
  145. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  146. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  147. end
  148. --[[
  149. Check if this job has a parent. If so we will just remove it from
  150. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  151. which requires code from "moveToFinished"
  152. ]]
  153. -- Includes
  154. --[[
  155. Function to add job in target list and add marker if needed.
  156. ]]
  157. -- Includes
  158. --[[
  159. Add marker if needed when a job is available.
  160. ]]
  161. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  162. if not isPausedOrMaxed then
  163. rcall("ZADD", markerKey, 0, "0")
  164. end
  165. end
  166. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  167. rcall(pushCmd, targetKey, jobId)
  168. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  169. end
  170. --[[
  171. Function to check for the meta.paused key to decide if we are paused or not
  172. (since an empty list and !EXISTS are not really the same).
  173. ]]
  174. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  175. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  176. if queueAttributes[1] then
  177. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  178. else
  179. if queueAttributes[2] then
  180. local activeCount = rcall("LLEN", activeKey)
  181. if activeCount >= tonumber(queueAttributes[2]) then
  182. return waitKey, true, queueAttributes[3], queueAttributes[4]
  183. else
  184. return waitKey, false, queueAttributes[3], queueAttributes[4]
  185. end
  186. end
  187. end
  188. return waitKey, false, queueAttributes[3], queueAttributes[4]
  189. end
  190. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  191. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  192. parentPrefix .. "wait", parentPrefix .. "paused")
  193. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  194. if emitEvent then
  195. local parentEventStream = parentPrefix .. "events"
  196. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  197. end
  198. end
  199. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  200. if parentKey then
  201. local parentDependenciesKey = parentKey .. ":dependencies"
  202. local result = rcall("SREM", parentDependenciesKey, jobKey)
  203. if result > 0 then
  204. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  205. if pendingDependencies == 0 then
  206. local parentId = getJobIdFromKey(parentKey)
  207. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  208. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  209. if numRemovedElements == 1 then
  210. if hard then -- remove parent in same queue
  211. if parentPrefix == baseKey then
  212. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  213. removeJobKeys(parentKey)
  214. if debounceId then
  215. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  216. end
  217. else
  218. _moveParentToWait(parentPrefix, parentId)
  219. end
  220. else
  221. _moveParentToWait(parentPrefix, parentId, true)
  222. end
  223. end
  224. end
  225. return true
  226. end
  227. else
  228. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  229. local missedParentKey = parentAttributes[1]
  230. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  231. and (rcall("EXISTS", missedParentKey) == 1)) then
  232. local parentDependenciesKey = missedParentKey .. ":dependencies"
  233. local result = rcall("SREM", parentDependenciesKey, jobKey)
  234. if result > 0 then
  235. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  236. if pendingDependencies == 0 then
  237. local parentId = getJobIdFromKey(missedParentKey)
  238. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  239. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  240. if numRemovedElements == 1 then
  241. if hard then
  242. if parentPrefix == baseKey then
  243. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  244. removeJobKeys(missedParentKey)
  245. if parentAttributes[2] then
  246. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  247. end
  248. else
  249. _moveParentToWait(parentPrefix, parentId)
  250. end
  251. else
  252. _moveParentToWait(parentPrefix, parentId, true)
  253. end
  254. end
  255. end
  256. return true
  257. end
  258. end
  259. end
  260. return false
  261. end
  262. local removeJobChildren
  263. local removeJobWithChildren
  264. removeJobChildren = function(prefix, jobKey, options)
  265. -- Check if this job has children
  266. -- If so, we are going to try to remove the children recursively in a depth-first way
  267. -- because if some job is locked, we must exit with an error.
  268. if not options.ignoreProcessed then
  269. local processed = rcall("HGETALL", jobKey .. ":processed")
  270. if #processed > 0 then
  271. for i = 1, #processed, 2 do
  272. local childJobId = getJobIdFromKey(processed[i])
  273. local childJobPrefix = getJobKeyPrefix(processed[i], childJobId)
  274. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  275. end
  276. end
  277. local failed = rcall("HGETALL", jobKey .. ":failed")
  278. if #failed > 0 then
  279. for i = 1, #failed, 2 do
  280. local childJobId = getJobIdFromKey(failed[i])
  281. local childJobPrefix = getJobKeyPrefix(failed[i], childJobId)
  282. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  283. end
  284. end
  285. local unsuccessful = rcall("ZRANGE", jobKey .. ":unsuccessful", 0, -1)
  286. if #unsuccessful > 0 then
  287. for i = 1, #unsuccessful, 1 do
  288. local childJobId = getJobIdFromKey(unsuccessful[i])
  289. local childJobPrefix = getJobKeyPrefix(unsuccessful[i], childJobId)
  290. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  291. end
  292. end
  293. end
  294. local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
  295. if #dependencies > 0 then
  296. for i, childJobKey in ipairs(dependencies) do
  297. local childJobId = getJobIdFromKey(childJobKey)
  298. local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
  299. removeJobWithChildren(childJobPrefix, childJobId, jobKey, options)
  300. end
  301. end
  302. end
  303. removeJobWithChildren = function(prefix, jobId, parentKey, options)
  304. local jobKey = prefix .. jobId
  305. if options.ignoreLocked then
  306. if isLocked(prefix, jobId) then
  307. return
  308. end
  309. end
  310. -- Check if job is in the failed zset
  311. local failedSet = prefix .. "failed"
  312. if not (options.ignoreProcessed and rcall("ZSCORE", failedSet, jobId)) then
  313. removeParentDependencyKey(jobKey, false, parentKey, nil)
  314. if options.removeChildren then
  315. removeJobChildren(prefix, jobKey, options)
  316. end
  317. local prev = removeJobFromAnyState(prefix, jobId)
  318. local deduplicationId = rcall("HGET", jobKey, "deid")
  319. removeDeduplicationKeyIfNeededOnRemoval(prefix, jobId, deduplicationId)
  320. if removeJobKeys(jobKey) > 0 then
  321. local metaKey = prefix .. "meta"
  322. local maxEvents = getOrSetMaxEvents(metaKey)
  323. rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed",
  324. "jobId", jobId, "prev", prev)
  325. end
  326. end
  327. end
  328. local jobId = ARGV[1]
  329. local shouldRemoveChildren = ARGV[2]
  330. local prefix = ARGV[3]
  331. local jobKey = KEYS[1]
  332. local repeatKey = KEYS[2]
  333. if isJobSchedulerJob(jobId, jobKey, repeatKey) then
  334. return -8
  335. end
  336. if not isLocked(prefix, jobId, shouldRemoveChildren) then
  337. local options = {
  338. removeChildren = shouldRemoveChildren == "1",
  339. ignoreProcessed = false,
  340. ignoreLocked = false
  341. }
  342. removeJobWithChildren(prefix, jobId, nil, options)
  343. return 1
  344. end
  345. return 0
  346. `;
  347. exports.removeJob = {
  348. name: 'removeJob',
  349. content,
  350. keys: 2,
  351. };
  352. //# sourceMappingURL=removeJob-2.js.map