moveToWaitingChildren-7.lua 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. --[[
  2. Moves job from active to waiting children set.
  3. Input:
  4. KEYS[1] active key
  5. KEYS[2] wait-children key
  6. KEYS[3] job key
  7. KEYS[4] job dependencies key
  8. KEYS[5] job unsuccessful key
  9. KEYS[6] stalled key
  10. KEYS[7] events key
  11. ARGV[1] token
  12. ARGV[2] child key
  13. ARGV[3] timestamp
  14. ARGV[4] jobId
  15. ARGV[5] prefix
  16. Output:
  17. 0 - OK
  18. 1 - There are not pending dependencies.
  19. -1 - Missing job.
  20. -2 - Missing lock
  21. -3 - Job not in active set
  22. -9 - Job has failed children
  23. ]]
  24. local rcall = redis.call
  25. local activeKey = KEYS[1]
  26. local waitingChildrenKey = KEYS[2]
  27. local jobKey = KEYS[3]
  28. local jobDependenciesKey = KEYS[4]
  29. local jobUnsuccessfulKey = KEYS[5]
  30. local stalledKey = KEYS[6]
  31. local eventStreamKey = KEYS[7]
  32. local token = ARGV[1]
  33. local timestamp = ARGV[3]
  34. local jobId = ARGV[4]
  35. --- Includes
  36. --- @include "includes/removeLock"
  37. local function removeJobFromActive(activeKey, stalledKey, jobKey, jobId,
  38. token)
  39. local errorCode = removeLock(jobKey, stalledKey, token, jobId)
  40. if errorCode < 0 then
  41. return errorCode
  42. end
  43. local numRemovedElements = rcall("LREM", activeKey, -1, jobId)
  44. if numRemovedElements < 1 then
  45. return -3
  46. end
  47. return 0
  48. end
  49. local function moveToWaitingChildren(activeKey, waitingChildrenKey, stalledKey, eventStreamKey,
  50. jobKey, jobId, timestamp, token)
  51. local errorCode = removeJobFromActive(activeKey, stalledKey, jobKey, jobId, token)
  52. if errorCode < 0 then
  53. return errorCode
  54. end
  55. local score = tonumber(timestamp)
  56. rcall("ZADD", waitingChildrenKey, score, jobId)
  57. rcall("XADD", eventStreamKey, "*", "event", "waiting-children", "jobId", jobId, 'prev', 'active')
  58. return 0
  59. end
  60. if rcall("EXISTS", jobKey) == 1 then
  61. if rcall("ZCARD", jobUnsuccessfulKey) ~= 0 then
  62. return -9
  63. else
  64. if ARGV[2] ~= "" then
  65. if rcall("SISMEMBER", jobDependenciesKey, ARGV[2]) ~= 0 then
  66. return moveToWaitingChildren(activeKey, waitingChildrenKey, stalledKey, eventStreamKey,
  67. jobKey, jobId, timestamp, token)
  68. end
  69. return 1
  70. else
  71. if rcall("SCARD", jobDependenciesKey) ~= 0 then
  72. return moveToWaitingChildren(activeKey, waitingChildrenKey, stalledKey, eventStreamKey,
  73. jobKey, jobId, timestamp, token)
  74. end
  75. return 1
  76. end
  77. end
  78. end
  79. return -1