removeOrphanedJobs-1.lua 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. --[[
  2. Removes orphaned job keys that exist in Redis but are not referenced
  3. in any queue state set. Checks each candidate atomically.
  4. Input:
  5. KEYS[1] base prefix key including trailing colon (e.g. bull:queueName:)
  6. ARGV[1] number of state key suffixes
  7. ARGV[2 .. 1+N] state key suffixes (e.g. active, wait, completed, ...)
  8. ARGV[2+N] number of job sub-key suffixes
  9. ARGV[3+N .. 2+N+M] job sub-key suffixes (e.g. logs, dependencies, ...)
  10. ARGV[3+N+M .. end] candidate job IDs to check
  11. Output:
  12. number of removed jobs
  13. ]]
  14. local rcall = redis.call
  15. local basePrefix = KEYS[1]
  16. -- Parse state key suffixes and cache their full key names + types.
  17. local stateKeyCount = tonumber(ARGV[1])
  18. local stateKeys = {}
  19. local stateKeyTypes = {}
  20. for i = 1, stateKeyCount do
  21. local fullKey = basePrefix .. ARGV[1 + i]
  22. stateKeys[i] = fullKey
  23. stateKeyTypes[i] = rcall('TYPE', fullKey)['ok']
  24. end
  25. -- Parse job sub-key suffixes.
  26. local subKeyCountIdx = 2 + stateKeyCount
  27. local subKeyCount = tonumber(ARGV[subKeyCountIdx])
  28. local subKeySuffixes = {}
  29. for i = 1, subKeyCount do
  30. subKeySuffixes[i] = ARGV[subKeyCountIdx + i]
  31. end
  32. -- Process candidate job IDs.
  33. local candidateStart = subKeyCountIdx + subKeyCount + 1
  34. local removedCount = 0
  35. for c = candidateStart, #ARGV do
  36. local jobId = ARGV[c]
  37. local found = false
  38. for i = 1, stateKeyCount do
  39. local kt = stateKeyTypes[i]
  40. if kt == 'list' then
  41. if rcall('LPOS', stateKeys[i], jobId) then
  42. found = true
  43. break
  44. end
  45. elseif kt == 'zset' then
  46. if rcall('ZSCORE', stateKeys[i], jobId) then
  47. found = true
  48. break
  49. end
  50. elseif kt == 'set' then
  51. if rcall('SISMEMBER', stateKeys[i], jobId) == 1 then
  52. found = true
  53. break
  54. end
  55. end
  56. end
  57. if not found then
  58. local jobKey = basePrefix .. jobId
  59. local keysToDelete = { jobKey }
  60. for _, suffix in ipairs(subKeySuffixes) do
  61. keysToDelete[#keysToDelete + 1] = jobKey .. ':' .. suffix
  62. end
  63. rcall('DEL', unpack(keysToDelete))
  64. removedCount = removedCount + 1
  65. end
  66. end
  67. return removedCount