addDelayedJob-6.js 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.addDelayedJob = void 0;
  4. const content = `--[[
  5. Adds a delayed job to the queue by doing the following:
  6. - Increases the job counter if needed.
  7. - Creates a new job key with the job data.
  8. - computes timestamp.
  9. - adds to delayed zset.
  10. - Emits a global event 'delayed' if the job is delayed.
  11. Input:
  12. KEYS[1] 'marker',
  13. KEYS[2] 'meta'
  14. KEYS[3] 'id'
  15. KEYS[4] 'delayed'
  16. KEYS[5] 'completed'
  17. KEYS[6] events stream key
  18. ARGV[1] msgpacked arguments array
  19. [1] key prefix,
  20. [2] custom id (use custom instead of one generated automatically)
  21. [3] name
  22. [4] timestamp
  23. [5] parentKey?
  24. [6] parent dependencies key.
  25. [7] parent? {id, queueKey}
  26. [8] repeat job key
  27. [9] deduplication key
  28. ARGV[2] Json stringified job data
  29. ARGV[3] msgpacked options
  30. Output:
  31. jobId - OK
  32. -5 - Missing parent key
  33. ]]
  34. local metaKey = KEYS[2]
  35. local idKey = KEYS[3]
  36. local delayedKey = KEYS[4]
  37. local completedKey = KEYS[5]
  38. local eventsKey = KEYS[6]
  39. local jobId
  40. local jobIdKey
  41. local rcall = redis.call
  42. local args = cmsgpack.unpack(ARGV[1])
  43. local data = ARGV[2]
  44. local parentKey = args[5]
  45. local parent = args[7]
  46. local repeatJobKey = args[8]
  47. local deduplicationKey = args[9]
  48. local parentData
  49. -- Includes
  50. --[[
  51. Adds a delayed job to the queue by doing the following:
  52. - Creates a new job key with the job data.
  53. - adds to delayed zset.
  54. - Emits a global event 'delayed' if the job is delayed.
  55. ]]
  56. -- Includes
  57. --[[
  58. Add delay marker if needed.
  59. ]]
  60. -- Includes
  61. --[[
  62. Function to return the next delayed job timestamp.
  63. ]]
  64. local function getNextDelayedTimestamp(delayedKey)
  65. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  66. if #result then
  67. local nextTimestamp = tonumber(result[2])
  68. if nextTimestamp ~= nil then
  69. return nextTimestamp / 0x1000
  70. end
  71. end
  72. end
  73. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  74. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  75. if nextTimestamp ~= nil then
  76. -- Replace the score of the marker with the newest known
  77. -- next timestamp.
  78. rcall("ZADD", markerKey, nextTimestamp, "1")
  79. end
  80. end
  81. --[[
  82. Bake in the job id first 12 bits into the timestamp
  83. to guarantee correct execution order of delayed jobs
  84. (up to 4096 jobs per given timestamp or 4096 jobs apart per timestamp)
  85. WARNING: Jobs that are so far apart that they wrap around will cause FIFO to fail
  86. ]]
  87. local function getDelayedScore(delayedKey, timestamp, delay)
  88. local delayedTimestamp = (delay > 0 and (tonumber(timestamp) + delay)) or tonumber(timestamp)
  89. local minScore = delayedTimestamp * 0x1000
  90. local maxScore = (delayedTimestamp + 1 ) * 0x1000 - 1
  91. local result = rcall("ZREVRANGEBYSCORE", delayedKey, maxScore,
  92. minScore, "WITHSCORES","LIMIT", 0, 1)
  93. if #result then
  94. local currentMaxScore = tonumber(result[2])
  95. if currentMaxScore ~= nil then
  96. if currentMaxScore >= maxScore then
  97. return maxScore, delayedTimestamp
  98. else
  99. return currentMaxScore + 1, delayedTimestamp
  100. end
  101. end
  102. end
  103. return minScore, delayedTimestamp
  104. end
  105. local function addDelayedJob(jobId, delayedKey, eventsKey, timestamp,
  106. maxEvents, markerKey, delay)
  107. local score, delayedTimestamp = getDelayedScore(delayedKey, timestamp, tonumber(delay))
  108. rcall("ZADD", delayedKey, score, jobId)
  109. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "delayed",
  110. "jobId", jobId, "delay", delayedTimestamp)
  111. -- mark that a delayed job is available
  112. addDelayMarkerIfNeeded(markerKey, delayedKey)
  113. end
  114. --[[
  115. Function to debounce a job.
  116. ]]
  117. -- Includes
  118. --[[
  119. Function to deduplicate a job.
  120. ]]
  121. --[[
  122. Function to set the deduplication key for a job.
  123. Uses TTL from deduplication opts if provided.
  124. ]]
  125. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  126. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  127. if ttl and ttl > 0 then
  128. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  129. else
  130. rcall('SET', deduplicationKey, jobId)
  131. end
  132. end
  133. --[[
  134. Function to store a deduplicated next job if the existing job is active
  135. and keepLastIfActive is set. When the active job finishes, the stored
  136. proto-job is used to create a real job in the queue.
  137. Returns true if the proto-job was stored, false otherwise.
  138. ]]
  139. --[[
  140. Functions to check if a item belongs to a list.
  141. ]]
  142. local function checkItemInList(list, item)
  143. for _, v in pairs(list) do
  144. if v == item then
  145. return 1
  146. end
  147. end
  148. return nil
  149. end
  150. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  151. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  152. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  153. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  154. local activeKey = prefix .. "active"
  155. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  156. if checkItemInList(activeItems, currentDebounceJobId) then
  157. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  158. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  159. if parentKey then
  160. fields[#fields+1] = 'pk'
  161. fields[#fields+1] = parentKey
  162. end
  163. if parentData then
  164. fields[#fields+1] = 'pd'
  165. fields[#fields+1] = parentData
  166. end
  167. if parentDependenciesKey then
  168. fields[#fields+1] = 'pdk'
  169. fields[#fields+1] = parentDependenciesKey
  170. end
  171. if repeatJobKey then
  172. fields[#fields+1] = 'rjk'
  173. fields[#fields+1] = repeatJobKey
  174. end
  175. rcall('HSET', deduplicationNextKey, unpack(fields))
  176. -- Ensure the dedup key does not expire while the job is active,
  177. -- so subsequent adds always hit the dedup path and never bypass
  178. -- the active-check because of a TTL expiry.
  179. local deduplicationKey = prefix .. "de:" .. deduplicationId
  180. rcall('PERSIST', deduplicationKey)
  181. -- TODO remove debounced event in next breaking change
  182. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  183. currentDebounceJobId, "debounceId", deduplicationId)
  184. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  185. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  186. return true
  187. end
  188. end
  189. return false
  190. end
  191. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  192. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  193. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  194. local ttl = deduplicationOpts['ttl']
  195. local deduplicationKeyExists
  196. if ttl and ttl > 0 then
  197. if deduplicationOpts['extend'] then
  198. local currentDebounceJobId = rcall('GET', deduplicationKey)
  199. if currentDebounceJobId then
  200. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  201. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  202. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  203. return currentDebounceJobId
  204. end
  205. if deduplicationOpts['keepLastIfActive'] then
  206. rcall('SET', deduplicationKey, currentDebounceJobId)
  207. else
  208. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  209. end
  210. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  211. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  212. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  213. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  214. return currentDebounceJobId
  215. else
  216. if deduplicationOpts['keepLastIfActive'] then
  217. rcall('SET', deduplicationKey, jobId)
  218. else
  219. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  220. end
  221. return
  222. end
  223. else
  224. if deduplicationOpts['keepLastIfActive'] then
  225. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  226. else
  227. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  228. end
  229. end
  230. else
  231. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  232. end
  233. if deduplicationKeyExists then
  234. local currentDebounceJobId = rcall('GET', deduplicationKey)
  235. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  236. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  237. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  238. return currentDebounceJobId
  239. end
  240. -- TODO remove debounced event in next breaking change
  241. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  242. currentDebounceJobId, "debounceId", deduplicationId)
  243. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  244. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  245. return currentDebounceJobId
  246. end
  247. end
  248. --[[
  249. Function to remove job keys.
  250. ]]
  251. local function removeJobKeys(jobKey)
  252. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  253. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  254. end
  255. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  256. jobId, deduplicationId, prefix)
  257. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  258. removeJobKeys(prefix .. currentDeduplicatedJobId)
  259. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  260. "prev", "delayed")
  261. -- TODO remove debounced event in next breaking change
  262. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  263. jobId, "debounceId", deduplicationId)
  264. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  265. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  266. return true
  267. end
  268. return false
  269. end
  270. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  271. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  272. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  273. if deduplicationId then
  274. if deduplicationOpts['replace'] then
  275. local currentDebounceJobId = rcall('GET', deduplicationKey)
  276. if currentDebounceJobId then
  277. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  278. currentDebounceJobId, jobId, deduplicationId, prefix)
  279. if isRemoved then
  280. if deduplicationOpts['keepLastIfActive'] then
  281. rcall('SET', deduplicationKey, jobId)
  282. else
  283. local ttl = deduplicationOpts['ttl']
  284. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  285. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  286. else
  287. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  288. end
  289. end
  290. return
  291. else
  292. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  293. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  294. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  295. return currentDebounceJobId
  296. end
  297. else
  298. if deduplicationOpts['keepLastIfActive'] then
  299. rcall('SET', deduplicationKey, jobId)
  300. else
  301. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  302. end
  303. return
  304. end
  305. else
  306. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  307. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  308. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  309. end
  310. end
  311. end
  312. --[[
  313. Function to get max events value or set by default 10000.
  314. ]]
  315. local function getOrSetMaxEvents(metaKey)
  316. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  317. if not maxEvents then
  318. maxEvents = 10000
  319. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  320. end
  321. return maxEvents
  322. end
  323. --[[
  324. Function to handle the case when job is duplicated.
  325. ]]
  326. -- Includes
  327. --[[
  328. This function is used to update the parent's dependencies if the job
  329. is already completed and about to be ignored. The parent must get its
  330. dependencies updated to avoid the parent job being stuck forever in
  331. the waiting-children state.
  332. ]]
  333. -- Includes
  334. --[[
  335. Validate and move or add dependencies to parent.
  336. ]]
  337. -- Includes
  338. --[[
  339. Validate and move parent to a wait status (waiting, delayed or prioritized)
  340. if no pending dependencies.
  341. ]]
  342. -- Includes
  343. --[[
  344. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  345. ]]
  346. -- Includes
  347. --[[
  348. Move parent to a wait status (wait, prioritized or delayed)
  349. ]]
  350. -- Includes
  351. --[[
  352. Function to add job in target list and add marker if needed.
  353. ]]
  354. -- Includes
  355. --[[
  356. Add marker if needed when a job is available.
  357. ]]
  358. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  359. if not isPausedOrMaxed then
  360. rcall("ZADD", markerKey, 0, "0")
  361. end
  362. end
  363. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  364. rcall(pushCmd, targetKey, jobId)
  365. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  366. end
  367. --[[
  368. Function to add job considering priority.
  369. ]]
  370. -- Includes
  371. --[[
  372. Function to get priority score.
  373. ]]
  374. local function getPriorityScore(priority, priorityCounterKey)
  375. local prioCounter = rcall("INCR", priorityCounterKey)
  376. return priority * 0x100000000 + prioCounter % 0x100000000
  377. end
  378. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  379. isPausedOrMaxed)
  380. local score = getPriorityScore(priority, priorityCounterKey)
  381. rcall("ZADD", prioritizedKey, score, jobId)
  382. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  383. end
  384. --[[
  385. Function to check if queue is paused or maxed
  386. (since an empty list and !EXISTS are not really the same).
  387. ]]
  388. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  389. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  390. if queueAttributes[1] then
  391. return true
  392. else
  393. if queueAttributes[2] then
  394. local activeCount = rcall("LLEN", activeKey)
  395. return activeCount >= tonumber(queueAttributes[2])
  396. end
  397. end
  398. return false
  399. end
  400. --[[
  401. Function to check for the meta.paused key to decide if we are paused or not
  402. (since an empty list and !EXISTS are not really the same).
  403. ]]
  404. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  405. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  406. if queueAttributes[1] then
  407. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  408. else
  409. if queueAttributes[2] then
  410. local activeCount = rcall("LLEN", activeKey)
  411. if activeCount >= tonumber(queueAttributes[2]) then
  412. return waitKey, true, queueAttributes[3], queueAttributes[4]
  413. else
  414. return waitKey, false, queueAttributes[3], queueAttributes[4]
  415. end
  416. end
  417. end
  418. return waitKey, false, queueAttributes[3], queueAttributes[4]
  419. end
  420. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  421. local parentWaitKey = parentQueueKey .. ":wait"
  422. local parentPausedKey = parentQueueKey .. ":paused"
  423. local parentActiveKey = parentQueueKey .. ":active"
  424. local parentMetaKey = parentQueueKey .. ":meta"
  425. local parentMarkerKey = parentQueueKey .. ":marker"
  426. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  427. local priority = tonumber(jobAttributes[1]) or 0
  428. local delay = tonumber(jobAttributes[2]) or 0
  429. if delay > 0 then
  430. local delayedTimestamp = tonumber(timestamp) + delay
  431. local score = delayedTimestamp * 0x1000
  432. local parentDelayedKey = parentQueueKey .. ":delayed"
  433. rcall("ZADD", parentDelayedKey, score, parentId)
  434. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  435. delayedTimestamp)
  436. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  437. else
  438. if priority == 0 then
  439. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  440. parentWaitKey, parentPausedKey)
  441. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  442. else
  443. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  444. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  445. parentQueueKey .. ":pc", isPausedOrMaxed)
  446. end
  447. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  448. "waiting-children")
  449. end
  450. end
  451. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  452. if rcall("EXISTS", parentKey) == 1 then
  453. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  454. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  455. rcall("ZREM", parentWaitingChildrenKey, parentId)
  456. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  457. end
  458. end
  459. end
  460. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  461. parentId, timestamp)
  462. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  463. if doNotHavePendingDependencies then
  464. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  465. end
  466. end
  467. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  468. parentId, jobIdKey, returnvalue, timestamp )
  469. local processedSet = parentKey .. ":processed"
  470. rcall("HSET", processedSet, jobIdKey, returnvalue)
  471. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  472. end
  473. local function updateExistingJobsParent(parentKey, parent, parentData,
  474. parentDependenciesKey, completedKey,
  475. jobIdKey, jobId, timestamp)
  476. if parentKey ~= nil then
  477. if rcall("ZSCORE", completedKey, jobId) then
  478. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  479. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  480. parentDependenciesKey, parent['id'],
  481. jobIdKey, returnvalue, timestamp)
  482. else
  483. if parentDependenciesKey ~= nil then
  484. rcall("SADD", parentDependenciesKey, jobIdKey)
  485. end
  486. end
  487. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  488. end
  489. end
  490. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  491. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  492. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  493. if not existedParentKey or existedParentKey == currentParentKey then
  494. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  495. parentDependenciesKey, completedKey, jobKey,
  496. jobId, timestamp)
  497. else
  498. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  499. and (rcall("EXISTS", existedParentKey) == 1) then
  500. return -7
  501. end
  502. end
  503. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  504. "duplicated", "jobId", jobId)
  505. return jobId .. "" -- convert to string
  506. end
  507. --[[
  508. Function to store a job
  509. ]]
  510. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  511. parentKey, parentData, repeatJobKey)
  512. local jsonOpts = cjson.encode(opts)
  513. local delay = opts['delay'] or 0
  514. local priority = opts['priority'] or 0
  515. local debounceId = opts['de'] and opts['de']['id']
  516. local optionalValues = {}
  517. if parentKey ~= nil then
  518. table.insert(optionalValues, "parentKey")
  519. table.insert(optionalValues, parentKey)
  520. table.insert(optionalValues, "parent")
  521. table.insert(optionalValues, parentData)
  522. end
  523. if repeatJobKey then
  524. table.insert(optionalValues, "rjk")
  525. table.insert(optionalValues, repeatJobKey)
  526. end
  527. if debounceId then
  528. table.insert(optionalValues, "deid")
  529. table.insert(optionalValues, debounceId)
  530. end
  531. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  532. "timestamp", timestamp, "delay", delay, "priority", priority,
  533. unpack(optionalValues))
  534. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  535. return delay, priority
  536. end
  537. if parentKey ~= nil then
  538. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  539. parentData = cjson.encode(parent)
  540. end
  541. local jobCounter = rcall("INCR", idKey)
  542. local maxEvents = getOrSetMaxEvents(metaKey)
  543. local opts = cmsgpack.unpack(ARGV[3])
  544. local parentDependenciesKey = args[6]
  545. local timestamp = args[4]
  546. if args[2] == "" then
  547. jobId = jobCounter
  548. jobIdKey = args[1] .. jobId
  549. else
  550. jobId = args[2]
  551. jobIdKey = args[1] .. jobId
  552. if rcall("EXISTS", jobIdKey) == 1 then
  553. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  554. parentData, parentDependenciesKey, completedKey, eventsKey,
  555. maxEvents, timestamp)
  556. end
  557. end
  558. local deduplicationJobId = deduplicateJob(opts['de'], jobId, delayedKey, deduplicationKey,
  559. eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  560. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  561. if deduplicationJobId then
  562. return deduplicationJobId
  563. end
  564. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
  565. opts, timestamp, parentKey, parentData, repeatJobKey)
  566. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, KEYS[1], delay)
  567. -- Check if this job is a child of another job, if so add it to the parents dependencies
  568. if parentDependenciesKey ~= nil then
  569. rcall("SADD", parentDependenciesKey, jobIdKey)
  570. end
  571. return jobId .. "" -- convert to string
  572. `;
  573. exports.addDelayedJob = {
  574. name: 'addDelayedJob',
  575. content,
  576. keys: 6,
  577. };
  578. //# sourceMappingURL=addDelayedJob-6.js.map