addStandardJob-9.js 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.addStandardJob = void 0;
  4. const content = `--[[
  5. Adds a job to the queue by doing the following:
  6. - Increases the job counter if needed.
  7. - Creates a new job key with the job data.
  8. - if delayed:
  9. - computes timestamp.
  10. - adds to delayed zset.
  11. - Emits a global event 'delayed' if the job is delayed.
  12. - if not delayed
  13. - Adds the jobId to the wait/paused list in one of three ways:
  14. - LIFO
  15. - FIFO
  16. - prioritized.
  17. - Adds the job to the "added" list so that workers gets notified.
  18. Input:
  19. KEYS[1] 'wait',
  20. KEYS[2] 'paused'
  21. KEYS[3] 'meta'
  22. KEYS[4] 'id'
  23. KEYS[5] 'completed'
  24. KEYS[6] 'delayed'
  25. KEYS[7] 'active'
  26. KEYS[8] events stream key
  27. KEYS[9] marker key
  28. ARGV[1] msgpacked arguments array
  29. [1] key prefix,
  30. [2] custom id (will not generate one automatically)
  31. [3] name
  32. [4] timestamp
  33. [5] parentKey?
  34. [6] parent dependencies key.
  35. [7] parent? {id, queueKey}
  36. [8] repeat job key
  37. [9] deduplication key
  38. ARGV[2] Json stringified job data
  39. ARGV[3] msgpacked options
  40. Output:
  41. jobId - OK
  42. -5 - Missing parent key
  43. ]]
  44. local eventsKey = KEYS[8]
  45. local jobId
  46. local jobIdKey
  47. local rcall = redis.call
  48. local args = cmsgpack.unpack(ARGV[1])
  49. local data = ARGV[2]
  50. local opts = cmsgpack.unpack(ARGV[3])
  51. local parentKey = args[5]
  52. local parent = args[7]
  53. local repeatJobKey = args[8]
  54. local deduplicationKey = args[9]
  55. local parentData
  56. -- Includes
  57. --[[
  58. Function to add job in target list and add marker if needed.
  59. ]]
  60. -- Includes
  61. --[[
  62. Add marker if needed when a job is available.
  63. ]]
  64. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  65. if not isPausedOrMaxed then
  66. rcall("ZADD", markerKey, 0, "0")
  67. end
  68. end
  69. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  70. rcall(pushCmd, targetKey, jobId)
  71. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  72. end
  73. --[[
  74. Function to debounce a job.
  75. ]]
  76. -- Includes
  77. --[[
  78. Function to deduplicate a job.
  79. ]]
  80. --[[
  81. Function to set the deduplication key for a job.
  82. Uses TTL from deduplication opts if provided.
  83. ]]
  84. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  85. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  86. if ttl and ttl > 0 then
  87. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  88. else
  89. rcall('SET', deduplicationKey, jobId)
  90. end
  91. end
  92. --[[
  93. Function to store a deduplicated next job if the existing job is active
  94. and keepLastIfActive is set. When the active job finishes, the stored
  95. proto-job is used to create a real job in the queue.
  96. Returns true if the proto-job was stored, false otherwise.
  97. ]]
  98. --[[
  99. Functions to check if a item belongs to a list.
  100. ]]
  101. local function checkItemInList(list, item)
  102. for _, v in pairs(list) do
  103. if v == item then
  104. return 1
  105. end
  106. end
  107. return nil
  108. end
  109. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  110. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  111. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  112. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  113. local activeKey = prefix .. "active"
  114. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  115. if checkItemInList(activeItems, currentDebounceJobId) then
  116. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  117. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  118. if parentKey then
  119. fields[#fields+1] = 'pk'
  120. fields[#fields+1] = parentKey
  121. end
  122. if parentData then
  123. fields[#fields+1] = 'pd'
  124. fields[#fields+1] = parentData
  125. end
  126. if parentDependenciesKey then
  127. fields[#fields+1] = 'pdk'
  128. fields[#fields+1] = parentDependenciesKey
  129. end
  130. if repeatJobKey then
  131. fields[#fields+1] = 'rjk'
  132. fields[#fields+1] = repeatJobKey
  133. end
  134. rcall('HSET', deduplicationNextKey, unpack(fields))
  135. -- Ensure the dedup key does not expire while the job is active,
  136. -- so subsequent adds always hit the dedup path and never bypass
  137. -- the active-check because of a TTL expiry.
  138. local deduplicationKey = prefix .. "de:" .. deduplicationId
  139. rcall('PERSIST', deduplicationKey)
  140. -- TODO remove debounced event in next breaking change
  141. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  142. currentDebounceJobId, "debounceId", deduplicationId)
  143. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  144. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  145. return true
  146. end
  147. end
  148. return false
  149. end
  150. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  151. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  152. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  153. local ttl = deduplicationOpts['ttl']
  154. local deduplicationKeyExists
  155. if ttl and ttl > 0 then
  156. if deduplicationOpts['extend'] then
  157. local currentDebounceJobId = rcall('GET', deduplicationKey)
  158. if currentDebounceJobId then
  159. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  160. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  161. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  162. return currentDebounceJobId
  163. end
  164. if deduplicationOpts['keepLastIfActive'] then
  165. rcall('SET', deduplicationKey, currentDebounceJobId)
  166. else
  167. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  168. end
  169. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  170. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  171. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  172. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  173. return currentDebounceJobId
  174. else
  175. if deduplicationOpts['keepLastIfActive'] then
  176. rcall('SET', deduplicationKey, jobId)
  177. else
  178. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  179. end
  180. return
  181. end
  182. else
  183. if deduplicationOpts['keepLastIfActive'] then
  184. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  185. else
  186. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  187. end
  188. end
  189. else
  190. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  191. end
  192. if deduplicationKeyExists then
  193. local currentDebounceJobId = rcall('GET', deduplicationKey)
  194. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  195. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  196. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  197. return currentDebounceJobId
  198. end
  199. -- TODO remove debounced event in next breaking change
  200. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  201. currentDebounceJobId, "debounceId", deduplicationId)
  202. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  203. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  204. return currentDebounceJobId
  205. end
  206. end
  207. --[[
  208. Function to remove job keys.
  209. ]]
  210. local function removeJobKeys(jobKey)
  211. return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies',
  212. jobKey .. ':processed', jobKey .. ':failed', jobKey .. ':unsuccessful')
  213. end
  214. local function removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents, currentDeduplicatedJobId,
  215. jobId, deduplicationId, prefix)
  216. if rcall("ZREM", delayedKey, currentDeduplicatedJobId) > 0 then
  217. removeJobKeys(prefix .. currentDeduplicatedJobId)
  218. rcall("XADD", eventsKey, "*", "event", "removed", "jobId", currentDeduplicatedJobId,
  219. "prev", "delayed")
  220. -- TODO remove debounced event in next breaking change
  221. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  222. jobId, "debounceId", deduplicationId)
  223. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  224. jobId, "deduplicationId", deduplicationId, "deduplicatedJobId", currentDeduplicatedJobId)
  225. return true
  226. end
  227. return false
  228. end
  229. local function deduplicateJob(deduplicationOpts, jobId, delayedKey, deduplicationKey, eventsKey, maxEvents,
  230. prefix, jobName, jobData, fullOpts, parentKey, parentData, parentDependenciesKey, repeatJobKey)
  231. local deduplicationId = deduplicationOpts and deduplicationOpts['id']
  232. if deduplicationId then
  233. if deduplicationOpts['replace'] then
  234. local currentDebounceJobId = rcall('GET', deduplicationKey)
  235. if currentDebounceJobId then
  236. local isRemoved = removeDelayedJob(delayedKey, deduplicationKey, eventsKey, maxEvents,
  237. currentDebounceJobId, jobId, deduplicationId, prefix)
  238. if isRemoved then
  239. if deduplicationOpts['keepLastIfActive'] then
  240. rcall('SET', deduplicationKey, jobId)
  241. else
  242. local ttl = deduplicationOpts['ttl']
  243. if not deduplicationOpts['extend'] and ttl and ttl > 0 then
  244. rcall('SET', deduplicationKey, jobId, 'KEEPTTL')
  245. else
  246. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  247. end
  248. end
  249. return
  250. else
  251. storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  252. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  253. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  254. return currentDebounceJobId
  255. end
  256. else
  257. if deduplicationOpts['keepLastIfActive'] then
  258. rcall('SET', deduplicationKey, jobId)
  259. else
  260. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  261. end
  262. return
  263. end
  264. else
  265. return deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts,
  266. jobId, deduplicationKey, eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  267. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  268. end
  269. end
  270. end
  271. --[[
  272. Function to get max events value or set by default 10000.
  273. ]]
  274. local function getOrSetMaxEvents(metaKey)
  275. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  276. if not maxEvents then
  277. maxEvents = 10000
  278. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  279. end
  280. return maxEvents
  281. end
  282. --[[
  283. Function to check for the meta.paused key to decide if we are paused or not
  284. (since an empty list and !EXISTS are not really the same).
  285. ]]
  286. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  287. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  288. if queueAttributes[1] then
  289. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  290. else
  291. if queueAttributes[2] then
  292. local activeCount = rcall("LLEN", activeKey)
  293. if activeCount >= tonumber(queueAttributes[2]) then
  294. return waitKey, true, queueAttributes[3], queueAttributes[4]
  295. else
  296. return waitKey, false, queueAttributes[3], queueAttributes[4]
  297. end
  298. end
  299. end
  300. return waitKey, false, queueAttributes[3], queueAttributes[4]
  301. end
  302. --[[
  303. Function to handle the case when job is duplicated.
  304. ]]
  305. -- Includes
  306. --[[
  307. This function is used to update the parent's dependencies if the job
  308. is already completed and about to be ignored. The parent must get its
  309. dependencies updated to avoid the parent job being stuck forever in
  310. the waiting-children state.
  311. ]]
  312. -- Includes
  313. --[[
  314. Validate and move or add dependencies to parent.
  315. ]]
  316. -- Includes
  317. --[[
  318. Validate and move parent to a wait status (waiting, delayed or prioritized)
  319. if no pending dependencies.
  320. ]]
  321. -- Includes
  322. --[[
  323. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  324. ]]
  325. -- Includes
  326. --[[
  327. Move parent to a wait status (wait, prioritized or delayed)
  328. ]]
  329. -- Includes
  330. --[[
  331. Add delay marker if needed.
  332. ]]
  333. -- Includes
  334. --[[
  335. Function to return the next delayed job timestamp.
  336. ]]
  337. local function getNextDelayedTimestamp(delayedKey)
  338. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  339. if #result then
  340. local nextTimestamp = tonumber(result[2])
  341. if nextTimestamp ~= nil then
  342. return nextTimestamp / 0x1000
  343. end
  344. end
  345. end
  346. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  347. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  348. if nextTimestamp ~= nil then
  349. -- Replace the score of the marker with the newest known
  350. -- next timestamp.
  351. rcall("ZADD", markerKey, nextTimestamp, "1")
  352. end
  353. end
  354. --[[
  355. Function to add job considering priority.
  356. ]]
  357. -- Includes
  358. --[[
  359. Function to get priority score.
  360. ]]
  361. local function getPriorityScore(priority, priorityCounterKey)
  362. local prioCounter = rcall("INCR", priorityCounterKey)
  363. return priority * 0x100000000 + prioCounter % 0x100000000
  364. end
  365. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  366. isPausedOrMaxed)
  367. local score = getPriorityScore(priority, priorityCounterKey)
  368. rcall("ZADD", prioritizedKey, score, jobId)
  369. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  370. end
  371. --[[
  372. Function to check if queue is paused or maxed
  373. (since an empty list and !EXISTS are not really the same).
  374. ]]
  375. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  376. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  377. if queueAttributes[1] then
  378. return true
  379. else
  380. if queueAttributes[2] then
  381. local activeCount = rcall("LLEN", activeKey)
  382. return activeCount >= tonumber(queueAttributes[2])
  383. end
  384. end
  385. return false
  386. end
  387. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  388. local parentWaitKey = parentQueueKey .. ":wait"
  389. local parentPausedKey = parentQueueKey .. ":paused"
  390. local parentActiveKey = parentQueueKey .. ":active"
  391. local parentMetaKey = parentQueueKey .. ":meta"
  392. local parentMarkerKey = parentQueueKey .. ":marker"
  393. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  394. local priority = tonumber(jobAttributes[1]) or 0
  395. local delay = tonumber(jobAttributes[2]) or 0
  396. if delay > 0 then
  397. local delayedTimestamp = tonumber(timestamp) + delay
  398. local score = delayedTimestamp * 0x1000
  399. local parentDelayedKey = parentQueueKey .. ":delayed"
  400. rcall("ZADD", parentDelayedKey, score, parentId)
  401. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  402. delayedTimestamp)
  403. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  404. else
  405. if priority == 0 then
  406. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  407. parentWaitKey, parentPausedKey)
  408. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  409. else
  410. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  411. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  412. parentQueueKey .. ":pc", isPausedOrMaxed)
  413. end
  414. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  415. "waiting-children")
  416. end
  417. end
  418. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  419. if rcall("EXISTS", parentKey) == 1 then
  420. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  421. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  422. rcall("ZREM", parentWaitingChildrenKey, parentId)
  423. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  424. end
  425. end
  426. end
  427. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  428. parentId, timestamp)
  429. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  430. if doNotHavePendingDependencies then
  431. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  432. end
  433. end
  434. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  435. parentId, jobIdKey, returnvalue, timestamp )
  436. local processedSet = parentKey .. ":processed"
  437. rcall("HSET", processedSet, jobIdKey, returnvalue)
  438. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  439. end
  440. local function updateExistingJobsParent(parentKey, parent, parentData,
  441. parentDependenciesKey, completedKey,
  442. jobIdKey, jobId, timestamp)
  443. if parentKey ~= nil then
  444. if rcall("ZSCORE", completedKey, jobId) then
  445. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  446. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  447. parentDependenciesKey, parent['id'],
  448. jobIdKey, returnvalue, timestamp)
  449. else
  450. if parentDependenciesKey ~= nil then
  451. rcall("SADD", parentDependenciesKey, jobIdKey)
  452. end
  453. end
  454. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  455. end
  456. end
  457. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  458. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  459. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  460. if not existedParentKey or existedParentKey == currentParentKey then
  461. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  462. parentDependenciesKey, completedKey, jobKey,
  463. jobId, timestamp)
  464. else
  465. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  466. and (rcall("EXISTS", existedParentKey) == 1) then
  467. return -7
  468. end
  469. end
  470. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  471. "duplicated", "jobId", jobId)
  472. return jobId .. "" -- convert to string
  473. end
  474. --[[
  475. Function to store a job
  476. ]]
  477. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  478. parentKey, parentData, repeatJobKey)
  479. local jsonOpts = cjson.encode(opts)
  480. local delay = opts['delay'] or 0
  481. local priority = opts['priority'] or 0
  482. local debounceId = opts['de'] and opts['de']['id']
  483. local optionalValues = {}
  484. if parentKey ~= nil then
  485. table.insert(optionalValues, "parentKey")
  486. table.insert(optionalValues, parentKey)
  487. table.insert(optionalValues, "parent")
  488. table.insert(optionalValues, parentData)
  489. end
  490. if repeatJobKey then
  491. table.insert(optionalValues, "rjk")
  492. table.insert(optionalValues, repeatJobKey)
  493. end
  494. if debounceId then
  495. table.insert(optionalValues, "deid")
  496. table.insert(optionalValues, debounceId)
  497. end
  498. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  499. "timestamp", timestamp, "delay", delay, "priority", priority,
  500. unpack(optionalValues))
  501. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  502. return delay, priority
  503. end
  504. if parentKey ~= nil then
  505. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  506. parentData = cjson.encode(parent)
  507. end
  508. local jobCounter = rcall("INCR", KEYS[4])
  509. local metaKey = KEYS[3]
  510. local maxEvents = getOrSetMaxEvents(metaKey)
  511. local parentDependenciesKey = args[6]
  512. local timestamp = args[4]
  513. if args[2] == "" then
  514. jobId = jobCounter
  515. jobIdKey = args[1] .. jobId
  516. else
  517. jobId = args[2]
  518. jobIdKey = args[1] .. jobId
  519. if rcall("EXISTS", jobIdKey) == 1 then
  520. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  521. parentData, parentDependenciesKey, KEYS[5], eventsKey,
  522. maxEvents, timestamp)
  523. end
  524. end
  525. local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[6],
  526. deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  527. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  528. if deduplicationJobId then
  529. return deduplicationJobId
  530. end
  531. -- Store the job.
  532. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
  533. parentKey, parentData, repeatJobKey)
  534. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[1], KEYS[2])
  535. -- LIFO or FIFO
  536. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  537. addJobInTargetList(target, KEYS[9], pushCmd, isPausedOrMaxed, jobId)
  538. -- Emit waiting event
  539. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  540. "jobId", jobId)
  541. -- Check if this job is a child of another job, if so add it to the parents dependencies
  542. if parentDependenciesKey ~= nil then
  543. rcall("SADD", parentDependenciesKey, jobIdKey)
  544. end
  545. return jobId .. "" -- convert to string
  546. `;
  547. exports.addStandardJob = {
  548. name: 'addStandardJob',
  549. content,
  550. keys: 9,
  551. };
  552. //# sourceMappingURL=addStandardJob-9.js.map