addJobScheduler-11.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.addJobScheduler = void 0;
  4. const content = `--[[
  5. Adds a job scheduler, i.e. a job factory that creates jobs based on a given schedule (repeat options).
  6. Input:
  7. KEYS[1] 'repeat' key
  8. KEYS[2] 'delayed' key
  9. KEYS[3] 'wait' key
  10. KEYS[4] 'paused' key
  11. KEYS[5] 'meta' key
  12. KEYS[6] 'prioritized' key
  13. KEYS[7] 'marker' key
  14. KEYS[8] 'id' key
  15. KEYS[9] 'events' key
  16. KEYS[10] 'pc' priority counter
  17. KEYS[11] 'active' key
  18. ARGV[1] next milliseconds
  19. ARGV[2] msgpacked options
  20. [1] name
  21. [2] tz?
  22. [3] pattern?
  23. [4] endDate?
  24. [5] every?
  25. ARGV[3] jobs scheduler id
  26. ARGV[4] Json stringified template data
  27. ARGV[5] mspacked template opts
  28. ARGV[6] msgpacked delayed opts
  29. ARGV[7] timestamp
  30. ARGV[8] prefix key
  31. ARGV[9] producer key
  32. Output:
  33. repeatableKey - OK
  34. ]] local rcall = redis.call
  35. local repeatKey = KEYS[1]
  36. local delayedKey = KEYS[2]
  37. local waitKey = KEYS[3]
  38. local pausedKey = KEYS[4]
  39. local metaKey = KEYS[5]
  40. local prioritizedKey = KEYS[6]
  41. local eventsKey = KEYS[9]
  42. local nextMillis = ARGV[1]
  43. local jobSchedulerId = ARGV[3]
  44. local templateOpts = cmsgpack.unpack(ARGV[5])
  45. local now = tonumber(ARGV[7])
  46. local prefixKey = ARGV[8]
  47. local jobOpts = cmsgpack.unpack(ARGV[6])
  48. -- Includes
  49. --[[
  50. Add delay marker if needed.
  51. ]]
  52. -- Includes
  53. --[[
  54. Shared helper to store a job and enqueue it into the appropriate list/set.
  55. Handles delayed, prioritized, and standard (LIFO/FIFO) jobs.
  56. Emits the appropriate event after enqueuing ("delayed" or "waiting").
  57. Returns delay, priority from storeJob.
  58. ]]
  59. -- Includes
  60. --[[
  61. Adds a delayed job to the queue by doing the following:
  62. - Creates a new job key with the job data.
  63. - adds to delayed zset.
  64. - Emits a global event 'delayed' if the job is delayed.
  65. ]]
  66. -- Includes
  67. --[[
  68. Add delay marker if needed.
  69. ]]
  70. -- Includes
  71. --[[
  72. Function to return the next delayed job timestamp.
  73. ]]
  74. local function getNextDelayedTimestamp(delayedKey)
  75. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  76. if #result then
  77. local nextTimestamp = tonumber(result[2])
  78. if nextTimestamp ~= nil then
  79. return nextTimestamp / 0x1000
  80. end
  81. end
  82. end
  83. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  84. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  85. if nextTimestamp ~= nil then
  86. -- Replace the score of the marker with the newest known
  87. -- next timestamp.
  88. rcall("ZADD", markerKey, nextTimestamp, "1")
  89. end
  90. end
  91. --[[
  92. Bake in the job id first 12 bits into the timestamp
  93. to guarantee correct execution order of delayed jobs
  94. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  95. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  96. ]]
  97. local function getDelayedScore(delayedKey, timestamp, delay)
  98. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  99. local minScore = delayedTimestamp * 0x1000
  100. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  101. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  102. minScore, "WITHSCORES","LIMIT", 0, 1)
  103. if #result then
  104. local currentMaxScore = tonumber(result[2])
  105. if currentMaxScore ~= nil then
  106. if currentMaxScore >= maxScore then
  107. return maxScore, delayedTimestamp
  108. else
  109. return currentMaxScore + 1, delayedTimestamp
  110. end
  111. end
  112. end
  113. return minScore, delayedTimestamp
  114. end
  115. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  116. maxEvents, markerKey, delay)
  117. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  118. rcall("ZADD", delayedKey, score, jobId)
  119. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  120. "jobId", jobId, "delay", delayedTimestamp)
  121. -- mark that a delayed job is available
  122. addDelayMarkerIfNeeded(markerKey, delayedKey)
  123. end
  124. --[[
  125. Function to add job in target list and add marker if needed.
  126. ]]
  127. -- Includes
  128. --[[
  129. Add marker if needed when a job is available.
  130. ]]
  131. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  132. if not isPausedOrMaxed then
  133. rcall("ZADD", markerKey, 0, "0")
  134. end
  135. end
  136. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  137. rcall(pushCmd, targetKey, jobId)
  138. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  139. end
  140. --[[
  141. Function to add job considering priority.
  142. ]]
  143. -- Includes
  144. --[[
  145. Function to get priority score.
  146. ]]
  147. local function getPriorityScore(priority, priorityCounterKey)
  148. local prioCounter = rcall("INCR", priorityCounterKey)
  149. return priority * 0x100000000 + prioCounter % 0x100000000
  150. end
  151. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  152. isPausedOrMaxed)
  153. local score = getPriorityScore(priority, priorityCounterKey)
  154. rcall("ZADD", prioritizedKey, score, jobId)
  155. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  156. end
  157. --[[
  158. Function to check for the meta.paused key to decide if we are paused or not
  159. (since an empty list and !EXISTS are not really the same).
  160. ]]
  161. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  162. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  163. if queueAttributes[1] then
  164. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  165. else
  166. if queueAttributes[2] then
  167. local activeCount = rcall("LLEN", activeKey)
  168. if activeCount >= tonumber(queueAttributes[2]) then
  169. return waitKey, true, queueAttributes[3], queueAttributes[4]
  170. else
  171. return waitKey, false, queueAttributes[3], queueAttributes[4]
  172. end
  173. end
  174. end
  175. return waitKey, false, queueAttributes[3], queueAttributes[4]
  176. end
  177. --[[
  178. Function to store a job
  179. ]]
  180. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  181. parentKey, parentData, repeatJobKey)
  182. local jsonOpts = cjson.encode(opts)
  183. local delay = opts['delay'] or 0
  184. local priority = opts['priority'] or 0
  185. local debounceId = opts['de'] and opts['de']['id']
  186. local optionalValues = {}
  187. if parentKey ~= nil then
  188. table.insert(optionalValues, "parentKey")
  189. table.insert(optionalValues, parentKey)
  190. table.insert(optionalValues, "parent")
  191. table.insert(optionalValues, parentData)
  192. end
  193. if repeatJobKey then
  194. table.insert(optionalValues, "rjk")
  195. table.insert(optionalValues, repeatJobKey)
  196. end
  197. if debounceId then
  198. table.insert(optionalValues, "deid")
  199. table.insert(optionalValues, debounceId)
  200. end
  201. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  202. "timestamp", timestamp, "delay", delay, "priority", priority,
  203. unpack(optionalValues))
  204. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  205. return delay, priority
  206. end
  207. local function storeAndEnqueueJob(eventsKey, jobIdKey, jobId, name, data, opts,
  208. timestamp, parentKey, parentData, repeatJobKey, maxEvents,
  209. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  210. priorityCounterKey, delayedKey, markerKey)
  211. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, name, data,
  212. opts, timestamp, parentKey, parentData, repeatJobKey)
  213. if delay ~= 0 and delayedKey then
  214. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
  215. else
  216. local target, isPausedOrMaxed = getTargetQueueList(metaKey, activeKey, waitKey, pausedKey)
  217. if priority > 0 then
  218. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  219. priorityCounterKey, isPausedOrMaxed)
  220. else
  221. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  222. addJobInTargetList(target, markerKey, pushCmd, isPausedOrMaxed, jobId)
  223. end
  224. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  225. "jobId", jobId)
  226. end
  227. return delay, priority
  228. end
  229. local function addJobFromScheduler(jobKey, jobId, opts, waitKey, pausedKey, activeKey, metaKey,
  230. prioritizedKey, priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp,
  231. data, jobSchedulerId, repeatDelay)
  232. opts['delay'] = repeatDelay
  233. opts['jobId'] = jobId
  234. storeAndEnqueueJob(eventsKey, jobKey, jobId, name, data, opts,
  235. timestamp, nil, nil, jobSchedulerId, maxEvents,
  236. waitKey, pausedKey, activeKey, metaKey, prioritizedKey,
  237. priorityCounter, delayedKey, markerKey)
  238. end
  239. --[[
  240. Function to get max events value or set by default 10000.
  241. ]]
  242. local function getOrSetMaxEvents(metaKey)
  243. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  244. if not maxEvents then
  245. maxEvents = 10000
  246. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  247. end
  248. return maxEvents
  249. end
  250. --[[
  251. Function to check for the meta.paused key to decide if we are paused or not
  252. (since an empty list and !EXISTS are not really the same).
  253. ]]
  254. local function isQueuePaused(queueMetaKey)
  255. return rcall("HEXISTS", queueMetaKey, "paused") == 1
  256. end
  257. --[[
  258. Function to remove job.
  259. ]]
  260. -- Includes
  261. --[[
  262. Function to remove deduplication key if needed
  263. when a job is being removed.
  264. ]]
  265. local function removeDeduplicationKeyIfNeededOnRemoval(prefixKey,
  266. jobId, deduplicationId)
  267. if deduplicationId then
  268. local deduplicationKey = prefixKey .. "de:" .. deduplicationId
  269. local currentJobId = rcall('GET', deduplicationKey)
  270. if currentJobId and currentJobId == jobId then
  271. rcall("DEL", deduplicationKey)
  272. -- Also clean up any pending dedup-next data for this dedup ID
  273. rcall("DEL", prefixKey .. "dn:" .. deduplicationId)
  274. return 1
  275. end
  276. end
  277. end
  278. --[[
  279. Function to remove job keys.
  280. ]]
  281. local function removeJobKeys(jobKey)
  282. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  283. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  284. end
  285. --[[
  286. Check if this job has a parent. If so we will just remove it from
  287. the parent child list, but if it is the last child we should move the parent to "wait/paused"
  288. which requires code from "moveToFinished"
  289. ]]
  290. -- Includes
  291. --[[
  292. Functions to destructure job key.
  293. Just a bit of warning, these functions may be a bit slow and affect performance significantly.
  294. ]]
  295. local getJobIdFromKey = function (jobKey)
  296. return string.match(jobKey, ".*:(.*)")
  297. end
  298. local getJobKeyPrefix = function (jobKey, jobId)
  299. return string.sub(jobKey, 0, #jobKey - #jobId)
  300. end
  301. local function _moveParentToWait(parentPrefix, parentId, emitEvent)
  302. local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
  303. parentPrefix .. "wait", parentPrefix .. "paused")
  304. addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)
  305. if emitEvent then
  306. local parentEventStream = parentPrefix .. "events"
  307. rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children")
  308. end
  309. end
  310. local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId)
  311. if parentKey then
  312. local parentDependenciesKey = parentKey .. ":dependencies"
  313. local result = rcall("SREM", parentDependenciesKey, jobKey)
  314. if result > 0 then
  315. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  316. if pendingDependencies == 0 then
  317. local parentId = getJobIdFromKey(parentKey)
  318. local parentPrefix = getJobKeyPrefix(parentKey, parentId)
  319. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  320. if numRemovedElements == 1 then
  321. if hard then -- remove parent in same queue
  322. if parentPrefix == baseKey then
  323. removeParentDependencyKey(parentKey, hard, nil, baseKey, nil)
  324. removeJobKeys(parentKey)
  325. if debounceId then
  326. rcall("DEL", parentPrefix .. "de:" .. debounceId)
  327. end
  328. else
  329. _moveParentToWait(parentPrefix, parentId)
  330. end
  331. else
  332. _moveParentToWait(parentPrefix, parentId, true)
  333. end
  334. end
  335. end
  336. return true
  337. end
  338. else
  339. local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid")
  340. local missedParentKey = parentAttributes[1]
  341. if( (type(missedParentKey) == "string") and missedParentKey ~= ""
  342. and (rcall("EXISTS", missedParentKey) == 1)) then
  343. local parentDependenciesKey = missedParentKey .. ":dependencies"
  344. local result = rcall("SREM", parentDependenciesKey, jobKey)
  345. if result > 0 then
  346. local pendingDependencies = rcall("SCARD", parentDependenciesKey)
  347. if pendingDependencies == 0 then
  348. local parentId = getJobIdFromKey(missedParentKey)
  349. local parentPrefix = getJobKeyPrefix(missedParentKey, parentId)
  350. local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId)
  351. if numRemovedElements == 1 then
  352. if hard then
  353. if parentPrefix == baseKey then
  354. removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil)
  355. removeJobKeys(missedParentKey)
  356. if parentAttributes[2] then
  357. rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2])
  358. end
  359. else
  360. _moveParentToWait(parentPrefix, parentId)
  361. end
  362. else
  363. _moveParentToWait(parentPrefix, parentId, true)
  364. end
  365. end
  366. end
  367. return true
  368. end
  369. end
  370. end
  371. return false
  372. end
  373. local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey)
  374. local jobKey = baseKey .. jobId
  375. removeParentDependencyKey(jobKey, hard, nil, baseKey)
  376. if shouldRemoveDeduplicationKey then
  377. local deduplicationId = rcall("HGET", jobKey, "deid")
  378. removeDeduplicationKeyIfNeededOnRemoval(baseKey, jobId, deduplicationId)
  379. end
  380. removeJobKeys(jobKey)
  381. end
  382. --[[
  383. Function to store a job scheduler
  384. ]]
  385. local function storeJobScheduler(schedulerId, schedulerKey, repeatKey, nextMillis, opts,
  386. templateData, templateOpts)
  387. rcall("ZADD", repeatKey, nextMillis, schedulerId)
  388. local optionalValues = {}
  389. if opts['tz'] then
  390. table.insert(optionalValues, "tz")
  391. table.insert(optionalValues, opts['tz'])
  392. end
  393. if opts['limit'] then
  394. table.insert(optionalValues, "limit")
  395. table.insert(optionalValues, opts['limit'])
  396. end
  397. if opts['pattern'] then
  398. table.insert(optionalValues, "pattern")
  399. table.insert(optionalValues, opts['pattern'])
  400. end
  401. if opts['startDate'] then
  402. table.insert(optionalValues, "startDate")
  403. table.insert(optionalValues, opts['startDate'])
  404. end
  405. if opts['endDate'] then
  406. table.insert(optionalValues, "endDate")
  407. table.insert(optionalValues, opts['endDate'])
  408. end
  409. if opts['every'] then
  410. table.insert(optionalValues, "every")
  411. table.insert(optionalValues, opts['every'])
  412. end
  413. if opts['offset'] then
  414. table.insert(optionalValues, "offset")
  415. table.insert(optionalValues, opts['offset'])
  416. else
  417. local offset = rcall("HGET", schedulerKey, "offset")
  418. if offset then
  419. table.insert(optionalValues, "offset")
  420. table.insert(optionalValues, tonumber(offset))
  421. end
  422. end
  423. local jsonTemplateOpts = cjson.encode(templateOpts)
  424. if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
  425. table.insert(optionalValues, "opts")
  426. table.insert(optionalValues, jsonTemplateOpts)
  427. end
  428. if templateData and templateData ~= '{}' then
  429. table.insert(optionalValues, "data")
  430. table.insert(optionalValues, templateData)
  431. end
  432. table.insert(optionalValues, "ic")
  433. table.insert(optionalValues, rcall("HGET", schedulerKey, "ic") or 1)
  434. rcall("DEL", schedulerKey) -- remove all attributes and then re-insert new ones
  435. rcall("HMSET", schedulerKey, "name", opts['name'], unpack(optionalValues))
  436. end
  437. local function getJobSchedulerEveryNextMillis(prevMillis, every, now, offset, startDate)
  438. local nextMillis
  439. if not prevMillis then
  440. if startDate then
  441. -- Assuming startDate is passed as milliseconds from JavaScript
  442. nextMillis = tonumber(startDate)
  443. nextMillis = nextMillis > now and nextMillis or now
  444. else
  445. nextMillis = now
  446. end
  447. else
  448. nextMillis = prevMillis + every
  449. -- check if we may have missed some iterations
  450. if nextMillis < now then
  451. nextMillis = math.floor(now / every) * every + every + (offset or 0)
  452. end
  453. end
  454. if not offset or offset == 0 then
  455. local timeSlot = math.floor(nextMillis / every) * every;
  456. offset = nextMillis - timeSlot;
  457. end
  458. -- Return a tuple nextMillis, offset
  459. return math.floor(nextMillis), math.floor(offset)
  460. end
  461. -- If we are overriding a repeatable job we must delete the delayed job for
  462. -- the next iteration.
  463. local schedulerKey = repeatKey .. ":" .. jobSchedulerId
  464. local maxEvents = getOrSetMaxEvents(metaKey)
  465. local templateData = ARGV[4]
  466. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
  467. if prevMillis then
  468. prevMillis = tonumber(prevMillis)
  469. end
  470. local schedulerOpts = cmsgpack.unpack(ARGV[2])
  471. local every = schedulerOpts['every']
  472. -- For backwards compatibility we also check the offset from the job itself.
  473. -- could be removed in future major versions.
  474. local jobOffset = jobOpts['repeat'] and jobOpts['repeat']['offset'] or 0
  475. local offset = schedulerOpts['offset'] or jobOffset or 0
  476. local newOffset = offset
  477. local updatedEvery = false
  478. if every then
  479. -- if we changed the 'every' value we need to reset millis to nil
  480. local millis = prevMillis
  481. if prevMillis then
  482. local prevEvery = tonumber(rcall("HGET", schedulerKey, "every"))
  483. if prevEvery ~= every then
  484. millis = nil
  485. updatedEvery = true
  486. end
  487. end
  488. local startDate = schedulerOpts['startDate']
  489. nextMillis, newOffset = getJobSchedulerEveryNextMillis(millis, every, now, offset, startDate)
  490. end
  491. local function removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, jobId, metaKey,
  492. eventsKey)
  493. if rcall("ZSCORE", delayedKey, jobId) then
  494. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  495. rcall("ZREM", delayedKey, jobId)
  496. return true
  497. elseif rcall("ZSCORE", prioritizedKey, jobId) then
  498. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  499. rcall("ZREM", prioritizedKey, jobId)
  500. return true
  501. else
  502. local pausedOrWaitKey = waitKey
  503. if isQueuePaused(metaKey) then
  504. pausedOrWaitKey = pausedKey
  505. end
  506. if rcall("LREM", pausedOrWaitKey, 1, jobId) > 0 then
  507. removeJob(jobId, true, prefixKey, true --[[remove debounce key]] )
  508. return true
  509. end
  510. end
  511. return false
  512. end
  513. local removedPrevJob = false
  514. if prevMillis then
  515. local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
  516. local currentJobKey = schedulerKey .. ":" .. prevMillis
  517. -- In theory it should always exist the currentJobKey if there is a prevMillis unless something has
  518. -- gone really wrong.
  519. if rcall("EXISTS", currentJobKey) == 1 then
  520. removedPrevJob = removeJobFromScheduler(prefixKey, delayedKey, prioritizedKey, waitKey, pausedKey, currentJobId,
  521. metaKey, eventsKey)
  522. end
  523. end
  524. if removedPrevJob then
  525. -- The jobs has been removed and we want to replace it, so lets use the same millis.
  526. if every and not updatedEvery then
  527. nextMillis = prevMillis
  528. end
  529. else
  530. -- Special case where no job was removed, and we need to add the next iteration.
  531. schedulerOpts['offset'] = newOffset
  532. end
  533. -- Check for job ID collision with existing jobs (in any state)
  534. local jobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
  535. local jobKey = prefixKey .. jobId
  536. -- If there's already a job with this ID, in a state
  537. -- that is not updatable (active, completed, failed) we must
  538. -- handle the collision
  539. local hasCollision = false
  540. if rcall("EXISTS", jobKey) == 1 then
  541. if every then
  542. -- For 'every' case: try next time slot to avoid collision
  543. local nextSlotMillis = nextMillis + every
  544. local nextSlotJobId = "repeat:" .. jobSchedulerId .. ":" .. nextSlotMillis
  545. local nextSlotJobKey = prefixKey .. nextSlotJobId
  546. if rcall("EXISTS", nextSlotJobKey) == 0 then
  547. -- Next slot is free, use it
  548. nextMillis = nextSlotMillis
  549. jobId = nextSlotJobId
  550. else
  551. -- Next slot also has a job, return error code
  552. return -11 -- SchedulerJobSlotsBusy
  553. end
  554. else
  555. hasCollision = true
  556. end
  557. end
  558. local delay = nextMillis - now
  559. -- Fast Clamp delay to minimum of 0
  560. if delay < 0 then
  561. delay = 0
  562. end
  563. local nextJobKey = schedulerKey .. ":" .. nextMillis
  564. if not hasCollision or removedPrevJob then
  565. -- jobId already calculated above during collision check
  566. storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, templateData, templateOpts)
  567. rcall("INCR", KEYS[8])
  568. addJobFromScheduler(nextJobKey, jobId, jobOpts, waitKey, pausedKey, KEYS[11], metaKey, prioritizedKey, KEYS[10],
  569. delayedKey, KEYS[7], eventsKey, schedulerOpts['name'], maxEvents, now, templateData, jobSchedulerId, delay)
  570. elseif hasCollision then
  571. -- For 'pattern' case: return error code
  572. return -10 -- SchedulerJobIdCollision
  573. end
  574. if ARGV[9] ~= "" then
  575. rcall("HSET", ARGV[9], "nrjid", jobId)
  576. end
  577. return {jobId .. "", delay}
  578. `;
  579. exports.addJobScheduler = {
  580. name: 'addJobScheduler',
  581. content,
  582. keys: 11,
  583. };
  584. //# sourceMappingURL=addJobScheduler-11.js.map