updateJobScheduler-12.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. const content = `--[[
  2. Updates a job scheduler and adds next delayed job
  3. Input:
  4. KEYS[1] 'repeat' key
  5. KEYS[2] 'delayed'
  6. KEYS[3] 'wait' key
  7. KEYS[4] 'paused' key
  8. KEYS[5] 'meta'
  9. KEYS[6] 'prioritized' key
  10. KEYS[7] 'marker',
  11. KEYS[8] 'id'
  12. KEYS[9] events stream key
  13. KEYS[10] 'pc' priority counter
  14. KEYS[11] producer key
  15. KEYS[12] 'active' key
  16. ARGV[1] next milliseconds
  17. ARGV[2] jobs scheduler id
  18. ARGV[3] Json stringified delayed data
  19. ARGV[4] msgpacked delayed opts
  20. ARGV[5] timestamp
  21. ARGV[6] prefix key
  22. ARGV[7] producer id
  23. Output:
  24. next delayed job id - OK
  25. ]] local rcall = redis.call
  26. local repeatKey = KEYS[1]
  27. local delayedKey = KEYS[2]
  28. local waitKey = KEYS[3]
  29. local pausedKey = KEYS[4]
  30. local metaKey = KEYS[5]
  31. local prioritizedKey = KEYS[6]
  32. local nextMillis = tonumber(ARGV[1])
  33. local jobSchedulerId = ARGV[2]
  34. local timestamp = tonumber(ARGV[5])
  35. local prefixKey = ARGV[6]
  36. local producerId = ARGV[7]
  37. local jobOpts = cmsgpack.unpack(ARGV[4])
  38. -- Includes
  39. --[[
  40. Add delay marker if needed.
  41. ]]
  42. -- Includes
  43. --[[
  44. Shared helper to store a job and enqueue it into the appropriate list/set.
  45. Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
  46. Emits the appropriate event after enqueuing ("delayed" or "waiting").
  47. Returns delay, priority from storeJob.
  48. ]]
  49. -- Includes
  50. --[[
  51. Adds a delayed job to the queue by doing the following:
  52. - Creates a new job key with the job data.
  53. - adds to delayed zset.
  54. - Emits a global event 'delayed' if the job is delayed.
  55. ]]
  56. -- Includes
  57. --[[
  58. Add delay marker if needed.
  59. ]]
  60. -- Includes
  61. --[[
  62. Function to return the next delayed job timestamp.
  63. ]]
  64. local function getNextDelayedTimestamp(delayedKey)
  65. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  66. if #result then
  67. local nextTimestamp = tonumber(result[2])
  68. if nextTimestamp ~= nil then
  69. return nextTimestamp / 0x1000
  70. end
  71. end
  72. end
  73. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  74. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  75. if nextTimestamp ~= nil then
  76. -- Replace the score of the marker with the newest known
  77. -- next timestamp.
  78. rcall("ZADD", markerKey, nextTimestamp, "1")
  79. end
  80. end
  81. --[[
  82. Bake in the job id first 12 bits into the timestamp
  83. to guarantee correct execution order of delayed jobs
  84. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  85. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  86. ]]
  87. local function getDelayedScore(delayedKey, timestamp, delay)
  88. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  89. local minScore = delayedTimestamp * 0x1000
  90. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  91. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  92. minScore, "WITHSCORES","LIMIT", 0, 1)
  93. if #result then
  94. local currentMaxScore = tonumber(result[2])
  95. if currentMaxScore ~= nil then
  96. if currentMaxScore >= maxScore then
  97. return maxScore, delayedTimestamp
  98. else
  99. return currentMaxScore + 1, delayedTimestamp
  100. end
  101. end
  102. end
  103. return minScore, delayedTimestamp
  104. end
  105. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  106. maxEvents, markerKey, delay)
  107. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  108. rcall("ZADD", delayedKey, score, jobId)
  109. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  110. "jobId", jobId, "delay", delayedTimestamp)
  111. -- mark that a delayed job is available
  112. addDelayMarkerIfNeeded(markerKey, delayedKey)
  113. end
  114. --[[
  115. Function to add job in target list and add marker if needed.
  116. ]]
  117. -- Includes
  118. --[[
  119. Add marker if needed when a job is available.
  120. ]]
  121. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  122. if not isPausedOrMaxed then
  123. rcall("ZADD", markerKey, 0, "0")
  124. end
  125. end
  126. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  127. rcall(pushCmd, targetKey, jobId)
  128. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  129. end
  130. --[[
  131. Function to add job considering priority.
  132. ]]
  133. -- Includes
  134. --[[
  135. Function to get priority score.
  136. ]]
  137. local function getPriorityScore(priority, priorityCounterKey)
  138. local prioCounter = rcall("INCR", priorityCounterKey)
  139. return priority * 0x100000000 + prioCounter % 0x100000000
  140. end
  141. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  142. isPausedOrMaxed)
  143. local score = getPriorityScore(priority, priorityCounterKey)
  144. rcall("ZADD", prioritizedKey, score, jobId)
  145. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  146. end
  147. --[[
  148. Function to check for the meta.paused key to decide if we are paused or not
  149. (since an empty list and !EXISTS are not really the same).
  150. ]]
  151. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  152. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  153. if queueAttributes[1] then
  154. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  155. else
  156. if queueAttributes[2] then
  157. local activeCount = rcall("LLEN", activeKey)
  158. if activeCount >= tonumber(queueAttributes[2]) then
  159. return waitKey, true, queueAttributes[3], queueAttributes[4]
  160. else
  161. return waitKey, false, queueAttributes[3], queueAttributes[4]
  162. end
  163. end
  164. end
  165. return waitKey, false, queueAttributes[3], queueAttributes[4]
  166. end
  167. --[[
  168. Function to store a job
  169. ]]
  170. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  171. parentKey, parentData, repeatJobKey)
  172. local jsonOpts = cjson.encode(opts)
  173. local delay = opts['delay'] or 0
  174. local priority = opts['priority'] or 0
  175. local debounceId = opts['de'] and opts['de']['id']
  176. local optionalValues = {}
  177. if parentKey ~= nil then
  178. table.insert(optionalValues, "parentKey")
  179. table.insert(optionalValues, parentKey)
  180. table.insert(optionalValues, "parent")
  181. table.insert(optionalValues, parentData)
  182. end
  183. if repeatJobKey then
  184. table.insert(optionalValues, "rjk")
  185. table.insert(optionalValues, repeatJobKey)
  186. end
  187. if debounceId then
  188. table.insert(optionalValues, "deid")
  189. table.insert(optionalValues, debounceId)
  190. end
  191. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  192. "timestamp", timestamp, "delay", delay, "priority", priority,
  193. unpack(optionalValues))
  194. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  195. return delay, priority
  196. end
  197. local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
  198. timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  199. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  200. priorityCounterKey, delayedKey, markerKey)
  201. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
  202. opts, timestamp, parentKey, parentData, repeatJobKey)
  203. if delay ~= 0 and delayedKey then
  204. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
  205. else
  206. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  207. if priority > 0 then
  208. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  209. priorityCounterKey, isPausedOrMaxed)
  210. else
  211. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  212. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  213. end
  214. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  215. "jobId", jobId)
  216. end
  217. return delay, priority
  218. end
  219. local function addJobFromScheduler(jobKey, jobId, opts, waitKey, pausedKey, activeKey, metaKey,
  220. prioritizedKey, priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp,
  221. data, jobSchedulerId, repeatDelay)
  222. opts['delay'] = repeatDelay
  223. opts['jobId'] = jobId
  224. storeAndEnqueueJob(eventsKey, jobKey, jobId, name, data, opts,
  225. timestamp, nil, nil, jobSchedulerId, maxEvents,
  226. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  227. priorityCounter, delayedKey, markerKey)
  228. end
  229. --[[
  230. Function to get max events value or set by default 10000.
  231. ]]
  232. local function getOrSetMaxEvents(metaKey)
  233. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  234. if not maxEvents then
  235. maxEvents = 10000
  236. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  237. end
  238. return maxEvents
  239. end
  240. local function getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  241. local nextMillis
  242. if not prevMillis then
  243. if startDate then
  244. -- Assuming startDate is passed as milliseconds from JavaScript
  245. nextMillis = tonumber(startDate)
  246. nextMillis = nextMillis > now and nextMillis or now
  247. else
  248. nextMillis = now
  249. end
  250. else
  251. nextMillis = prevMillis + every
  252. -- check if we may have missed some iterations
  253. if nextMillis < now then
  254. nextMillis = math.floor(now / every) * every + every + (offset or 0)
  255. end
  256. end
  257. if not offset or offset == 0 then
  258. local timeSlot = math.floor(nextMillis / every) * every;
  259. offset = nextMillis - timeSlot;
  260. end
  261. -- Return a tuple nextMillis, offset
  262. return math.floor(nextMillis), math.floor(offset)
  263. end
  264. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
  265. -- Validate that scheduler exists.
  266. -- If it does not exist we should not iterate anymore.
  267. if prevMillis then
  268. prevMillis = tonumber(prevMillis)
  269. local schedulerKey = repeatKey .. ":" .. jobSchedulerId
  270. local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data", "every", "startDate", "offset")
  271. local every = tonumber(schedulerAttributes[3])
  272. local now = tonumber(timestamp)
  273. -- If every is not found in scheduler attributes, try to get it from job options
  274. if not every and jobOpts['repeat'] and jobOpts['repeat']['every'] then
  275. every = tonumber(jobOpts['repeat']['every'])
  276. end
  277. if every then
  278. local startDate = schedulerAttributes[4]
  279. local jobOptsOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
  280. local offset = schedulerAttributes[5] or jobOptsOffset or 0
  281. local newOffset
  282. nextMillis, newOffset = getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  283. if not offset then
  284. rcall("HSET", schedulerKey, "offset", newOffset)
  285. jobOpts['repeat']['offset'] = newOffset
  286. end
  287. end
  288. local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
  289. local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
  290. local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
  291. if producerId == currentDelayedJobId then
  292. local eventsKey = KEYS[9]
  293. local maxEvents = getOrSetMaxEvents(metaKey)
  294. if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
  295. rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
  296. rcall("HINCRBY", schedulerKey, "ic", 1)
  297. rcall("INCR", KEYS[8])
  298. -- TODO: remove this workaround in next breaking change,
  299. -- all job-schedulers must save job data
  300. local templateData = schedulerAttributes[2] or ARGV[3]
  301. if templateData and templateData ~= '{}' then
  302. rcall("HSET", schedulerKey, "data", templateData)
  303. end
  304. local delay = nextMillis - now
  305. -- Fast Clamp delay to minimum of 0
  306. if delay < 0 then
  307. delay = 0
  308. end
  309. jobOpts["delay"] = delay
  310. addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, jobOpts, waitKey, pausedKey, KEYS[12], metaKey,
  311. prioritizedKey, KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerAttributes[1], maxEvents, ARGV[5],
  312. templateData or '{}', jobSchedulerId, delay)
  313. -- TODO: remove this workaround in next breaking change
  314. if KEYS[11] ~= "" then
  315. rcall("HSET", KEYS[11], "nrjid", nextDelayedJobId)
  316. end
  317. return nextDelayedJobId .. "" -- convert to string
  318. else
  319. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "duplicated", "jobId", nextDelayedJobId)
  320. end
  321. end
  322. end
  323. `;
  324. export const updateJobScheduler = {
  325. name: 'updateJobScheduler',
  326. content,
  327. keys: 12,
  328. };
  329. //# sourceMappingURL=updateJobScheduler-12.js.map