addPrioritizedJob-9.js 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545
  1. const content = `--[[
  2. Adds a priotitized job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - Adds the job to the "added" list so that workers gets notified.
  6. Input:
  7. KEYS[1] 'marker',
  8. KEYS[2] 'meta'
  9. KEYS[3] 'id'
  10. KEYS[4] 'prioritized'
  11. KEYS[5] 'delayed'
  12. KEYS[6] 'completed'
  13. KEYS[7] 'active'
  14. KEYS[8] events stream key
  15. KEYS[9] 'pc' priority counter
  16. ARGV[1] msgpacked arguments array
  17. [1] key prefix,
  18. [2] custom id (will not generate one automatically)
  19. [3] name
  20. [4] timestamp
  21. [5] parentKey?
  22. [6] parent dependencies key.
  23. [7] parent? {id, queueKey}
  24. [8] repeat job key
  25. [9] deduplication key
  26. ARGV[2] Json stringified job data
  27. ARGV[3] msgpacked options
  28. Output:
  29. jobId - OK
  30. -5 - Missing parent key
  31. ]]
  32. local metaKey = KEYS[2]
  33. local idKey = KEYS[3]
  34. local priorityKey = KEYS[4]
  35. local completedKey = KEYS[6]
  36. local activeKey = KEYS[7]
  37. local eventsKey = KEYS[8]
  38. local priorityCounterKey = KEYS[9]
  39. local jobId
  40. local jobIdKey
  41. local rcall = redis.call
  42. local args = cmsgpack.unpack(ARGV[1])
  43. local data = ARGV[2]
  44. local opts = cmsgpack.unpack(ARGV[3])
  45. local parentKey = args[5]
  46. local parent = args[7]
  47. local repeatJobKey = args[8]
  48. local deduplicationKey = args[9]
  49. local parentData
  50. -- Includes
  51. --[[
  52. Function to add job considering priority.
  53. ]]
  54. -- Includes
  55. --[[
  56. Add marker if needed when a job is available.
  57. ]]
  58. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  59. if not isPausedOrMaxed then
  60. rcall("ZADD", markerKey, 0, "0")
  61. end
  62. end
  63. --[[
  64. Function to get priority score.
  65. ]]
  66. local function getPriorityScore(priority, priorityCounterKey)
  67. local prioCounter = rcall("INCR", priorityCounterKey)
  68. return priority * 0x100000000 + prioCounter % 0x100000000
  69. end
  70. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  71. isPausedOrMaxed)
  72. local score = getPriorityScore(priority, priorityCounterKey)
  73. rcall("ZADD", prioritizedKey, score, jobId)
  74. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  75. end
  76. --[[
  77. Function to debounce a job.
  78. ]]
  79. -- Includes
  80. --[[
  81. Function to deduplicate a job.
  82. ]]
  83. --[[
  84. Function to set the deduplication key for a job.
  85. Uses TTL from deduplication opts if provided.
  86. ]]
  87. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  88. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  89. if ttl and ttl > 0 then
  90. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  91. else
  92. rcall('SET', deduplicationKey, jobId)
  93. end
  94. end
  95. --[[
  96. Function to store a deduplicated next job if the existing job is active
  97. and keepLastIfActive is set. When the active job finishes, the stored
  98. proto-job is used to create a real job in the queue.
  99. Returns true if the proto-job was stored, false otherwise.
  100. ]]
  101. --[[
  102. Functions to check if a item belongs to a list.
  103. ]]
  104. local function checkItemInList(list, item)
  105. for _, v in pairs(list) do
  106. if v == item then
  107. return 1
  108. end
  109. end
  110. return nil
  111. end
  112. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  113. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  114. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  115. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  116. local activeKey = prefix .. "active"
  117. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  118. if checkItemInList(activeItems, currentDebounceJobId) then
  119. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  120. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  121. if parentKey then
  122. fields[#fields+1] = 'pk'
  123. fields[#fields+1] = parentKey
  124. end
  125. if parentData then
  126. fields[#fields+1] = 'pd'
  127. fields[#fields+1] = parentData
  128. end
  129. if parentDependenciesKey then
  130. fields[#fields+1] = 'pdk'
  131. fields[#fields+1] = parentDependenciesKey
  132. end
  133. if repeatJobKey then
  134. fields[#fields+1] = 'rjk'
  135. fields[#fields+1] = repeatJobKey
  136. end
  137. rcall('HSET', deduplicationNextKey, unpack(fields))
  138. -- Ensure the dedup key does not expire while the job is active,
  139. -- so subsequent adds always hit the dedup path and never bypass
  140. -- the active-check because of a TTL expiry.
  141. local deduplicationKey = prefix .. "de:" .. deduplicationId
  142. rcall('PERSIST', deduplicationKey)
  143. -- TODO remove debounced event in next breaking change
  144. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  145. currentDebounceJobId, "debounceId", deduplicationId)
  146. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  147. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  148. return true
  149. end
  150. end
  151. return false
  152. end
  153. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  154. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  155. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  156. local ttl = deduplicationOpts['ttl']
  157. local deduplicationKeyExists
  158. if ttl and ttl > 0 then
  159. if deduplicationOpts['extend'] then
  160. local currentDebounceJobId = rcall('GET', deduplicationKey)
  161. if currentDebounceJobId then
  162. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  163. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  164. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  165. return currentDebounceJobId
  166. end
  167. if deduplicationOpts['keepLastIfActive'] then
  168. rcall('SET', deduplicationKey, currentDebounceJobId)
  169. else
  170. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  171. end
  172. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  173. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  174. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  175. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  176. return currentDebounceJobId
  177. else
  178. if deduplicationOpts['keepLastIfActive'] then
  179. rcall('SET', deduplicationKey, jobId)
  180. else
  181. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  182. end
  183. return
  184. end
  185. else
  186. if deduplicationOpts['keepLastIfActive'] then
  187. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  188. else
  189. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  190. end
  191. end
  192. else
  193. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  194. end
  195. if deduplicationKeyExists then
  196. local currentDebounceJobId = rcall('GET', deduplicationKey)
  197. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  198. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  199. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  200. return currentDebounceJobId
  201. end
  202. -- TODO remove debounced event in next breaking change
  203. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  204. currentDebounceJobId, "debounceId", deduplicationId)
  205. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  206. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  207. return currentDebounceJobId
  208. end
  209. end
  210. --[[
  211. Function to remove job keys.
  212. ]]
  213. local function removeJobKeys(jobKey)
  214. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  215. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  216. end
  217. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  218. jobId, deduplicationId, prefix)
  219. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  220. removeJobKeys(prefix .. currentDeduplicatedJobId)
  221. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  222. "prev", "delayed")
  223. -- TODO remove debounced event in next breaking change
  224. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  225. jobId, "debounceId", deduplicationId)
  226. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  227. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  228. return true
  229. end
  230. return false
  231. end
  232. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  233. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  234. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  235. if deduplicationId then
  236. if deduplicationOpts['replace'] then
  237. local currentDebounceJobId = rcall('GET', deduplicationKey)
  238. if currentDebounceJobId then
  239. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  240. currentDebounceJobId, jobId, deduplicationId, prefix)
  241. if isRemoved then
  242. if deduplicationOpts['keepLastIfActive'] then
  243. rcall('SET', deduplicationKey, jobId)
  244. else
  245. local ttl = deduplicationOpts['ttl']
  246. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  247. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  248. else
  249. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  250. end
  251. end
  252. return
  253. else
  254. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  255. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  256. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  257. return currentDebounceJobId
  258. end
  259. else
  260. if deduplicationOpts['keepLastIfActive'] then
  261. rcall('SET', deduplicationKey, jobId)
  262. else
  263. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  264. end
  265. return
  266. end
  267. else
  268. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  269. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  270. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  271. end
  272. end
  273. end
  274. --[[
  275. Function to store a job
  276. ]]
  277. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  278. parentKey, parentData, repeatJobKey)
  279. local jsonOpts = cjson.encode(opts)
  280. local delay = opts['delay'] or 0
  281. local priority = opts['priority'] or 0
  282. local debounceId = opts['de'] and opts['de']['id']
  283. local optionalValues = {}
  284. if parentKey ~= nil then
  285. table.insert(optionalValues, "parentKey")
  286. table.insert(optionalValues, parentKey)
  287. table.insert(optionalValues, "parent")
  288. table.insert(optionalValues, parentData)
  289. end
  290. if repeatJobKey then
  291. table.insert(optionalValues, "rjk")
  292. table.insert(optionalValues, repeatJobKey)
  293. end
  294. if debounceId then
  295. table.insert(optionalValues, "deid")
  296. table.insert(optionalValues, debounceId)
  297. end
  298. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  299. "timestamp", timestamp, "delay", delay, "priority", priority,
  300. unpack(optionalValues))
  301. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  302. return delay, priority
  303. end
  304. --[[
  305. Function to get max events value or set by default 10000.
  306. ]]
  307. local function getOrSetMaxEvents(metaKey)
  308. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  309. if not maxEvents then
  310. maxEvents = 10000
  311. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  312. end
  313. return maxEvents
  314. end
  315. --[[
  316. Function to handle the case when job is duplicated.
  317. ]]
  318. -- Includes
  319. --[[
  320. This function is used to update the parent's dependencies if the job
  321. is already completed and about to be ignored. The parent must get its
  322. dependencies updated to avoid the parent job being stuck forever in
  323. the waiting-children state.
  324. ]]
  325. -- Includes
  326. --[[
  327. Validate and move or add dependencies to parent.
  328. ]]
  329. -- Includes
  330. --[[
  331. Validate and move parent to a wait status (waiting, delayed or prioritized)
  332. if no pending dependencies.
  333. ]]
  334. -- Includes
  335. --[[
  336. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  337. ]]
  338. -- Includes
  339. --[[
  340. Move parent to a wait status (wait, prioritized or delayed)
  341. ]]
  342. -- Includes
  343. --[[
  344. Add delay marker if needed.
  345. ]]
  346. -- Includes
  347. --[[
  348. Function to return the next delayed job timestamp.
  349. ]]
  350. local function getNextDelayedTimestamp(delayedKey)
  351. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  352. if #result then
  353. local nextTimestamp = tonumber(result[2])
  354. if nextTimestamp ~= nil then
  355. return nextTimestamp / 0x1000
  356. end
  357. end
  358. end
  359. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  360. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  361. if nextTimestamp ~= nil then
  362. -- Replace the score of the marker with the newest known
  363. -- next timestamp.
  364. rcall("ZADD", markerKey, nextTimestamp, "1")
  365. end
  366. end
  367. --[[
  368. Function to add job in target list and add marker if needed.
  369. ]]
  370. -- Includes
  371. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  372. rcall(pushCmd, targetKey, jobId)
  373. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  374. end
  375. --[[
  376. Function to check if queue is paused or maxed
  377. (since an empty list and !EXISTS are not really the same).
  378. ]]
  379. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  380. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  381. if queueAttributes[1] then
  382. return true
  383. else
  384. if queueAttributes[2] then
  385. local activeCount = rcall("LLEN", activeKey)
  386. return activeCount >= tonumber(queueAttributes[2])
  387. end
  388. end
  389. return false
  390. end
  391. --[[
  392. Function to check for the meta.paused key to decide if we are paused or not
  393. (since an empty list and !EXISTS are not really the same).
  394. ]]
  395. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  396. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  397. if queueAttributes[1] then
  398. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  399. else
  400. if queueAttributes[2] then
  401. local activeCount = rcall("LLEN", activeKey)
  402. if activeCount >= tonumber(queueAttributes[2]) then
  403. return waitKey, true, queueAttributes[3], queueAttributes[4]
  404. else
  405. return waitKey, false, queueAttributes[3], queueAttributes[4]
  406. end
  407. end
  408. end
  409. return waitKey, false, queueAttributes[3], queueAttributes[4]
  410. end
  411. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  412. local parentWaitKey = parentQueueKey .. ":wait"
  413. local parentPausedKey = parentQueueKey .. ":paused"
  414. local parentActiveKey = parentQueueKey .. ":active"
  415. local parentMetaKey = parentQueueKey .. ":meta"
  416. local parentMarkerKey = parentQueueKey .. ":marker"
  417. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  418. local priority = tonumber(jobAttributes[1]) or 0
  419. local delay = tonumber(jobAttributes[2]) or 0
  420. if delay > 0 then
  421. local delayedTimestamp = tonumber(timestamp) + delay
  422. local score = delayedTimestamp * 0x1000
  423. local parentDelayedKey = parentQueueKey .. ":delayed"
  424. rcall("ZADD", parentDelayedKey, score, parentId)
  425. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  426. delayedTimestamp)
  427. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  428. else
  429. if priority == 0 then
  430. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  431. parentWaitKey, parentPausedKey)
  432. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  433. else
  434. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  435. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  436. parentQueueKey .. ":pc", isPausedOrMaxed)
  437. end
  438. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  439. "waiting-children")
  440. end
  441. end
  442. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  443. if rcall("EXISTS", parentKey) == 1 then
  444. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  445. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  446. rcall("ZREM", parentWaitingChildrenKey, parentId)
  447. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  448. end
  449. end
  450. end
  451. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  452. parentId, timestamp)
  453. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  454. if doNotHavePendingDependencies then
  455. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  456. end
  457. end
  458. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  459. parentId, jobIdKey, returnvalue, timestamp )
  460. local processedSet = parentKey .. ":processed"
  461. rcall("HSET", processedSet, jobIdKey, returnvalue)
  462. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  463. end
  464. local function updateExistingJobsParent(parentKey, parent, parentData,
  465. parentDependenciesKey, completedKey,
  466. jobIdKey, jobId, timestamp)
  467. if parentKey ~= nil then
  468. if rcall("ZSCORE", completedKey, jobId) then
  469. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  470. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  471. parentDependenciesKey, parent['id'],
  472. jobIdKey, returnvalue, timestamp)
  473. else
  474. if parentDependenciesKey ~= nil then
  475. rcall("SADD", parentDependenciesKey, jobIdKey)
  476. end
  477. end
  478. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  479. end
  480. end
  481. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  482. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  483. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  484. if not existedParentKey or existedParentKey == currentParentKey then
  485. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  486. parentDependenciesKey, completedKey, jobKey,
  487. jobId, timestamp)
  488. else
  489. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  490. and (rcall("EXISTS", existedParentKey) == 1) then
  491. return -7
  492. end
  493. end
  494. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  495. "duplicated", "jobId", jobId)
  496. return jobId .. "" -- convert to string
  497. end
  498. if parentKey ~= nil then
  499. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  500. parentData = cjson.encode(parent)
  501. end
  502. local jobCounter = rcall("INCR", idKey)
  503. local maxEvents = getOrSetMaxEvents(metaKey)
  504. local parentDependenciesKey = args[6]
  505. local timestamp = args[4]
  506. if args[2] == "" then
  507. jobId = jobCounter
  508. jobIdKey = args[1] .. jobId
  509. else
  510. jobId = args[2]
  511. jobIdKey = args[1] .. jobId
  512. if rcall("EXISTS", jobIdKey) == 1 then
  513. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  514. parentData, parentDependenciesKey, completedKey, eventsKey,
  515. maxEvents, timestamp)
  516. end
  517. end
  518. local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[5],
  519. deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  520. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  521. if deduplicationJobId then
  522. return deduplicationJobId
  523. end
  524. -- Store the job.
  525. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
  526. opts, timestamp, parentKey, parentData,
  527. repeatJobKey)
  528. -- Add the job to the prioritized set
  529. local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
  530. addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)
  531. -- Emit waiting event
  532. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  533. "jobId", jobId)
  534. -- Check if this job is a child of another job, if so add it to the parents dependencies
  535. if parentDependenciesKey ~= nil then
  536. rcall("SADD", parentDependenciesKey, jobIdKey)
  537. end
  538. return jobId .. "" -- convert to string
  539. `;
  540. export const addPrioritizedJob = {
  541. name: 'addPrioritizedJob',
  542. content,
  543. keys: 9,
  544. };
  545. //# sourceMappingURL=addPrioritizedJob-9.js.map