addStandardJob-9.lua 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. --[[
  2. Adds a job to the queue by doing the following:
  3. - Increases the job counter if needed.
  4. - Creates a new job key with the job data.
  5. - if delayed:
  6. - computes timestamp.
  7. - adds to delayed zset.
  8. - Emits a global event 'delayed' if the job is delayed.
  9. - if not delayed
  10. - Adds the jobId to the wait/paused list in one of three ways:
  11. - LIFO
  12. - FIFO
  13. - prioritized.
  14. - Adds the job to the "added" list so that workers gets notified.
  15. Input:
  16. KEYS[1] 'wait',
  17. KEYS[2] 'paused'
  18. KEYS[3] 'meta'
  19. KEYS[4] 'id'
  20. KEYS[5] 'completed'
  21. KEYS[6] 'delayed'
  22. KEYS[7] 'active'
  23. KEYS[8] events stream key
  24. KEYS[9] marker key
  25. ARGV[1] msgpacked arguments array
  26. [1] key prefix,
  27. [2] custom id (will not generate one automatically)
  28. [3] name
  29. [4] timestamp
  30. [5] parentKey?
  31. [6] parent dependencies key.
  32. [7] parent? {id, queueKey}
  33. [8] repeat job key
  34. [9] deduplication key
  35. ARGV[2] Json stringified job data
  36. ARGV[3] msgpacked options
  37. Output:
  38. jobId - OK
  39. -5 - Missing parent key
  40. ]]
  41. local eventsKey = KEYS[8]
  42. local jobId
  43. local jobIdKey
  44. local rcall = redis.call
  45. local args = cmsgpack.unpack(ARGV[1])
  46. local data = ARGV[2]
  47. local opts = cmsgpack.unpack(ARGV[3])
  48. local parentKey = args[5]
  49. local parent = args[7]
  50. local repeatJobKey = args[8]
  51. local deduplicationKey = args[9]
  52. local parentData
  53. -- Includes
  54. --- @include "includes/addJobInTargetList"
  55. --- @include "includes/deduplicateJob"
  56. --- @include "includes/getOrSetMaxEvents"
  57. --- @include "includes/getTargetQueueList"
  58. --- @include "includes/handleDuplicatedJob"
  59. --- @include "includes/storeJob"
  60. if parentKey ~= nil then
  61. if rcall("EXISTS", parentKey) ~= 1 then return -5 end
  62. parentData = cjson.encode(parent)
  63. end
  64. local jobCounter = rcall("INCR", KEYS[4])
  65. local metaKey = KEYS[3]
  66. local maxEvents = getOrSetMaxEvents(metaKey)
  67. local parentDependenciesKey = args[6]
  68. local timestamp = args[4]
  69. if args[2] == "" then
  70. jobId = jobCounter
  71. jobIdKey = args[1] .. jobId
  72. else
  73. jobId = args[2]
  74. jobIdKey = args[1] .. jobId
  75. if rcall("EXISTS", jobIdKey) == 1 then
  76. return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
  77. parentData, parentDependenciesKey, KEYS[5], eventsKey,
  78. maxEvents, timestamp)
  79. end
  80. end
  81. local deduplicationJobId = deduplicateJob(opts['de'], jobId, KEYS[6],
  82. deduplicationKey, eventsKey, maxEvents, args[1], args[3], ARGV[2], opts,
  83. parentKey, parentData, parentDependenciesKey, repeatJobKey)
  84. if deduplicationJobId then
  85. return deduplicationJobId
  86. end
  87. -- Store the job.
  88. storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
  89. parentKey, parentData, repeatJobKey)
  90. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[1], KEYS[2])
  91. -- LIFO or FIFO
  92. local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
  93. addJobInTargetList(target, KEYS[9], pushCmd, isPausedOrMaxed, jobId)
  94. -- Emit waiting event
  95. rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  96. "jobId", jobId)
  97. -- Check if this job is a child of another job, if so add it to the parents dependencies
  98. if parentDependenciesKey ~= nil then
  99. rcall("SADD", parentDependenciesKey, jobIdKey)
  100. end
  101. return jobId .. "" -- convert to string