addDelayedJob-6.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. const content = `--[[
  2. Adds a delayed job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - computes timestamp.
  6. - adds to delayed zset.
  7. - Emits a global event 'delayed' if the job is delayed.
  8. Input:
  9. KEYS[1] 'marker',
  10. KEYS[2] 'meta'
  11. KEYS[3] 'id'
  12. KEYS[4] 'delayed'
  13. KEYS[5] 'completed'
  14. KEYS[6] events stream key
  15. ARGV[1] msgpacked arguments array
  16. [1] key prefix,
  17. [2] custom id (use custom instead of one generated automatically)
  18. [3] name
  19. [4] timestamp
  20. [5] parentKey?
  21. [6] parent dependencies key.
  22. [7] parent? {id, queueKey}
  23. [8] repeat job key
  24. [9] deduplication key
  25. ARGV[2] Json stringified job data
  26. ARGV[3] msgpacked options
  27. Output:
  28. jobId - OK
  29. -5 - Missing parent key
  30. ]]
  31. local metaKey = KEYS[2]
  32. local idKey = KEYS[3]
  33. local delayedKey = KEYS[4]
  34. local completedKey = KEYS[5]
  35. local eventsKey = KEYS[6]
  36. local jobId
  37. local jobIdKey
  38. local rcall = redis.call
  39. local args = cmsgpack.unpack(ARGV[1])
  40. local data = ARGV[2]
  41. local parentKey = args[5]
  42. local parent = args[7]
  43. local repeatJobKey = args[8]
  44. local deduplicationKey = args[9]
  45. local parentData
  46. -- Includes
  47. --[[
  48. Adds a delayed job to the queue by doing the following:
  49. - Creates a new job key with the job data.
  50. - adds to delayed zset.
  51. - Emits a global event 'delayed' if the job is delayed.
  52. ]]
  53. -- Includes
  54. --[[
  55. Add delay marker if needed.
  56. ]]
  57. -- Includes
  58. --[[
  59. Function to return the next delayed job timestamp.
  60. ]]
  61. local function getNextDelayedTimestamp(delayedKey)
  62. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  63. if #result then
  64. local nextTimestamp = tonumber(result[2])
  65. if nextTimestamp ~= nil then
  66. return nextTimestamp / 0x1000
  67. end
  68. end
  69. end
  70. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  71. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  72. if nextTimestamp ~= nil then
  73. -- Replace the score of the marker with the newest known
  74. -- next timestamp.
  75. rcall("ZADD", markerKey, nextTimestamp, "1")
  76. end
  77. end
  78. --[[
  79. Bake in the job id first 12 bits into the timestamp
  80. to guarantee correct execution order of delayed jobs
  81. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  82. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  83. ]]
  84. local function getDelayedScore(delayedKey, timestamp, delay)
  85. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  86. local minScore = delayedTimestamp * 0x1000
  87. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  88. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  89. minScore, "WITHSCORES","LIMIT", 0, 1)
  90. if #result then
  91. local currentMaxScore = tonumber(result[2])
  92. if currentMaxScore ~= nil then
  93. if currentMaxScore >= maxScore then
  94. return maxScore, delayedTimestamp
  95. else
  96. return currentMaxScore + 1, delayedTimestamp
  97. end
  98. end
  99. end
  100. return minScore, delayedTimestamp
  101. end
  102. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  103. maxEvents, markerKey, delay)
  104. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  105. rcall("ZADD", delayedKey, score, jobId)
  106. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  107. "jobId", jobId, "delay", delayedTimestamp)
  108. -- mark that a delayed job is available
  109. addDelayMarkerIfNeeded(markerKey, delayedKey)
  110. end
  111. --[[
  112. Function to debounce a job.
  113. ]]
  114. -- Includes
  115. --[[
  116. Function to deduplicate a job.
  117. ]]
  118. --[[
  119. Function to set the deduplication key for a job.
  120. Uses TTL from deduplication opts if provided.
  121. ]]
  122. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  123. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  124. if ttl and ttl > 0 then
  125. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  126. else
  127. rcall('SET', deduplicationKey, jobId)
  128. end
  129. end
  130. --[[
  131. Function to store a deduplicated next job if the existing job is active
  132. and keepLastIfActive is set. When the active job finishes, the stored
  133. proto-job is used to create a real job in the queue.
  134. Returns true if the proto-job was stored, false otherwise.
  135. ]]
  136. --[[
  137. Functions to check if a item belongs to a list.
  138. ]]
  139. local function checkItemInList(list, item)
  140. for _, v in pairs(list) do
  141. if v == item then
  142. return 1
  143. end
  144. end
  145. return nil
  146. end
  147. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  148. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  149. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  150. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  151. local activeKey = prefix .. "active"
  152. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  153. if checkItemInList(activeItems, currentDebounceJobId) then
  154. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  155. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  156. if parentKey then
  157. fields[#fields+1] = 'pk'
  158. fields[#fields+1] = parentKey
  159. end
  160. if parentData then
  161. fields[#fields+1] = 'pd'
  162. fields[#fields+1] = parentData
  163. end
  164. if parentDependenciesKey then
  165. fields[#fields+1] = 'pdk'
  166. fields[#fields+1] = parentDependenciesKey
  167. end
  168. if repeatJobKey then
  169. fields[#fields+1] = 'rjk'
  170. fields[#fields+1] = repeatJobKey
  171. end
  172. rcall('HSET', deduplicationNextKey, unpack(fields))
  173. -- Ensure the dedup key does not expire while the job is active,
  174. -- so subsequent adds always hit the dedup path and never bypass
  175. -- the active-check because of a TTL expiry.
  176. local deduplicationKey = prefix .. "de:" .. deduplicationId
  177. rcall('PERSIST', deduplicationKey)
  178. -- TODO remove debounced event in next breaking change
  179. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  180. currentDebounceJobId, "debounceId", deduplicationId)
  181. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  182. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  183. return true
  184. end
  185. end
  186. return false
  187. end
  188. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  189. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  190. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  191. local ttl = deduplicationOpts['ttl']
  192. local deduplicationKeyExists
  193. if ttl and ttl > 0 then
  194. if deduplicationOpts['extend'] then
  195. local currentDebounceJobId = rcall('GET', deduplicationKey)
  196. if currentDebounceJobId then
  197. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  198. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  199. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  200. return currentDebounceJobId
  201. end
  202. if deduplicationOpts['keepLastIfActive'] then
  203. rcall('SET', deduplicationKey, currentDebounceJobId)
  204. else
  205. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  206. end
  207. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  208. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  209. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  210. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  211. return currentDebounceJobId
  212. else
  213. if deduplicationOpts['keepLastIfActive'] then
  214. rcall('SET', deduplicationKey, jobId)
  215. else
  216. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  217. end
  218. return
  219. end
  220. else
  221. if deduplicationOpts['keepLastIfActive'] then
  222. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  223. else
  224. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  225. end
  226. end
  227. else
  228. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  229. end
  230. if deduplicationKeyExists then
  231. local currentDebounceJobId = rcall('GET', deduplicationKey)
  232. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  233. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  234. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  235. return currentDebounceJobId
  236. end
  237. -- TODO remove debounced event in next breaking change
  238. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  239. currentDebounceJobId, "debounceId", deduplicationId)
  240. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  241. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  242. return currentDebounceJobId
  243. end
  244. end
  245. --[[
  246. Function to remove job keys.
  247. ]]
  248. local function removeJobKeys(jobKey)
  249. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  250. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  251. end
  252. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  253. jobId, deduplicationId, prefix)
  254. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  255. removeJobKeys(prefix .. currentDeduplicatedJobId)
  256. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  257. "prev", "delayed")
  258. -- TODO remove debounced event in next breaking change
  259. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  260. jobId, "debounceId", deduplicationId)
  261. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  262. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  263. return true
  264. end
  265. return false
  266. end
  267. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  268. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  269. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  270. if deduplicationId then
  271. if deduplicationOpts['replace'] then
  272. local currentDebounceJobId = rcall('GET', deduplicationKey)
  273. if currentDebounceJobId then
  274. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  275. currentDebounceJobId, jobId, deduplicationId, prefix)
  276. if isRemoved then
  277. if deduplicationOpts['keepLastIfActive'] then
  278. rcall('SET', deduplicationKey, jobId)
  279. else
  280. local ttl = deduplicationOpts['ttl']
  281. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  282. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  283. else
  284. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  285. end
  286. end
  287. return
  288. else
  289. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  290. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  291. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  292. return currentDebounceJobId
  293. end
  294. else
  295. if deduplicationOpts['keepLastIfActive'] then
  296. rcall('SET', deduplicationKey, jobId)
  297. else
  298. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  299. end
  300. return
  301. end
  302. else
  303. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  304. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  305. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  306. end
  307. end
  308. end
  309. --[[
  310. Function to get max events value or set by default 10000.
  311. ]]
  312. local function getOrSetMaxEvents(metaKey)
  313. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  314. if not maxEvents then
  315. maxEvents = 10000
  316. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  317. end
  318. return maxEvents
  319. end
  320. --[[
  321. Function to handle the case when job is duplicated.
  322. ]]
  323. -- Includes
  324. --[[
  325. This function is used to update the parent's dependencies if the job
  326. is already completed and about to be ignored. The parent must get its
  327. dependencies updated to avoid the parent job being stuck forever in
  328. the waiting-children state.
  329. ]]
  330. -- Includes
  331. --[[
  332. Validate and move or add dependencies to parent.
  333. ]]
  334. -- Includes
  335. --[[
  336. Validate and move parent to a wait status (waiting, delayed or prioritized)
  337. if no pending dependencies.
  338. ]]
  339. -- Includes
  340. --[[
  341. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  342. ]]
  343. -- Includes
  344. --[[
  345. Move parent to a wait status (wait, prioritized or delayed)
  346. ]]
  347. -- Includes
  348. --[[
  349. Function to add job in target list and add marker if needed.
  350. ]]
  351. -- Includes
  352. --[[
  353. Add marker if needed when a job is available.
  354. ]]
  355. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  356. if not isPausedOrMaxed then
  357. rcall("ZADD", markerKey, 0, "0")
  358. end
  359. end
  360. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  361. rcall(pushCmd, targetKey, jobId)
  362. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  363. end
  364. --[[
  365. Function to add job considering priority.
  366. ]]
  367. -- Includes
  368. --[[
  369. Function to get priority score.
  370. ]]
  371. local function getPriorityScore(priority, priorityCounterKey)
  372. local prioCounter = rcall("INCR", priorityCounterKey)
  373. return priority * 0x100000000 + prioCounter % 0x100000000
  374. end
  375. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  376. isPausedOrMaxed)
  377. local score = getPriorityScore(priority, priorityCounterKey)
  378. rcall("ZADD", prioritizedKey, score, jobId)
  379. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  380. end
  381. --[[
  382. Function to check if queue is paused or maxed
  383. (since an empty list and !EXISTS are not really the same).
  384. ]]
  385. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  386. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  387. if queueAttributes[1] then
  388. return true
  389. else
  390. if queueAttributes[2] then
  391. local activeCount = rcall("LLEN", activeKey)
  392. return activeCount >= tonumber(queueAttributes[2])
  393. end
  394. end
  395. return false
  396. end
  397. --[[
  398. Function to check for the meta.paused key to decide if we are paused or not
  399. (since an empty list and !EXISTS are not really the same).
  400. ]]
  401. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  402. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  403. if queueAttributes[1] then
  404. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  405. else
  406. if queueAttributes[2] then
  407. local activeCount = rcall("LLEN", activeKey)
  408. if activeCount >= tonumber(queueAttributes[2]) then
  409. return waitKey, true, queueAttributes[3], queueAttributes[4]
  410. else
  411. return waitKey, false, queueAttributes[3], queueAttributes[4]
  412. end
  413. end
  414. end
  415. return waitKey, false, queueAttributes[3], queueAttributes[4]
  416. end
  417. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  418. local parentWaitKey = parentQueueKey .. ":wait"
  419. local parentPausedKey = parentQueueKey .. ":paused"
  420. local parentActiveKey = parentQueueKey .. ":active"
  421. local parentMetaKey = parentQueueKey .. ":meta"
  422. local parentMarkerKey = parentQueueKey .. ":marker"
  423. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  424. local priority = tonumber(jobAttributes[1]) or 0
  425. local delay = tonumber(jobAttributes[2]) or 0
  426. if delay > 0 then
  427. local delayedTimestamp = tonumber(timestamp) + delay
  428. local score = delayedTimestamp * 0x1000
  429. local parentDelayedKey = parentQueueKey .. ":delayed"
  430. rcall("ZADD", parentDelayedKey, score, parentId)
  431. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  432. delayedTimestamp)
  433. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  434. else
  435. if priority == 0 then
  436. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  437. parentWaitKey, parentPausedKey)
  438. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  439. else
  440. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  441. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  442. parentQueueKey .. ":pc", isPausedOrMaxed)
  443. end
  444. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  445. "waiting-children")
  446. end
  447. end
  448. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  449. if rcall("EXISTS", parentKey) == 1 then
  450. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  451. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  452. rcall("ZREM", parentWaitingChildrenKey, parentId)
  453. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  454. end
  455. end
  456. end
  457. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  458. parentId, timestamp)
  459. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  460. if doNotHavePendingDependencies then
  461. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  462. end
  463. end
  464. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  465. parentId, jobIdKey, returnvalue, timestamp )
  466. local processedSet = parentKey .. ":processed"
  467. rcall("HSET", processedSet, jobIdKey, returnvalue)
  468. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  469. end
  470. local function updateExistingJobsParent(parentKey, parent, parentData,
  471. parentDependenciesKey, completedKey,
  472. jobIdKey, jobId, timestamp)
  473. if parentKey ~= nil then
  474. if rcall("ZSCORE", completedKey, jobId) then
  475. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  476. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  477. parentDependenciesKey, parent['id'],
  478. jobIdKey, returnvalue, timestamp)
  479. else
  480. if parentDependenciesKey ~= nil then
  481. rcall("SADD", parentDependenciesKey, jobIdKey)
  482. end
  483. end
  484. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  485. end
  486. end
  487. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  488. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  489. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  490. if not existedParentKey or existedParentKey == currentParentKey then
  491. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  492. parentDependenciesKey, completedKey, jobKey,
  493. jobId, timestamp)
  494. else
  495. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  496. and (rcall("EXISTS", existedParentKey) == 1) then
  497. return -7
  498. end
  499. end
  500. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  501. "duplicated", "jobId", jobId)
  502. return jobId .. "" -- convert to string
  503. end
  504. --[[
  505. Function to store a job
  506. ]]
  507. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  508. parentKey, parentData, repeatJobKey)
  509. local jsonOpts = cjson.encode(opts)
  510. local delay = opts['delay'] or 0
  511. local priority = opts['priority'] or 0
  512. local debounceId = opts['de'] and opts['de']['id']
  513. local optionalValues = {}
  514. if parentKey ~= nil then
  515. table.insert(optionalValues, "parentKey")
  516. table.insert(optionalValues, parentKey)
  517. table.insert(optionalValues, "parent")
  518. table.insert(optionalValues, parentData)
  519. end
  520. if repeatJobKey then
  521. table.insert(optionalValues, "rjk")
  522. table.insert(optionalValues, repeatJobKey)
  523. end
  524. if debounceId then
  525. table.insert(optionalValues, "deid")
  526. table.insert(optionalValues, debounceId)
  527. end
  528. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  529. "timestamp", timestamp, "delay", delay, "priority", priority,
  530. unpack(optionalValues))
  531. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  532. return delay, priority
  533. end
  534. if parentKey ~= nil then
  535. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  536. parentData = cjson.encode(parent)
  537. end
  538. local jobCounter = rcall("INCR", idKey)
  539. local maxEvents = getOrSetMaxEvents(metaKey)
  540. local opts = cmsgpack.unpack(ARGV[3])
  541. local parentDependenciesKey = args[6]
  542. local timestamp = args[4]
  543. if args[2] == "" then
  544. jobId = jobCounter
  545. jobIdKey = args[1] .. jobId
  546. else
  547. jobId = args[2]
  548. jobIdKey = args[1] .. jobId
  549. if rcall("EXISTS", jobIdKey) == 1 then
  550. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  551. parentData, parentDependenciesKey, completedKey, eventsKey,
  552. maxEvents, timestamp)
  553. end
  554. end
  555. local deduplicationJobId = deduplicateJob(opts['de'], jobId, delayedKey, deduplicationKey,
  556. eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  557. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  558. if deduplicationJobId then
  559. return deduplicationJobId
  560. end
  561. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
  562. opts, timestamp, parentKey, parentData, repeatJobKey)
  563. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, KEYS[1], delay)
  564. -- Check if this job is a child of another job, if so add it to the parents dependencies
  565. if parentDependenciesKey ~= nil then
  566. rcall("SADD", parentDependenciesKey, jobIdKey)
  567. end
  568. return jobId .. "" -- convert to string
  569. `;
  570. export const addDelayedJob = {
  571. name: 'addDelayedJob',
  572. content,
  573. keys: 6,
  574. };
  575. //# sourceMappingURL=addDelayedJob-6.js.map