moveJobsToWait-8.js 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.moveJobsToWait = void 0;
  4. const content = `--[[
  5. Move completed, failed or delayed jobs to wait.
  6. Note: Does not support jobs with priorities.
  7. Input:
  8. KEYS[1] base key
  9. KEYS[2] events stream
  10. KEYS[3] state key (failed, completed, delayed)
  11. KEYS[4] 'wait'
  12. KEYS[5] 'paused'
  13. KEYS[6] 'meta'
  14. KEYS[7] 'active'
  15. KEYS[8] 'marker'
  16. ARGV[1] count
  17. ARGV[2] timestamp
  18. ARGV[3] prev state
  19. Output:
  20. 1 means the operation is not completed
  21. 0 means the operation is completed
  22. ]]
  23. local maxCount = tonumber(ARGV[1])
  24. local timestamp = tonumber(ARGV[2])
  25. local rcall = redis.call;
  26. -- Includes
  27. --[[
  28. Add marker if needed when a job is available.
  29. ]]
  30. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  31. if not isPausedOrMaxed then
  32. rcall("ZADD", markerKey, 0, "0")
  33. end
  34. end
  35. --[[
  36. Function to loop in batches.
  37. Just a bit of warning, some commands as ZREM
  38. could receive a maximum of 7000 parameters per call.
  39. ]]
  40. local function batches(n, batchSize)
  41. local i = 0
  42. return function()
  43. local from = i * batchSize + 1
  44. i = i + 1
  45. if (from <= n) then
  46. local to = math.min(from + batchSize - 1, n)
  47. return from, to
  48. end
  49. end
  50. end
  51. --[[
  52. Function to get max events value or set by default 10000.
  53. ]]
  54. local function getOrSetMaxEvents(metaKey)
  55. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  56. if not maxEvents then
  57. maxEvents = 10000
  58. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  59. end
  60. return maxEvents
  61. end
  62. --[[
  63. Function to check for the meta.paused key to decide if we are paused or not
  64. (since an empty list and !EXISTS are not really the same).
  65. ]]
  66. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  67. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  68. if queueAttributes[1] then
  69. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  70. else
  71. if queueAttributes[2] then
  72. local activeCount = rcall("LLEN", activeKey)
  73. if activeCount >= tonumber(queueAttributes[2]) then
  74. return waitKey, true, queueAttributes[3], queueAttributes[4]
  75. else
  76. return waitKey, false, queueAttributes[3], queueAttributes[4]
  77. end
  78. end
  79. end
  80. return waitKey, false, queueAttributes[3], queueAttributes[4]
  81. end
  82. local metaKey = KEYS[6]
  83. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[4], KEYS[5])
  84. local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount)
  85. if (#jobs > 0) then
  86. if ARGV[3] == "failed" then
  87. for i, key in ipairs(jobs) do
  88. local jobKey = KEYS[1] .. key
  89. rcall("HDEL", jobKey, "finishedOn", "processedOn", "failedReason")
  90. end
  91. elseif ARGV[3] == "completed" then
  92. for i, key in ipairs(jobs) do
  93. local jobKey = KEYS[1] .. key
  94. rcall("HDEL", jobKey, "finishedOn", "processedOn", "returnvalue")
  95. end
  96. end
  97. local maxEvents = getOrSetMaxEvents(metaKey)
  98. for i, key in ipairs(jobs) do
  99. -- Emit waiting event
  100. rcall("XADD", KEYS[2], "MAXLEN", "~", maxEvents, "*", "event",
  101. "waiting", "jobId", key, "prev", ARGV[3]);
  102. end
  103. for from, to in batches(#jobs, 7000) do
  104. rcall("ZREM", KEYS[3], unpack(jobs, from, to))
  105. rcall("LPUSH", target, unpack(jobs, from, to))
  106. end
  107. addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed)
  108. end
  109. maxCount = maxCount - #jobs
  110. if (maxCount <= 0) then return 1 end
  111. return 0
  112. `;
  113. exports.moveJobsToWait = {
  114. name: 'moveJobsToWait',
  115. content,
  116. keys: 8,
  117. };
  118. //# sourceMappingURL=moveJobsToWait-8.js.map