reprocessJob-8.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.reprocessJob = void 0;
  4. const content = `--[[
  5. Attempts to reprocess a job
  6. Input:
  7. KEYS[1] job key
  8. KEYS[2] events stream
  9. KEYS[3] job state
  10. KEYS[4] wait key
  11. KEYS[5] meta
  12. KEYS[6] paused key
  13. KEYS[7] active key
  14. KEYS[8] marker key
  15. ARGV[1] job.id
  16. ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH'
  17. ARGV[3] propVal - failedReason/returnvalue
  18. ARGV[4] prev state - failed/completed
  19. ARGV[5] reset attemptsMade - "1" or "0"
  20. ARGV[6] reset attemptsStarted - "1" or "0"
  21. Output:
  22. 1 means the operation was a success
  23. -1 means the job does not exist
  24. -3 means the job was not found in the expected set.
  25. ]]
  26. local rcall = redis.call;
  27. -- Includes
  28. --[[
  29. Function to add job in target list and add marker if needed.
  30. ]]
  31. -- Includes
  32. --[[
  33. Add marker if needed when a job is available.
  34. ]]
  35. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  36. if not isPausedOrMaxed then
  37. rcall("ZADD", markerKey, 0, "0")
  38. end
  39. end
  40. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  41. rcall(pushCmd, targetKey, jobId)
  42. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  43. end
  44. --[[
  45. Function to get max events value or set by default 10000.
  46. ]]
  47. local function getOrSetMaxEvents(metaKey)
  48. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  49. if not maxEvents then
  50. maxEvents = 10000
  51. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  52. end
  53. return maxEvents
  54. end
  55. --[[
  56. Function to check for the meta.paused key to decide if we are paused or not
  57. (since an empty list and !EXISTS are not really the same).
  58. ]]
  59. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  60. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  61. if queueAttributes[1] then
  62. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  63. else
  64. if queueAttributes[2] then
  65. local activeCount = rcall("LLEN", activeKey)
  66. if activeCount >= tonumber(queueAttributes[2]) then
  67. return waitKey, true, queueAttributes[3], queueAttributes[4]
  68. else
  69. return waitKey, false, queueAttributes[3], queueAttributes[4]
  70. end
  71. end
  72. end
  73. return waitKey, false, queueAttributes[3], queueAttributes[4]
  74. end
  75. local jobKey = KEYS[1]
  76. if rcall("EXISTS", jobKey) == 1 then
  77. local jobId = ARGV[1]
  78. if (rcall("ZREM", KEYS[3], jobId) == 1) then
  79. local attributesToRemove = {}
  80. if ARGV[5] == "1" then
  81. table.insert(attributesToRemove, "atm")
  82. end
  83. if ARGV[6] == "1" then
  84. table.insert(attributesToRemove, "ats")
  85. end
  86. rcall("HDEL", jobKey, "finishedOn", "processedOn", ARGV[3], unpack(attributesToRemove))
  87. local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6])
  88. addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId)
  89. local parentKey = rcall("HGET", jobKey, "parentKey")
  90. if parentKey and rcall("EXISTS", parentKey) == 1 then
  91. if ARGV[4] == "failed" then
  92. if rcall("ZREM", parentKey .. ":unsuccessful", jobKey) == 1 or
  93. rcall("ZREM", parentKey .. ":failed", jobKey) == 1 then
  94. rcall("SADD", parentKey .. ":dependencies", jobKey)
  95. end
  96. else
  97. if rcall("HDEL", parentKey .. ":processed", jobKey) == 1 then
  98. rcall("SADD", parentKey .. ":dependencies", jobKey)
  99. end
  100. end
  101. end
  102. local maxEvents = getOrSetMaxEvents(KEYS[5])
  103. -- Emit waiting event
  104. rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  105. "jobId", jobId, "prev", ARGV[4]);
  106. return 1
  107. else
  108. return -3
  109. end
  110. else
  111. return -1
  112. end
  113. `;
  114. exports.reprocessJob = {
  115. name: 'reprocessJob',
  116. content,
  117. keys: 8,
  118. };
  119. //# sourceMappingURL=reprocessJob-8.js.map