reprocessJob-8.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. const content = `--[[
  2. Attempts to reprocess a job
  3. Input:
  4. KEYS[1] job key
  5. KEYS[2] events stream
  6. KEYS[3] job state
  7. KEYS[4] wait key
  8. KEYS[5] meta
  9. KEYS[6] paused key
  10. KEYS[7] active key
  11. KEYS[8] marker key
  12. ARGV[1] job.id
  13. ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH'
  14. ARGV[3] propVal - failedReason/returnvalue
  15. ARGV[4] prev state - failed/completed
  16. ARGV[5] reset attemptsMade - "1" or "0"
  17. ARGV[6] reset attemptsStarted - "1" or "0"
  18. Output:
  19. 1 means the operation was a success
  20. -1 means the job does not exist
  21. -3 means the job was not found in the expected set.
  22. ]]
  23. local rcall = redis.call;
  24. -- Includes
  25. --[[
  26. Function to add job in target list and add marker if needed.
  27. ]]
  28. -- Includes
  29. --[[
  30. Add marker if needed when a job is available.
  31. ]]
  32. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  33. if not isPausedOrMaxed then
  34. rcall("ZADD", markerKey, 0, "0")
  35. end
  36. end
  37. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  38. rcall(pushCmd, targetKey, jobId)
  39. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  40. end
  41. --[[
  42. Function to get max events value or set by default 10000.
  43. ]]
  44. local function getOrSetMaxEvents(metaKey)
  45. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  46. if not maxEvents then
  47. maxEvents = 10000
  48. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  49. end
  50. return maxEvents
  51. end
  52. --[[
  53. Function to check for the meta.paused key to decide if we are paused or not
  54. (since an empty list and !EXISTS are not really the same).
  55. ]]
  56. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  57. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  58. if queueAttributes[1] then
  59. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  60. else
  61. if queueAttributes[2] then
  62. local activeCount = rcall("LLEN", activeKey)
  63. if activeCount >= tonumber(queueAttributes[2]) then
  64. return waitKey, true, queueAttributes[3], queueAttributes[4]
  65. else
  66. return waitKey, false, queueAttributes[3], queueAttributes[4]
  67. end
  68. end
  69. end
  70. return waitKey, false, queueAttributes[3], queueAttributes[4]
  71. end
  72. local jobKey = KEYS[1]
  73. if rcall("EXISTS", jobKey) == 1 then
  74. local jobId = ARGV[1]
  75. if (rcall("ZREM", KEYS[3], jobId) == 1) then
  76. local attributesToRemove = {}
  77. if ARGV[5] == "1" then
  78. table.insert(attributesToRemove, "atm")
  79. end
  80. if ARGV[6] == "1" then
  81. table.insert(attributesToRemove, "ats")
  82. end
  83. rcall("HDEL", jobKey, "finishedOn", "processedOn", ARGV[3], unpack(attributesToRemove))
  84. local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6])
  85. addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId)
  86. local parentKey = rcall("HGET", jobKey, "parentKey")
  87. if parentKey and rcall("EXISTS", parentKey) == 1 then
  88. if ARGV[4] == "failed" then
  89. if rcall("ZREM", parentKey .. ":unsuccessful", jobKey) == 1 or
  90. rcall("ZREM", parentKey .. ":failed", jobKey) == 1 then
  91. rcall("SADD", parentKey .. ":dependencies", jobKey)
  92. end
  93. else
  94. if rcall("HDEL", parentKey .. ":processed", jobKey) == 1 then
  95. rcall("SADD", parentKey .. ":dependencies", jobKey)
  96. end
  97. end
  98. end
  99. local maxEvents = getOrSetMaxEvents(KEYS[5])
  100. -- Emit waiting event
  101. rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  102. "jobId", jobId, "prev", ARGV[4]);
  103. return 1
  104. else
  105. return -3
  106. end
  107. else
  108. return -1
  109. end
  110. `;
  111. export const reprocessJob = {
  112. name: 'reprocessJob',
  113. content,
  114. keys: 8,
  115. };
  116. //# sourceMappingURL=reprocessJob-8.js.map