addParentJob-6.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. const content = `--[[
  2. Adds a parent job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - adds the job to the waiting-children zset
  6. Input:
  7. KEYS[1] 'meta'
  8. KEYS[2] 'id'
  9. KEYS[3] 'delayed'
  10. KEYS[4] 'waiting-children'
  11. KEYS[5] 'completed'
  12. KEYS[6] events stream key
  13. ARGV[1] msgpacked arguments array
  14. [1] key prefix,
  15. [2] custom id (will not generate one automatically)
  16. [3] name
  17. [4] timestamp
  18. [5] parentKey?
  19. [6] parent dependencies key.
  20. [7] parent? {id, queueKey}
  21. [8] repeat job key
  22. [9] deduplication key
  23. ARGV[2] Json stringified job data
  24. ARGV[3] msgpacked options
  25. Output:
  26. jobId - OK
  27. -5 - Missing parent key
  28. ]]
  29. local metaKey = KEYS[1]
  30. local idKey = KEYS[2]
  31. local delayedKey = KEYS[3]
  32. local completedKey = KEYS[5]
  33. local eventsKey = KEYS[6]
  34. local jobId
  35. local jobIdKey
  36. local rcall = redis.call
  37. local args = cmsgpack.unpack(ARGV[1])
  38. local data = ARGV[2]
  39. local opts = cmsgpack.unpack(ARGV[3])
  40. local parentKey = args[5]
  41. local parent = args[7]
  42. local repeatJobKey = args[8]
  43. local deduplicationKey = args[9]
  44. local parentData
  45. -- Includes
  46. --[[
  47. Function to deduplicate a job.
  48. ]]
  49. --[[
  50. Function to set the deduplication key for a job.
  51. Uses TTL from deduplication opts if provided.
  52. ]]
  53. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  54. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  55. if ttl and ttl > 0 then
  56. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  57. else
  58. rcall('SET', deduplicationKey, jobId)
  59. end
  60. end
  61. --[[
  62. Function to store a deduplicated next job if the existing job is active
  63. and keepLastIfActive is set. When the active job finishes, the stored
  64. proto-job is used to create a real job in the queue.
  65. Returns true if the proto-job was stored, false otherwise.
  66. ]]
  67. --[[
  68. Functions to check if a item belongs to a list.
  69. ]]
  70. local function checkItemInList(list, item)
  71. for _, v in pairs(list) do
  72. if v == item then
  73. return 1
  74. end
  75. end
  76. return nil
  77. end
  78. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  79. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  80. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  81. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  82. local activeKey = prefix .. "active"
  83. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  84. if checkItemInList(activeItems, currentDebounceJobId) then
  85. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  86. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  87. if parentKey then
  88. fields[#fields+1] = 'pk'
  89. fields[#fields+1] = parentKey
  90. end
  91. if parentData then
  92. fields[#fields+1] = 'pd'
  93. fields[#fields+1] = parentData
  94. end
  95. if parentDependenciesKey then
  96. fields[#fields+1] = 'pdk'
  97. fields[#fields+1] = parentDependenciesKey
  98. end
  99. if repeatJobKey then
  100. fields[#fields+1] = 'rjk'
  101. fields[#fields+1] = repeatJobKey
  102. end
  103. rcall('HSET', deduplicationNextKey, unpack(fields))
  104. -- Ensure the dedup key does not expire while the job is active,
  105. -- so subsequent adds always hit the dedup path and never bypass
  106. -- the active-check because of a TTL expiry.
  107. local deduplicationKey = prefix .. "de:" .. deduplicationId
  108. rcall('PERSIST', deduplicationKey)
  109. -- TODO remove debounced event in next breaking change
  110. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  111. currentDebounceJobId, "debounceId", deduplicationId)
  112. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  113. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  114. return true
  115. end
  116. end
  117. return false
  118. end
  119. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  120. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  121. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  122. local ttl = deduplicationOpts['ttl']
  123. local deduplicationKeyExists
  124. if ttl and ttl > 0 then
  125. if deduplicationOpts['extend'] then
  126. local currentDebounceJobId = rcall('GET', deduplicationKey)
  127. if currentDebounceJobId then
  128. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  129. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  130. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  131. return currentDebounceJobId
  132. end
  133. if deduplicationOpts['keepLastIfActive'] then
  134. rcall('SET', deduplicationKey, currentDebounceJobId)
  135. else
  136. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  137. end
  138. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  139. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  140. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  141. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  142. return currentDebounceJobId
  143. else
  144. if deduplicationOpts['keepLastIfActive'] then
  145. rcall('SET', deduplicationKey, jobId)
  146. else
  147. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  148. end
  149. return
  150. end
  151. else
  152. if deduplicationOpts['keepLastIfActive'] then
  153. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  154. else
  155. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  156. end
  157. end
  158. else
  159. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  160. end
  161. if deduplicationKeyExists then
  162. local currentDebounceJobId = rcall('GET', deduplicationKey)
  163. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  164. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  165. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  166. return currentDebounceJobId
  167. end
  168. -- TODO remove debounced event in next breaking change
  169. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  170. currentDebounceJobId, "debounceId", deduplicationId)
  171. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  172. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  173. return currentDebounceJobId
  174. end
  175. end
  176. --[[
  177. Function to get max events value or set by default 10000.
  178. ]]
  179. local function getOrSetMaxEvents(metaKey)
  180. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  181. if not maxEvents then
  182. maxEvents = 10000
  183. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  184. end
  185. return maxEvents
  186. end
  187. --[[
  188. Function to handle the case when job is duplicated.
  189. ]]
  190. -- Includes
  191. --[[
  192. This function is used to update the parent's dependencies if the job
  193. is already completed and about to be ignored. The parent must get its
  194. dependencies updated to avoid the parent job being stuck forever in
  195. the waiting-children state.
  196. ]]
  197. -- Includes
  198. --[[
  199. Validate and move or add dependencies to parent.
  200. ]]
  201. -- Includes
  202. --[[
  203. Validate and move parent to a wait status (waiting, delayed or prioritized)
  204. if no pending dependencies.
  205. ]]
  206. -- Includes
  207. --[[
  208. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  209. ]]
  210. -- Includes
  211. --[[
  212. Move parent to a wait status (wait, prioritized or delayed)
  213. ]]
  214. -- Includes
  215. --[[
  216. Add delay marker if needed.
  217. ]]
  218. -- Includes
  219. --[[
  220. Function to return the next delayed job timestamp.
  221. ]]
  222. local function getNextDelayedTimestamp(delayedKey)
  223. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  224. if #result then
  225. local nextTimestamp = tonumber(result[2])
  226. if nextTimestamp ~= nil then
  227. return nextTimestamp / 0x1000
  228. end
  229. end
  230. end
  231. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  232. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  233. if nextTimestamp ~= nil then
  234. -- Replace the score of the marker with the newest known
  235. -- next timestamp.
  236. rcall("ZADD", markerKey, nextTimestamp, "1")
  237. end
  238. end
  239. --[[
  240. Function to add job in target list and add marker if needed.
  241. ]]
  242. -- Includes
  243. --[[
  244. Add marker if needed when a job is available.
  245. ]]
  246. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  247. if not isPausedOrMaxed then
  248. rcall("ZADD", markerKey, 0, "0")
  249. end
  250. end
  251. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  252. rcall(pushCmd, targetKey, jobId)
  253. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  254. end
  255. --[[
  256. Function to add job considering priority.
  257. ]]
  258. -- Includes
  259. --[[
  260. Function to get priority score.
  261. ]]
  262. local function getPriorityScore(priority, priorityCounterKey)
  263. local prioCounter = rcall("INCR", priorityCounterKey)
  264. return priority * 0x100000000 + prioCounter % 0x100000000
  265. end
  266. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  267. isPausedOrMaxed)
  268. local score = getPriorityScore(priority, priorityCounterKey)
  269. rcall("ZADD", prioritizedKey, score, jobId)
  270. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  271. end
  272. --[[
  273. Function to check if queue is paused or maxed
  274. (since an empty list and !EXISTS are not really the same).
  275. ]]
  276. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  277. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  278. if queueAttributes[1] then
  279. return true
  280. else
  281. if queueAttributes[2] then
  282. local activeCount = rcall("LLEN", activeKey)
  283. return activeCount >= tonumber(queueAttributes[2])
  284. end
  285. end
  286. return false
  287. end
  288. --[[
  289. Function to check for the meta.paused key to decide if we are paused or not
  290. (since an empty list and !EXISTS are not really the same).
  291. ]]
  292. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  293. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  294. if queueAttributes[1] then
  295. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  296. else
  297. if queueAttributes[2] then
  298. local activeCount = rcall("LLEN", activeKey)
  299. if activeCount >= tonumber(queueAttributes[2]) then
  300. return waitKey, true, queueAttributes[3], queueAttributes[4]
  301. else
  302. return waitKey, false, queueAttributes[3], queueAttributes[4]
  303. end
  304. end
  305. end
  306. return waitKey, false, queueAttributes[3], queueAttributes[4]
  307. end
  308. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  309. local parentWaitKey = parentQueueKey .. ":wait"
  310. local parentPausedKey = parentQueueKey .. ":paused"
  311. local parentActiveKey = parentQueueKey .. ":active"
  312. local parentMetaKey = parentQueueKey .. ":meta"
  313. local parentMarkerKey = parentQueueKey .. ":marker"
  314. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  315. local priority = tonumber(jobAttributes[1]) or 0
  316. local delay = tonumber(jobAttributes[2]) or 0
  317. if delay > 0 then
  318. local delayedTimestamp = tonumber(timestamp) + delay
  319. local score = delayedTimestamp * 0x1000
  320. local parentDelayedKey = parentQueueKey .. ":delayed"
  321. rcall("ZADD", parentDelayedKey, score, parentId)
  322. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  323. delayedTimestamp)
  324. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  325. else
  326. if priority == 0 then
  327. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  328. parentWaitKey, parentPausedKey)
  329. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  330. else
  331. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  332. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  333. parentQueueKey .. ":pc", isPausedOrMaxed)
  334. end
  335. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  336. "waiting-children")
  337. end
  338. end
  339. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  340. if rcall("EXISTS", parentKey) == 1 then
  341. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  342. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  343. rcall("ZREM", parentWaitingChildrenKey, parentId)
  344. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  345. end
  346. end
  347. end
  348. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  349. parentId, timestamp)
  350. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  351. if doNotHavePendingDependencies then
  352. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  353. end
  354. end
  355. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  356. parentId, jobIdKey, returnvalue, timestamp )
  357. local processedSet = parentKey .. ":processed"
  358. rcall("HSET", processedSet, jobIdKey, returnvalue)
  359. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  360. end
  361. local function updateExistingJobsParent(parentKey, parent, parentData,
  362. parentDependenciesKey, completedKey,
  363. jobIdKey, jobId, timestamp)
  364. if parentKey ~= nil then
  365. if rcall("ZSCORE", completedKey, jobId) then
  366. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  367. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  368. parentDependenciesKey, parent['id'],
  369. jobIdKey, returnvalue, timestamp)
  370. else
  371. if parentDependenciesKey ~= nil then
  372. rcall("SADD", parentDependenciesKey, jobIdKey)
  373. end
  374. end
  375. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  376. end
  377. end
  378. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  379. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  380. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  381. if not existedParentKey or existedParentKey == currentParentKey then
  382. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  383. parentDependenciesKey, completedKey, jobKey,
  384. jobId, timestamp)
  385. else
  386. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  387. and (rcall("EXISTS", existedParentKey) == 1) then
  388. return -7
  389. end
  390. end
  391. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  392. "duplicated", "jobId", jobId)
  393. return jobId .. "" -- convert to string
  394. end
  395. --[[
  396. Function to store a job
  397. ]]
  398. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  399. parentKey, parentData, repeatJobKey)
  400. local jsonOpts = cjson.encode(opts)
  401. local delay = opts['delay'] or 0
  402. local priority = opts['priority'] or 0
  403. local debounceId = opts['de'] and opts['de']['id']
  404. local optionalValues = {}
  405. if parentKey ~= nil then
  406. table.insert(optionalValues, "parentKey")
  407. table.insert(optionalValues, parentKey)
  408. table.insert(optionalValues, "parent")
  409. table.insert(optionalValues, parentData)
  410. end
  411. if repeatJobKey then
  412. table.insert(optionalValues, "rjk")
  413. table.insert(optionalValues, repeatJobKey)
  414. end
  415. if debounceId then
  416. table.insert(optionalValues, "deid")
  417. table.insert(optionalValues, debounceId)
  418. end
  419. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  420. "timestamp", timestamp, "delay", delay, "priority", priority,
  421. unpack(optionalValues))
  422. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  423. return delay, priority
  424. end
  425. if parentKey ~= nil then
  426. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  427. parentData = cjson.encode(parent)
  428. end
  429. local jobCounter = rcall("INCR", idKey)
  430. local maxEvents = getOrSetMaxEvents(metaKey)
  431. local parentDependenciesKey = args[6]
  432. local timestamp = args[4]
  433. if args[2] == "" then
  434. jobId = jobCounter
  435. jobIdKey = args[1] .. jobId
  436. else
  437. jobId = args[2]
  438. jobIdKey = args[1] .. jobId
  439. if rcall("EXISTS", jobIdKey) == 1 then
  440. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  441. parentData, parentDependenciesKey, completedKey, eventsKey,
  442. maxEvents, timestamp)
  443. end
  444. end
  445. local deduplicationId = opts['de'] and opts['de']['id']
  446. if deduplicationId then
  447. local deduplicationJobId = deduplicateJobWithoutReplace(deduplicationId, opts['de'],
  448. jobId, deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  449. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  450. if deduplicationJobId then
  451. return deduplicationJobId
  452. end
  453. end
  454. -- Store the job.
  455. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
  456. parentKey, parentData, repeatJobKey)
  457. local waitChildrenKey = KEYS[4]
  458. rcall("ZADD", waitChildrenKey, timestamp, jobId)
  459. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  460. "waiting-children", "jobId", jobId)
  461. -- Check if this job is a child of another job, if so add it to the parents dependencies
  462. if parentDependenciesKey ~= nil then
  463. rcall("SADD", parentDependenciesKey, jobIdKey)
  464. end
  465. return jobId .. "" -- convert to string
  466. `;
  467. export const addParentJob = {
  468. name: 'addParentJob',
  469. content,
  470. keys: 6,
  471. };
  472. //# sourceMappingURL=addParentJob-6.js.map