addPrioritizedJob-9.js 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.addPrioritizedJob = void 0;
  4. const content = `--[[
  5. Adds a priotitized job to the queue by doing the following:
  6. - Increases the job counter if needed.
  7. - Creates a new job key with the job data.
  8. - Adds the job to the "added" list so that workers gets notified.
  9. Input:
  10. KEYS[1] 'marker',
  11. KEYS[2] 'meta'
  12. KEYS[3] 'id'
  13. KEYS[4] 'prioritized'
  14. KEYS[5] 'delayed'
  15. KEYS[6] 'completed'
  16. KEYS[7] 'active'
  17. KEYS[8] events stream key
  18. KEYS[9] 'pc' priority counter
  19. ARGV[1] msgpacked arguments array
  20. [1] key prefix,
  21. [2] custom id (will not generate one automatically)
  22. [3] name
  23. [4] timestamp
  24. [5] parentKey?
  25. [6] parent dependencies key.
  26. [7] parent? {id, queueKey}
  27. [8] repeat job key
  28. [9] deduplication key
  29. ARGV[2] Json stringified job data
  30. ARGV[3] msgpacked options
  31. Output:
  32. jobId - OK
  33. -5 - Missing parent key
  34. ]]
  35. local metaKey = KEYS[2]
  36. local idKey = KEYS[3]
  37. local priorityKey = KEYS[4]
  38. local completedKey = KEYS[6]
  39. local activeKey = KEYS[7]
  40. local eventsKey = KEYS[8]
  41. local priorityCounterKey = KEYS[9]
  42. local jobId
  43. local jobIdKey
  44. local rcall = redis.call
  45. local args = cmsgpack.unpack(ARGV[1])
  46. local data = ARGV[2]
  47. local opts = cmsgpack.unpack(ARGV[3])
  48. local parentKey = args[5]
  49. local parent = args[7]
  50. local repeatJobKey = args[8]
  51. local deduplicationKey = args[9]
  52. local parentData
  53. -- Includes
  54. --[[
  55. Function to add job considering priority.
  56. ]]
  57. -- Includes
  58. --[[
  59. Add marker if needed when a job is available.
  60. ]]
  61. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  62. if not isPausedOrMaxed then
  63. rcall("ZADD", markerKey, 0, "0")
  64. end
  65. end
  66. --[[
  67. Function to get priority score.
  68. ]]
  69. local function getPriorityScore(priority, priorityCounterKey)
  70. local prioCounter = rcall("INCR", priorityCounterKey)
  71. return priority * 0x100000000 + prioCounter % 0x100000000
  72. end
  73. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  74. isPausedOrMaxed)
  75. local score = getPriorityScore(priority, priorityCounterKey)
  76. rcall("ZADD", prioritizedKey, score, jobId)
  77. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  78. end
  79. --[[
  80. Function to debounce a job.
  81. ]]
  82. -- Includes
  83. --[[
  84. Function to deduplicate a job.
  85. ]]
  86. --[[
  87. Function to set the deduplication key for a job.
  88. Uses TTL from deduplication opts if provided.
  89. ]]
  90. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  91. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  92. if ttl and ttl > 0 then
  93. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  94. else
  95. rcall('SET', deduplicationKey, jobId)
  96. end
  97. end
  98. --[[
  99. Function to store a deduplicated next job if the existing job is active
  100. and keepLastIfActive is set. When the active job finishes, the stored
  101. proto-job is used to create a real job in the queue.
  102. Returns true if the proto-job was stored, false otherwise.
  103. ]]
  104. --[[
  105. Functions to check if a item belongs to a list.
  106. ]]
  107. local function checkItemInList(list, item)
  108. for _, v in pairs(list) do
  109. if v == item then
  110. return 1
  111. end
  112. end
  113. return nil
  114. end
  115. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  116. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  117. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  118. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  119. local activeKey = prefix .. "active"
  120. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  121. if checkItemInList(activeItems, currentDebounceJobId) then
  122. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  123. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  124. if parentKey then
  125. fields[#fields+1] = 'pk'
  126. fields[#fields+1] = parentKey
  127. end
  128. if parentData then
  129. fields[#fields+1] = 'pd'
  130. fields[#fields+1] = parentData
  131. end
  132. if parentDependenciesKey then
  133. fields[#fields+1] = 'pdk'
  134. fields[#fields+1] = parentDependenciesKey
  135. end
  136. if repeatJobKey then
  137. fields[#fields+1] = 'rjk'
  138. fields[#fields+1] = repeatJobKey
  139. end
  140. rcall('HSET', deduplicationNextKey, unpack(fields))
  141. -- Ensure the dedup key does not expire while the job is active,
  142. -- so subsequent adds always hit the dedup path and never bypass
  143. -- the active-check because of a TTL expiry.
  144. local deduplicationKey = prefix .. "de:" .. deduplicationId
  145. rcall('PERSIST', deduplicationKey)
  146. -- TODO remove debounced event in next breaking change
  147. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  148. currentDebounceJobId, "debounceId", deduplicationId)
  149. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  150. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  151. return true
  152. end
  153. end
  154. return false
  155. end
  156. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  157. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  158. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  159. local ttl = deduplicationOpts['ttl']
  160. local deduplicationKeyExists
  161. if ttl and ttl > 0 then
  162. if deduplicationOpts['extend'] then
  163. local currentDebounceJobId = rcall('GET', deduplicationKey)
  164. if currentDebounceJobId then
  165. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  166. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  167. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  168. return currentDebounceJobId
  169. end
  170. if deduplicationOpts['keepLastIfActive'] then
  171. rcall('SET', deduplicationKey, currentDebounceJobId)
  172. else
  173. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  174. end
  175. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  176. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  177. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  178. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  179. return currentDebounceJobId
  180. else
  181. if deduplicationOpts['keepLastIfActive'] then
  182. rcall('SET', deduplicationKey, jobId)
  183. else
  184. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  185. end
  186. return
  187. end
  188. else
  189. if deduplicationOpts['keepLastIfActive'] then
  190. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  191. else
  192. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  193. end
  194. end
  195. else
  196. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  197. end
  198. if deduplicationKeyExists then
  199. local currentDebounceJobId = rcall('GET', deduplicationKey)
  200. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  201. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  202. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  203. return currentDebounceJobId
  204. end
  205. -- TODO remove debounced event in next breaking change
  206. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  207. currentDebounceJobId, "debounceId", deduplicationId)
  208. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  209. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  210. return currentDebounceJobId
  211. end
  212. end
  213. --[[
  214. Function to remove job keys.
  215. ]]
  216. local function removeJobKeys(jobKey)
  217. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  218. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  219. end
  220. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  221. jobId, deduplicationId, prefix)
  222. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  223. removeJobKeys(prefix .. currentDeduplicatedJobId)
  224. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  225. "prev", "delayed")
  226. -- TODO remove debounced event in next breaking change
  227. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  228. jobId, "debounceId", deduplicationId)
  229. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  230. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  231. return true
  232. end
  233. return false
  234. end
  235. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  236. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  237. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  238. if deduplicationId then
  239. if deduplicationOpts['replace'] then
  240. local currentDebounceJobId = rcall('GET', deduplicationKey)
  241. if currentDebounceJobId then
  242. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  243. currentDebounceJobId, jobId, deduplicationId, prefix)
  244. if isRemoved then
  245. if deduplicationOpts['keepLastIfActive'] then
  246. rcall('SET', deduplicationKey, jobId)
  247. else
  248. local ttl = deduplicationOpts['ttl']
  249. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  250. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  251. else
  252. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  253. end
  254. end
  255. return
  256. else
  257. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  258. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  259. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  260. return currentDebounceJobId
  261. end
  262. else
  263. if deduplicationOpts['keepLastIfActive'] then
  264. rcall('SET', deduplicationKey, jobId)
  265. else
  266. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  267. end
  268. return
  269. end
  270. else
  271. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  272. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  273. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  274. end
  275. end
  276. end
  277. --[[
  278. Function to store a job
  279. ]]
  280. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  281. parentKey, parentData, repeatJobKey)
  282. local jsonOpts = cjson.encode(opts)
  283. local delay = opts['delay'] or 0
  284. local priority = opts['priority'] or 0
  285. local debounceId = opts['de'] and opts['de']['id']
  286. local optionalValues = {}
  287. if parentKey ~= nil then
  288. table.insert(optionalValues, "parentKey")
  289. table.insert(optionalValues, parentKey)
  290. table.insert(optionalValues, "parent")
  291. table.insert(optionalValues, parentData)
  292. end
  293. if repeatJobKey then
  294. table.insert(optionalValues, "rjk")
  295. table.insert(optionalValues, repeatJobKey)
  296. end
  297. if debounceId then
  298. table.insert(optionalValues, "deid")
  299. table.insert(optionalValues, debounceId)
  300. end
  301. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  302. "timestamp", timestamp, "delay", delay, "priority", priority,
  303. unpack(optionalValues))
  304. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  305. return delay, priority
  306. end
  307. --[[
  308. Function to get max events value or set by default 10000.
  309. ]]
  310. local function getOrSetMaxEvents(metaKey)
  311. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  312. if not maxEvents then
  313. maxEvents = 10000
  314. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  315. end
  316. return maxEvents
  317. end
  318. --[[
  319. Function to handle the case when job is duplicated.
  320. ]]
  321. -- Includes
  322. --[[
  323. This function is used to update the parent's dependencies if the job
  324. is already completed and about to be ignored. The parent must get its
  325. dependencies updated to avoid the parent job being stuck forever in
  326. the waiting-children state.
  327. ]]
  328. -- Includes
  329. --[[
  330. Validate and move or add dependencies to parent.
  331. ]]
  332. -- Includes
  333. --[[
  334. Validate and move parent to a wait status (waiting, delayed or prioritized)
  335. if no pending dependencies.
  336. ]]
  337. -- Includes
  338. --[[
  339. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  340. ]]
  341. -- Includes
  342. --[[
  343. Move parent to a wait status (wait, prioritized or delayed)
  344. ]]
  345. -- Includes
  346. --[[
  347. Add delay marker if needed.
  348. ]]
  349. -- Includes
  350. --[[
  351. Function to return the next delayed job timestamp.
  352. ]]
  353. local function getNextDelayedTimestamp(delayedKey)
  354. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  355. if #result then
  356. local nextTimestamp = tonumber(result[2])
  357. if nextTimestamp ~= nil then
  358. return nextTimestamp / 0x1000
  359. end
  360. end
  361. end
  362. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  363. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  364. if nextTimestamp ~= nil then
  365. -- Replace the score of the marker with the newest known
  366. -- next timestamp.
  367. rcall("ZADD", markerKey, nextTimestamp, "1")
  368. end
  369. end
  370. --[[
  371. Function to add job in target list and add marker if needed.
  372. ]]
  373. -- Includes
  374. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  375. rcall(pushCmd, targetKey, jobId)
  376. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  377. end
  378. --[[
  379. Function to check if queue is paused or maxed
  380. (since an empty list and !EXISTS are not really the same).
  381. ]]
  382. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  383. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  384. if queueAttributes[1] then
  385. return true
  386. else
  387. if queueAttributes[2] then
  388. local activeCount = rcall("LLEN", activeKey)
  389. return activeCount >= tonumber(queueAttributes[2])
  390. end
  391. end
  392. return false
  393. end
  394. --[[
  395. Function to check for the meta.paused key to decide if we are paused or not
  396. (since an empty list and !EXISTS are not really the same).
  397. ]]
  398. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  399. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  400. if queueAttributes[1] then
  401. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  402. else
  403. if queueAttributes[2] then
  404. local activeCount = rcall("LLEN", activeKey)
  405. if activeCount >= tonumber(queueAttributes[2]) then
  406. return waitKey, true, queueAttributes[3], queueAttributes[4]
  407. else
  408. return waitKey, false, queueAttributes[3], queueAttributes[4]
  409. end
  410. end
  411. end
  412. return waitKey, false, queueAttributes[3], queueAttributes[4]
  413. end
  414. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  415. local parentWaitKey = parentQueueKey .. ":wait"
  416. local parentPausedKey = parentQueueKey .. ":paused"
  417. local parentActiveKey = parentQueueKey .. ":active"
  418. local parentMetaKey = parentQueueKey .. ":meta"
  419. local parentMarkerKey = parentQueueKey .. ":marker"
  420. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  421. local priority = tonumber(jobAttributes[1]) or 0
  422. local delay = tonumber(jobAttributes[2]) or 0
  423. if delay > 0 then
  424. local delayedTimestamp = tonumber(timestamp) + delay
  425. local score = delayedTimestamp * 0x1000
  426. local parentDelayedKey = parentQueueKey .. ":delayed"
  427. rcall("ZADD", parentDelayedKey, score, parentId)
  428. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  429. delayedTimestamp)
  430. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  431. else
  432. if priority == 0 then
  433. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  434. parentWaitKey, parentPausedKey)
  435. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  436. else
  437. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  438. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  439. parentQueueKey .. ":pc", isPausedOrMaxed)
  440. end
  441. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  442. "waiting-children")
  443. end
  444. end
  445. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  446. if rcall("EXISTS", parentKey) == 1 then
  447. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  448. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  449. rcall("ZREM", parentWaitingChildrenKey, parentId)
  450. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  451. end
  452. end
  453. end
  454. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  455. parentId, timestamp)
  456. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  457. if doNotHavePendingDependencies then
  458. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  459. end
  460. end
  461. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  462. parentId, jobIdKey, returnvalue, timestamp )
  463. local processedSet = parentKey .. ":processed"
  464. rcall("HSET", processedSet, jobIdKey, returnvalue)
  465. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  466. end
  467. local function updateExistingJobsParent(parentKey, parent, parentData,
  468. parentDependenciesKey, completedKey,
  469. jobIdKey, jobId, timestamp)
  470. if parentKey ~= nil then
  471. if rcall("ZSCORE", completedKey, jobId) then
  472. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  473. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  474. parentDependenciesKey, parent['id'],
  475. jobIdKey, returnvalue, timestamp)
  476. else
  477. if parentDependenciesKey ~= nil then
  478. rcall("SADD", parentDependenciesKey, jobIdKey)
  479. end
  480. end
  481. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  482. end
  483. end
  484. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  485. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  486. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  487. if not existedParentKey or existedParentKey == currentParentKey then
  488. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  489. parentDependenciesKey, completedKey, jobKey,
  490. jobId, timestamp)
  491. else
  492. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  493. and (rcall("EXISTS", existedParentKey) == 1) then
  494. return -7
  495. end
  496. end
  497. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  498. "duplicated", "jobId", jobId)
  499. return jobId .. "" -- convert to string
  500. end
  501. if parentKey ~= nil then
  502. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  503. parentData = cjson.encode(parent)
  504. end
  505. local jobCounter = rcall("INCR", idKey)
  506. local maxEvents = getOrSetMaxEvents(metaKey)
  507. local parentDependenciesKey = args[6]
  508. local timestamp = args[4]
  509. if args[2] == "" then
  510. jobId = jobCounter
  511. jobIdKey = args[1] .. jobId
  512. else
  513. jobId = args[2]
  514. jobIdKey = args[1] .. jobId
  515. if rcall("EXISTS", jobIdKey) == 1 then
  516. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  517. parentData, parentDependenciesKey, completedKey, eventsKey,
  518. maxEvents, timestamp)
  519. end
  520. end
  521. local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[5],
  522. deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  523. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  524. if deduplicationJobId then
  525. return deduplicationJobId
  526. end
  527. -- Store the job.
  528. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
  529. opts, timestamp, parentKey, parentData,
  530. repeatJobKey)
  531. -- Add the job to the prioritized set
  532. local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
  533. addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)
  534. -- Emit waiting event
  535. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  536. "jobId", jobId)
  537. -- Check if this job is a child of another job, if so add it to the parents dependencies
  538. if parentDependenciesKey ~= nil then
  539. rcall("SADD", parentDependenciesKey, jobIdKey)
  540. end
  541. return jobId .. "" -- convert to string
  542. `;
  543. exports.addPrioritizedJob = {
  544. name: 'addPrioritizedJob',
  545. content,
  546. keys: 9,
  547. };
  548. //# sourceMappingURL=addPrioritizedJob-9.js.map