| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- 'use strict';
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.QueueGetters = void 0;
- const tslib_1 = require("tslib");
- const queue_base_1 = require("./queue-base");
- const utils_1 = require("../utils");
- const enums_1 = require("../enums");
- /**
- * Provides different getters for different aspects of a queue.
- */
- class QueueGetters extends queue_base_1.QueueBase {
- getJob(jobId) {
- return this.Job.fromId(this, jobId);
- }
- commandByType(types, count, callback) {
- return types.map((type) => {
- type = type === 'waiting' ? 'wait' : type; // alias
- const key = this.toKey(type);
- switch (type) {
- case 'completed':
- case 'failed':
- case 'delayed':
- case 'prioritized':
- case 'repeat':
- case 'waiting-children':
- return callback(key, count ? 'zcard' : 'zrange');
- case 'active':
- case 'wait':
- case 'paused':
- return callback(key, count ? 'llen' : 'lrange');
- }
- });
- }
- sanitizeJobTypes(types) {
- const currentTypes = typeof types === 'string' ? [types] : types;
- if (Array.isArray(currentTypes) && currentTypes.length > 0) {
- const sanitizedTypes = [...currentTypes];
- if (sanitizedTypes.indexOf('waiting') !== -1) {
- sanitizedTypes.push('paused');
- }
- return [...new Set(sanitizedTypes)];
- }
- return [
- 'active',
- 'completed',
- 'delayed',
- 'failed',
- 'paused',
- 'prioritized',
- 'waiting',
- 'waiting-children',
- ];
- }
- /**
- Returns the number of jobs waiting to be processed. This includes jobs that are
- "waiting" or "delayed" or "prioritized" or "waiting-children".
- */
- async count() {
- const count = await this.getJobCountByTypes('waiting', 'paused', 'delayed', 'prioritized', 'waiting-children');
- return count;
- }
- /**
- * Returns the time to live for a rate limited key in milliseconds.
- * @param maxJobs - max jobs to be considered in rate limit state. If not passed
- * it will return the remaining ttl without considering if max jobs is excedeed.
- * @returns -2 if the key does not exist.
- * -1 if the key exists but has no associated expire.
- * @see {@link https://redis.io/commands/pttl/}
- */
- async getRateLimitTtl(maxJobs) {
- return this.scripts.getRateLimitTtl(maxJobs);
- }
- /**
- * Get jobId that starts debounced state.
- * @deprecated use getDeduplicationJobId method
- *
- * @param id - debounce identifier
- */
- async getDebounceJobId(id) {
- const client = await this.client;
- return client.get(`${this.keys.de}:${id}`);
- }
- /**
- * Get jobId from deduplicated state.
- *
- * @param id - deduplication identifier
- */
- async getDeduplicationJobId(id) {
- const client = await this.client;
- return client.get(`${this.keys.de}:${id}`);
- }
- /**
- * Get global concurrency value.
- * Returns null in case no value is set.
- */
- async getGlobalConcurrency() {
- const client = await this.client;
- const concurrency = await client.hget(this.keys.meta, 'concurrency');
- if (concurrency) {
- return Number(concurrency);
- }
- return null;
- }
- /**
- * Get global rate limit values.
- * Returns null in case no value is set.
- */
- async getGlobalRateLimit() {
- const client = await this.client;
- const [max, duration] = await client.hmget(this.keys.meta, 'max', 'duration');
- if (max && duration) {
- return {
- max: Number(max),
- duration: Number(duration),
- };
- }
- return null;
- }
- /**
- * Job counts by type
- *
- * Queue#getJobCountByTypes('completed') =\> completed count
- * Queue#getJobCountByTypes('completed', 'failed') =\> completed + failed count
- * Queue#getJobCountByTypes('completed', 'waiting', 'failed') =\> completed + waiting + failed count
- */
- async getJobCountByTypes(...types) {
- const result = await this.getJobCounts(...types);
- return Object.values(result).reduce((sum, count) => sum + count, 0);
- }
- /**
- * Returns the job counts for each type specified or every list/set in the queue by default.
- * @param types - the types of jobs to count. If not specified, it will return the counts for all types.
- * @returns An object, key (type) and value (count)
- */
- async getJobCounts(...types) {
- const currentTypes = this.sanitizeJobTypes(types);
- const responses = await this.scripts.getCounts(currentTypes);
- const counts = {};
- responses.forEach((res, index) => {
- counts[currentTypes[index]] = res || 0;
- });
- return counts;
- }
- /**
- * Records job counts as gauge metrics for telemetry purposes.
- * Each job state count is recorded with the queue name and state as attributes.
- * @param types - the types of jobs to count. If not specified, it will return the counts for all types.
- * @returns An object, key (type) and value (count)
- */
- async recordJobCountsMetric(...types) {
- var _a;
- const counts = await this.getJobCounts(...types);
- const meter = (_a = this.opts.telemetry) === null || _a === void 0 ? void 0 : _a.meter;
- if (meter && typeof meter.createGauge === 'function') {
- const gauge = meter.createGauge(enums_1.MetricNames.QueueJobsCount, {
- description: 'Number of jobs in the queue by state',
- unit: '{jobs}',
- });
- for (const [state, jobCount] of Object.entries(counts)) {
- gauge.record(jobCount, {
- [enums_1.TelemetryAttributes.QueueName]: this.name,
- [enums_1.TelemetryAttributes.QueueJobsState]: state,
- });
- }
- }
- return counts;
- }
- /**
- * Get current job state.
- *
- * @param jobId - job identifier.
- * @returns Returns one of these values:
- * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
- */
- getJobState(jobId) {
- return this.scripts.getState(jobId);
- }
- /**
- * Get global queue configuration.
- *
- * @returns Returns the global queue configuration.
- */
- async getMeta() {
- const client = await this.client;
- const config = await client.hgetall(this.keys.meta);
- const { concurrency, max, duration, paused, 'opts.maxLenEvents': maxLenEvents } = config, rest = tslib_1.__rest(config, ["concurrency", "max", "duration", "paused", 'opts.maxLenEvents']);
- const parsedConfig = rest;
- if (concurrency) {
- parsedConfig['concurrency'] = Number(concurrency);
- }
- if (maxLenEvents) {
- parsedConfig['maxLenEvents'] = Number(maxLenEvents);
- }
- if (max) {
- parsedConfig['max'] = Number(max);
- }
- if (duration) {
- parsedConfig['duration'] = Number(duration);
- }
- parsedConfig['paused'] = paused === '1';
- return parsedConfig;
- }
- /**
- * @returns Returns the number of jobs in completed status.
- */
- getCompletedCount() {
- return this.getJobCountByTypes('completed');
- }
- /**
- * Returns the number of jobs in failed status.
- */
- getFailedCount() {
- return this.getJobCountByTypes('failed');
- }
- /**
- * Returns the number of jobs in delayed status.
- */
- getDelayedCount() {
- return this.getJobCountByTypes('delayed');
- }
- /**
- * Returns the number of jobs in active status.
- */
- getActiveCount() {
- return this.getJobCountByTypes('active');
- }
- /**
- * Returns the number of jobs in prioritized status.
- */
- getPrioritizedCount() {
- return this.getJobCountByTypes('prioritized');
- }
- /**
- * Returns the number of jobs per priority.
- */
- async getCountsPerPriority(priorities) {
- const uniquePriorities = [...new Set(priorities)];
- const responses = await this.scripts.getCountsPerPriority(uniquePriorities);
- const counts = {};
- responses.forEach((res, index) => {
- counts[`${uniquePriorities[index]}`] = res || 0;
- });
- return counts;
- }
- /**
- * Returns the number of jobs in waiting or paused statuses.
- */
- getWaitingCount() {
- return this.getJobCountByTypes('waiting');
- }
- /**
- * Returns the number of jobs in waiting-children status.
- */
- getWaitingChildrenCount() {
- return this.getJobCountByTypes('waiting-children');
- }
- /**
- * Returns the jobs that are in the "waiting" status.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getWaiting(start = 0, end = -1) {
- return this.getJobs(['waiting'], start, end, true);
- }
- /**
- * Returns the jobs that are in the "waiting-children" status.
- * I.E. parent jobs that have at least one child that has not completed yet.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getWaitingChildren(start = 0, end = -1) {
- return this.getJobs(['waiting-children'], start, end, true);
- }
- /**
- * Returns the jobs that are in the "active" status.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getActive(start = 0, end = -1) {
- return this.getJobs(['active'], start, end, true);
- }
- /**
- * Returns the jobs that are in the "delayed" status.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getDelayed(start = 0, end = -1) {
- return this.getJobs(['delayed'], start, end, true);
- }
- /**
- * Returns the jobs that are in the "prioritized" status.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getPrioritized(start = 0, end = -1) {
- return this.getJobs(['prioritized'], start, end, true);
- }
- /**
- * Returns the jobs that are in the "completed" status.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getCompleted(start = 0, end = -1) {
- return this.getJobs(['completed'], start, end, false);
- }
- /**
- * Returns the jobs that are in the "failed" status.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- */
- getFailed(start = 0, end = -1) {
- return this.getJobs(['failed'], start, end, false);
- }
- /**
- * Returns the qualified job ids and the raw job data (if available) of the
- * children jobs of the given parent job.
- * It is possible to get either the already processed children, in this case
- * an array of qualified job ids and their result values will be returned,
- * or the pending children, in this case an array of qualified job ids will
- * be returned.
- * A qualified job id is a string representing the job id in a given queue,
- * for example: "bull:myqueue:jobid".
- *
- * @param parentId - The id of the parent job
- * @param type - "processed" | "pending"
- * @param opts - Options for the query.
- *
- * @returns an object with the following shape:
- * `{ items: { id: string, v?: any, err?: string } [], jobs: JobJsonRaw[], total: number}`
- */
- async getDependencies(parentId, type, start, end) {
- const key = this.toKey(type == 'processed'
- ? `${parentId}:processed`
- : `${parentId}:dependencies`);
- const { items, total, jobs } = await this.scripts.paginate(key, {
- start,
- end,
- fetchJobs: true,
- });
- return {
- items,
- jobs,
- total,
- };
- }
- async getRanges(types, start = 0, end = 1, asc = false) {
- const multiCommands = [];
- this.commandByType(types, false, (key, command) => {
- switch (command) {
- case 'lrange':
- multiCommands.push('lrange');
- break;
- case 'zrange':
- multiCommands.push('zrange');
- break;
- }
- });
- const responses = await this.scripts.getRanges(types, start, end, asc);
- let results = [];
- responses.forEach((response, index) => {
- const result = response || [];
- if (asc && multiCommands[index] === 'lrange') {
- results = results.concat(result.reverse());
- }
- else {
- results = results.concat(result);
- }
- });
- return [...new Set(results)];
- }
- /**
- * Returns the jobs that are on the given statuses (note that JobType is synonym for job status)
- * @param types - the statuses of the jobs to return.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- * @param asc - if true, the jobs will be returned in ascending order.
- */
- async getJobs(types, start = 0, end = -1, asc = false) {
- const currentTypes = this.sanitizeJobTypes(types);
- const jobIds = await this.getRanges(currentTypes, start, end, asc);
- return Promise.all(jobIds.map(jobId => this.Job.fromId(this, jobId)));
- }
- /**
- * Returns the logs for a given Job.
- * @param jobId - the id of the job to get the logs for.
- * @param start - zero based index from where to start returning jobs.
- * @param end - zero based index where to stop returning jobs.
- * @param asc - if true, the jobs will be returned in ascending order.
- */
- async getJobLogs(jobId, start = 0, end = -1, asc = true) {
- const client = await this.client;
- const multi = client.multi();
- const logsKey = this.toKey(jobId + ':logs');
- if (asc) {
- multi.lrange(logsKey, start, end);
- }
- else {
- multi.lrange(logsKey, -(end + 1), -(start + 1));
- }
- multi.llen(logsKey);
- const result = (await multi.exec());
- if (!asc) {
- result[0][1].reverse();
- }
- return {
- logs: result[0][1],
- count: result[1][1],
- };
- }
- async baseGetClients(matcher) {
- const client = await this.client;
- try {
- if (client.isCluster) {
- const clusterNodes = client.nodes();
- const clientsPerNode = [];
- for (let nodeIndex = 0; nodeIndex < clusterNodes.length; nodeIndex++) {
- const node = clusterNodes[nodeIndex];
- const clients = (await node.client('LIST'));
- const list = this.parseClientList(clients, matcher);
- clientsPerNode.push(list);
- }
- const clientsFromNodeWithMostConnections = clientsPerNode.reduce((prev, current) => {
- return prev.length > current.length ? prev : current;
- }, []);
- return clientsFromNodeWithMostConnections;
- }
- else {
- const clients = (await client.client('LIST'));
- const list = this.parseClientList(clients, matcher);
- return list;
- }
- }
- catch (err) {
- if (!utils_1.clientCommandMessageReg.test(err.message)) {
- throw err;
- }
- return [{ name: 'GCP does not support client list' }];
- }
- }
- /**
- * Get the worker list related to the queue. i.e. all the known
- * workers that are available to process jobs for this queue.
- * Note: GCP does not support SETNAME, so this call will not work
- *
- * @returns - Returns an array with workers info.
- */
- getWorkers() {
- const unnamedWorkerClientName = `${this.clientName()}`;
- const namedWorkerClientName = `${this.clientName()}:w:`;
- const matcher = (name) => name &&
- (name === unnamedWorkerClientName ||
- name.startsWith(namedWorkerClientName));
- return this.baseGetClients(matcher);
- }
- /**
- * Returns the current count of workers for the queue.
- *
- * getWorkersCount(): Promise<number>
- *
- */
- async getWorkersCount() {
- const workers = await this.getWorkers();
- return workers.length;
- }
- /**
- * Get queue events list related to the queue.
- * Note: GCP does not support SETNAME, so this call will not work
- *
- * @deprecated do not use this method, it will be removed in the future.
- *
- * @returns - Returns an array with queue events info.
- */
- async getQueueEvents() {
- const clientName = `${this.clientName()}${utils_1.QUEUE_EVENT_SUFFIX}`;
- return this.baseGetClients((name) => name === clientName);
- }
- /**
- * Get queue metrics related to the queue.
- *
- * This method returns the gathered metrics for the queue.
- * The metrics are represented as an array of job counts
- * per unit of time (1 minute).
- *
- * @param start - Start point of the metrics, where 0
- * is the newest point to be returned.
- * @param end - End point of the metrics, where -1 is the
- * oldest point to be returned.
- *
- * @returns - Returns an object with queue metrics.
- */
- async getMetrics(type, start = 0, end = -1) {
- const [meta, data, count] = await this.scripts.getMetrics(type, start, end);
- return {
- meta: {
- count: parseInt(meta[0] || '0', 10),
- prevTS: parseInt(meta[1] || '0', 10),
- prevCount: parseInt(meta[2] || '0', 10),
- },
- data: data.map(point => +point || 0),
- count,
- };
- }
- parseClientList(list, matcher) {
- const lines = list.split(/\r?\n/);
- const clients = [];
- lines.forEach((line) => {
- const client = {};
- const keyValues = line.split(' ');
- keyValues.forEach(function (keyValue) {
- const index = keyValue.indexOf('=');
- const key = keyValue.substring(0, index);
- const value = keyValue.substring(index + 1);
- client[key] = value;
- });
- const name = client['name'];
- if (matcher(name)) {
- client['name'] = this.name;
- client['rawname'] = name;
- clients.push(client);
- }
- });
- return clients;
- }
- /**
- * Export the metrics for the queue in the Prometheus format.
- * Automatically exports all the counts returned by getJobCounts().
- *
- * @returns - Returns a string with the metrics in the Prometheus format.
- *
- * @see {@link https://prometheus.io/docs/instrumenting/exposition_formats/}
- *
- **/
- async exportPrometheusMetrics(globalVariables) {
- const counts = await this.getJobCounts();
- const metrics = [];
- // Match the test's expected HELP text
- metrics.push('# HELP bullmq_job_count Number of jobs in the queue by state');
- metrics.push('# TYPE bullmq_job_count gauge');
- const variables = !globalVariables
- ? ''
- : Object.keys(globalVariables).reduce((acc, curr) => `${acc}, ${curr}="${globalVariables[curr]}"`, '');
- for (const [state, count] of Object.entries(counts)) {
- metrics.push(`bullmq_job_count{queue="${this.name}", state="${state}"${variables}} ${count}`);
- }
- return metrics.join('\n');
- }
- }
- exports.QueueGetters = QueueGetters;
- //# sourceMappingURL=queue-getters.js.map
|