collectMetrics.lua 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. --[[
  2. Functions to collect metrics based on a current and previous count of jobs.
  3. Granualarity is fixed at 1 minute.
  4. ]]
  5. --- @include "batches"
  6. local function collectMetrics(metaKey, dataPointsList, maxDataPoints,
  7. timestamp)
  8. -- Increment current count
  9. local count = rcall("HINCRBY", metaKey, "count", 1) - 1
  10. -- Compute how many data points we need to add to the list, N.
  11. local prevTS = rcall("HGET", metaKey, "prevTS")
  12. if not prevTS then
  13. -- If prevTS is nil, set it to the current timestamp
  14. rcall("HSET", metaKey, "prevTS", timestamp, "prevCount", 0)
  15. return
  16. end
  17. local N = math.min(math.floor(timestamp / 60000) - math.floor(prevTS / 60000), tonumber(maxDataPoints))
  18. if N > 0 then
  19. local delta = count - rcall("HGET", metaKey, "prevCount")
  20. -- If N > 1, add N-1 zeros to the list
  21. if N > 1 then
  22. local points = {}
  23. points[1] = delta
  24. for i = 2, N do
  25. points[i] = 0
  26. end
  27. for from, to in batches(#points, 7000) do
  28. rcall("LPUSH", dataPointsList, unpack(points, from, to))
  29. end
  30. else
  31. -- LPUSH delta to the list
  32. rcall("LPUSH", dataPointsList, delta)
  33. end
  34. -- LTRIM to keep list to its max size
  35. rcall("LTRIM", dataPointsList, 0, maxDataPoints - 1)
  36. -- update prev count with current count
  37. rcall("HSET", metaKey, "prevCount", count, "prevTS", timestamp)
  38. end
  39. end