changePriority-7.js 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.changePriority = void 0;
  4. const content = `--[[
  5. Change job priority
  6. Input:
  7. KEYS[1] 'wait',
  8. KEYS[2] 'paused'
  9. KEYS[3] 'meta'
  10. KEYS[4] 'prioritized'
  11. KEYS[5] 'active'
  12. KEYS[6] 'pc' priority counter
  13. KEYS[7] 'marker'
  14. ARGV[1] priority value
  15. ARGV[2] prefix key
  16. ARGV[3] job id
  17. ARGV[4] lifo
  18. Output:
  19. 0 - OK
  20. -1 - Missing job
  21. ]]
  22. local jobId = ARGV[3]
  23. local jobKey = ARGV[2] .. jobId
  24. local priority = tonumber(ARGV[1])
  25. local rcall = redis.call
  26. -- Includes
  27. --[[
  28. Function to add job in target list and add marker if needed.
  29. ]]
  30. -- Includes
  31. --[[
  32. Add marker if needed when a job is available.
  33. ]]
  34. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  35. if not isPausedOrMaxed then
  36. rcall("ZADD", markerKey, 0, "0")
  37. end
  38. end
  39. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  40. rcall(pushCmd, targetKey, jobId)
  41. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  42. end
  43. --[[
  44. Function to add job considering priority.
  45. ]]
  46. -- Includes
  47. --[[
  48. Function to get priority score.
  49. ]]
  50. local function getPriorityScore(priority, priorityCounterKey)
  51. local prioCounter = rcall("INCR", priorityCounterKey)
  52. return priority * 0x100000000 + prioCounter % 0x100000000
  53. end
  54. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  55. isPausedOrMaxed)
  56. local score = getPriorityScore(priority, priorityCounterKey)
  57. rcall("ZADD", prioritizedKey, score, jobId)
  58. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  59. end
  60. --[[
  61. Function to check for the meta.paused key to decide if we are paused or not
  62. (since an empty list and !EXISTS are not really the same).
  63. ]]
  64. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  65. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  66. if queueAttributes[1] then
  67. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  68. else
  69. if queueAttributes[2] then
  70. local activeCount = rcall("LLEN", activeKey)
  71. if activeCount >= tonumber(queueAttributes[2]) then
  72. return waitKey, true, queueAttributes[3], queueAttributes[4]
  73. else
  74. return waitKey, false, queueAttributes[3], queueAttributes[4]
  75. end
  76. end
  77. end
  78. return waitKey, false, queueAttributes[3], queueAttributes[4]
  79. end
  80. --[[
  81. Function to push back job considering priority in front of same prioritized jobs.
  82. ]]
  83. local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
  84. -- in order to put it at front of same prioritized jobs
  85. -- we consider prioritized counter as 0
  86. local score = priority * 0x100000000
  87. rcall("ZADD", prioritizedKey, score, jobId)
  88. end
  89. local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey,
  90. priorityCounter, lifo, priority, jobId, isPausedOrMaxed)
  91. if priority == 0 then
  92. local pushCmd = lifo and 'RPUSH' or 'LPUSH'
  93. addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  94. else
  95. if lifo then
  96. pushBackJobWithPriority(prioritizedKey, priority, jobId)
  97. else
  98. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  99. priorityCounter, isPausedOrMaxed)
  100. end
  101. end
  102. end
  103. if rcall("EXISTS", jobKey) == 1 then
  104. local metaKey = KEYS[3]
  105. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2])
  106. local prioritizedKey = KEYS[4]
  107. local priorityCounterKey = KEYS[6]
  108. local markerKey = KEYS[7]
  109. -- Re-add with the new priority
  110. if rcall("ZREM", prioritizedKey, jobId) > 0 then
  111. reAddJobWithNewPriority( prioritizedKey, markerKey, target,
  112. priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed)
  113. elseif rcall("LREM", target, -1, jobId) > 0 then
  114. reAddJobWithNewPriority( prioritizedKey, markerKey, target,
  115. priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed)
  116. end
  117. rcall("HSET", jobKey, "priority", priority)
  118. return 0
  119. else
  120. return -1
  121. end
  122. `;
  123. exports.changePriority = {
  124. name: 'changePriority',
  125. content,
  126. keys: 7,
  127. };
  128. //# sourceMappingURL=changePriority-7.js.map