addDelayedJob-6.lua 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. --[[
  2. Adds a delayed job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - computes timestamp.
  6. - adds to delayed zset.
  7. - Emits a global event 'delayed' if the job is delayed.
  8. Input:
  9. KEYS[1] 'marker',
  10. KEYS[2] 'meta'
  11. KEYS[3] 'id'
  12. KEYS[4] 'delayed'
  13. KEYS[5] 'completed'
  14. KEYS[6] events stream key
  15. ARGV[1] msgpacked arguments array
  16. [1] key prefix,
  17. [2] custom id (use custom instead of one generated automatically)
  18. [3] name
  19. [4] timestamp
  20. [5] parentKey?
  21. [6] parent dependencies key.
  22. [7] parent? {id, queueKey}
  23. [8] repeat job key
  24. [9] deduplication key
  25. ARGV[2] Json stringified job data
  26. ARGV[3] msgpacked options
  27. Output:
  28. jobId - OK
  29. -5 - Missing parent key
  30. ]]
  31. local metaKey = KEYS[2]
  32. local idKey = KEYS[3]
  33. local delayedKey = KEYS[4]
  34. local completedKey = KEYS[5]
  35. local eventsKey = KEYS[6]
  36. local jobId
  37. local jobIdKey
  38. local rcall = redis.call
  39. local args = cmsgpack.unpack(ARGV[1])
  40. local data = ARGV[2]
  41. local parentKey = args[5]
  42. local parent = args[7]
  43. local repeatJobKey = args[8]
  44. local deduplicationKey = args[9]
  45. local parentData
  46. -- Includes
  47. --- @include "includes/addDelayedJob"
  48. --- @include "includes/deduplicateJob"
  49. --- @include "includes/getOrSetMaxEvents"
  50. --- @include "includes/handleDuplicatedJob"
  51. --- @include "includes/storeJob"
  52. if parentKey ~= nil then
  53. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  54. parentData = cjson.encode(parent)
  55. end
  56. local jobCounter = rcall("INCR", idKey)
  57. local maxEvents = getOrSetMaxEvents(metaKey)
  58. local opts = cmsgpack.unpack(ARGV[3])
  59. local parentDependenciesKey = args[6]
  60. local timestamp = args[4]
  61. if args[2] == "" then
  62. jobId = jobCounter
  63. jobIdKey = args[1] .. jobId
  64. else
  65. jobId = args[2]
  66. jobIdKey = args[1] .. jobId
  67. if rcall("EXISTS", jobIdKey) == 1 then
  68. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  69. parentData, parentDependenciesKey, completedKey, eventsKey,
  70. maxEvents, timestamp)
  71. end
  72. end
  73. local deduplicationJobId = deduplicateJob(opts['de'], jobId, delayedKey, deduplicationKey,
  74. eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  75. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  76. if deduplicationJobId then
  77. return deduplicationJobId
  78. end
  79. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
  80. opts, timestamp, parentKey, parentData, repeatJobKey)
  81. addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, KEYS[1], delay)
  82. -- Check if this job is a child of another job, if so add it to the parents dependencies
  83. if parentDependenciesKey ~= nil then
  84. rcall("SADD", parentDependenciesKey, jobIdKey)
  85. end
  86. return jobId .. "" -- convert to string