addJobScheduler-11.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. const content = `--[[
  2. Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).
  3. Input:
  4. KEYS[1] 'repeat' key
  5. KEYS[2] 'delayed' key
  6. KEYS[3] 'wait' key
  7. KEYS[4] 'paused' key
  8. KEYS[5] 'meta' key
  9. KEYS[6] 'prioritized' key
  10. KEYS[7] 'marker' key
  11. KEYS[8] 'id' key
  12. KEYS[9] 'events' key
  13. KEYS[10] 'pc' priority counter
  14. KEYS[11] 'active' key
  15. ARGV[1] next milliseconds
  16. ARGV[2] msgpacked options
  17. [1] name
  18. [2] tz?
  19. [3] pattern?
  20. [4] endDate?
  21. [5] every?
  22. ARGV[3] jobs scheduler id
  23. ARGV[4] Json stringified template data
  24. ARGV[5] mspacked template opts
  25. ARGV[6] msgpacked delayed opts
  26. ARGV[7] timestamp
  27. ARGV[8] prefix key
  28. ARGV[9] producer key
  29. Output:
  30. repeatableKey - OK
  31. ]] local rcall = redis.call
  32. local repeatKey = KEYS[1]
  33. local delayedKey = KEYS[2]
  34. local waitKey = KEYS[3]
  35. local pausedKey = KEYS[4]
  36. local metaKey = KEYS[5]
  37. local prioritizedKey = KEYS[6]
  38. local eventsKey = KEYS[9]
  39. local nextMillis = ARGV[1]
  40. local jobSchedulerId = ARGV[3]
  41. local templateOpts = cmsgpack.unpack(ARGV[5])
  42. local now = tonumber(ARGV[7])
  43. local prefixKey = ARGV[8]
  44. local jobOpts = cmsgpack.unpack(ARGV[6])
  45. -- Includes
  46. --[[
  47. Add delay marker if needed.
  48. ]]
  49. -- Includes
  50. --[[
  51. Shared helper to store a job and enqueue it into the appropriate list/set.
  52. Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
  53. Emits the appropriate event after enqueuing ("delayed" or "waiting").
  54. Returns delay, priority from storeJob.
  55. ]]
  56. -- Includes
  57. --[[
  58. Adds a delayed job to the queue by doing the following:
  59. - Creates a new job key with the job data.
  60. - adds to delayed zset.
  61. - Emits a global event 'delayed' if the job is delayed.
  62. ]]
  63. -- Includes
  64. --[[
  65. Add delay marker if needed.
  66. ]]
  67. -- Includes
  68. --[[
  69. Function to return the next delayed job timestamp.
  70. ]]
  71. local function getNextDelayedTimestamp(delayedKey)
  72. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  73. if #result then
  74. local nextTimestamp = tonumber(result[2])
  75. if nextTimestamp ~= nil then
  76. return nextTimestamp / 0x1000
  77. end
  78. end
  79. end
  80. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  81. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  82. if nextTimestamp ~= nil then
  83. -- Replace the score of the marker with the newest known
  84. -- next timestamp.
  85. rcall("ZADD", markerKey, nextTimestamp, "1")
  86. end
  87. end
  88. --[[
  89. Bake in the job id first 12 bits into the timestamp
  90. to guarantee correct execution order of delayed jobs
  91. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  92. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  93. ]]
  94. local function getDelayedScore(delayedKey, timestamp, delay)
  95. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  96. local minScore = delayedTimestamp * 0x1000
  97. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  98. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  99. minScore, "WITHSCORES","LIMIT", 0, 1)
  100. if #result then
  101. local currentMaxScore = tonumber(result[2])
  102. if currentMaxScore ~= nil then
  103. if currentMaxScore >= maxScore then
  104. return maxScore, delayedTimestamp
  105. else
  106. return currentMaxScore + 1, delayedTimestamp
  107. end
  108. end
  109. end
  110. return minScore, delayedTimestamp
  111. end
  112. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  113. maxEvents, markerKey, delay)
  114. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  115. rcall("ZADD", delayedKey, score, jobId)
  116. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  117. "jobId", jobId, "delay", delayedTimestamp)
  118. -- mark that a delayed job is available
  119. addDelayMarkerIfNeeded(markerKey, delayedKey)
  120. end
  121. --[[
  122. Function to add job in target list and add marker if needed.
  123. ]]
  124. -- Includes
  125. --[[
  126. Add marker if needed when a job is available.
  127. ]]
  128. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  129. if not isPausedOrMaxed then
  130. rcall("ZADD", markerKey, 0, "0")
  131. end
  132. end
  133. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  134. rcall(pushCmd, targetKey, jobId)
  135. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  136. end
  137. --[[
  138. Function to add job considering priority.
  139. ]]
  140. -- Includes
  141. --[[
  142. Function to get priority score.
  143. ]]
  144. local function getPriorityScore(priority, priorityCounterKey)
  145. local prioCounter = rcall("INCR", priorityCounterKey)
  146. return priority * 0x100000000 + prioCounter % 0x100000000
  147. end
  148. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  149. isPausedOrMaxed)
  150. local score = getPriorityScore(priority, priorityCounterKey)
  151. rcall("ZADD", prioritizedKey, score, jobId)
  152. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  153. end
  154. --[[
  155. Function to check for the meta.paused key to decide if we are paused or not
  156. (since an empty list and !EXISTS are not really the same).
  157. ]]
  158. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  159. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  160. if queueAttributes[1] then
  161. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  162. else
  163. if queueAttributes[2] then
  164. local activeCount = rcall("LLEN", activeKey)
  165. if activeCount >= tonumber(queueAttributes[2]) then
  166. return waitKey, true, queueAttributes[3], queueAttributes[4]
  167. else
  168. return waitKey, false, queueAttributes[3], queueAttributes[4]
  169. end
  170. end
  171. end
  172. return waitKey, false, queueAttributes[3], queueAttributes[4]
  173. end
  174. --[[
  175. Function to store a job
  176. ]]
  177. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  178. parentKey, parentData, repeatJobKey)
  179. local jsonOpts = cjson.encode(opts)
  180. local delay = opts['delay'] or 0
  181. local priority = opts['priority'] or 0
  182. local debounceId = opts['de'] and opts['de']['id']
  183. local optionalValues = {}
  184. if parentKey ~= nil then
  185. table.insert(optionalValues, "parentKey")
  186. table.insert(optionalValues, parentKey)
  187. table.insert(optionalValues, "parent")
  188. table.insert(optionalValues, parentData)
  189. end
  190. if repeatJobKey then
  191. table.insert(optionalValues, "rjk")
  192. table.insert(optionalValues, repeatJobKey)
  193. end
  194. if debounceId then
  195. table.insert(optionalValues, "deid")
  196. table.insert(optionalValues, debounceId)
  197. end
  198. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  199. "timestamp", timestamp, "delay", delay, "priority", priority,
  200. unpack(optionalValues))
  201. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  202. return delay, priority
  203. end
  204. local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
  205. timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  206. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  207. priorityCounterKey, delayedKey, markerKey)
  208. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
  209. opts, timestamp, parentKey, parentData, repeatJobKey)
  210. if delay ~= 0 and delayedKey then
  211. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
  212. else
  213. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  214. if priority > 0 then
  215. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  216. priorityCounterKey, isPausedOrMaxed)
  217. else
  218. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  219. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  220. end
  221. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  222. "jobId", jobId)
  223. end
  224. return delay, priority
  225. end
  226. local function addJobFromScheduler(jobKey, jobId, opts, waitKey, pausedKey, activeKey, metaKey,
  227. prioritizedKey, priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp,
  228. data, jobSchedulerId, repeatDelay)
  229. opts['delay'] = repeatDelay
  230. opts['jobId'] = jobId
  231. storeAndEnqueueJob(eventsKey, jobKey, jobId, name, data, opts,
  232. timestamp, nil, nil, jobSchedulerId, maxEvents,
  233. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  234. priorityCounter, delayedKey, markerKey)
  235. end
  236. --[[
  237. Function to get max events value or set by default 10000.
  238. ]]
  239. local function getOrSetMaxEvents(metaKey)
  240. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  241. if not maxEvents then
  242. maxEvents = 10000
  243. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  244. end
  245. return maxEvents
  246. end
  247. --[[
  248. Function to check for the meta.paused key to decide if we are paused or not
  249. (since an empty list and !EXISTS are not really the same).
  250. ]]
  251. local function isQueuePaused(queueMetaKey)
  252. return rcall("HEXISTS", queueMetaKey, "paused") == 1
  253. end
  254. --[[
  255. Function to remove job.
  256. ]]
  257. -- Includes
  258. --[[
  259. Function to remove deduplication key if needed
  260. when a job is being removed.
  261. ]]
  262. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  263. jobId, deduplicationId)
  264. if deduplicationId then
  265. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  266. local currentJobId = rcall('GET', deduplicationKey)
  267. if currentJobId and currentJobId == jobId then
  268. rcall("DEL", deduplicationKey)
  269. -- Also clean up any pending dedup-next data for this dedup ID
  270. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  271. return 1
  272. end
  273. end
  274. end
  275. --[[
  276. Function to remove job keys.
  277. ]]
  278. local function removeJobKeys(jobKey)
  279. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  280. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  281. end
  282. --[[
  283. Check if this job has a parent. If so we will just remove it from
  284. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  285. which requires code from "moveToFinished"
  286. ]]
  287. -- Includes
  288. --[[
  289. Functions to destructure job key.
  290. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  291. ]]
  292. local getJobIdFromKey = function (jobKey)
  293. return string.match(jobKey, ".*:(.*)")
  294. end
  295. local getJobKeyPrefix = function (jobKey, jobId)
  296. return string.sub(jobKey, 0, #jobKey - #jobId)
  297. end
  298. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  299. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  300. parentPrefix .. "wait", parentPrefix .. "paused")
  301. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  302. if emitEvent then
  303. local parentEventStream = parentPrefix .. "events"
  304. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  305. end
  306. end
  307. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  308. if parentKey then
  309. local parentDependenciesKey = parentKey .. ":dependencies"
  310. local result = rcall("SREM", parentDependenciesKey, jobKey)
  311. if result > 0 then
  312. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  313. if pendingDependencies == 0 then
  314. local parentId = getJobIdFromKey(parentKey)
  315. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  316. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  317. if numRemovedElements == 1 then
  318. if hard then -- remove parent in same queue
  319. if parentPrefix == baseKey then
  320. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  321. removeJobKeys(parentKey)
  322. if debounceId then
  323. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  324. end
  325. else
  326. _moveParentToWait(parentPrefix, parentId)
  327. end
  328. else
  329. _moveParentToWait(parentPrefix, parentId, true)
  330. end
  331. end
  332. end
  333. return true
  334. end
  335. else
  336. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  337. local missedParentKey = parentAttributes[1]
  338. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  339. and (rcall("EXISTS", missedParentKey) == 1)) then
  340. local parentDependenciesKey = missedParentKey .. ":dependencies"
  341. local result = rcall("SREM", parentDependenciesKey, jobKey)
  342. if result > 0 then
  343. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  344. if pendingDependencies == 0 then
  345. local parentId = getJobIdFromKey(missedParentKey)
  346. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  347. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  348. if numRemovedElements == 1 then
  349. if hard then
  350. if parentPrefix == baseKey then
  351. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  352. removeJobKeys(missedParentKey)
  353. if parentAttributes[2] then
  354. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  355. end
  356. else
  357. _moveParentToWait(parentPrefix, parentId)
  358. end
  359. else
  360. _moveParentToWait(parentPrefix, parentId, true)
  361. end
  362. end
  363. end
  364. return true
  365. end
  366. end
  367. end
  368. return false
  369. end
  370. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  371. local jobKey = baseKey .. jobId
  372. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  373. if shouldRemoveDeduplicationKey then
  374. local deduplicationId = rcall("HGET", jobKey, "deid")
  375. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  376. end
  377. removeJobKeys(jobKey)
  378. end
  379. --[[
  380. Function to store a job scheduler
  381. ]]
  382. local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMillis, opts,
  383. templateData, templateOpts)
  384. rcall("ZADD", repeatKey, nextMillis, schedulerId)
  385. local optionalValues = {}
  386. if opts['tz'] then
  387. table.insert(optionalValues, "tz")
  388. table.insert(optionalValues, opts['tz'])
  389. end
  390. if opts['limit'] then
  391. table.insert(optionalValues, "limit")
  392. table.insert(optionalValues, opts['limit'])
  393. end
  394. if opts['pattern'] then
  395. table.insert(optionalValues, "pattern")
  396. table.insert(optionalValues, opts['pattern'])
  397. end
  398. if opts['startDate'] then
  399. table.insert(optionalValues, "startDate")
  400. table.insert(optionalValues, opts['startDate'])
  401. end
  402. if opts['endDate'] then
  403. table.insert(optionalValues, "endDate")
  404. table.insert(optionalValues, opts['endDate'])
  405. end
  406. if opts['every'] then
  407. table.insert(optionalValues, "every")
  408. table.insert(optionalValues, opts['every'])
  409. end
  410. if opts['offset'] then
  411. table.insert(optionalValues, "offset")
  412. table.insert(optionalValues, opts['offset'])
  413. else
  414. local offset = rcall("HGET", schedulerKey, "offset")
  415. if offset then
  416. table.insert(optionalValues, "offset")
  417. table.insert(optionalValues, tonumber(offset))
  418. end
  419. end
  420. local jsonTemplateOpts = cjson.encode(templateOpts)
  421. if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
  422. table.insert(optionalValues, "opts")
  423. table.insert(optionalValues, jsonTemplateOpts)
  424. end
  425. if templateData and templateData ~= '{}' then
  426. table.insert(optionalValues, "data")
  427. table.insert(optionalValues, templateData)
  428. end
  429. table.insert(optionalValues, "ic")
  430. table.insert(optionalValues, rcall("HGET", schedulerKey, "ic") or 1)
  431. rcall("DEL", schedulerKey) -- remove all attributes and then re-insert new ones
  432. rcall("HMSET", schedulerKey, "name", opts['name'], unpack(optionalValues))
  433. end
  434. local function getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  435. local nextMillis
  436. if not prevMillis then
  437. if startDate then
  438. -- Assuming startDate is passed as milliseconds from JavaScript
  439. nextMillis = tonumber(startDate)
  440. nextMillis = nextMillis > now and nextMillis or now
  441. else
  442. nextMillis = now
  443. end
  444. else
  445. nextMillis = prevMillis + every
  446. -- check if we may have missed some iterations
  447. if nextMillis < now then
  448. nextMillis = math.floor(now / every) * every + every + (offset or 0)
  449. end
  450. end
  451. if not offset or offset == 0 then
  452. local timeSlot = math.floor(nextMillis / every) * every;
  453. offset = nextMillis - timeSlot;
  454. end
  455. -- Return a tuple nextMillis, offset
  456. return math.floor(nextMillis), math.floor(offset)
  457. end
  458. -- If we are overriding a repeatable job we must delete the delayed job for
  459. -- the next iteration.
  460. local schedulerKey = repeatKey .. ":" .. jobSchedulerId
  461. local maxEvents = getOrSetMaxEvents(metaKey)
  462. local templateData = ARGV[4]
  463. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
  464. if prevMillis then
  465. prevMillis = tonumber(prevMillis)
  466. end
  467. local schedulerOpts = cmsgpack.unpack(ARGV[2])
  468. local every = schedulerOpts['every']
  469. -- For backwards compatibility we also check the offset from the job itself.
  470. -- could be removed in future major versions.
  471. local jobOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
  472. local offset = schedulerOpts['offset'] or jobOffset or 0
  473. local newOffset = offset
  474. local updatedEvery = false
  475. if every then
  476. -- if we changed the 'every' value we need to reset millis to nil
  477. local millis = prevMillis
  478. if prevMillis then
  479. local prevEvery = tonumber(rcall("HGET", schedulerKey, "every"))
  480. if prevEvery ~= every then
  481. millis = nil
  482. updatedEvery = true
  483. end
  484. end
  485. local startDate = schedulerOpts['startDate']
  486. nextMillis, newOffset = getJobSchedulerEveryNextMillis(millis, every, now, offset, startDate)
  487. end
  488. local function removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, jobId, metaKey,
  489. eventsKey)
  490. if rcall("ZSCORE", delayedKey, jobId) then
  491. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  492. rcall("ZREM", delayedKey, jobId)
  493. return true
  494. elseif rcall("ZSCORE", prioritizedKey, jobId) then
  495. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  496. rcall("ZREM", prioritizedKey, jobId)
  497. return true
  498. else
  499. local pausedOrWaitKey = waitKey
  500. if isQueuePaused(metaKey) then
  501. pausedOrWaitKey = pausedKey
  502. end
  503. if rcall("LREM", pausedOrWaitKey, 1, jobId) > 0 then
  504. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  505. return true
  506. end
  507. end
  508. return false
  509. end
  510. local removedPrevJob = false
  511. if prevMillis then
  512. local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
  513. local currentJobKey = schedulerKey .. ":" .. prevMillis
  514. -- In theory it should always exist the currentJobKey if there is a prevMillis unless something has
  515. -- gone really wrong.
  516. if rcall("EXISTS", currentJobKey) == 1 then
  517. removedPrevJob = removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, currentJobId,
  518. metaKey, eventsKey)
  519. end
  520. end
  521. if removedPrevJob then
  522. -- The jobs has been removed and we want to replace it, so lets use the same millis.
  523. if every and not updatedEvery then
  524. nextMillis = prevMillis
  525. end
  526. else
  527. -- Special case where no job was removed, and we need to add the next iteration.
  528. schedulerOpts['offset'] = newOffset
  529. end
  530. -- Check for job ID collision with existing jobs (in any state)
  531. local jobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
  532. local jobKey = prefixKey .. jobId
  533. -- If there's already a job with this ID, in a state
  534. -- that is not updatable (active, completed, failed) we must
  535. -- handle the collision
  536. local hasCollision = false
  537. if rcall("EXISTS", jobKey) == 1 then
  538. if every then
  539. -- For 'every' case: try next time slot to avoid collision
  540. local nextSlotMillis = nextMillis + every
  541. local nextSlotJobId = "repeat:" .. jobSchedulerId .. ":" .. nextSlotMillis
  542. local nextSlotJobKey = prefixKey .. nextSlotJobId
  543. if rcall("EXISTS", nextSlotJobKey) == 0 then
  544. -- Next slot is free, use it
  545. nextMillis = nextSlotMillis
  546. jobId = nextSlotJobId
  547. else
  548. -- Next slot also has a job, return error code
  549. return -11 -- SchedulerJobSlotsBusy
  550. end
  551. else
  552. hasCollision = true
  553. end
  554. end
  555. local delay = nextMillis - now
  556. -- Fast Clamp delay to minimum of 0
  557. if delay < 0 then
  558. delay = 0
  559. end
  560. local nextJobKey = schedulerKey .. ":" .. nextMillis
  561. if not hasCollision or removedPrevJob then
  562. -- jobId already calculated above during collision check
  563. storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, templateData, templateOpts)
  564. rcall("INCR", KEYS[8])
  565. addJobFromScheduler(nextJobKey, jobId, jobOpts, waitKey, pausedKey, KEYS[11], metaKey, prioritizedKey, KEYS[10],
  566. delayedKey, KEYS[7], eventsKey, schedulerOpts['name'], maxEvents, now, templateData, jobSchedulerId, delay)
  567. elseif hasCollision then
  568. -- For 'pattern' case: return error code
  569. return -10 -- SchedulerJobIdCollision
  570. end
  571. if ARGV[9] ~= "" then
  572. rcall("HSET", ARGV[9], "nrjid", jobId)
  573. end
  574. return {jobId .. "", delay}
  575. `;
  576. export const addJobScheduler = {
  577. name: 'addJobScheduler',
  578. content,
  579. keys: 11,
  580. };
  581. //# sourceMappingURL=addJobScheduler-11.js.map