moveJobFromActiveToWait-9.js 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.moveJobFromActiveToWait = void 0;
  4. const content = `--[[
  5. Function to move job from active state to wait.
  6. Input:
  7. KEYS[1] active key
  8. KEYS[2] wait key
  9. KEYS[3] stalled key
  10. KEYS[4] paused key
  11. KEYS[5] meta key
  12. KEYS[6] limiter key
  13. KEYS[7] prioritized key
  14. KEYS[8] marker key
  15. KEYS[9] event key
  16. ARGV[1] job id
  17. ARGV[2] lock token
  18. ARGV[3] job id key
  19. ]]
  20. local rcall = redis.call
  21. -- Includes
  22. --[[
  23. Function to add job in target list and add marker if needed.
  24. ]]
  25. -- Includes
  26. --[[
  27. Add marker if needed when a job is available.
  28. ]]
  29. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  30. if not isPausedOrMaxed then
  31. rcall("ZADD", markerKey, 0, "0")
  32. end
  33. end
  34. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  35. rcall(pushCmd, targetKey, jobId)
  36. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  37. end
  38. --[[
  39. Function to push back job considering priority in front of same prioritized jobs.
  40. ]]
  41. local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
  42. -- in order to put it at front of same prioritized jobs
  43. -- we consider prioritized counter as 0
  44. local score = priority * 0x100000000
  45. rcall("ZADD", prioritizedKey, score, jobId)
  46. end
  47. --[[
  48. Function to get max events value or set by default 10000.
  49. ]]
  50. local function getOrSetMaxEvents(metaKey)
  51. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  52. if not maxEvents then
  53. maxEvents = 10000
  54. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  55. end
  56. return maxEvents
  57. end
  58. --[[
  59. Function to check for the meta.paused key to decide if we are paused or not
  60. (since an empty list and !EXISTS are not really the same).
  61. ]]
  62. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  63. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  64. if queueAttributes[1] then
  65. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  66. else
  67. if queueAttributes[2] then
  68. local activeCount = rcall("LLEN", activeKey)
  69. if activeCount >= tonumber(queueAttributes[2]) then
  70. return waitKey, true, queueAttributes[3], queueAttributes[4]
  71. else
  72. return waitKey, false, queueAttributes[3], queueAttributes[4]
  73. end
  74. end
  75. end
  76. return waitKey, false, queueAttributes[3], queueAttributes[4]
  77. end
  78. local function removeLock(jobKey, stalledKey, token, jobId)
  79. if token ~= "0" then
  80. local lockKey = jobKey .. ':lock'
  81. local lockToken = rcall("GET", lockKey)
  82. if lockToken == token then
  83. rcall("DEL", lockKey)
  84. rcall("SREM", stalledKey, jobId)
  85. else
  86. if lockToken then
  87. -- Lock exists but token does not match
  88. return -6
  89. else
  90. -- Lock is missing completely
  91. return -2
  92. end
  93. end
  94. end
  95. return 0
  96. end
  97. local jobId = ARGV[1]
  98. local token = ARGV[2]
  99. local jobKey = ARGV[3]
  100. if rcall("EXISTS", jobKey) == 0 then
  101. return -1
  102. end
  103. local errorCode = removeLock(jobKey, KEYS[3], token, jobId)
  104. if errorCode < 0 then
  105. return errorCode
  106. end
  107. local metaKey = KEYS[5]
  108. local removed = rcall("LREM", KEYS[1], 1, jobId)
  109. if removed > 0 then
  110. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4])
  111. local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0
  112. if priority > 0 then
  113. pushBackJobWithPriority(KEYS[7], priority, jobId)
  114. else
  115. addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId)
  116. end
  117. local maxEvents = getOrSetMaxEvents(metaKey)
  118. -- Emit waiting event
  119. rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  120. "jobId", jobId, "prev", "active")
  121. end
  122. local pttl = rcall("PTTL", KEYS[6])
  123. if pttl > 0 then
  124. return pttl
  125. else
  126. return 0
  127. end
  128. `;
  129. exports.moveJobFromActiveToWait = {
  130. name: 'moveJobFromActiveToWait',
  131. content,
  132. keys: 9,
  133. };
  134. //# sourceMappingURL=moveJobFromActiveToWait-9.js.map