addParentJob-6.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.addParentJob = void 0;
  4. const content = `--[[
  5. Adds a parent job to the queue by doing the following:
  6. - Increases the job counter if needed.
  7. - Creates a new job key with the job data.
  8. - adds the job to the waiting-children zset
  9. Input:
  10. KEYS[1] 'meta'
  11. KEYS[2] 'id'
  12. KEYS[3] 'delayed'
  13. KEYS[4] 'waiting-children'
  14. KEYS[5] 'completed'
  15. KEYS[6] events stream key
  16. ARGV[1] msgpacked arguments array
  17. [1] key prefix,
  18. [2] custom id (will not generate one automatically)
  19. [3] name
  20. [4] timestamp
  21. [5] parentKey?
  22. [6] parent dependencies key.
  23. [7] parent? {id, queueKey}
  24. [8] repeat job key
  25. [9] deduplication key
  26. ARGV[2] Json stringified job data
  27. ARGV[3] msgpacked options
  28. Output:
  29. jobId - OK
  30. -5 - Missing parent key
  31. ]]
  32. local metaKey = KEYS[1]
  33. local idKey = KEYS[2]
  34. local delayedKey = KEYS[3]
  35. local completedKey = KEYS[5]
  36. local eventsKey = KEYS[6]
  37. local jobId
  38. local jobIdKey
  39. local rcall = redis.call
  40. local args = cmsgpack.unpack(ARGV[1])
  41. local data = ARGV[2]
  42. local opts = cmsgpack.unpack(ARGV[3])
  43. local parentKey = args[5]
  44. local parent = args[7]
  45. local repeatJobKey = args[8]
  46. local deduplicationKey = args[9]
  47. local parentData
  48. -- Includes
  49. --[[
  50. Function to deduplicate a job.
  51. ]]
  52. --[[
  53. Function to set the deduplication key for a job.
  54. Uses TTL from deduplication opts if provided.
  55. ]]
  56. local function setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  57. local ttl = deduplicationOpts and deduplicationOpts['ttl']
  58. if ttl and ttl > 0 then
  59. rcall('SET', deduplicationKey, jobId, 'PX', ttl)
  60. else
  61. rcall('SET', deduplicationKey, jobId)
  62. end
  63. end
  64. --[[
  65. Function to store a deduplicated next job if the existing job is active
  66. and keepLastIfActive is set. When the active job finishes, the stored
  67. proto-job is used to create a real job in the queue.
  68. Returns true if the proto-job was stored, false otherwise.
  69. ]]
  70. --[[
  71. Functions to check if a item belongs to a list.
  72. ]]
  73. local function checkItemInList(list, item)
  74. for _, v in pairs(list) do
  75. if v == item then
  76. return 1
  77. end
  78. end
  79. return nil
  80. end
  81. local function storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  82. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  83. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  84. if deduplicationOpts['keepLastIfActive'] and currentDebounceJobId then
  85. local activeKey = prefix .. "active"
  86. local activeItems = rcall('LRANGE', activeKey, 0, -1)
  87. if checkItemInList(activeItems, currentDebounceJobId) then
  88. local deduplicationNextKey = prefix .. "dn:" .. deduplicationId
  89. local fields = {'name', jobName, 'data', jobData, 'opts', cjson.encode(fullOpts)}
  90. if parentKey then
  91. fields[#fields+1] = 'pk'
  92. fields[#fields+1] = parentKey
  93. end
  94. if parentData then
  95. fields[#fields+1] = 'pd'
  96. fields[#fields+1] = parentData
  97. end
  98. if parentDependenciesKey then
  99. fields[#fields+1] = 'pdk'
  100. fields[#fields+1] = parentDependenciesKey
  101. end
  102. if repeatJobKey then
  103. fields[#fields+1] = 'rjk'
  104. fields[#fields+1] = repeatJobKey
  105. end
  106. rcall('HSET', deduplicationNextKey, unpack(fields))
  107. -- Ensure the dedup key does not expire while the job is active,
  108. -- so subsequent adds always hit the dedup path and never bypass
  109. -- the active-check because of a TTL expiry.
  110. local deduplicationKey = prefix .. "de:" .. deduplicationId
  111. rcall('PERSIST', deduplicationKey)
  112. -- TODO remove debounced event in next breaking change
  113. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  114. currentDebounceJobId, "debounceId", deduplicationId)
  115. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  116. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  117. return true
  118. end
  119. end
  120. return false
  121. end
  122. local function deduplicateJobWithoutReplace(deduplicationId, deduplicationOpts, jobId, deduplicationKey,
  123. eventsKey, maxEvents, prefix, jobName, jobData, fullOpts,
  124. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  125. local ttl = deduplicationOpts['ttl']
  126. local deduplicationKeyExists
  127. if ttl and ttl > 0 then
  128. if deduplicationOpts['extend'] then
  129. local currentDebounceJobId = rcall('GET', deduplicationKey)
  130. if currentDebounceJobId then
  131. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  132. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  133. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  134. return currentDebounceJobId
  135. end
  136. if deduplicationOpts['keepLastIfActive'] then
  137. rcall('SET', deduplicationKey, currentDebounceJobId)
  138. else
  139. setDeduplicationKey(deduplicationKey, currentDebounceJobId, deduplicationOpts)
  140. end
  141. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced",
  142. "jobId", currentDebounceJobId, "debounceId", deduplicationId)
  143. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  144. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  145. return currentDebounceJobId
  146. else
  147. if deduplicationOpts['keepLastIfActive'] then
  148. rcall('SET', deduplicationKey, jobId)
  149. else
  150. setDeduplicationKey(deduplicationKey, jobId, deduplicationOpts)
  151. end
  152. return
  153. end
  154. else
  155. if deduplicationOpts['keepLastIfActive'] then
  156. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  157. else
  158. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'PX', ttl, 'NX')
  159. end
  160. end
  161. else
  162. deduplicationKeyExists = not rcall('SET', deduplicationKey, jobId, 'NX')
  163. end
  164. if deduplicationKeyExists then
  165. local currentDebounceJobId = rcall('GET', deduplicationKey)
  166. if storeDeduplicatedNextJob(deduplicationOpts, currentDebounceJobId, prefix,
  167. deduplicationId, jobName, jobData, fullOpts, eventsKey, maxEvents, jobId,
  168. parentKey, parentData, parentDependenciesKey, repeatJobKey) then
  169. return currentDebounceJobId
  170. end
  171. -- TODO remove debounced event in next breaking change
  172. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "debounced", "jobId",
  173. currentDebounceJobId, "debounceId", deduplicationId)
  174. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "deduplicated", "jobId",
  175. currentDebounceJobId, "deduplicationId", deduplicationId, "deduplicatedJobId", jobId)
  176. return currentDebounceJobId
  177. end
  178. end
  179. --[[
  180. Function to get max events value or set by default 10000.
  181. ]]
  182. local function getOrSetMaxEvents(metaKey)
  183. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  184. if not maxEvents then
  185. maxEvents = 10000
  186. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  187. end
  188. return maxEvents
  189. end
  190. --[[
  191. Function to handle the case when job is duplicated.
  192. ]]
  193. -- Includes
  194. --[[
  195. This function is used to update the parent's dependencies if the job
  196. is already completed and about to be ignored. The parent must get its
  197. dependencies updated to avoid the parent job being stuck forever in
  198. the waiting-children state.
  199. ]]
  200. -- Includes
  201. --[[
  202. Validate and move or add dependencies to parent.
  203. ]]
  204. -- Includes
  205. --[[
  206. Validate and move parent to a wait status (waiting, delayed or prioritized)
  207. if no pending dependencies.
  208. ]]
  209. -- Includes
  210. --[[
  211. Validate and move parent to a wait status (waiting, delayed or prioritized) if needed.
  212. ]]
  213. -- Includes
  214. --[[
  215. Move parent to a wait status (wait, prioritized or delayed)
  216. ]]
  217. -- Includes
  218. --[[
  219. Add delay marker if needed.
  220. ]]
  221. -- Includes
  222. --[[
  223. Function to return the next delayed job timestamp.
  224. ]]
  225. local function getNextDelayedTimestamp(delayedKey)
  226. local result = rcall("ZRANGE", delayedKey, 0, 0, "WITHSCORES")
  227. if #result then
  228. local nextTimestamp = tonumber(result[2])
  229. if nextTimestamp ~= nil then
  230. return nextTimestamp / 0x1000
  231. end
  232. end
  233. end
  234. local function addDelayMarkerIfNeeded(markerKey, delayedKey)
  235. local nextTimestamp = getNextDelayedTimestamp(delayedKey)
  236. if nextTimestamp ~= nil then
  237. -- Replace the score of the marker with the newest known
  238. -- next timestamp.
  239. rcall("ZADD", markerKey, nextTimestamp, "1")
  240. end
  241. end
  242. --[[
  243. Function to add job in target list and add marker if needed.
  244. ]]
  245. -- Includes
  246. --[[
  247. Add marker if needed when a job is available.
  248. ]]
  249. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  250. if not isPausedOrMaxed then
  251. rcall("ZADD", markerKey, 0, "0")
  252. end
  253. end
  254. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  255. rcall(pushCmd, targetKey, jobId)
  256. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  257. end
  258. --[[
  259. Function to add job considering priority.
  260. ]]
  261. -- Includes
  262. --[[
  263. Function to get priority score.
  264. ]]
  265. local function getPriorityScore(priority, priorityCounterKey)
  266. local prioCounter = rcall("INCR", priorityCounterKey)
  267. return priority * 0x100000000 + prioCounter % 0x100000000
  268. end
  269. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  270. isPausedOrMaxed)
  271. local score = getPriorityScore(priority, priorityCounterKey)
  272. rcall("ZADD", prioritizedKey, score, jobId)
  273. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  274. end
  275. --[[
  276. Function to check if queue is paused or maxed
  277. (since an empty list and !EXISTS are not really the same).
  278. ]]
  279. local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
  280. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")
  281. if queueAttributes[1] then
  282. return true
  283. else
  284. if queueAttributes[2] then
  285. local activeCount = rcall("LLEN", activeKey)
  286. return activeCount >= tonumber(queueAttributes[2])
  287. end
  288. end
  289. return false
  290. end
  291. --[[
  292. Function to check for the meta.paused key to decide if we are paused or not
  293. (since an empty list and !EXISTS are not really the same).
  294. ]]
  295. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  296. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  297. if queueAttributes[1] then
  298. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  299. else
  300. if queueAttributes[2] then
  301. local activeCount = rcall("LLEN", activeKey)
  302. if activeCount >= tonumber(queueAttributes[2]) then
  303. return waitKey, true, queueAttributes[3], queueAttributes[4]
  304. else
  305. return waitKey, false, queueAttributes[3], queueAttributes[4]
  306. end
  307. end
  308. end
  309. return waitKey, false, queueAttributes[3], queueAttributes[4]
  310. end
  311. local function moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  312. local parentWaitKey = parentQueueKey .. ":wait"
  313. local parentPausedKey = parentQueueKey .. ":paused"
  314. local parentActiveKey = parentQueueKey .. ":active"
  315. local parentMetaKey = parentQueueKey .. ":meta"
  316. local parentMarkerKey = parentQueueKey .. ":marker"
  317. local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
  318. local priority = tonumber(jobAttributes[1]) or 0
  319. local delay = tonumber(jobAttributes[2]) or 0
  320. if delay > 0 then
  321. local delayedTimestamp = tonumber(timestamp) + delay
  322. local score = delayedTimestamp * 0x1000
  323. local parentDelayedKey = parentQueueKey .. ":delayed"
  324. rcall("ZADD", parentDelayedKey, score, parentId)
  325. rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, "delay",
  326. delayedTimestamp)
  327. addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
  328. else
  329. if priority == 0 then
  330. local parentTarget, isParentPausedOrMaxed = getTargetQueueList(parentMetaKey, parentActiveKey,
  331. parentWaitKey, parentPausedKey)
  332. addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
  333. else
  334. local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
  335. addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, parentId,
  336. parentQueueKey .. ":pc", isPausedOrMaxed)
  337. end
  338. rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, "prev",
  339. "waiting-children")
  340. end
  341. end
  342. local function moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  343. if rcall("EXISTS", parentKey) == 1 then
  344. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  345. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  346. rcall("ZREM", parentWaitingChildrenKey, parentId)
  347. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  348. end
  349. end
  350. end
  351. local function moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey,
  352. parentId, timestamp)
  353. local doNotHavePendingDependencies = rcall("SCARD", parentDependenciesKey) == 0
  354. if doNotHavePendingDependencies then
  355. moveParentToWaitIfNeeded(parentQueueKey, parentKey, parentId, timestamp)
  356. end
  357. end
  358. local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,
  359. parentId, jobIdKey, returnvalue, timestamp )
  360. local processedSet = parentKey .. ":processed"
  361. rcall("HSET", processedSet, jobIdKey, returnvalue)
  362. moveParentToWaitIfNoPendingDependencies(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
  363. end
  364. local function updateExistingJobsParent(parentKey, parent, parentData,
  365. parentDependenciesKey, completedKey,
  366. jobIdKey, jobId, timestamp)
  367. if parentKey ~= nil then
  368. if rcall("ZSCORE", completedKey, jobId) then
  369. local returnvalue = rcall("HGET", jobIdKey, "returnvalue")
  370. updateParentDepsIfNeeded(parentKey, parent['queueKey'],
  371. parentDependenciesKey, parent['id'],
  372. jobIdKey, returnvalue, timestamp)
  373. else
  374. if parentDependenciesKey ~= nil then
  375. rcall("SADD", parentDependenciesKey, jobIdKey)
  376. end
  377. end
  378. rcall("HMSET", jobIdKey, "parentKey", parentKey, "parent", parentData)
  379. end
  380. end
  381. local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
  382. parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
  383. local existedParentKey = rcall("HGET", jobKey, "parentKey")
  384. if not existedParentKey or existedParentKey == currentParentKey then
  385. updateExistingJobsParent(currentParentKey, currentParent, parentData,
  386. parentDependenciesKey, completedKey, jobKey,
  387. jobId, timestamp)
  388. else
  389. if currentParentKey ~= nil and currentParentKey ~= existedParentKey
  390. and (rcall("EXISTS", existedParentKey) == 1) then
  391. return -7
  392. end
  393. end
  394. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  395. "duplicated", "jobId", jobId)
  396. return jobId .. "" -- convert to string
  397. end
  398. --[[
  399. Function to store a job
  400. ]]
  401. local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
  402. parentKey, parentData, repeatJobKey)
  403. local jsonOpts = cjson.encode(opts)
  404. local delay = opts['delay'] or 0
  405. local priority = opts['priority'] or 0
  406. local debounceId = opts['de'] and opts['de']['id']
  407. local optionalValues = {}
  408. if parentKey ~= nil then
  409. table.insert(optionalValues, "parentKey")
  410. table.insert(optionalValues, parentKey)
  411. table.insert(optionalValues, "parent")
  412. table.insert(optionalValues, parentData)
  413. end
  414. if repeatJobKey then
  415. table.insert(optionalValues, "rjk")
  416. table.insert(optionalValues, repeatJobKey)
  417. end
  418. if debounceId then
  419. table.insert(optionalValues, "deid")
  420. table.insert(optionalValues, debounceId)
  421. end
  422. rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
  423. "timestamp", timestamp, "delay", delay, "priority", priority,
  424. unpack(optionalValues))
  425. rcall("XADD", eventsKey, "*", "event", "added", "jobId", jobId, "name", name)
  426. return delay, priority
  427. end
  428. if parentKey ~= nil then
  429. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  430. parentData = cjson.encode(parent)
  431. end
  432. local jobCounter = rcall("INCR", idKey)
  433. local maxEvents = getOrSetMaxEvents(metaKey)
  434. local parentDependenciesKey = args[6]
  435. local timestamp = args[4]
  436. if args[2] == "" then
  437. jobId = jobCounter
  438. jobIdKey = args[1] .. jobId
  439. else
  440. jobId = args[2]
  441. jobIdKey = args[1] .. jobId
  442. if rcall("EXISTS", jobIdKey) == 1 then
  443. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  444. parentData, parentDependenciesKey, completedKey, eventsKey,
  445. maxEvents, timestamp)
  446. end
  447. end
  448. local deduplicationId = opts['de'] and opts['de']['id']
  449. if deduplicationId then
  450. local deduplicationJobId = deduplicateJobWithoutReplace(deduplicationId, opts['de'],
  451. jobId, deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  452. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  453. if deduplicationJobId then
  454. return deduplicationJobId
  455. end
  456. end
  457. -- Store the job.
  458. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
  459. parentKey, parentData, repeatJobKey)
  460. local waitChildrenKey = KEYS[4]
  461. rcall("ZADD", waitChildrenKey, timestamp, jobId)
  462. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  463. "waiting-children", "jobId", jobId)
  464. -- Check if this job is a child of another job, if so add it to the parents dependencies
  465. if parentDependenciesKey ~= nil then
  466. rcall("SADD", parentDependenciesKey, jobIdKey)
  467. end
  468. return jobId .. "" -- convert to string
  469. `;
  470. exports.addParentJob = {
  471. name: 'addParentJob',
  472. content,
  473. keys: 6,
  474. };
  475. //# sourceMappingURL=addParentJob-6.js.map