updateJobScheduler-12.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.updateJobScheduler = void 0;
  4. const content = `--[[
  5. Updates a job scheduler and adds next delayed job
  6. Input:
  7. KEYS[1] 'repeat' key
  8. KEYS[2] 'delayed'
  9. KEYS[3] 'wait' key
  10. KEYS[4] 'paused' key
  11. KEYS[5] 'meta'
  12. KEYS[6] 'prioritized' key
  13. KEYS[7] 'marker',
  14. KEYS[8] 'id'
  15. KEYS[9] events stream key
  16. KEYS[10] 'pc' priority counter
  17. KEYS[11] producer key
  18. KEYS[12] 'active' key
  19. ARGV[1] next milliseconds
  20. ARGV[2] jobs scheduler id
  21. ARGV[3] Json stringified delayed data
  22. ARGV[4] msgpacked delayed opts
  23. ARGV[5] timestamp
  24. ARGV[6] prefix key
  25. ARGV[7] producer id
  26. Output:
  27. next delayed job id - OK
  28. ]] local rcall = redis.call
  29. local repeatKey = KEYS[1]
  30. local delayedKey = KEYS[2]
  31. local waitKey = KEYS[3]
  32. local pausedKey = KEYS[4]
  33. local metaKey = KEYS[5]
  34. local prioritizedKey = KEYS[6]
  35. local nextMillis = tonumber(ARGV[1])
  36. local jobSchedulerId = ARGV[2]
  37. local timestamp = tonumber(ARGV[5])
  38. local prefixKey = ARGV[6]
  39. local producerId = ARGV[7]
  40. local jobOpts = cmsgpack.unpack(ARGV[4])
  41. -- Includes
  42. --[[
  43. Add delay marker if needed.
  44. ]]
  45. -- Includes
  46. --[[
  47. Shared helper to store a job and enqueue it into the appropriate list/set.
  48. Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
  49. Emits the appropriate event after enqueuing ("delayed" or "waiting").
  50. Returns delay, priority from storeJob.
  51. ]]
  52. -- Includes
  53. --[[
  54. Adds a delayed job to the queue by doing the following:
  55. - Creates a new job key with the job data.
  56. - adds to delayed zset.
  57. - Emits a global event 'delayed' if the job is delayed.
  58. ]]
  59. -- Includes
  60. --[[
  61. Add delay marker if needed.
  62. ]]
  63. -- Includes
  64. --[[
  65. Function to return the next delayed job timestamp.
  66. ]]
  67. local function getNextDelayedTimestamp(delayedKey)
  68. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  69. if #result then
  70. local nextTimestamp = tonumber(result[2])
  71. if nextTimestamp ~= nil then
  72. return nextTimestamp / 0x1000
  73. end
  74. end
  75. end
  76. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  77. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  78. if nextTimestamp ~= nil then
  79. -- Replace the score of the marker with the newest known
  80. -- next timestamp.
  81. rcall("ZADD", markerKey, nextTimestamp, "1")
  82. end
  83. end
  84. --[[
  85. Bake in the job id first 12 bits into the timestamp
  86. to guarantee correct execution order of delayed jobs
  87. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  88. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  89. ]]
  90. local function getDelayedScore(delayedKey, timestamp, delay)
  91. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  92. local minScore = delayedTimestamp * 0x1000
  93. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  94. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  95. minScore, "WITHSCORES","LIMIT", 0, 1)
  96. if #result then
  97. local currentMaxScore = tonumber(result[2])
  98. if currentMaxScore ~= nil then
  99. if currentMaxScore >= maxScore then
  100. return maxScore, delayedTimestamp
  101. else
  102. return currentMaxScore + 1, delayedTimestamp
  103. end
  104. end
  105. end
  106. return minScore, delayedTimestamp
  107. end
  108. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  109. maxEvents, markerKey, delay)
  110. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  111. rcall("ZADD", delayedKey, score, jobId)
  112. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  113. "jobId", jobId, "delay", delayedTimestamp)
  114. -- mark that a delayed job is available
  115. addDelayMarkerIfNeeded(markerKey, delayedKey)
  116. end
  117. --[[
  118. Function to add job in target list and add marker if needed.
  119. ]]
  120. -- Includes
  121. --[[
  122. Add marker if needed when a job is available.
  123. ]]
  124. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  125. if not isPausedOrMaxed then
  126. rcall("ZADD", markerKey, 0, "0")
  127. end
  128. end
  129. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  130. rcall(pushCmd, targetKey, jobId)
  131. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  132. end
  133. --[[
  134. Function to add job considering priority.
  135. ]]
  136. -- Includes
  137. --[[
  138. Function to get priority score.
  139. ]]
  140. local function getPriorityScore(priority, priorityCounterKey)
  141. local prioCounter = rcall("INCR", priorityCounterKey)
  142. return priority * 0x100000000 + prioCounter % 0x100000000
  143. end
  144. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  145. isPausedOrMaxed)
  146. local score = getPriorityScore(priority, priorityCounterKey)
  147. rcall("ZADD", prioritizedKey, score, jobId)
  148. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  149. end
  150. --[[
  151. Function to check for the meta.paused key to decide if we are paused or not
  152. (since an empty list and !EXISTS are not really the same).
  153. ]]
  154. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  155. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  156. if queueAttributes[1] then
  157. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  158. else
  159. if queueAttributes[2] then
  160. local activeCount = rcall("LLEN", activeKey)
  161. if activeCount >= tonumber(queueAttributes[2]) then
  162. return waitKey, true, queueAttributes[3], queueAttributes[4]
  163. else
  164. return waitKey, false, queueAttributes[3], queueAttributes[4]
  165. end
  166. end
  167. end
  168. return waitKey, false, queueAttributes[3], queueAttributes[4]
  169. end
  170. --[[
  171. Function to store a job
  172. ]]
  173. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  174. parentKey, parentData, repeatJobKey)
  175. local jsonOpts = cjson.encode(opts)
  176. local delay = opts['delay'] or 0
  177. local priority = opts['priority'] or 0
  178. local debounceId = opts['de'] and opts['de']['id']
  179. local optionalValues = {}
  180. if parentKey ~= nil then
  181. table.insert(optionalValues, "parentKey")
  182. table.insert(optionalValues, parentKey)
  183. table.insert(optionalValues, "parent")
  184. table.insert(optionalValues, parentData)
  185. end
  186. if repeatJobKey then
  187. table.insert(optionalValues, "rjk")
  188. table.insert(optionalValues, repeatJobKey)
  189. end
  190. if debounceId then
  191. table.insert(optionalValues, "deid")
  192. table.insert(optionalValues, debounceId)
  193. end
  194. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  195. "timestamp", timestamp, "delay", delay, "priority", priority,
  196. unpack(optionalValues))
  197. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  198. return delay, priority
  199. end
  200. local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
  201. timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  202. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  203. priorityCounterKey, delayedKey, markerKey)
  204. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
  205. opts, timestamp, parentKey, parentData, repeatJobKey)
  206. if delay ~= 0 and delayedKey then
  207. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
  208. else
  209. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  210. if priority > 0 then
  211. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  212. priorityCounterKey, isPausedOrMaxed)
  213. else
  214. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  215. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  216. end
  217. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  218. "jobId", jobId)
  219. end
  220. return delay, priority
  221. end
  222. local function addJobFromScheduler(jobKey, jobId, opts, waitKey, pausedKey, activeKey, metaKey,
  223. prioritizedKey, priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp,
  224. data, jobSchedulerId, repeatDelay)
  225. opts['delay'] = repeatDelay
  226. opts['jobId'] = jobId
  227. storeAndEnqueueJob(eventsKey, jobKey, jobId, name, data, opts,
  228. timestamp, nil, nil, jobSchedulerId, maxEvents,
  229. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  230. priorityCounter, delayedKey, markerKey)
  231. end
  232. --[[
  233. Function to get max events value or set by default 10000.
  234. ]]
  235. local function getOrSetMaxEvents(metaKey)
  236. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  237. if not maxEvents then
  238. maxEvents = 10000
  239. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  240. end
  241. return maxEvents
  242. end
  243. local function getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  244. local nextMillis
  245. if not prevMillis then
  246. if startDate then
  247. -- Assuming startDate is passed as milliseconds from JavaScript
  248. nextMillis = tonumber(startDate)
  249. nextMillis = nextMillis > now and nextMillis or now
  250. else
  251. nextMillis = now
  252. end
  253. else
  254. nextMillis = prevMillis + every
  255. -- check if we may have missed some iterations
  256. if nextMillis < now then
  257. nextMillis = math.floor(now / every) * every + every + (offset or 0)
  258. end
  259. end
  260. if not offset or offset == 0 then
  261. local timeSlot = math.floor(nextMillis / every) * every;
  262. offset = nextMillis - timeSlot;
  263. end
  264. -- Return a tuple nextMillis, offset
  265. return math.floor(nextMillis), math.floor(offset)
  266. end
  267. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
  268. -- Validate that scheduler exists.
  269. -- If it does not exist we should not iterate anymore.
  270. if prevMillis then
  271. prevMillis = tonumber(prevMillis)
  272. local schedulerKey = repeatKey .. ":" .. jobSchedulerId
  273. local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data", "every", "startDate", "offset")
  274. local every = tonumber(schedulerAttributes[3])
  275. local now = tonumber(timestamp)
  276. -- If every is not found in scheduler attributes, try to get it from job options
  277. if not every and jobOpts['repeat'] and jobOpts['repeat']['every'] then
  278. every = tonumber(jobOpts['repeat']['every'])
  279. end
  280. if every then
  281. local startDate = schedulerAttributes[4]
  282. local jobOptsOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
  283. local offset = schedulerAttributes[5] or jobOptsOffset or 0
  284. local newOffset
  285. nextMillis, newOffset = getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  286. if not offset then
  287. rcall("HSET", schedulerKey, "offset", newOffset)
  288. jobOpts['repeat']['offset'] = newOffset
  289. end
  290. end
  291. local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
  292. local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis
  293. local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
  294. if producerId == currentDelayedJobId then
  295. local eventsKey = KEYS[9]
  296. local maxEvents = getOrSetMaxEvents(metaKey)
  297. if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
  298. rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
  299. rcall("HINCRBY", schedulerKey, "ic", 1)
  300. rcall("INCR", KEYS[8])
  301. -- TODO: remove this workaround in next breaking change,
  302. -- all job-schedulers must save job data
  303. local templateData = schedulerAttributes[2] or ARGV[3]
  304. if templateData and templateData ~= '{}' then
  305. rcall("HSET", schedulerKey, "data", templateData)
  306. end
  307. local delay = nextMillis - now
  308. -- Fast Clamp delay to minimum of 0
  309. if delay < 0 then
  310. delay = 0
  311. end
  312. jobOpts["delay"] = delay
  313. addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, jobOpts, waitKey, pausedKey, KEYS[12], metaKey,
  314. prioritizedKey, KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerAttributes[1], maxEvents, ARGV[5],
  315. templateData or '{}', jobSchedulerId, delay)
  316. -- TODO: remove this workaround in next breaking change
  317. if KEYS[11] ~= "" then
  318. rcall("HSET", KEYS[11], "nrjid", nextDelayedJobId)
  319. end
  320. return nextDelayedJobId .. "" -- convert to string
  321. else
  322. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "duplicated", "jobId", nextDelayedJobId)
  323. end
  324. end
  325. end
  326. `;
  327. exports.updateJobScheduler = {
  328. name: 'updateJobScheduler',
  329. content,
  330. keys: 12,
  331. };
  332. //# sourceMappingURL=updateJobScheduler-12.js.map