addPrioritizedJob-9.lua 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. --[[
  2. Adds a priotitized job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - Adds the job to the "added" list so that workers gets notified.
  6. Input:
  7. KEYS[1] 'marker',
  8. KEYS[2] 'meta'
  9. KEYS[3] 'id'
  10. KEYS[4] 'prioritized'
  11. KEYS[5] 'delayed'
  12. KEYS[6] 'completed'
  13. KEYS[7] 'active'
  14. KEYS[8] events stream key
  15. KEYS[9] 'pc' priority counter
  16. ARGV[1] msgpacked arguments array
  17. [1] key prefix,
  18. [2] custom id (will not generate one automatically)
  19. [3] name
  20. [4] timestamp
  21. [5] parentKey?
  22. [6] parent dependencies key.
  23. [7] parent? {id, queueKey}
  24. [8] repeat job key
  25. [9] deduplication key
  26. ARGV[2] Json stringified job data
  27. ARGV[3] msgpacked options
  28. Output:
  29. jobId - OK
  30. -5 - Missing parent key
  31. ]]
  32. local metaKey = KEYS[2]
  33. local idKey = KEYS[3]
  34. local priorityKey = KEYS[4]
  35. local completedKey = KEYS[6]
  36. local activeKey = KEYS[7]
  37. local eventsKey = KEYS[8]
  38. local priorityCounterKey = KEYS[9]
  39. local jobId
  40. local jobIdKey
  41. local rcall = redis.call
  42. local args = cmsgpack.unpack(ARGV[1])
  43. local data = ARGV[2]
  44. local opts = cmsgpack.unpack(ARGV[3])
  45. local parentKey = args[5]
  46. local parent = args[7]
  47. local repeatJobKey = args[8]
  48. local deduplicationKey = args[9]
  49. local parentData
  50. -- Includes
  51. --- @include "includes/addJobWithPriority"
  52. --- @include "includes/deduplicateJob"
  53. --- @include "includes/storeJob"
  54. --- @include "includes/getOrSetMaxEvents"
  55. --- @include "includes/handleDuplicatedJob"
  56. --- @include "includes/isQueuePausedOrMaxed"
  57. if parentKey ~= nil then
  58. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  59. parentData = cjson.encode(parent)
  60. end
  61. local jobCounter = rcall("INCR", idKey)
  62. local maxEvents = getOrSetMaxEvents(metaKey)
  63. local parentDependenciesKey = args[6]
  64. local timestamp = args[4]
  65. if args[2] == "" then
  66. jobId = jobCounter
  67. jobIdKey = args[1] .. jobId
  68. else
  69. jobId = args[2]
  70. jobIdKey = args[1] .. jobId
  71. if rcall("EXISTS", jobIdKey) == 1 then
  72. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  73. parentData, parentDependenciesKey, completedKey, eventsKey,
  74. maxEvents, timestamp)
  75. end
  76. end
  77. local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[5],
  78. deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  79. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  80. if deduplicationJobId then
  81. return deduplicationJobId
  82. end
  83. -- Store the job.
  84. local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
  85. opts, timestamp, parentKey, parentData,
  86. repeatJobKey)
  87. -- Add the job to the prioritized set
  88. local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
  89. addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)
  90. -- Emit waiting event
  91. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  92. "jobId", jobId)
  93. -- Check if this job is a child of another job, if so add it to the parents dependencies
  94. if parentDependenciesKey ~= nil then
  95. rcall("SADD", parentDependenciesKey, jobIdKey)
  96. end
  97. return jobId .. "" -- convert to string