prepareJobForProcessing.lua 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. --[[
  2. Function to move job from wait state to active.
  3. Input:
  4. opts - token - lock token
  5. opts - lockDuration
  6. opts - limiter
  7. ]]
  8. -- Includes
  9. --- @include "addBaseMarkerIfNeeded"
  10. local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
  11. jobId, processedOn, maxJobs, limiterDuration, markerKey, opts)
  12. local jobKey = keyPrefix .. jobId
  13. -- Check if we need to perform rate limiting.
  14. if maxJobs then
  15. local jobCounter = tonumber(rcall("INCR", rateLimiterKey))
  16. if jobCounter == 1 then
  17. local integerDuration = math.floor(math.abs(limiterDuration))
  18. rcall("PEXPIRE", rateLimiterKey, integerDuration)
  19. end
  20. end
  21. -- get a lock
  22. if opts['token'] ~= "0" then
  23. local lockKey = jobKey .. ':lock'
  24. rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
  25. end
  26. local optionalValues = {}
  27. if opts['name'] then
  28. -- Set "processedBy" field to the worker name
  29. table.insert(optionalValues, "pb")
  30. table.insert(optionalValues, opts['name'])
  31. end
  32. rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
  33. rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
  34. rcall("HINCRBY", jobKey, "ats", 1)
  35. addBaseMarkerIfNeeded(markerKey, false)
  36. -- rate limit delay must be 0 in this case to prevent adding more delay
  37. -- when job that is moved to active needs to be processed
  38. return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
  39. end