addParentJob-6.lua 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. --[[
  2. Adds a parent job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - adds the job to the waiting-children zset
  6. Input:
  7. KEYS[1] 'meta'
  8. KEYS[2] 'id'
  9. KEYS[3] 'delayed'
  10. KEYS[4] 'waiting-children'
  11. KEYS[5] 'completed'
  12. KEYS[6] events stream key
  13. ARGV[1] msgpacked arguments array
  14. [1] key prefix,
  15. [2] custom id (will not generate one automatically)
  16. [3] name
  17. [4] timestamp
  18. [5] parentKey?
  19. [6] parent dependencies key.
  20. [7] parent? {id, queueKey}
  21. [8] repeat job key
  22. [9] deduplication key
  23. ARGV[2] Json stringified job data
  24. ARGV[3] msgpacked options
  25. Output:
  26. jobId - OK
  27. -5 - Missing parent key
  28. ]]
  29. local metaKey = KEYS[1]
  30. local idKey = KEYS[2]
  31. local delayedKey = KEYS[3]
  32. local completedKey = KEYS[5]
  33. local eventsKey = KEYS[6]
  34. local jobId
  35. local jobIdKey
  36. local rcall = redis.call
  37. local args = cmsgpack.unpack(ARGV[1])
  38. local data = ARGV[2]
  39. local opts = cmsgpack.unpack(ARGV[3])
  40. local parentKey = args[5]
  41. local parent = args[7]
  42. local repeatJobKey = args[8]
  43. local deduplicationKey = args[9]
  44. local parentData
  45. -- Includes
  46. --- @include "includes/deduplicateJobWithoutReplace"
  47. --- @include "includes/getOrSetMaxEvents"
  48. --- @include "includes/handleDuplicatedJob"
  49. --- @include "includes/storeJob"
  50. if parentKey ~= nil then
  51. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  52. parentData = cjson.encode(parent)
  53. end
  54. local jobCounter = rcall("INCR", idKey)
  55. local maxEvents = getOrSetMaxEvents(metaKey)
  56. local parentDependenciesKey = args[6]
  57. local timestamp = args[4]
  58. if args[2] == "" then
  59. jobId = jobCounter
  60. jobIdKey = args[1] .. jobId
  61. else
  62. jobId = args[2]
  63. jobIdKey = args[1] .. jobId
  64. if rcall("EXISTS", jobIdKey) == 1 then
  65. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  66. parentData, parentDependenciesKey, completedKey, eventsKey,
  67. maxEvents, timestamp)
  68. end
  69. end
  70. local deduplicationId = opts['de'] and opts['de']['id']
  71. if deduplicationId then
  72. local deduplicationJobId = deduplicateJobWithoutReplace(deduplicationId, opts['de'],
  73. jobId, deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  74. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  75. if deduplicationJobId then
  76. return deduplicationJobId
  77. end
  78. end
  79. -- Store the job.
  80. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
  81. parentKey, parentData, repeatJobKey)
  82. local waitChildrenKey = KEYS[4]
  83. rcall("ZADD", waitChildrenKey, timestamp, jobId)
  84. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
  85. "waiting-children", "jobId", jobId)
  86. -- Check if this job is a child of another job, if so add it to the parents dependencies
  87. if parentDependenciesKey ~= nil then
  88. rcall("SADD", parentDependenciesKey, jobIdKey)
  89. end
  90. return jobId .. "" -- convert to string