moveChildFromDependenciesIfNeeded.lua 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. --[[
  2. Function to recursively move from waitingChildren to failed.
  3. ]]
  4. -- Includes
  5. --- @include "moveParentToWaitIfNoPendingDependencies"
  6. --- @include "moveParentToWaitIfNeeded"
  7. --- @include "moveParentToWait"
  8. local handleChildFailureAndMoveParentToWait = function (parentQueueKey, parentKey, parentId, jobIdKey, timestamp)
  9. if rcall("EXISTS", parentKey) == 1 then
  10. local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
  11. local parentDelayedKey = parentQueueKey .. ":delayed"
  12. local parentWaitingChildrenOrDelayedKey
  13. if rcall("ZSCORE", parentWaitingChildrenKey, parentId) then
  14. parentWaitingChildrenOrDelayedKey = parentWaitingChildrenKey
  15. elseif rcall("ZSCORE", parentDelayedKey, parentId) then
  16. parentWaitingChildrenOrDelayedKey = parentDelayedKey
  17. rcall("HSET", parentKey, "delay", 0)
  18. end
  19. if parentWaitingChildrenOrDelayedKey then
  20. rcall("ZREM", parentWaitingChildrenOrDelayedKey, parentId)
  21. local deferredFailure = "child " .. jobIdKey .. " failed"
  22. rcall("HSET", parentKey, "defa", deferredFailure)
  23. moveParentToWait(parentQueueKey, parentKey, parentId, timestamp)
  24. else
  25. if not rcall("ZSCORE", parentQueueKey .. ":failed", parentId) then
  26. local deferredFailure = "child " .. jobIdKey .. " failed"
  27. rcall("HSET", parentKey, "defa", deferredFailure)
  28. end
  29. end
  30. end
  31. end
  32. local moveChildFromDependenciesIfNeeded = function (rawParentData, childKey, failedReason, timestamp)
  33. if rawParentData then
  34. local parentData = cjson.decode(rawParentData)
  35. local parentKey = parentData['queueKey'] .. ':' .. parentData['id']
  36. local parentDependenciesChildrenKey = parentKey .. ":dependencies"
  37. if parentData['fpof'] then
  38. if rcall("SREM", parentDependenciesChildrenKey, childKey) == 1 then
  39. local parentUnsuccessfulChildrenKey = parentKey .. ":unsuccessful"
  40. rcall("ZADD", parentUnsuccessfulChildrenKey, timestamp, childKey)
  41. handleChildFailureAndMoveParentToWait(
  42. parentData['queueKey'],
  43. parentKey,
  44. parentData['id'],
  45. childKey,
  46. timestamp
  47. )
  48. end
  49. elseif parentData['cpof'] then
  50. if rcall("SREM", parentDependenciesChildrenKey, childKey) == 1 then
  51. local parentFailedChildrenKey = parentKey .. ":failed"
  52. rcall("HSET", parentFailedChildrenKey, childKey, failedReason)
  53. moveParentToWaitIfNeeded(parentData['queueKey'], parentKey, parentData['id'], timestamp)
  54. end
  55. elseif parentData['idof'] or parentData['rdof'] then
  56. if rcall("SREM", parentDependenciesChildrenKey, childKey) == 1 then
  57. moveParentToWaitIfNoPendingDependencies(parentData['queueKey'], parentDependenciesChildrenKey,
  58. parentKey, parentData['id'], timestamp)
  59. if parentData['idof'] then
  60. local parentFailedChildrenKey = parentKey .. ":failed"
  61. rcall("HSET", parentFailedChildrenKey, childKey, failedReason)
  62. end
  63. end
  64. end
  65. end
  66. end