addStandardJob-9.js 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. const content = `--[[
  2. Adds a job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - if delayed:
  6. - computes timestamp.
  7. - adds to delayed zset.
  8. - Emits a global event 'delayed' if the job is delayed.
  9. - if not delayed
  10. - Adds the jobId to the wait/paused list in one of three ways:
  11. - LIFO
  12. - FIFO
  13. - prioritized.
  14. - Adds the job to the "added" list so that workers gets notified.
  15. Input:
  16. KEYS[1] 'wait',
  17. KEYS[2] 'paused'
  18. KEYS[3] 'meta'
  19. KEYS[4] 'id'
  20. KEYS[5] 'completed'
  21. KEYS[6] 'delayed'
  22. KEYS[7] 'active'
  23. KEYS[8] events stream key
  24. KEYS[9] marker key
  25. ARGV[1] msgpacked arguments array
  26. [1] key prefix,
  27. [2] custom id (will not generate one automatically)
  28. [3] name
  29. [4] timestamp
  30. [5] parentKey?
  31. [6] parent dependencies key.
  32. [7] parent? {id, queueKey}
  33. [8] repeat job key
  34. [9] deduplication key
  35. ARGV[2] Json stringified job data
  36. ARGV[3] msgpacked options
  37. Output:
  38. jobId - OK
  39. -5 - Missing parent key
  40. ]]
  41. local eventsKey = KEYS[8]
  42. local jobId
  43. local jobIdKey
  44. local rcall = redis.call
  45. local args = cmsgpack.unpack(ARGV[1])
  46. local data = ARGV[2]
  47. local opts = cmsgpack.unpack(ARGV[3])
  48. local parentKey = args[5]
  49. local parent = args[7]
  50. local repeatJobKey = args[8]
  51. local deduplicationKey = args[9]
  52. local parentData
  53. -- Includes
  54. --[[
  55. Function to add job in target list and add marker if needed.
  56. ]]
  57. -- Includes
  58. --[[
  59. Add marker if needed when a job is available.
  60. ]]
  61. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  62. if not isPausedOrMaxed then
  63. rcall("ZADD", markerKey, 0, "0")
  64. end
  65. end
  66. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  67. rcall(pushCmd, targetKey, jobId)
  68. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  69. end
  70. --[[
  71. Function to debounce a job.
  72. ]]
  73. -- Includes
  74. --[[
  75. Function to deduplicate a job.
  76. ]]
  77. --[[
  78. Function to set the deduplication key for a job.
  79. Uses TTL from deduplication opts if provided.
  80. ]]
  81. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  82. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  83. if ttl and ttl > 0 then
  84. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  85. else
  86. rcall('SET', deduplicationKey, jobId)
  87. end
  88. end
  89. --[[
  90. Function to store a deduplicated next job if the existing job is active
  91. and keepLastIfActive is set. When the active job finishes, the stored
  92. proto-job is used to create a real job in the queue.
  93. Returns true if the proto-job was stored, false otherwise.
  94. ]]
  95. --[[
  96. Functions to check if a item belongs to a list.
  97. ]]
  98. local function checkItemInList(list, item)
  99. for _, v in pairs(list) do
  100. if v == item then
  101. return 1
  102. end
  103. end
  104. return nil
  105. end
  106. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  107. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  108. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  109. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  110. local activeKey = prefix .. "active"
  111. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  112. if checkItemInList(activeItems, currentDebounceJobId) then
  113. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  114. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  115. if parentKey then
  116. fields[#fields+1] = 'pk'
  117. fields[#fields+1] = parentKey
  118. end
  119. if parentData then
  120. fields[#fields+1] = 'pd'
  121. fields[#fields+1] = parentData
  122. end
  123. if parentDependenciesKey then
  124. fields[#fields+1] = 'pdk'
  125. fields[#fields+1] = parentDependenciesKey
  126. end
  127. if repeatJobKey then
  128. fields[#fields+1] = 'rjk'
  129. fields[#fields+1] = repeatJobKey
  130. end
  131. rcall('HSET', deduplicationNextKey, unpack(fields))
  132. -- Ensure the dedup key does not expire while the job is active,
  133. -- so subsequent adds always hit the dedup path and never bypass
  134. -- the active-check because of a TTL expiry.
  135. local deduplicationKey = prefix .. "de:" .. deduplicationId
  136. rcall('PERSIST', deduplicationKey)
  137. -- TODO remove debounced event in next breaking change
  138. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  139. currentDebounceJobId, "debounceId", deduplicationId)
  140. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  141. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  142. return true
  143. end
  144. end
  145. return false
  146. end
  147. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  148. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  149. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  150. local ttl = deduplicationOpts['ttl']
  151. local deduplicationKeyExists
  152. if ttl and ttl > 0 then
  153. if deduplicationOpts['extend'] then
  154. local currentDebounceJobId = rcall('GET', deduplicationKey)
  155. if currentDebounceJobId then
  156. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  157. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  158. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  159. return currentDebounceJobId
  160. end
  161. if deduplicationOpts['keepLastIfActive'] then
  162. rcall('SET', deduplicationKey, currentDebounceJobId)
  163. else
  164. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  165. end
  166. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  167. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  168. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  169. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  170. return currentDebounceJobId
  171. else
  172. if deduplicationOpts['keepLastIfActive'] then
  173. rcall('SET', deduplicationKey, jobId)
  174. else
  175. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  176. end
  177. return
  178. end
  179. else
  180. if deduplicationOpts['keepLastIfActive'] then
  181. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  182. else
  183. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  184. end
  185. end
  186. else
  187. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  188. end
  189. if deduplicationKeyExists then
  190. local currentDebounceJobId = rcall('GET', deduplicationKey)
  191. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  192. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  193. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  194. return currentDebounceJobId
  195. end
  196. -- TODO remove debounced event in next breaking change
  197. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  198. currentDebounceJobId, "debounceId", deduplicationId)
  199. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  200. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  201. return currentDebounceJobId
  202. end
  203. end
  204. --[[
  205. Function to remove job keys.
  206. ]]
  207. local function removeJobKeys(jobKey)
  208. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  209. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  210. end
  211. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  212. jobId, deduplicationId, prefix)
  213. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  214. removeJobKeys(prefix .. currentDeduplicatedJobId)
  215. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  216. "prev", "delayed")
  217. -- TODO remove debounced event in next breaking change
  218. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  219. jobId, "debounceId", deduplicationId)
  220. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  221. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  222. return true
  223. end
  224. return false
  225. end
  226. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  227. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  228. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  229. if deduplicationId then
  230. if deduplicationOpts['replace'] then
  231. local currentDebounceJobId = rcall('GET', deduplicationKey)
  232. if currentDebounceJobId then
  233. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  234. currentDebounceJobId, jobId, deduplicationId, prefix)
  235. if isRemoved then
  236. if deduplicationOpts['keepLastIfActive'] then
  237. rcall('SET', deduplicationKey, jobId)
  238. else
  239. local ttl = deduplicationOpts['ttl']
  240. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  241. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  242. else
  243. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  244. end
  245. end
  246. return
  247. else
  248. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  249. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  250. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  251. return currentDebounceJobId
  252. end
  253. else
  254. if deduplicationOpts['keepLastIfActive'] then
  255. rcall('SET', deduplicationKey, jobId)
  256. else
  257. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  258. end
  259. return
  260. end
  261. else
  262. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  263. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  264. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  265. end
  266. end
  267. end
  268. --[[
  269. Function to get max events value or set by default 10000.
  270. ]]
  271. local function getOrSetMaxEvents(metaKey)
  272. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  273. if not maxEvents then
  274. maxEvents = 10000
  275. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  276. end
  277. return maxEvents
  278. end
  279. --[[
  280. Function to check for the meta.paused key to decide if we are paused or not
  281. (since an empty list and !EXISTS are not really the same).
  282. ]]
  283. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  284. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  285. if queueAttributes[1] then
  286. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  287. else
  288. if queueAttributes[2] then
  289. local activeCount = rcall("LLEN", activeKey)
  290. if activeCount >= tonumber(queueAttributes[2]) then
  291. return waitKey, true, queueAttributes[3], queueAttributes[4]
  292. else
  293. return waitKey, false, queueAttributes[3], queueAttributes[4]
  294. end
  295. end
  296. end
  297. return waitKey, false, queueAttributes[3], queueAttributes[4]
  298. end
  299. --[[
  300. Function to handle the case when job is duplicated.
  301. ]]
  302. -- Includes
  303. --[[
  304. This function is used to update the parent's dependencies if the job
  305. is already completed and about to be ignored. The parent must get its
  306. dependencies updated to avoid the parent job being stuck forever in
  307. the waiting-children state.
  308. ]]
  309. -- Includes
  310. --[[
  311. Validate and move or add dependencies to parent.
  312. ]]
  313. -- Includes
  314. --[[
  315. Validate and move parent to a wait status (waiting, delayed or prioritized)
  316. if no pending dependencies.
  317. ]]
  318. -- Includes
  319. --[[
  320. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  321. ]]
  322. -- Includes
  323. --[[
  324. Move parent to a wait status (wait, prioritized or delayed)
  325. ]]
  326. -- Includes
  327. --[[
  328. Add delay marker if needed.
  329. ]]
  330. -- Includes
  331. --[[
  332. Function to return the next delayed job timestamp.
  333. ]]
  334. local function getNextDelayedTimestamp(delayedKey)
  335. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  336. if #result then
  337. local nextTimestamp = tonumber(result[2])
  338. if nextTimestamp ~= nil then
  339. return nextTimestamp / 0x1000
  340. end
  341. end
  342. end
  343. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  344. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  345. if nextTimestamp ~= nil then
  346. -- Replace the score of the marker with the newest known
  347. -- next timestamp.
  348. rcall("ZADD", markerKey, nextTimestamp, "1")
  349. end
  350. end
  351. --[[
  352. Function to add job considering priority.
  353. ]]
  354. -- Includes
  355. --[[
  356. Function to get priority score.
  357. ]]
  358. local function getPriorityScore(priority, priorityCounterKey)
  359. local prioCounter = rcall("INCR", priorityCounterKey)
  360. return priority * 0x100000000 + prioCounter % 0x100000000
  361. end
  362. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  363. isPausedOrMaxed)
  364. local score = getPriorityScore(priority, priorityCounterKey)
  365. rcall("ZADD", prioritizedKey, score, jobId)
  366. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  367. end
  368. --[[
  369. Function to check if queue is paused or maxed
  370. (since an empty list and !EXISTS are not really the same).
  371. ]]
  372. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  373. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  374. if queueAttributes[1] then
  375. return true
  376. else
  377. if queueAttributes[2] then
  378. local activeCount = rcall("LLEN", activeKey)
  379. return activeCount >= tonumber(queueAttributes[2])
  380. end
  381. end
  382. return false
  383. end
  384. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  385. local parentWaitKey = parentQueueKey .. ":wait"
  386. local parentPausedKey = parentQueueKey .. ":paused"
  387. local parentActiveKey = parentQueueKey .. ":active"
  388. local parentMetaKey = parentQueueKey .. ":meta"
  389. local parentMarkerKey = parentQueueKey .. ":marker"
  390. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  391. local priority = tonumber(jobAttributes[1]) or 0
  392. local delay = tonumber(jobAttributes[2]) or 0
  393. if delay > 0 then
  394. local delayedTimestamp = tonumber(timestamp) + delay
  395. local score = delayedTimestamp * 0x1000
  396. local parentDelayedKey = parentQueueKey .. ":delayed"
  397. rcall("ZADD", parentDelayedKey, score, parentId)
  398. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  399. delayedTimestamp)
  400. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  401. else
  402. if priority == 0 then
  403. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  404. parentWaitKey, parentPausedKey)
  405. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  406. else
  407. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  408. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  409. parentQueueKey .. ":pc", isPausedOrMaxed)
  410. end
  411. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  412. "waiting-children")
  413. end
  414. end
  415. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  416. if rcall("EXISTS", parentKey) == 1 then
  417. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  418. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  419. rcall("ZREM", parentWaitingChildrenKey, parentId)
  420. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  421. end
  422. end
  423. end
  424. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  425. parentId, timestamp)
  426. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  427. if doNotHavePendingDependencies then
  428. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  429. end
  430. end
  431. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  432. parentId, jobIdKey, returnvalue, timestamp )
  433. local processedSet = parentKey .. ":processed"
  434. rcall("HSET", processedSet, jobIdKey, returnvalue)
  435. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  436. end
  437. local function updateExistingJobsParent(parentKey, parent, parentData,
  438. parentDependenciesKey, completedKey,
  439. jobIdKey, jobId, timestamp)
  440. if parentKey ~= nil then
  441. if rcall("ZSCORE", completedKey, jobId) then
  442. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  443. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  444. parentDependenciesKey, parent['id'],
  445. jobIdKey, returnvalue, timestamp)
  446. else
  447. if parentDependenciesKey ~= nil then
  448. rcall("SADD", parentDependenciesKey, jobIdKey)
  449. end
  450. end
  451. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  452. end
  453. end
  454. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  455. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  456. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  457. if not existedParentKey or existedParentKey == currentParentKey then
  458. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  459. parentDependenciesKey, completedKey, jobKey,
  460. jobId, timestamp)
  461. else
  462. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  463. and (rcall("EXISTS", existedParentKey) == 1) then
  464. return -7
  465. end
  466. end
  467. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  468. "duplicated", "jobId", jobId)
  469. return jobId .. "" -- convert to string
  470. end
  471. --[[
  472. Function to store a job
  473. ]]
  474. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  475. parentKey, parentData, repeatJobKey)
  476. local jsonOpts = cjson.encode(opts)
  477. local delay = opts['delay'] or 0
  478. local priority = opts['priority'] or 0
  479. local debounceId = opts['de'] and opts['de']['id']
  480. local optionalValues = {}
  481. if parentKey ~= nil then
  482. table.insert(optionalValues, "parentKey")
  483. table.insert(optionalValues, parentKey)
  484. table.insert(optionalValues, "parent")
  485. table.insert(optionalValues, parentData)
  486. end
  487. if repeatJobKey then
  488. table.insert(optionalValues, "rjk")
  489. table.insert(optionalValues, repeatJobKey)
  490. end
  491. if debounceId then
  492. table.insert(optionalValues, "deid")
  493. table.insert(optionalValues, debounceId)
  494. end
  495. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  496. "timestamp", timestamp, "delay", delay, "priority", priority,
  497. unpack(optionalValues))
  498. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  499. return delay, priority
  500. end
  501. if parentKey ~= nil then
  502. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  503. parentData = cjson.encode(parent)
  504. end
  505. local jobCounter = rcall("INCR", KEYS[4])
  506. local metaKey = KEYS[3]
  507. local maxEvents = getOrSetMaxEvents(metaKey)
  508. local parentDependenciesKey = args[6]
  509. local timestamp = args[4]
  510. if args[2] == "" then
  511. jobId = jobCounter
  512. jobIdKey = args[1] .. jobId
  513. else
  514. jobId = args[2]
  515. jobIdKey = args[1] .. jobId
  516. if rcall("EXISTS", jobIdKey) == 1 then
  517. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  518. parentData, parentDependenciesKey, KEYS[5], eventsKey,
  519. maxEvents, timestamp)
  520. end
  521. end
  522. local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[6],
  523. deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  524. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  525. if deduplicationJobId then
  526. return deduplicationJobId
  527. end
  528. -- Store the job.
  529. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
  530. parentKey, parentData, repeatJobKey)
  531. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[1], KEYS[2])
  532. -- LIFO or FIFO
  533. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  534. addJobInTargetList(target, KEYS[9], pushCmd, isPausedOrMaxed, jobId)
  535. -- Emit waiting event
  536. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  537. "jobId", jobId)
  538. -- Check if this job is a child of another job, if so add it to the parents dependencies
  539. if parentDependenciesKey ~= nil then
  540. rcall("SADD", parentDependenciesKey, jobIdKey)
  541. end
  542. return jobId .. "" -- convert to string
  543. `;
  544. export const addStandardJob = {
  545. name: 'addStandardJob',
  546. content,
  547. keys: 9,
  548. };
  549. //# sourceMappingURL=addStandardJob-9.js.map