changePriority-7.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. const content = `--[[
  2. Change job priority
  3. Input:
  4. KEYS[1] 'wait',
  5. KEYS[2] 'paused'
  6. KEYS[3] 'meta'
  7. KEYS[4] 'prioritized'
  8. KEYS[5] 'active'
  9. KEYS[6] 'pc' priority counter
  10. KEYS[7] 'marker'
  11. ARGV[1] priority value
  12. ARGV[2] prefix key
  13. ARGV[3] job id
  14. ARGV[4] lifo
  15. Output:
  16. 0 - OK
  17. -1 - Missing job
  18. ]]
  19. local jobId = ARGV[3]
  20. local jobKey = ARGV[2] .. jobId
  21. local priority = tonumber(ARGV[1])
  22. local rcall = redis.call
  23. -- Includes
  24. --[[
  25. Function to add job in target list and add marker if needed.
  26. ]]
  27. -- Includes
  28. --[[
  29. Add marker if needed when a job is available.
  30. ]]
  31. local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  32. if not isPausedOrMaxed then
  33. rcall("ZADD", markerKey, 0, "0")
  34. end
  35. end
  36. local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  37. rcall(pushCmd, targetKey, jobId)
  38. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  39. end
  40. --[[
  41. Function to add job considering priority.
  42. ]]
  43. -- Includes
  44. --[[
  45. Function to get priority score.
  46. ]]
  47. local function getPriorityScore(priority, priorityCounterKey)
  48. local prioCounter = rcall("INCR", priorityCounterKey)
  49. return priority * 0x100000000 + prioCounter % 0x100000000
  50. end
  51. local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey,
  52. isPausedOrMaxed)
  53. local score = getPriorityScore(priority, priorityCounterKey)
  54. rcall("ZADD", prioritizedKey, score, jobId)
  55. addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed)
  56. end
  57. --[[
  58. Function to check for the meta.paused key to decide if we are paused or not
  59. (since an empty list and !EXISTS are not really the same).
  60. ]]
  61. local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
  62. local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency", "max", "duration")
  63. if queueAttributes[1] then
  64. return pausedKey, true, queueAttributes[3], queueAttributes[4]
  65. else
  66. if queueAttributes[2] then
  67. local activeCount = rcall("LLEN", activeKey)
  68. if activeCount >= tonumber(queueAttributes[2]) then
  69. return waitKey, true, queueAttributes[3], queueAttributes[4]
  70. else
  71. return waitKey, false, queueAttributes[3], queueAttributes[4]
  72. end
  73. end
  74. end
  75. return waitKey, false, queueAttributes[3], queueAttributes[4]
  76. end
  77. --[[
  78. Function to push back job considering priority in front of same prioritized jobs.
  79. ]]
  80. local function pushBackJobWithPriority(prioritizedKey, priority, jobId)
  81. -- in order to put it at front of same prioritized jobs
  82. -- we consider prioritized counter as 0
  83. local score = priority * 0x100000000
  84. rcall("ZADD", prioritizedKey, score, jobId)
  85. end
  86. local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey,
  87. priorityCounter, lifo, priority, jobId, isPausedOrMaxed)
  88. if priority == 0 then
  89. local pushCmd = lifo and 'RPUSH' or 'LPUSH'
  90. addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId)
  91. else
  92. if lifo then
  93. pushBackJobWithPriority(prioritizedKey, priority, jobId)
  94. else
  95. addJobWithPriority(markerKey, prioritizedKey, priority, jobId,
  96. priorityCounter, isPausedOrMaxed)
  97. end
  98. end
  99. end
  100. if rcall("EXISTS", jobKey) == 1 then
  101. local metaKey = KEYS[3]
  102. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2])
  103. local prioritizedKey = KEYS[4]
  104. local priorityCounterKey = KEYS[6]
  105. local markerKey = KEYS[7]
  106. -- Re-add with the new priority
  107. if rcall("ZREM", prioritizedKey, jobId) > 0 then
  108. reAddJobWithNewPriority( prioritizedKey, markerKey, target,
  109. priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed)
  110. elseif rcall("LREM", target, -1, jobId) > 0 then
  111. reAddJobWithNewPriority( prioritizedKey, markerKey, target,
  112. priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed)
  113. end
  114. rcall("HSET", jobKey, "priority", priority)
  115. return 0
  116. else
  117. return -1
  118. end
  119. `;
  120. export const changePriority = {
  121. name: 'changePriority',
  122. content,
  123. keys: 7,
  124. };
  125. //# sourceMappingURL=changePriority-7.js.map