queue-getters.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. 'use strict';
  2. import { __rest } from "tslib";
  3. import { QueueBase } from './queue-base';
  4. import { clientCommandMessageReg, QUEUE_EVENT_SUFFIX } from '../utils';
  5. import { MetricNames, TelemetryAttributes } from '../enums';
  6. /**
  7. * Provides different getters for different aspects of a queue.
  8. */
  9. export class QueueGetters extends QueueBase {
  10. getJob(jobId) {
  11. return this.Job.fromId(this, jobId);
  12. }
  13. commandByType(types, count, callback) {
  14. return types.map((type) => {
  15. type = type === 'waiting' ? 'wait' : type; // alias
  16. const key = this.toKey(type);
  17. switch (type) {
  18. case 'completed':
  19. case 'failed':
  20. case 'delayed':
  21. case 'prioritized':
  22. case 'repeat':
  23. case 'waiting-children':
  24. return callback(key, count ? 'zcard' : 'zrange');
  25. case 'active':
  26. case 'wait':
  27. case 'paused':
  28. return callback(key, count ? 'llen' : 'lrange');
  29. }
  30. });
  31. }
  32. sanitizeJobTypes(types) {
  33. const currentTypes = typeof types === 'string' ? [types] : types;
  34. if (Array.isArray(currentTypes) && currentTypes.length > 0) {
  35. const sanitizedTypes = [...currentTypes];
  36. if (sanitizedTypes.indexOf('waiting') !== -1) {
  37. sanitizedTypes.push('paused');
  38. }
  39. return [...new Set(sanitizedTypes)];
  40. }
  41. return [
  42. 'active',
  43. 'completed',
  44. 'delayed',
  45. 'failed',
  46. 'paused',
  47. 'prioritized',
  48. 'waiting',
  49. 'waiting-children',
  50. ];
  51. }
  52. /**
  53. Returns the number of jobs waiting to be processed. This includes jobs that are
  54. "waiting" or "delayed" or "prioritized" or "waiting-children".
  55. */
  56. async count() {
  57. const count = await this.getJobCountByTypes('waiting', 'paused', 'delayed', 'prioritized', 'waiting-children');
  58. return count;
  59. }
  60. /**
  61. * Returns the time to live for a rate limited key in milliseconds.
  62. * @param maxJobs - max jobs to be considered in rate limit state. If not passed
  63. * it will return the remaining ttl without considering if max jobs is excedeed.
  64. * @returns -2 if the key does not exist.
  65. * -1 if the key exists but has no associated expire.
  66. * @see {@link https://redis.io/commands/pttl/}
  67. */
  68. async getRateLimitTtl(maxJobs) {
  69. return this.scripts.getRateLimitTtl(maxJobs);
  70. }
  71. /**
  72. * Get jobId that starts debounced state.
  73. * @deprecated use getDeduplicationJobId method
  74. *
  75. * @param id - debounce identifier
  76. */
  77. async getDebounceJobId(id) {
  78. const client = await this.client;
  79. return client.get(`${this.keys.de}:${id}`);
  80. }
  81. /**
  82. * Get jobId from deduplicated state.
  83. *
  84. * @param id - deduplication identifier
  85. */
  86. async getDeduplicationJobId(id) {
  87. const client = await this.client;
  88. return client.get(`${this.keys.de}:${id}`);
  89. }
  90. /**
  91. * Get global concurrency value.
  92. * Returns null in case no value is set.
  93. */
  94. async getGlobalConcurrency() {
  95. const client = await this.client;
  96. const concurrency = await client.hget(this.keys.meta, 'concurrency');
  97. if (concurrency) {
  98. return Number(concurrency);
  99. }
  100. return null;
  101. }
  102. /**
  103. * Get global rate limit values.
  104. * Returns null in case no value is set.
  105. */
  106. async getGlobalRateLimit() {
  107. const client = await this.client;
  108. const [max, duration] = await client.hmget(this.keys.meta, 'max', 'duration');
  109. if (max && duration) {
  110. return {
  111. max: Number(max),
  112. duration: Number(duration),
  113. };
  114. }
  115. return null;
  116. }
  117. /**
  118. * Job counts by type
  119. *
  120. * Queue#getJobCountByTypes('completed') =\> completed count
  121. * Queue#getJobCountByTypes('completed', 'failed') =\> completed + failed count
  122. * Queue#getJobCountByTypes('completed', 'waiting', 'failed') =\> completed + waiting + failed count
  123. */
  124. async getJobCountByTypes(...types) {
  125. const result = await this.getJobCounts(...types);
  126. return Object.values(result).reduce((sum, count) => sum + count, 0);
  127. }
  128. /**
  129. * Returns the job counts for each type specified or every list/set in the queue by default.
  130. * @param types - the types of jobs to count. If not specified, it will return the counts for all types.
  131. * @returns An object, key (type) and value (count)
  132. */
  133. async getJobCounts(...types) {
  134. const currentTypes = this.sanitizeJobTypes(types);
  135. const responses = await this.scripts.getCounts(currentTypes);
  136. const counts = {};
  137. responses.forEach((res, index) => {
  138. counts[currentTypes[index]] = res || 0;
  139. });
  140. return counts;
  141. }
  142. /**
  143. * Records job counts as gauge metrics for telemetry purposes.
  144. * Each job state count is recorded with the queue name and state as attributes.
  145. * @param types - the types of jobs to count. If not specified, it will return the counts for all types.
  146. * @returns An object, key (type) and value (count)
  147. */
  148. async recordJobCountsMetric(...types) {
  149. var _a;
  150. const counts = await this.getJobCounts(...types);
  151. const meter = (_a = this.opts.telemetry) === null || _a === void 0 ? void 0 : _a.meter;
  152. if (meter && typeof meter.createGauge === 'function') {
  153. const gauge = meter.createGauge(MetricNames.QueueJobsCount, {
  154. description: 'Number of jobs in the queue by state',
  155. unit: '{jobs}',
  156. });
  157. for (const [state, jobCount] of Object.entries(counts)) {
  158. gauge.record(jobCount, {
  159. [TelemetryAttributes.QueueName]: this.name,
  160. [TelemetryAttributes.QueueJobsState]: state,
  161. });
  162. }
  163. }
  164. return counts;
  165. }
  166. /**
  167. * Get current job state.
  168. *
  169. * @param jobId - job identifier.
  170. * @returns Returns one of these values:
  171. * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
  172. */
  173. getJobState(jobId) {
  174. return this.scripts.getState(jobId);
  175. }
  176. /**
  177. * Get global queue configuration.
  178. *
  179. * @returns Returns the global queue configuration.
  180. */
  181. async getMeta() {
  182. const client = await this.client;
  183. const config = await client.hgetall(this.keys.meta);
  184. const { concurrency, max, duration, paused, 'opts.maxLenEvents': maxLenEvents } = config, rest = __rest(config, ["concurrency", "max", "duration", "paused", 'opts.maxLenEvents']);
  185. const parsedConfig = rest;
  186. if (concurrency) {
  187. parsedConfig['concurrency'] = Number(concurrency);
  188. }
  189. if (maxLenEvents) {
  190. parsedConfig['maxLenEvents'] = Number(maxLenEvents);
  191. }
  192. if (max) {
  193. parsedConfig['max'] = Number(max);
  194. }
  195. if (duration) {
  196. parsedConfig['duration'] = Number(duration);
  197. }
  198. parsedConfig['paused'] = paused === '1';
  199. return parsedConfig;
  200. }
  201. /**
  202. * @returns Returns the number of jobs in completed status.
  203. */
  204. getCompletedCount() {
  205. return this.getJobCountByTypes('completed');
  206. }
  207. /**
  208. * Returns the number of jobs in failed status.
  209. */
  210. getFailedCount() {
  211. return this.getJobCountByTypes('failed');
  212. }
  213. /**
  214. * Returns the number of jobs in delayed status.
  215. */
  216. getDelayedCount() {
  217. return this.getJobCountByTypes('delayed');
  218. }
  219. /**
  220. * Returns the number of jobs in active status.
  221. */
  222. getActiveCount() {
  223. return this.getJobCountByTypes('active');
  224. }
  225. /**
  226. * Returns the number of jobs in prioritized status.
  227. */
  228. getPrioritizedCount() {
  229. return this.getJobCountByTypes('prioritized');
  230. }
  231. /**
  232. * Returns the number of jobs per priority.
  233. */
  234. async getCountsPerPriority(priorities) {
  235. const uniquePriorities = [...new Set(priorities)];
  236. const responses = await this.scripts.getCountsPerPriority(uniquePriorities);
  237. const counts = {};
  238. responses.forEach((res, index) => {
  239. counts[`${uniquePriorities[index]}`] = res || 0;
  240. });
  241. return counts;
  242. }
  243. /**
  244. * Returns the number of jobs in waiting or paused statuses.
  245. */
  246. getWaitingCount() {
  247. return this.getJobCountByTypes('waiting');
  248. }
  249. /**
  250. * Returns the number of jobs in waiting-children status.
  251. */
  252. getWaitingChildrenCount() {
  253. return this.getJobCountByTypes('waiting-children');
  254. }
  255. /**
  256. * Returns the jobs that are in the "waiting" status.
  257. * @param start - zero based index from where to start returning jobs.
  258. * @param end - zero based index where to stop returning jobs.
  259. */
  260. getWaiting(start = 0, end = -1) {
  261. return this.getJobs(['waiting'], start, end, true);
  262. }
  263. /**
  264. * Returns the jobs that are in the "waiting-children" status.
  265. * I.E. parent jobs that have at least one child that has not completed yet.
  266. * @param start - zero based index from where to start returning jobs.
  267. * @param end - zero based index where to stop returning jobs.
  268. */
  269. getWaitingChildren(start = 0, end = -1) {
  270. return this.getJobs(['waiting-children'], start, end, true);
  271. }
  272. /**
  273. * Returns the jobs that are in the "active" status.
  274. * @param start - zero based index from where to start returning jobs.
  275. * @param end - zero based index where to stop returning jobs.
  276. */
  277. getActive(start = 0, end = -1) {
  278. return this.getJobs(['active'], start, end, true);
  279. }
  280. /**
  281. * Returns the jobs that are in the "delayed" status.
  282. * @param start - zero based index from where to start returning jobs.
  283. * @param end - zero based index where to stop returning jobs.
  284. */
  285. getDelayed(start = 0, end = -1) {
  286. return this.getJobs(['delayed'], start, end, true);
  287. }
  288. /**
  289. * Returns the jobs that are in the "prioritized" status.
  290. * @param start - zero based index from where to start returning jobs.
  291. * @param end - zero based index where to stop returning jobs.
  292. */
  293. getPrioritized(start = 0, end = -1) {
  294. return this.getJobs(['prioritized'], start, end, true);
  295. }
  296. /**
  297. * Returns the jobs that are in the "completed" status.
  298. * @param start - zero based index from where to start returning jobs.
  299. * @param end - zero based index where to stop returning jobs.
  300. */
  301. getCompleted(start = 0, end = -1) {
  302. return this.getJobs(['completed'], start, end, false);
  303. }
  304. /**
  305. * Returns the jobs that are in the "failed" status.
  306. * @param start - zero based index from where to start returning jobs.
  307. * @param end - zero based index where to stop returning jobs.
  308. */
  309. getFailed(start = 0, end = -1) {
  310. return this.getJobs(['failed'], start, end, false);
  311. }
  312. /**
  313. * Returns the qualified job ids and the raw job data (if available) of the
  314. * children jobs of the given parent job.
  315. * It is possible to get either the already processed children, in this case
  316. * an array of qualified job ids and their result values will be returned,
  317. * or the pending children, in this case an array of qualified job ids will
  318. * be returned.
  319. * A qualified job id is a string representing the job id in a given queue,
  320. * for example: "bull:myqueue:jobid".
  321. *
  322. * @param parentId - The id of the parent job
  323. * @param type - "processed" | "pending"
  324. * @param opts - Options for the query.
  325. *
  326. * @returns an object with the following shape:
  327. * `{ items: { id: string, v?: any, err?: string } [], jobs: JobJsonRaw[], total: number}`
  328. */
  329. async getDependencies(parentId, type, start, end) {
  330. const key = this.toKey(type == 'processed'
  331. ? `${parentId}:processed`
  332. : `${parentId}:dependencies`);
  333. const { items, total, jobs } = await this.scripts.paginate(key, {
  334. start,
  335. end,
  336. fetchJobs: true,
  337. });
  338. return {
  339. items,
  340. jobs,
  341. total,
  342. };
  343. }
  344. async getRanges(types, start = 0, end = 1, asc = false) {
  345. const multiCommands = [];
  346. this.commandByType(types, false, (key, command) => {
  347. switch (command) {
  348. case 'lrange':
  349. multiCommands.push('lrange');
  350. break;
  351. case 'zrange':
  352. multiCommands.push('zrange');
  353. break;
  354. }
  355. });
  356. const responses = await this.scripts.getRanges(types, start, end, asc);
  357. let results = [];
  358. responses.forEach((response, index) => {
  359. const result = response || [];
  360. if (asc && multiCommands[index] === 'lrange') {
  361. results = results.concat(result.reverse());
  362. }
  363. else {
  364. results = results.concat(result);
  365. }
  366. });
  367. return [...new Set(results)];
  368. }
  369. /**
  370. * Returns the jobs that are on the given statuses (note that JobType is synonym for job status)
  371. * @param types - the statuses of the jobs to return.
  372. * @param start - zero based index from where to start returning jobs.
  373. * @param end - zero based index where to stop returning jobs.
  374. * @param asc - if true, the jobs will be returned in ascending order.
  375. */
  376. async getJobs(types, start = 0, end = -1, asc = false) {
  377. const currentTypes = this.sanitizeJobTypes(types);
  378. const jobIds = await this.getRanges(currentTypes, start, end, asc);
  379. return Promise.all(jobIds.map(jobId => this.Job.fromId(this, jobId)));
  380. }
  381. /**
  382. * Returns the logs for a given Job.
  383. * @param jobId - the id of the job to get the logs for.
  384. * @param start - zero based index from where to start returning jobs.
  385. * @param end - zero based index where to stop returning jobs.
  386. * @param asc - if true, the jobs will be returned in ascending order.
  387. */
  388. async getJobLogs(jobId, start = 0, end = -1, asc = true) {
  389. const client = await this.client;
  390. const multi = client.multi();
  391. const logsKey = this.toKey(jobId + ':logs');
  392. if (asc) {
  393. multi.lrange(logsKey, start, end);
  394. }
  395. else {
  396. multi.lrange(logsKey, -(end + 1), -(start + 1));
  397. }
  398. multi.llen(logsKey);
  399. const result = (await multi.exec());
  400. if (!asc) {
  401. result[0][1].reverse();
  402. }
  403. return {
  404. logs: result[0][1],
  405. count: result[1][1],
  406. };
  407. }
  408. async baseGetClients(matcher) {
  409. const client = await this.client;
  410. try {
  411. if (client.isCluster) {
  412. const clusterNodes = client.nodes();
  413. const clientsPerNode = [];
  414. for (let nodeIndex = 0; nodeIndex < clusterNodes.length; nodeIndex++) {
  415. const node = clusterNodes[nodeIndex];
  416. const clients = (await node.client('LIST'));
  417. const list = this.parseClientList(clients, matcher);
  418. clientsPerNode.push(list);
  419. }
  420. const clientsFromNodeWithMostConnections = clientsPerNode.reduce((prev, current) => {
  421. return prev.length > current.length ? prev : current;
  422. }, []);
  423. return clientsFromNodeWithMostConnections;
  424. }
  425. else {
  426. const clients = (await client.client('LIST'));
  427. const list = this.parseClientList(clients, matcher);
  428. return list;
  429. }
  430. }
  431. catch (err) {
  432. if (!clientCommandMessageReg.test(err.message)) {
  433. throw err;
  434. }
  435. return [{ name: 'GCP does not support client list' }];
  436. }
  437. }
  438. /**
  439. * Get the worker list related to the queue. i.e. all the known
  440. * workers that are available to process jobs for this queue.
  441. * Note: GCP does not support SETNAME, so this call will not work
  442. *
  443. * @returns - Returns an array with workers info.
  444. */
  445. getWorkers() {
  446. const unnamedWorkerClientName = `${this.clientName()}`;
  447. const namedWorkerClientName = `${this.clientName()}:w:`;
  448. const matcher = (name) => name &&
  449. (name === unnamedWorkerClientName ||
  450. name.startsWith(namedWorkerClientName));
  451. return this.baseGetClients(matcher);
  452. }
  453. /**
  454. * Returns the current count of workers for the queue.
  455. *
  456. * getWorkersCount(): Promise<number>
  457. *
  458. */
  459. async getWorkersCount() {
  460. const workers = await this.getWorkers();
  461. return workers.length;
  462. }
  463. /**
  464. * Get queue events list related to the queue.
  465. * Note: GCP does not support SETNAME, so this call will not work
  466. *
  467. * @deprecated do not use this method, it will be removed in the future.
  468. *
  469. * @returns - Returns an array with queue events info.
  470. */
  471. async getQueueEvents() {
  472. const clientName = `${this.clientName()}${QUEUE_EVENT_SUFFIX}`;
  473. return this.baseGetClients((name) => name === clientName);
  474. }
  475. /**
  476. * Get queue metrics related to the queue.
  477. *
  478. * This method returns the gathered metrics for the queue.
  479. * The metrics are represented as an array of job counts
  480. * per unit of time (1 minute).
  481. *
  482. * @param start - Start point of the metrics, where 0
  483. * is the newest point to be returned.
  484. * @param end - End point of the metrics, where -1 is the
  485. * oldest point to be returned.
  486. *
  487. * @returns - Returns an object with queue metrics.
  488. */
  489. async getMetrics(type, start = 0, end = -1) {
  490. const [meta, data, count] = await this.scripts.getMetrics(type, start, end);
  491. return {
  492. meta: {
  493. count: parseInt(meta[0] || '0', 10),
  494. prevTS: parseInt(meta[1] || '0', 10),
  495. prevCount: parseInt(meta[2] || '0', 10),
  496. },
  497. data: data.map(point => +point || 0),
  498. count,
  499. };
  500. }
  501. parseClientList(list, matcher) {
  502. const lines = list.split(/\r?\n/);
  503. const clients = [];
  504. lines.forEach((line) => {
  505. const client = {};
  506. const keyValues = line.split(' ');
  507. keyValues.forEach(function (keyValue) {
  508. const index = keyValue.indexOf('=');
  509. const key = keyValue.substring(0, index);
  510. const value = keyValue.substring(index + 1);
  511. client[key] = value;
  512. });
  513. const name = client['name'];
  514. if (matcher(name)) {
  515. client['name'] = this.name;
  516. client['rawname'] = name;
  517. clients.push(client);
  518. }
  519. });
  520. return clients;
  521. }
  522. /**
  523. * Export the metrics for the queue in the Prometheus format.
  524. * Automatically exports all the counts returned by getJobCounts().
  525. *
  526. * @returns - Returns a string with the metrics in the Prometheus format.
  527. *
  528. * @see {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
  529. *
  530. **/
  531. async exportPrometheusMetrics(globalVariables) {
  532. const counts = await this.getJobCounts();
  533. const metrics = [];
  534. // Match the test's expected HELP text
  535. metrics.push('# HELP bullmq_job_count Number of jobs in the queue by state');
  536. metrics.push('# TYPE bullmq_job_count gauge');
  537. const variables = !globalVariables
  538. ? ''
  539. : Object.keys(globalVariables).reduce((acc, curr) => `${acc}, ${curr}="${globalVariables[curr]}"`, '');
  540. for (const [state, count] of Object.entries(counts)) {
  541. metrics.push(`bullmq_job_count{queue="${this.name}", state="${state}"${variables}} ${count}`);
  542. }
  543. return metrics.join('\n');
  544. }
  545. }
  546. //# sourceMappingURL=queue-getters.js.map