addRepeatableJob-2.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.addRepeatableJob = void 0;
  4. const content = `--[[
  5. Adds a repeatable job
  6. Input:
  7. KEYS[1] 'repeat' key
  8. KEYS[2] 'delayed' key
  9. ARGV[1] next milliseconds
  10. ARGV[2] msgpacked options
  11. [1] name
  12. [2] tz?
  13. [3] pattern?
  14. [4] endDate?
  15. [5] every?
  16. ARGV[3] legacy custom key TODO: remove this logic in next breaking change
  17. ARGV[4] custom key
  18. ARGV[5] prefix key
  19. Output:
  20. repeatableKey - OK
  21. ]]
  22. local rcall = redis.call
  23. local repeatKey = KEYS[1]
  24. local delayedKey = KEYS[2]
  25. local nextMillis = ARGV[1]
  26. local legacyCustomKey = ARGV[3]
  27. local customKey = ARGV[4]
  28. local prefixKey = ARGV[5]
  29. -- Includes
  30. --[[
  31. Function to remove job.
  32. ]]
  33. -- Includes
  34. --[[
  35. Function to remove deduplication key if needed
  36. when a job is being removed.
  37. ]]
  38. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  39. jobId, deduplicationId)
  40. if deduplicationId then
  41. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  42. local currentJobId = rcall('GET', deduplicationKey)
  43. if currentJobId and currentJobId == jobId then
  44. rcall("DEL", deduplicationKey)
  45. -- Also clean up any pending dedup-next data for this dedup ID
  46. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  47. return 1
  48. end
  49. end
  50. end
  51. --[[
  52. Function to remove job keys.
  53. ]]
  54. local function removeJobKeys(jobKey)
  55. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  56. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  57. end
  58. --[[
  59. Check if this job has a parent. If so we will just remove it from
  60. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  61. which requires code from "moveToFinished"
  62. ]]
  63. -- Includes
  64. --[[
  65. Function to add job in target list and add marker if needed.
  66. ]]
  67. -- Includes
  68. --[[
  69. Add marker if needed when a job is available.
  70. ]]
  71. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  72. if not isPausedOrMaxed then
  73. rcall("ZADD", markerKey, 0, "0")
  74. end
  75. end
  76. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  77. rcall(pushCmd, targetKey, jobId)
  78. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  79. end
  80. --[[
  81. Functions to destructure job key.
  82. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  83. ]]
  84. local getJobIdFromKey = function (jobKey)
  85. return string.match(jobKey, ".*:(.*)")
  86. end
  87. local getJobKeyPrefix = function (jobKey, jobId)
  88. return string.sub(jobKey, 0, #jobKey - #jobId)
  89. end
  90. --[[
  91. Function to check for the meta.paused key to decide if we are paused or not
  92. (since an empty list and !EXISTS are not really the same).
  93. ]]
  94. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  95. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  96. if queueAttributes[1] then
  97. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  98. else
  99. if queueAttributes[2] then
  100. local activeCount = rcall("LLEN", activeKey)
  101. if activeCount >= tonumber(queueAttributes[2]) then
  102. return waitKey, true, queueAttributes[3], queueAttributes[4]
  103. else
  104. return waitKey, false, queueAttributes[3], queueAttributes[4]
  105. end
  106. end
  107. end
  108. return waitKey, false, queueAttributes[3], queueAttributes[4]
  109. end
  110. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  111. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  112. parentPrefix .. "wait", parentPrefix .. "paused")
  113. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  114. if emitEvent then
  115. local parentEventStream = parentPrefix .. "events"
  116. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  117. end
  118. end
  119. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  120. if parentKey then
  121. local parentDependenciesKey = parentKey .. ":dependencies"
  122. local result = rcall("SREM", parentDependenciesKey, jobKey)
  123. if result > 0 then
  124. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  125. if pendingDependencies == 0 then
  126. local parentId = getJobIdFromKey(parentKey)
  127. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  128. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  129. if numRemovedElements == 1 then
  130. if hard then -- remove parent in same queue
  131. if parentPrefix == baseKey then
  132. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  133. removeJobKeys(parentKey)
  134. if debounceId then
  135. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  136. end
  137. else
  138. _moveParentToWait(parentPrefix, parentId)
  139. end
  140. else
  141. _moveParentToWait(parentPrefix, parentId, true)
  142. end
  143. end
  144. end
  145. return true
  146. end
  147. else
  148. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  149. local missedParentKey = parentAttributes[1]
  150. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  151. and (rcall("EXISTS", missedParentKey) == 1)) then
  152. local parentDependenciesKey = missedParentKey .. ":dependencies"
  153. local result = rcall("SREM", parentDependenciesKey, jobKey)
  154. if result > 0 then
  155. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  156. if pendingDependencies == 0 then
  157. local parentId = getJobIdFromKey(missedParentKey)
  158. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  159. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  160. if numRemovedElements == 1 then
  161. if hard then
  162. if parentPrefix == baseKey then
  163. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  164. removeJobKeys(missedParentKey)
  165. if parentAttributes[2] then
  166. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  167. end
  168. else
  169. _moveParentToWait(parentPrefix, parentId)
  170. end
  171. else
  172. _moveParentToWait(parentPrefix, parentId, true)
  173. end
  174. end
  175. end
  176. return true
  177. end
  178. end
  179. end
  180. return false
  181. end
  182. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  183. local jobKey = baseKey .. jobId
  184. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  185. if shouldRemoveDeduplicationKey then
  186. local deduplicationId = rcall("HGET", jobKey, "deid")
  187. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  188. end
  189. removeJobKeys(jobKey)
  190. end
  191. local function storeRepeatableJob(repeatKey, customKey, nextMillis, rawOpts)
  192. rcall("ZADD", repeatKey, nextMillis, customKey)
  193. local opts = cmsgpack.unpack(rawOpts)
  194. local optionalValues = {}
  195. if opts['tz'] then
  196. table.insert(optionalValues, "tz")
  197. table.insert(optionalValues, opts['tz'])
  198. end
  199. if opts['pattern'] then
  200. table.insert(optionalValues, "pattern")
  201. table.insert(optionalValues, opts['pattern'])
  202. end
  203. if opts['endDate'] then
  204. table.insert(optionalValues, "endDate")
  205. table.insert(optionalValues, opts['endDate'])
  206. end
  207. if opts['every'] then
  208. table.insert(optionalValues, "every")
  209. table.insert(optionalValues, opts['every'])
  210. end
  211. rcall("HMSET", repeatKey .. ":" .. customKey, "name", opts['name'],
  212. unpack(optionalValues))
  213. return customKey
  214. end
  215. -- If we are overriding a repeatable job we must delete the delayed job for
  216. -- the next iteration.
  217. local prevMillis = rcall("ZSCORE", repeatKey, customKey)
  218. if prevMillis then
  219. local delayedJobId = "repeat:" .. customKey .. ":" .. prevMillis
  220. local nextDelayedJobId = repeatKey .. ":" .. customKey .. ":" .. nextMillis
  221. if rcall("ZSCORE", delayedKey, delayedJobId)
  222. and rcall("EXISTS", nextDelayedJobId) ~= 1 then
  223. removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
  224. rcall("ZREM", delayedKey, delayedJobId)
  225. end
  226. end
  227. -- Keep backwards compatibility with old repeatable jobs (<= 3.0.0)
  228. if rcall("ZSCORE", repeatKey, legacyCustomKey) ~= false then
  229. return storeRepeatableJob(repeatKey, legacyCustomKey, nextMillis, ARGV[2])
  230. end
  231. return storeRepeatableJob(repeatKey, customKey, nextMillis, ARGV[2])
  232. `;
  233. exports.addRepeatableJob = {
  234. name: 'addRepeatableJob',
  235. content,
  236. keys: 2,
  237. };
  238. //# sourceMappingURL=addRepeatableJob-2.js.map