moveJobFromActiveToWait-9.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. const content = `--[[
  2. Function to move job from active state to wait.
  3. Input:
  4. KEYS[1] active key
  5. KEYS[2] wait key
  6. KEYS[3] stalled key
  7. KEYS[4] paused key
  8. KEYS[5] meta key
  9. KEYS[6] limiter key
  10. KEYS[7] prioritized key
  11. KEYS[8] marker key
  12. KEYS[9] event key
  13. ARGV[1] job id
  14. ARGV[2] lock token
  15. ARGV[3] job id key
  16. ]]
  17. local rcall = redis.call
  18. -- Includes
  19. --[[
  20. Function to add job in target list and add marker if needed.
  21. ]]
  22. -- Includes
  23. --[[
  24. Add marker if needed when a job is available.
  25. ]]
  26. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  27. if not isPausedOrMaxed then
  28. rcall("ZADD", markerKey, 0, "0")
  29. end
  30. end
  31. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  32. rcall(pushCmd, targetKey, jobId)
  33. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  34. end
  35. --[[
  36. Function to push back job considering priority in front of same prioritized jobs.
  37. ]]
  38. local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
  39. -- in order to put it at front of same prioritized jobs
  40. -- we consider prioritized counter as 0
  41. local score = priority * 0x100000000
  42. rcall("ZADD", prioritizedKey, score, jobId)
  43. end
  44. --[[
  45. Function to get max events value or set by default 10000.
  46. ]]
  47. local function getOrSetMaxEvents(metaKey)
  48. local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
  49. if not maxEvents then
  50. maxEvents = 10000
  51. rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
  52. end
  53. return maxEvents
  54. end
  55. --[[
  56. Function to check for the meta.paused key to decide if we are paused or not
  57. (since an empty list and !EXISTS are not really the same).
  58. ]]
  59. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  60. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  61. if queueAttributes[1] then
  62. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  63. else
  64. if queueAttributes[2] then
  65. local activeCount = rcall("LLEN", activeKey)
  66. if activeCount >= tonumber(queueAttributes[2]) then
  67. return waitKey, true, queueAttributes[3], queueAttributes[4]
  68. else
  69. return waitKey, false, queueAttributes[3], queueAttributes[4]
  70. end
  71. end
  72. end
  73. return waitKey, false, queueAttributes[3], queueAttributes[4]
  74. end
  75. local function removeLock(jobKey, stalledKey, token, jobId)
  76. if token ~= "0" then
  77. local lockKey = jobKey .. ':lock'
  78. local lockToken = rcall("GET", lockKey)
  79. if lockToken == token then
  80. rcall("DEL", lockKey)
  81. rcall("SREM", stalledKey, jobId)
  82. else
  83. if lockToken then
  84. -- Lock exists but token does not match
  85. return -6
  86. else
  87. -- Lock is missing completely
  88. return -2
  89. end
  90. end
  91. end
  92. return 0
  93. end
  94. local jobId = ARGV[1]
  95. local token = ARGV[2]
  96. local jobKey = ARGV[3]
  97. if rcall("EXISTS", jobKey) == 0 then
  98. return -1
  99. end
  100. local errorCode = removeLock(jobKey, KEYS[3], token, jobId)
  101. if errorCode < 0 then
  102. return errorCode
  103. end
  104. local metaKey = KEYS[5]
  105. local removed = rcall("LREM", KEYS[1], 1, jobId)
  106. if removed > 0 then
  107. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4])
  108. local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0
  109. if priority > 0 then
  110. pushBackJobWithPriority(KEYS[7], priority, jobId)
  111. else
  112. addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId)
  113. end
  114. local maxEvents = getOrSetMaxEvents(metaKey)
  115. -- Emit waiting event
  116. rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
  117. "jobId", jobId, "prev", "active")
  118. end
  119. local pttl = rcall("PTTL", KEYS[6])
  120. if pttl > 0 then
  121. return pttl
  122. else
  123. return 0
  124. end
  125. `;
  126. export const moveJobFromActiveToWait = {
  127. name: 'moveJobFromActiveToWait',
  128. content,
  129. keys: 9,
  130. };
  131. //# sourceMappingURL=moveJobFromActiveToWait-9.js.map