queue-getters.js 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. 'use strict';
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.QueueGetters = void 0;
  4. const tslib_1 = require("tslib");
  5. const queue_base_1 = require("./queue-base");
  6. const utils_1 = require("../utils");
  7. const enums_1 = require("../enums");
  8. /**
  9. * Provides different getters for different aspects of a queue.
  10. */
  11. class QueueGetters extends queue_base_1.QueueBase {
  12. getJob(jobId) {
  13. return this.Job.fromId(this, jobId);
  14. }
  15. commandByType(types, count, callback) {
  16. return types.map((type) => {
  17. type = type === 'waiting' ? 'wait' : type; // alias
  18. const key = this.toKey(type);
  19. switch (type) {
  20. case 'completed':
  21. case 'failed':
  22. case 'delayed':
  23. case 'prioritized':
  24. case 'repeat':
  25. case 'waiting-children':
  26. return callback(key, count ? 'zcard' : 'zrange');
  27. case 'active':
  28. case 'wait':
  29. case 'paused':
  30. return callback(key, count ? 'llen' : 'lrange');
  31. }
  32. });
  33. }
  34. sanitizeJobTypes(types) {
  35. const currentTypes = typeof types === 'string' ? [types] : types;
  36. if (Array.isArray(currentTypes) && currentTypes.length > 0) {
  37. const sanitizedTypes = [...currentTypes];
  38. if (sanitizedTypes.indexOf('waiting') !== -1) {
  39. sanitizedTypes.push('paused');
  40. }
  41. return [...new Set(sanitizedTypes)];
  42. }
  43. return [
  44. 'active',
  45. 'completed',
  46. 'delayed',
  47. 'failed',
  48. 'paused',
  49. 'prioritized',
  50. 'waiting',
  51. 'waiting-children',
  52. ];
  53. }
  54. /**
  55. Returns the number of jobs waiting to be processed. This includes jobs that are
  56. "waiting" or "delayed" or "prioritized" or "waiting-children".
  57. */
  58. async count() {
  59. const count = await this.getJobCountByTypes('waiting', 'paused', 'delayed', 'prioritized', 'waiting-children');
  60. return count;
  61. }
  62. /**
  63. * Returns the time to live for a rate limited key in milliseconds.
  64. * @param maxJobs - max jobs to be considered in rate limit state. If not passed
  65. * it will return the remaining ttl without considering if max jobs is excedeed.
  66. * @returns -2 if the key does not exist.
  67. * -1 if the key exists but has no associated expire.
  68. * @see {@link https://redis.io/commands/pttl/}
  69. */
  70. async getRateLimitTtl(maxJobs) {
  71. return this.scripts.getRateLimitTtl(maxJobs);
  72. }
  73. /**
  74. * Get jobId that starts debounced state.
  75. * @deprecated use getDeduplicationJobId method
  76. *
  77. * @param id - debounce identifier
  78. */
  79. async getDebounceJobId(id) {
  80. const client = await this.client;
  81. return client.get(`${this.keys.de}:${id}`);
  82. }
  83. /**
  84. * Get jobId from deduplicated state.
  85. *
  86. * @param id - deduplication identifier
  87. */
  88. async getDeduplicationJobId(id) {
  89. const client = await this.client;
  90. return client.get(`${this.keys.de}:${id}`);
  91. }
  92. /**
  93. * Get global concurrency value.
  94. * Returns null in case no value is set.
  95. */
  96. async getGlobalConcurrency() {
  97. const client = await this.client;
  98. const concurrency = await client.hget(this.keys.meta, 'concurrency');
  99. if (concurrency) {
  100. return Number(concurrency);
  101. }
  102. return null;
  103. }
  104. /**
  105. * Get global rate limit values.
  106. * Returns null in case no value is set.
  107. */
  108. async getGlobalRateLimit() {
  109. const client = await this.client;
  110. const [max, duration] = await client.hmget(this.keys.meta, 'max', 'duration');
  111. if (max && duration) {
  112. return {
  113. max: Number(max),
  114. duration: Number(duration),
  115. };
  116. }
  117. return null;
  118. }
  119. /**
  120. * Job counts by type
  121. *
  122. * Queue#getJobCountByTypes('completed') =\> completed count
  123. * Queue#getJobCountByTypes('completed', 'failed') =\> completed + failed count
  124. * Queue#getJobCountByTypes('completed', 'waiting', 'failed') =\> completed + waiting + failed count
  125. */
  126. async getJobCountByTypes(...types) {
  127. const result = await this.getJobCounts(...types);
  128. return Object.values(result).reduce((sum, count) => sum + count, 0);
  129. }
  130. /**
  131. * Returns the job counts for each type specified or every list/set in the queue by default.
  132. * @param types - the types of jobs to count. If not specified, it will return the counts for all types.
  133. * @returns An object, key (type) and value (count)
  134. */
  135. async getJobCounts(...types) {
  136. const currentTypes = this.sanitizeJobTypes(types);
  137. const responses = await this.scripts.getCounts(currentTypes);
  138. const counts = {};
  139. responses.forEach((res, index) => {
  140. counts[currentTypes[index]] = res || 0;
  141. });
  142. return counts;
  143. }
  144. /**
  145. * Records job counts as gauge metrics for telemetry purposes.
  146. * Each job state count is recorded with the queue name and state as attributes.
  147. * @param types - the types of jobs to count. If not specified, it will return the counts for all types.
  148. * @returns An object, key (type) and value (count)
  149. */
  150. async recordJobCountsMetric(...types) {
  151. var _a;
  152. const counts = await this.getJobCounts(...types);
  153. const meter = (_a = this.opts.telemetry) === null || _a === void 0 ? void 0 : _a.meter;
  154. if (meter && typeof meter.createGauge === 'function') {
  155. const gauge = meter.createGauge(enums_1.MetricNames.QueueJobsCount, {
  156. description: 'Number of jobs in the queue by state',
  157. unit: '{jobs}',
  158. });
  159. for (const [state, jobCount] of Object.entries(counts)) {
  160. gauge.record(jobCount, {
  161. [enums_1.TelemetryAttributes.QueueName]: this.name,
  162. [enums_1.TelemetryAttributes.QueueJobsState]: state,
  163. });
  164. }
  165. }
  166. return counts;
  167. }
  168. /**
  169. * Get current job state.
  170. *
  171. * @param jobId - job identifier.
  172. * @returns Returns one of these values:
  173. * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
  174. */
  175. getJobState(jobId) {
  176. return this.scripts.getState(jobId);
  177. }
  178. /**
  179. * Get global queue configuration.
  180. *
  181. * @returns Returns the global queue configuration.
  182. */
  183. async getMeta() {
  184. const client = await this.client;
  185. const config = await client.hgetall(this.keys.meta);
  186. const { concurrency, max, duration, paused, 'opts.maxLenEvents': maxLenEvents } = config, rest = tslib_1.__rest(config, ["concurrency", "max", "duration", "paused", 'opts.maxLenEvents']);
  187. const parsedConfig = rest;
  188. if (concurrency) {
  189. parsedConfig['concurrency'] = Number(concurrency);
  190. }
  191. if (maxLenEvents) {
  192. parsedConfig['maxLenEvents'] = Number(maxLenEvents);
  193. }
  194. if (max) {
  195. parsedConfig['max'] = Number(max);
  196. }
  197. if (duration) {
  198. parsedConfig['duration'] = Number(duration);
  199. }
  200. parsedConfig['paused'] = paused === '1';
  201. return parsedConfig;
  202. }
  203. /**
  204. * @returns Returns the number of jobs in completed status.
  205. */
  206. getCompletedCount() {
  207. return this.getJobCountByTypes('completed');
  208. }
  209. /**
  210. * Returns the number of jobs in failed status.
  211. */
  212. getFailedCount() {
  213. return this.getJobCountByTypes('failed');
  214. }
  215. /**
  216. * Returns the number of jobs in delayed status.
  217. */
  218. getDelayedCount() {
  219. return this.getJobCountByTypes('delayed');
  220. }
  221. /**
  222. * Returns the number of jobs in active status.
  223. */
  224. getActiveCount() {
  225. return this.getJobCountByTypes('active');
  226. }
  227. /**
  228. * Returns the number of jobs in prioritized status.
  229. */
  230. getPrioritizedCount() {
  231. return this.getJobCountByTypes('prioritized');
  232. }
  233. /**
  234. * Returns the number of jobs per priority.
  235. */
  236. async getCountsPerPriority(priorities) {
  237. const uniquePriorities = [...new Set(priorities)];
  238. const responses = await this.scripts.getCountsPerPriority(uniquePriorities);
  239. const counts = {};
  240. responses.forEach((res, index) => {
  241. counts[`${uniquePriorities[index]}`] = res || 0;
  242. });
  243. return counts;
  244. }
  245. /**
  246. * Returns the number of jobs in waiting or paused statuses.
  247. */
  248. getWaitingCount() {
  249. return this.getJobCountByTypes('waiting');
  250. }
  251. /**
  252. * Returns the number of jobs in waiting-children status.
  253. */
  254. getWaitingChildrenCount() {
  255. return this.getJobCountByTypes('waiting-children');
  256. }
  257. /**
  258. * Returns the jobs that are in the "waiting" status.
  259. * @param start - zero based index from where to start returning jobs.
  260. * @param end - zero based index where to stop returning jobs.
  261. */
  262. getWaiting(start = 0, end = -1) {
  263. return this.getJobs(['waiting'], start, end, true);
  264. }
  265. /**
  266. * Returns the jobs that are in the "waiting-children" status.
  267. * I.E. parent jobs that have at least one child that has not completed yet.
  268. * @param start - zero based index from where to start returning jobs.
  269. * @param end - zero based index where to stop returning jobs.
  270. */
  271. getWaitingChildren(start = 0, end = -1) {
  272. return this.getJobs(['waiting-children'], start, end, true);
  273. }
  274. /**
  275. * Returns the jobs that are in the "active" status.
  276. * @param start - zero based index from where to start returning jobs.
  277. * @param end - zero based index where to stop returning jobs.
  278. */
  279. getActive(start = 0, end = -1) {
  280. return this.getJobs(['active'], start, end, true);
  281. }
  282. /**
  283. * Returns the jobs that are in the "delayed" status.
  284. * @param start - zero based index from where to start returning jobs.
  285. * @param end - zero based index where to stop returning jobs.
  286. */
  287. getDelayed(start = 0, end = -1) {
  288. return this.getJobs(['delayed'], start, end, true);
  289. }
  290. /**
  291. * Returns the jobs that are in the "prioritized" status.
  292. * @param start - zero based index from where to start returning jobs.
  293. * @param end - zero based index where to stop returning jobs.
  294. */
  295. getPrioritized(start = 0, end = -1) {
  296. return this.getJobs(['prioritized'], start, end, true);
  297. }
  298. /**
  299. * Returns the jobs that are in the "completed" status.
  300. * @param start - zero based index from where to start returning jobs.
  301. * @param end - zero based index where to stop returning jobs.
  302. */
  303. getCompleted(start = 0, end = -1) {
  304. return this.getJobs(['completed'], start, end, false);
  305. }
  306. /**
  307. * Returns the jobs that are in the "failed" status.
  308. * @param start - zero based index from where to start returning jobs.
  309. * @param end - zero based index where to stop returning jobs.
  310. */
  311. getFailed(start = 0, end = -1) {
  312. return this.getJobs(['failed'], start, end, false);
  313. }
  314. /**
  315. * Returns the qualified job ids and the raw job data (if available) of the
  316. * children jobs of the given parent job.
  317. * It is possible to get either the already processed children, in this case
  318. * an array of qualified job ids and their result values will be returned,
  319. * or the pending children, in this case an array of qualified job ids will
  320. * be returned.
  321. * A qualified job id is a string representing the job id in a given queue,
  322. * for example: "bull:myqueue:jobid".
  323. *
  324. * @param parentId - The id of the parent job
  325. * @param type - "processed" | "pending"
  326. * @param opts - Options for the query.
  327. *
  328. * @returns an object with the following shape:
  329. * `{ items: { id: string, v?: any, err?: string } [], jobs: JobJsonRaw[], total: number}`
  330. */
  331. async getDependencies(parentId, type, start, end) {
  332. const key = this.toKey(type == 'processed'
  333. ? `${parentId}:processed`
  334. : `${parentId}:dependencies`);
  335. const { items, total, jobs } = await this.scripts.paginate(key, {
  336. start,
  337. end,
  338. fetchJobs: true,
  339. });
  340. return {
  341. items,
  342. jobs,
  343. total,
  344. };
  345. }
  346. async getRanges(types, start = 0, end = 1, asc = false) {
  347. const multiCommands = [];
  348. this.commandByType(types, false, (key, command) => {
  349. switch (command) {
  350. case 'lrange':
  351. multiCommands.push('lrange');
  352. break;
  353. case 'zrange':
  354. multiCommands.push('zrange');
  355. break;
  356. }
  357. });
  358. const responses = await this.scripts.getRanges(types, start, end, asc);
  359. let results = [];
  360. responses.forEach((response, index) => {
  361. const result = response || [];
  362. if (asc && multiCommands[index] === 'lrange') {
  363. results = results.concat(result.reverse());
  364. }
  365. else {
  366. results = results.concat(result);
  367. }
  368. });
  369. return [...new Set(results)];
  370. }
  371. /**
  372. * Returns the jobs that are on the given statuses (note that JobType is synonym for job status)
  373. * @param types - the statuses of the jobs to return.
  374. * @param start - zero based index from where to start returning jobs.
  375. * @param end - zero based index where to stop returning jobs.
  376. * @param asc - if true, the jobs will be returned in ascending order.
  377. */
  378. async getJobs(types, start = 0, end = -1, asc = false) {
  379. const currentTypes = this.sanitizeJobTypes(types);
  380. const jobIds = await this.getRanges(currentTypes, start, end, asc);
  381. return Promise.all(jobIds.map(jobId => this.Job.fromId(this, jobId)));
  382. }
  383. /**
  384. * Returns the logs for a given Job.
  385. * @param jobId - the id of the job to get the logs for.
  386. * @param start - zero based index from where to start returning jobs.
  387. * @param end - zero based index where to stop returning jobs.
  388. * @param asc - if true, the jobs will be returned in ascending order.
  389. */
  390. async getJobLogs(jobId, start = 0, end = -1, asc = true) {
  391. const client = await this.client;
  392. const multi = client.multi();
  393. const logsKey = this.toKey(jobId + ':logs');
  394. if (asc) {
  395. multi.lrange(logsKey, start, end);
  396. }
  397. else {
  398. multi.lrange(logsKey, -(end + 1), -(start + 1));
  399. }
  400. multi.llen(logsKey);
  401. const result = (await multi.exec());
  402. if (!asc) {
  403. result[0][1].reverse();
  404. }
  405. return {
  406. logs: result[0][1],
  407. count: result[1][1],
  408. };
  409. }
  410. async baseGetClients(matcher) {
  411. const client = await this.client;
  412. try {
  413. if (client.isCluster) {
  414. const clusterNodes = client.nodes();
  415. const clientsPerNode = [];
  416. for (let nodeIndex = 0; nodeIndex < clusterNodes.length; nodeIndex++) {
  417. const node = clusterNodes[nodeIndex];
  418. const clients = (await node.client('LIST'));
  419. const list = this.parseClientList(clients, matcher);
  420. clientsPerNode.push(list);
  421. }
  422. const clientsFromNodeWithMostConnections = clientsPerNode.reduce((prev, current) => {
  423. return prev.length > current.length ? prev : current;
  424. }, []);
  425. return clientsFromNodeWithMostConnections;
  426. }
  427. else {
  428. const clients = (await client.client('LIST'));
  429. const list = this.parseClientList(clients, matcher);
  430. return list;
  431. }
  432. }
  433. catch (err) {
  434. if (!utils_1.clientCommandMessageReg.test(err.message)) {
  435. throw err;
  436. }
  437. return [{ name: 'GCP does not support client list' }];
  438. }
  439. }
  440. /**
  441. * Get the worker list related to the queue. i.e. all the known
  442. * workers that are available to process jobs for this queue.
  443. * Note: GCP does not support SETNAME, so this call will not work
  444. *
  445. * @returns - Returns an array with workers info.
  446. */
  447. getWorkers() {
  448. const unnamedWorkerClientName = `${this.clientName()}`;
  449. const namedWorkerClientName = `${this.clientName()}:w:`;
  450. const matcher = (name) => name &&
  451. (name === unnamedWorkerClientName ||
  452. name.startsWith(namedWorkerClientName));
  453. return this.baseGetClients(matcher);
  454. }
  455. /**
  456. * Returns the current count of workers for the queue.
  457. *
  458. * getWorkersCount(): Promise<number>
  459. *
  460. */
  461. async getWorkersCount() {
  462. const workers = await this.getWorkers();
  463. return workers.length;
  464. }
  465. /**
  466. * Get queue events list related to the queue.
  467. * Note: GCP does not support SETNAME, so this call will not work
  468. *
  469. * @deprecated do not use this method, it will be removed in the future.
  470. *
  471. * @returns - Returns an array with queue events info.
  472. */
  473. async getQueueEvents() {
  474. const clientName = `${this.clientName()}${utils_1.QUEUE_EVENT_SUFFIX}`;
  475. return this.baseGetClients((name) => name === clientName);
  476. }
  477. /**
  478. * Get queue metrics related to the queue.
  479. *
  480. * This method returns the gathered metrics for the queue.
  481. * The metrics are represented as an array of job counts
  482. * per unit of time (1 minute).
  483. *
  484. * @param start - Start point of the metrics, where 0
  485. * is the newest point to be returned.
  486. * @param end - End point of the metrics, where -1 is the
  487. * oldest point to be returned.
  488. *
  489. * @returns - Returns an object with queue metrics.
  490. */
  491. async getMetrics(type, start = 0, end = -1) {
  492. const [meta, data, count] = await this.scripts.getMetrics(type, start, end);
  493. return {
  494. meta: {
  495. count: parseInt(meta[0] || '0', 10),
  496. prevTS: parseInt(meta[1] || '0', 10),
  497. prevCount: parseInt(meta[2] || '0', 10),
  498. },
  499. data: data.map(point => +point || 0),
  500. count,
  501. };
  502. }
  503. parseClientList(list, matcher) {
  504. const lines = list.split(/\r?\n/);
  505. const clients = [];
  506. lines.forEach((line) => {
  507. const client = {};
  508. const keyValues = line.split(' ');
  509. keyValues.forEach(function (keyValue) {
  510. const index = keyValue.indexOf('=');
  511. const key = keyValue.substring(0, index);
  512. const value = keyValue.substring(index + 1);
  513. client[key] = value;
  514. });
  515. const name = client['name'];
  516. if (matcher(name)) {
  517. client['name'] = this.name;
  518. client['rawname'] = name;
  519. clients.push(client);
  520. }
  521. });
  522. return clients;
  523. }
  524. /**
  525. * Export the metrics for the queue in the Prometheus format.
  526. * Automatically exports all the counts returned by getJobCounts().
  527. *
  528. * @returns - Returns a string with the metrics in the Prometheus format.
  529. *
  530. * @see {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
  531. *
  532. **/
  533. async exportPrometheusMetrics(globalVariables) {
  534. const counts = await this.getJobCounts();
  535. const metrics = [];
  536. // Match the test's expected HELP text
  537. metrics.push('# HELP bullmq_job_count Number of jobs in the queue by state');
  538. metrics.push('# TYPE bullmq_job_count gauge');
  539. const variables = !globalVariables
  540. ? ''
  541. : Object.keys(globalVariables).reduce((acc, curr) => `${acc}, ${curr}="${globalVariables[curr]}"`, '');
  542. for (const [state, count] of Object.entries(counts)) {
  543. metrics.push(`bullmq_job_count{queue="${this.name}", state="${state}"${variables}} ${count}`);
  544. }
  545. return metrics.join('\n');
  546. }
  547. }
  548. exports.QueueGetters = QueueGetters;
  549. //# sourceMappingURL=queue-getters.js.map