addRepeatableJob-2.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. const content = `--[[
  2. Adds a repeatable job
  3. Input:
  4. KEYS[1] 'repeat' key
  5. KEYS[2] 'delayed' key
  6. ARGV[1] next milliseconds
  7. ARGV[2] msgpacked options
  8. [1] name
  9. [2] tz?
  10. [3] pattern?
  11. [4] endDate?
  12. [5] every?
  13. ARGV[3] legacy custom key TODO: remove this logic in next breaking change
  14. ARGV[4] custom key
  15. ARGV[5] prefix key
  16. Output:
  17. repeatableKey - OK
  18. ]]
  19. local rcall = redis.call
  20. local repeatKey = KEYS[1]
  21. local delayedKey = KEYS[2]
  22. local nextMillis = ARGV[1]
  23. local legacyCustomKey = ARGV[3]
  24. local customKey = ARGV[4]
  25. local prefixKey = ARGV[5]
  26. -- Includes
  27. --[[
  28. Function to remove job.
  29. ]]
  30. -- Includes
  31. --[[
  32. Function to remove deduplication key if needed
  33. when a job is being removed.
  34. ]]
  35. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  36. jobId, deduplicationId)
  37. if deduplicationId then
  38. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  39. local currentJobId = rcall('GET', deduplicationKey)
  40. if currentJobId and currentJobId == jobId then
  41. rcall("DEL", deduplicationKey)
  42. -- Also clean up any pending dedup-next data for this dedup ID
  43. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  44. return 1
  45. end
  46. end
  47. end
  48. --[[
  49. Function to remove job keys.
  50. ]]
  51. local function removeJobKeys(jobKey)
  52. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  53. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  54. end
  55. --[[
  56. Check if this job has a parent. If so we will just remove it from
  57. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  58. which requires code from "moveToFinished"
  59. ]]
  60. -- Includes
  61. --[[
  62. Function to add job in target list and add marker if needed.
  63. ]]
  64. -- Includes
  65. --[[
  66. Add marker if needed when a job is available.
  67. ]]
  68. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  69. if not isPausedOrMaxed then
  70. rcall("ZADD", markerKey, 0, "0")
  71. end
  72. end
  73. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  74. rcall(pushCmd, targetKey, jobId)
  75. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  76. end
  77. --[[
  78. Functions to destructure job key.
  79. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  80. ]]
  81. local getJobIdFromKey = function (jobKey)
  82. return string.match(jobKey, ".*:(.*)")
  83. end
  84. local getJobKeyPrefix = function (jobKey, jobId)
  85. return string.sub(jobKey, 0, #jobKey - #jobId)
  86. end
  87. --[[
  88. Function to check for the meta.paused key to decide if we are paused or not
  89. (since an empty list and !EXISTS are not really the same).
  90. ]]
  91. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  92. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  93. if queueAttributes[1] then
  94. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  95. else
  96. if queueAttributes[2] then
  97. local activeCount = rcall("LLEN", activeKey)
  98. if activeCount >= tonumber(queueAttributes[2]) then
  99. return waitKey, true, queueAttributes[3], queueAttributes[4]
  100. else
  101. return waitKey, false, queueAttributes[3], queueAttributes[4]
  102. end
  103. end
  104. end
  105. return waitKey, false, queueAttributes[3], queueAttributes[4]
  106. end
  107. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  108. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  109. parentPrefix .. "wait", parentPrefix .. "paused")
  110. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  111. if emitEvent then
  112. local parentEventStream = parentPrefix .. "events"
  113. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  114. end
  115. end
  116. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  117. if parentKey then
  118. local parentDependenciesKey = parentKey .. ":dependencies"
  119. local result = rcall("SREM", parentDependenciesKey, jobKey)
  120. if result > 0 then
  121. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  122. if pendingDependencies == 0 then
  123. local parentId = getJobIdFromKey(parentKey)
  124. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  125. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  126. if numRemovedElements == 1 then
  127. if hard then -- remove parent in same queue
  128. if parentPrefix == baseKey then
  129. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  130. removeJobKeys(parentKey)
  131. if debounceId then
  132. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  133. end
  134. else
  135. _moveParentToWait(parentPrefix, parentId)
  136. end
  137. else
  138. _moveParentToWait(parentPrefix, parentId, true)
  139. end
  140. end
  141. end
  142. return true
  143. end
  144. else
  145. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  146. local missedParentKey = parentAttributes[1]
  147. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  148. and (rcall("EXISTS", missedParentKey) == 1)) then
  149. local parentDependenciesKey = missedParentKey .. ":dependencies"
  150. local result = rcall("SREM", parentDependenciesKey, jobKey)
  151. if result > 0 then
  152. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  153. if pendingDependencies == 0 then
  154. local parentId = getJobIdFromKey(missedParentKey)
  155. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  156. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  157. if numRemovedElements == 1 then
  158. if hard then
  159. if parentPrefix == baseKey then
  160. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  161. removeJobKeys(missedParentKey)
  162. if parentAttributes[2] then
  163. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  164. end
  165. else
  166. _moveParentToWait(parentPrefix, parentId)
  167. end
  168. else
  169. _moveParentToWait(parentPrefix, parentId, true)
  170. end
  171. end
  172. end
  173. return true
  174. end
  175. end
  176. end
  177. return false
  178. end
  179. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  180. local jobKey = baseKey .. jobId
  181. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  182. if shouldRemoveDeduplicationKey then
  183. local deduplicationId = rcall("HGET", jobKey, "deid")
  184. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  185. end
  186. removeJobKeys(jobKey)
  187. end
  188. local function storeRepeatableJob(repeatKey, customKey, nextMillis, rawOpts)
  189. rcall("ZADD", repeatKey, nextMillis, customKey)
  190. local opts = cmsgpack.unpack(rawOpts)
  191. local optionalValues = {}
  192. if opts['tz'] then
  193. table.insert(optionalValues, "tz")
  194. table.insert(optionalValues, opts['tz'])
  195. end
  196. if opts['pattern'] then
  197. table.insert(optionalValues, "pattern")
  198. table.insert(optionalValues, opts['pattern'])
  199. end
  200. if opts['endDate'] then
  201. table.insert(optionalValues, "endDate")
  202. table.insert(optionalValues, opts['endDate'])
  203. end
  204. if opts['every'] then
  205. table.insert(optionalValues, "every")
  206. table.insert(optionalValues, opts['every'])
  207. end
  208. rcall("HMSET", repeatKey .. ":" .. customKey, "name", opts['name'],
  209. unpack(optionalValues))
  210. return customKey
  211. end
  212. -- If we are overriding a repeatable job we must delete the delayed job for
  213. -- the next iteration.
  214. local prevMillis = rcall("ZSCORE", repeatKey, customKey)
  215. if prevMillis then
  216. local delayedJobId = "repeat:" .. customKey .. ":" .. prevMillis
  217. local nextDelayedJobId = repeatKey .. ":" .. customKey .. ":" .. nextMillis
  218. if rcall("ZSCORE", delayedKey, delayedJobId)
  219. and rcall("EXISTS", nextDelayedJobId) ~= 1 then
  220. removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
  221. rcall("ZREM", delayedKey, delayedJobId)
  222. end
  223. end
  224. -- Keep backwards compatibility with old repeatable jobs (<= 3.0.0)
  225. if rcall("ZSCORE", repeatKey, legacyCustomKey) ~= false then
  226. return storeRepeatableJob(repeatKey, legacyCustomKey, nextMillis, ARGV[2])
  227. end
  228. return storeRepeatableJob(repeatKey, customKey, nextMillis, ARGV[2])
  229. `;
  230. export const addRepeatableJob = {
  231. name: 'addRepeatableJob',
  232. content,
  233. keys: 2,
  234. };
  235. //# sourceMappingURL=addRepeatableJob-2.js.map