| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225 |
- /**
- * Includes all the scripts needed by the queue and jobs.
- */
- 'use strict';
- import { Packr } from 'msgpackr';
- const packer = new Packr({
- useRecords: false,
- encodeUndefinedAsNil: true,
- });
- const pack = packer.pack;
- import { ErrorCode } from '../enums';
- import { array2obj, getParentKey, isRedisVersionLowerThan, objectToFlatArray, } from '../utils';
- import { version as packageVersion } from '../version';
- import { UnrecoverableError } from './errors';
- export class Scripts {
- constructor(queue) {
- this.queue = queue;
- this.version = packageVersion;
- const queueKeys = this.queue.keys;
- this.moveToFinishedKeys = [
- queueKeys.wait,
- queueKeys.active,
- queueKeys.prioritized,
- queueKeys.events,
- queueKeys.stalled,
- queueKeys.limiter,
- queueKeys.delayed,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.pc,
- undefined,
- undefined,
- undefined,
- undefined,
- ];
- }
- execCommand(client, commandName, args) {
- const commandNameWithVersion = `${commandName}:${this.version}`;
- return client[commandNameWithVersion](args);
- }
- async isJobInList(listKey, jobId) {
- const client = await this.queue.client;
- let result;
- if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6', this.queue.databaseType)) {
- result = await this.execCommand(client, 'isJobInList', [listKey, jobId]);
- }
- else {
- result = await client.lpos(listKey, jobId);
- }
- return Number.isInteger(result);
- }
- addDelayedJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.marker,
- queueKeys.meta,
- queueKeys.id,
- queueKeys.delayed,
- queueKeys.completed,
- queueKeys.events,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addDelayedJob(client, job, encodedOpts, args) {
- const argsList = this.addDelayedJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addDelayedJob', argsList);
- }
- addPrioritizedJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.marker,
- queueKeys.meta,
- queueKeys.id,
- queueKeys.prioritized,
- queueKeys.delayed,
- queueKeys.completed,
- queueKeys.active,
- queueKeys.events,
- queueKeys.pc,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addPrioritizedJob(client, job, encodedOpts, args) {
- const argsList = this.addPrioritizedJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addPrioritizedJob', argsList);
- }
- addParentJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.meta,
- queueKeys.id,
- queueKeys.delayed,
- queueKeys['waiting-children'],
- queueKeys.completed,
- queueKeys.events,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addParentJob(client, job, encodedOpts, args) {
- const argsList = this.addParentJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addParentJob', argsList);
- }
- addStandardJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.id,
- queueKeys.completed,
- queueKeys.delayed,
- queueKeys.active,
- queueKeys.events,
- queueKeys.marker,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addStandardJob(client, job, encodedOpts, args) {
- const argsList = this.addStandardJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addStandardJob', argsList);
- }
- async addJob(client, job, opts, jobId, parentKeyOpts = {}) {
- const queueKeys = this.queue.keys;
- const parent = job.parent;
- const args = [
- queueKeys[''],
- typeof jobId !== 'undefined' ? jobId : '',
- job.name,
- job.timestamp,
- job.parentKey || null,
- parentKeyOpts.parentDependenciesKey || null,
- parent,
- job.repeatJobKey,
- job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
- ];
- let encodedOpts;
- if (opts.repeat) {
- const repeat = Object.assign({}, opts.repeat);
- if (repeat.startDate) {
- repeat.startDate = +new Date(repeat.startDate);
- }
- if (repeat.endDate) {
- repeat.endDate = +new Date(repeat.endDate);
- }
- encodedOpts = pack(Object.assign(Object.assign({}, opts), { repeat }));
- }
- else {
- encodedOpts = pack(opts);
- }
- let result;
- if (parentKeyOpts.addToWaitingChildren) {
- result = await this.addParentJob(client, job, encodedOpts, args);
- }
- else if (typeof opts.delay == 'number' && opts.delay > 0) {
- result = await this.addDelayedJob(client, job, encodedOpts, args);
- }
- else if (opts.priority) {
- result = await this.addPrioritizedJob(client, job, encodedOpts, args);
- }
- else {
- result = await this.addStandardJob(client, job, encodedOpts, args);
- }
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- parentKey: parentKeyOpts.parentKey,
- command: 'addJob',
- });
- }
- return result;
- }
- pauseArgs(pause) {
- let src = 'wait', dst = 'paused';
- if (!pause) {
- src = 'paused';
- dst = 'wait';
- }
- const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name));
- keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker);
- const args = [pause ? 'paused' : 'resumed'];
- return keys.concat(args);
- }
- async pause(pause) {
- const client = await this.queue.client;
- const args = this.pauseArgs(pause);
- return this.execCommand(client, 'pause', args);
- }
- addRepeatableJobArgs(customKey, nextMillis, opts, legacyCustomKey) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.repeat,
- queueKeys.delayed,
- ];
- const args = [
- nextMillis,
- pack(opts),
- legacyCustomKey,
- customKey,
- queueKeys[''],
- ];
- return keys.concat(args);
- }
- async addRepeatableJob(customKey, nextMillis, opts, legacyCustomKey) {
- const client = await this.queue.client;
- const args = this.addRepeatableJobArgs(customKey, nextMillis, opts, legacyCustomKey);
- return this.execCommand(client, 'addRepeatableJob', args);
- }
- async removeDeduplicationKey(deduplicationId, jobId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [`${queueKeys.de}:${deduplicationId}`];
- const args = [jobId];
- return this.execCommand(client, 'removeDeduplicationKey', keys.concat(args));
- }
- async addJobScheduler(jobSchedulerId, nextMillis, templateData, templateOpts, opts, delayedJobOpts,
- // The job id of the job that produced this next iteration
- producerId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.repeat,
- queueKeys.delayed,
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.prioritized,
- queueKeys.marker,
- queueKeys.id,
- queueKeys.events,
- queueKeys.pc,
- queueKeys.active,
- ];
- const args = [
- nextMillis,
- pack(opts),
- jobSchedulerId,
- templateData,
- pack(templateOpts),
- pack(delayedJobOpts),
- Date.now(),
- queueKeys[''],
- producerId ? this.queue.toKey(producerId) : '',
- ];
- const result = await this.execCommand(client, 'addJobScheduler', keys.concat(args));
- if (typeof result === 'number' && result < 0) {
- throw this.finishedErrors({
- code: result,
- command: 'addJobScheduler',
- });
- }
- return result;
- }
- async updateRepeatableJobMillis(client, customKey, nextMillis, legacyCustomKey) {
- const args = [
- this.queue.keys.repeat,
- nextMillis,
- customKey,
- legacyCustomKey,
- ];
- return this.execCommand(client, 'updateRepeatableJobMillis', args);
- }
- async updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, templateData, delayedJobOpts,
- // The job id of the job that produced this next iteration - TODO: remove in next breaking change
- producerId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.repeat,
- queueKeys.delayed,
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.prioritized,
- queueKeys.marker,
- queueKeys.id,
- queueKeys.events,
- queueKeys.pc,
- producerId ? this.queue.toKey(producerId) : '',
- queueKeys.active,
- ];
- const args = [
- nextMillis,
- jobSchedulerId,
- templateData,
- pack(delayedJobOpts),
- Date.now(),
- queueKeys[''],
- producerId,
- ];
- return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
- }
- removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {
- const queueKeys = this.queue.keys;
- const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events];
- const args = [
- legacyRepeatJobId,
- this.getRepeatConcatOptions(repeatConcatOptions, repeatJobKey),
- repeatJobKey,
- queueKeys[''],
- ];
- return keys.concat(args);
- }
- // TODO: remove this check in next breaking change
- getRepeatConcatOptions(repeatConcatOptions, repeatJobKey) {
- if (repeatJobKey && repeatJobKey.split(':').length > 2) {
- return repeatJobKey;
- }
- return repeatConcatOptions;
- }
- async removeRepeatable(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {
- const client = await this.queue.client;
- const args = this.removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey);
- return this.execCommand(client, 'removeRepeatable', args);
- }
- async removeJobScheduler(jobSchedulerId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events];
- const args = [jobSchedulerId, queueKeys['']];
- return this.execCommand(client, 'removeJobScheduler', keys.concat(args));
- }
- removeArgs(jobId, removeChildren) {
- const keys = [jobId, 'repeat'].map(name => this.queue.toKey(name));
- const args = [jobId, removeChildren ? 1 : 0, this.queue.toKey('')];
- return keys.concat(args);
- }
- async remove(jobId, removeChildren) {
- const client = await this.queue.client;
- const args = this.removeArgs(jobId, removeChildren);
- const result = await this.execCommand(client, 'removeJob', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'removeJob',
- });
- }
- return result;
- }
- async removeUnprocessedChildren(jobId) {
- const client = await this.queue.client;
- const args = [
- this.queue.toKey(jobId),
- this.queue.keys.meta,
- this.queue.toKey(''),
- jobId,
- ];
- await this.execCommand(client, 'removeUnprocessedChildren', args);
- }
- async extendLock(jobId, token, duration, client) {
- client = client || (await this.queue.client);
- const args = [
- this.queue.toKey(jobId) + ':lock',
- this.queue.keys.stalled,
- token,
- duration,
- jobId,
- ];
- return this.execCommand(client, 'extendLock', args);
- }
- async extendLocks(jobIds, tokens, duration) {
- const client = await this.queue.client;
- const args = [
- this.queue.keys.stalled,
- this.queue.toKey(''),
- pack(tokens),
- pack(jobIds),
- duration,
- ];
- return this.execCommand(client, 'extendLocks', args);
- }
- async updateData(job, data) {
- const client = await this.queue.client;
- const keys = [this.queue.toKey(job.id)];
- const dataJson = JSON.stringify(data);
- const result = await this.execCommand(client, 'updateData', keys.concat([dataJson]));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId: job.id,
- command: 'updateData',
- });
- }
- }
- async updateProgress(jobId, progress) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(jobId),
- this.queue.keys.events,
- this.queue.keys.meta,
- ];
- const progressJson = JSON.stringify(progress);
- const result = await this.execCommand(client, 'updateProgress', keys.concat([jobId, progressJson]));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'updateProgress',
- });
- }
- }
- async addLog(jobId, logRow, keepLogs) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(jobId),
- this.queue.toKey(jobId) + ':logs',
- ];
- const result = await this.execCommand(client, 'addLog', keys.concat([jobId, logRow, keepLogs ? keepLogs : '']));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'addLog',
- });
- }
- return result;
- }
- moveToFinishedArgs(job, val, propVal, shouldRemove, target, token, timestamp, fetchNext = true, fieldsToUpdate) {
- var _a, _b, _c, _d, _e, _f, _g;
- const queueKeys = this.queue.keys;
- const opts = this.queue.opts;
- const workerKeepJobs = target === 'completed' ? opts.removeOnComplete : opts.removeOnFail;
- const metricsKey = this.queue.toKey(`metrics:${target}`);
- const keys = this.moveToFinishedKeys;
- keys[10] = queueKeys[target];
- keys[11] = this.queue.toKey((_a = job.id) !== null && _a !== void 0 ? _a : '');
- keys[12] = metricsKey;
- keys[13] = this.queue.keys.marker;
- const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);
- const args = [
- job.id,
- timestamp,
- propVal,
- typeof val === 'undefined' ? 'null' : val,
- target,
- !fetchNext || this.queue.closing ? 0 : 1,
- queueKeys[''],
- pack({
- token,
- name: opts.name,
- keepJobs,
- limiter: opts.limiter,
- lockDuration: opts.lockDuration,
- attempts: job.opts.attempts,
- maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints)
- ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints
- : '',
- fpof: !!((_d = job.opts) === null || _d === void 0 ? void 0 : _d.failParentOnFailure),
- cpof: !!((_e = job.opts) === null || _e === void 0 ? void 0 : _e.continueParentOnFailure),
- idof: !!((_f = job.opts) === null || _f === void 0 ? void 0 : _f.ignoreDependencyOnFailure),
- rdof: !!((_g = job.opts) === null || _g === void 0 ? void 0 : _g.removeDependencyOnFailure),
- }),
- fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
- ];
- return keys.concat(args);
- }
- getKeepJobs(shouldRemove, workerKeepJobs) {
- if (typeof shouldRemove === 'undefined') {
- return workerKeepJobs || { count: shouldRemove ? 0 : -1 };
- }
- return typeof shouldRemove === 'object'
- ? shouldRemove
- : typeof shouldRemove === 'number'
- ? { count: shouldRemove }
- : { count: shouldRemove ? 0 : -1 };
- }
- async moveToFinished(jobId, args) {
- const client = await this.queue.client;
- const result = await this.execCommand(client, 'moveToFinished', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveToFinished',
- state: 'active',
- });
- }
- else {
- if (typeof result !== 'undefined') {
- return raw2NextJobData(result);
- }
- }
- }
- drainArgs(delayed) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.delayed,
- queueKeys.prioritized,
- queueKeys.repeat,
- ];
- const args = [queueKeys[''], delayed ? '1' : '0'];
- return keys.concat(args);
- }
- async drain(delayed) {
- const client = await this.queue.client;
- const args = this.drainArgs(delayed);
- return this.execCommand(client, 'drain', args);
- }
- removeChildDependencyArgs(jobId, parentKey) {
- const queueKeys = this.queue.keys;
- const keys = [queueKeys['']];
- const args = [this.queue.toKey(jobId), parentKey];
- return keys.concat(args);
- }
- async removeChildDependency(jobId, parentKey) {
- const client = await this.queue.client;
- const args = this.removeChildDependencyArgs(jobId, parentKey);
- const result = await this.execCommand(client, 'removeChildDependency', args);
- switch (result) {
- case 0:
- return true;
- case 1:
- return false;
- default:
- throw this.finishedErrors({
- code: result,
- jobId,
- parentKey,
- command: 'removeChildDependency',
- });
- }
- }
- getRangesArgs(types, start, end, asc) {
- const queueKeys = this.queue.keys;
- const transformedTypes = types.map(type => {
- return type === 'waiting' ? 'wait' : type;
- });
- const keys = [queueKeys['']];
- const args = [start, end, asc ? '1' : '0', ...transformedTypes];
- return keys.concat(args);
- }
- async getRanges(types, start = 0, end = 1, asc = false) {
- const client = await this.queue.client;
- const args = this.getRangesArgs(types, start, end, asc);
- return await this.execCommand(client, 'getRanges', args);
- }
- getCountsArgs(types) {
- const queueKeys = this.queue.keys;
- const transformedTypes = types.map(type => {
- return type === 'waiting' ? 'wait' : type;
- });
- const keys = [queueKeys['']];
- const args = [...transformedTypes];
- return keys.concat(args);
- }
- async getCounts(types) {
- const client = await this.queue.client;
- const args = this.getCountsArgs(types);
- return await this.execCommand(client, 'getCounts', args);
- }
- getCountsPerPriorityArgs(priorities) {
- const keys = [
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.prioritized,
- ];
- const args = priorities;
- return keys.concat(args);
- }
- async getCountsPerPriority(priorities) {
- const client = await this.queue.client;
- const args = this.getCountsPerPriorityArgs(priorities);
- return await this.execCommand(client, 'getCountsPerPriority', args);
- }
- getDependencyCountsArgs(jobId, types) {
- const keys = [
- `${jobId}:processed`,
- `${jobId}:dependencies`,
- `${jobId}:failed`,
- `${jobId}:unsuccessful`,
- ].map(name => {
- return this.queue.toKey(name);
- });
- const args = types;
- return keys.concat(args);
- }
- async getDependencyCounts(jobId, types) {
- const client = await this.queue.client;
- const args = this.getDependencyCountsArgs(jobId, types);
- return await this.execCommand(client, 'getDependencyCounts', args);
- }
- moveToCompletedArgs(job, returnvalue, removeOnComplete, token, fetchNext = false) {
- const timestamp = Date.now();
- return this.moveToFinishedArgs(job, returnvalue, 'returnvalue', removeOnComplete, 'completed', token, timestamp, fetchNext);
- }
- moveToFailedArgs(job, failedReason, removeOnFailed, token, fetchNext = false, fieldsToUpdate) {
- const timestamp = Date.now();
- return this.moveToFinishedArgs(job, failedReason, 'failedReason', removeOnFailed, 'failed', token, timestamp, fetchNext, fieldsToUpdate);
- }
- async isFinished(jobId, returnValue = false) {
- const client = await this.queue.client;
- const keys = ['completed', 'failed', jobId].map((key) => {
- return this.queue.toKey(key);
- });
- return this.execCommand(client, 'isFinished', keys.concat([jobId, returnValue ? '1' : '']));
- }
- async getState(jobId) {
- const client = await this.queue.client;
- const keys = [
- 'completed',
- 'failed',
- 'delayed',
- 'active',
- 'wait',
- 'paused',
- 'waiting-children',
- 'prioritized',
- ].map((key) => {
- return this.queue.toKey(key);
- });
- if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6', this.queue.databaseType)) {
- return this.execCommand(client, 'getState', keys.concat([jobId]));
- }
- return this.execCommand(client, 'getStateV2', keys.concat([jobId]));
- }
- /**
- * Change delay of a delayed job.
- *
- * Reschedules a delayed job by setting a new delay from the current time.
- * For example, calling changeDelay(5000) will reschedule the job to execute
- * 5000 milliseconds (5 seconds) from now, regardless of the original delay.
- *
- * @param jobId - the ID of the job to change the delay for.
- * @param delay - milliseconds from now when the job should be processed.
- * @returns delay in milliseconds.
- * @throws JobNotExist
- * This exception is thrown if jobId is missing.
- * @throws JobNotInState
- * This exception is thrown if job is not in delayed state.
- */
- async changeDelay(jobId, delay) {
- const client = await this.queue.client;
- const args = this.changeDelayArgs(jobId, delay);
- const result = await this.execCommand(client, 'changeDelay', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'changeDelay',
- state: 'delayed',
- });
- }
- }
- changeDelayArgs(jobId, delay) {
- const timestamp = Date.now();
- const keys = [
- this.queue.keys.delayed,
- this.queue.keys.meta,
- this.queue.keys.marker,
- this.queue.keys.events,
- ];
- return keys.concat([
- delay,
- JSON.stringify(timestamp),
- jobId,
- this.queue.toKey(jobId),
- ]);
- }
- async changePriority(jobId, priority = 0, lifo = false) {
- const client = await this.queue.client;
- const args = this.changePriorityArgs(jobId, priority, lifo);
- const result = await this.execCommand(client, 'changePriority', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'changePriority',
- });
- }
- }
- changePriorityArgs(jobId, priority = 0, lifo = false) {
- const keys = [
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.prioritized,
- this.queue.keys.active,
- this.queue.keys.pc,
- this.queue.keys.marker,
- ];
- return keys.concat([priority, this.queue.toKey(''), jobId, lifo ? 1 : 0]);
- }
- moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) {
- const queueKeys = this.queue.keys;
- const workerOpts = this.queue.opts;
- const keys = [
- queueKeys.marker,
- queueKeys.active,
- queueKeys.prioritized,
- queueKeys.delayed,
- this.queue.toKey(jobId),
- queueKeys.events,
- queueKeys.meta,
- queueKeys.stalled,
- queueKeys.wait,
- queueKeys.limiter,
- queueKeys.paused,
- queueKeys.pc,
- ];
- const fetchNext = opts.fetchNext && !this.queue.closing ? 1 : 0;
- return keys.concat([
- this.queue.keys[''],
- timestamp,
- jobId,
- token,
- delay,
- opts.skipAttempt ? '1' : '0',
- opts.fieldsToUpdate
- ? pack(objectToFlatArray(opts.fieldsToUpdate))
- : void 0,
- fetchNext,
- fetchNext
- ? pack({
- token,
- lockDuration: workerOpts.lockDuration,
- limiter: workerOpts.limiter,
- name: workerOpts.name,
- })
- : void 0,
- ]);
- }
- moveToWaitingChildrenArgs(jobId, token, opts) {
- const timestamp = Date.now();
- const childKey = getParentKey(opts.child);
- const keys = [
- 'active',
- 'waiting-children',
- jobId,
- `${jobId}:dependencies`,
- `${jobId}:unsuccessful`,
- 'stalled',
- 'events',
- ].map(name => {
- return this.queue.toKey(name);
- });
- return keys.concat([
- token,
- childKey !== null && childKey !== void 0 ? childKey : '',
- JSON.stringify(timestamp),
- jobId,
- this.queue.toKey(''),
- ]);
- }
- isMaxedArgs() {
- const queueKeys = this.queue.keys;
- const keys = [queueKeys.meta, queueKeys.active];
- return keys;
- }
- async isMaxed() {
- const client = await this.queue.client;
- const args = this.isMaxedArgs();
- return !!(await this.execCommand(client, 'isMaxed', args));
- }
- async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) {
- const client = await this.queue.client;
- const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
- const result = await this.execCommand(client, 'moveToDelayed', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveToDelayed',
- state: 'active',
- });
- }
- else if (typeof result !== 'undefined') {
- return raw2NextJobData(result);
- }
- }
- /**
- * Move parent job to waiting-children state.
- *
- * @returns true if job is successfully moved, false if there are pending dependencies.
- * @throws JobNotExist
- * This exception is thrown if jobId is missing.
- * @throws JobLockNotExist
- * This exception is thrown if job lock is missing.
- * @throws JobNotInState
- * This exception is thrown if job is not in active state.
- */
- async moveToWaitingChildren(jobId, token, opts = {}) {
- const client = await this.queue.client;
- const args = this.moveToWaitingChildrenArgs(jobId, token, opts);
- const result = await this.execCommand(client, 'moveToWaitingChildren', args);
- switch (result) {
- case 0:
- return true;
- case 1:
- return false;
- default:
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveToWaitingChildren',
- state: 'active',
- });
- }
- }
- getRateLimitTtlArgs(maxJobs) {
- const keys = [
- this.queue.keys.limiter,
- this.queue.keys.meta,
- ];
- return keys.concat([maxJobs !== null && maxJobs !== void 0 ? maxJobs : '0']);
- }
- async getRateLimitTtl(maxJobs) {
- const client = await this.queue.client;
- const args = this.getRateLimitTtlArgs(maxJobs);
- return this.execCommand(client, 'getRateLimitTtl', args);
- }
- /**
- * Remove jobs in a specific state.
- *
- * @returns Id jobs from the deleted records.
- */
- async cleanJobsInSet(set, timestamp, limit = 0) {
- const client = await this.queue.client;
- return this.execCommand(client, 'cleanJobsInSet', [
- this.queue.toKey(set),
- this.queue.toKey('events'),
- this.queue.toKey('repeat'),
- this.queue.toKey(''),
- timestamp,
- limit,
- set,
- ]);
- }
- getJobSchedulerArgs(id) {
- const keys = [this.queue.keys.repeat];
- return keys.concat([id]);
- }
- async getJobScheduler(id) {
- const client = await this.queue.client;
- const args = this.getJobSchedulerArgs(id);
- return this.execCommand(client, 'getJobScheduler', args);
- }
- retryJobArgs(jobId, lifo, token, opts = {}) {
- const keys = [
- this.queue.keys.active,
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.toKey(jobId),
- this.queue.keys.meta,
- this.queue.keys.events,
- this.queue.keys.delayed,
- this.queue.keys.prioritized,
- this.queue.keys.pc,
- this.queue.keys.marker,
- this.queue.keys.stalled,
- ];
- const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
- return keys.concat([
- this.queue.toKey(''),
- Date.now(),
- pushCmd,
- jobId,
- token,
- opts.fieldsToUpdate
- ? pack(objectToFlatArray(opts.fieldsToUpdate))
- : void 0,
- ]);
- }
- async retryJob(jobId, lifo, token = '0', opts = {}) {
- const client = await this.queue.client;
- const args = this.retryJobArgs(jobId, lifo, token, opts);
- const result = await this.execCommand(client, 'retryJob', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'retryJob',
- state: 'active',
- });
- }
- }
- moveJobsToWaitArgs(state, count, timestamp) {
- const keys = [
- this.queue.toKey(''),
- this.queue.keys.events,
- this.queue.toKey(state),
- this.queue.toKey('wait'),
- this.queue.toKey('paused'),
- this.queue.keys.meta,
- this.queue.keys.active,
- this.queue.keys.marker,
- ];
- const args = [count, timestamp, state];
- return keys.concat(args);
- }
- async retryJobs(state = 'failed', count = 1000, timestamp = new Date().getTime()) {
- const client = await this.queue.client;
- const args = this.moveJobsToWaitArgs(state, count, timestamp);
- return this.execCommand(client, 'moveJobsToWait', args);
- }
- async promoteJobs(count = 1000) {
- const client = await this.queue.client;
- const args = this.moveJobsToWaitArgs('delayed', count, Number.MAX_VALUE);
- return this.execCommand(client, 'moveJobsToWait', args);
- }
- /**
- * Attempts to reprocess a job
- *
- * @param job - The job to reprocess
- * @param state - The expected job state. If the job is not found
- * on the provided state, then it's not reprocessed. Supported states: 'failed', 'completed'
- *
- * @returns A promise that resolves when the job has been successfully moved to the wait queue.
- * @throws Will throw an error with a code property indicating the failure reason:
- * - code 0: Job does not exist
- * - code -1: Job is currently locked and can't be retried
- * - code -2: Job was not found in the expected set
- */
- async reprocessJob(job, state, opts = {}) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(job.id),
- this.queue.keys.events,
- this.queue.toKey(state),
- this.queue.keys.wait,
- this.queue.keys.meta,
- this.queue.keys.paused,
- this.queue.keys.active,
- this.queue.keys.marker,
- ];
- const args = [
- job.id,
- (job.opts.lifo ? 'R' : 'L') + 'PUSH',
- state === 'failed' ? 'failedReason' : 'returnvalue',
- state,
- opts.resetAttemptsMade ? '1' : '0',
- opts.resetAttemptsStarted ? '1' : '0',
- ];
- const result = await this.execCommand(client, 'reprocessJob', keys.concat(args));
- switch (result) {
- case 1:
- return;
- default:
- throw this.finishedErrors({
- code: result,
- jobId: job.id,
- command: 'reprocessJob',
- state,
- });
- }
- }
- async getMetrics(type, start = 0, end = -1) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(`metrics:${type}`),
- this.queue.toKey(`metrics:${type}:data`),
- ];
- const args = [start, end];
- const result = await this.execCommand(client, 'getMetrics', keys.concat(args));
- return result;
- }
- async moveToActive(client, token, name) {
- const opts = this.queue.opts;
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.wait,
- queueKeys.active,
- queueKeys.prioritized,
- queueKeys.events,
- queueKeys.stalled,
- queueKeys.limiter,
- queueKeys.delayed,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.pc,
- queueKeys.marker,
- ];
- const args = [
- queueKeys[''],
- Date.now(),
- pack({
- token,
- lockDuration: opts.lockDuration,
- limiter: opts.limiter,
- name,
- }),
- ];
- const result = await this.execCommand(client, 'moveToActive', keys.concat(args));
- return raw2NextJobData(result);
- }
- async promote(jobId) {
- const client = await this.queue.client;
- const keys = [
- this.queue.keys.delayed,
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.prioritized,
- this.queue.keys.active,
- this.queue.keys.pc,
- this.queue.keys.events,
- this.queue.keys.marker,
- ];
- const args = [this.queue.toKey(''), jobId];
- const code = await this.execCommand(client, 'promote', keys.concat(args));
- if (code < 0) {
- throw this.finishedErrors({
- code,
- jobId,
- command: 'promote',
- state: 'delayed',
- });
- }
- }
- moveStalledJobsToWaitArgs() {
- const opts = this.queue.opts;
- const keys = [
- this.queue.keys.stalled,
- this.queue.keys.wait,
- this.queue.keys.active,
- this.queue.keys['stalled-check'],
- this.queue.keys.meta,
- this.queue.keys.paused,
- this.queue.keys.marker,
- this.queue.keys.events,
- ];
- const args = [
- opts.maxStalledCount,
- this.queue.toKey(''),
- Date.now(),
- opts.stalledInterval,
- ];
- return keys.concat(args);
- }
- /**
- * Looks for unlocked jobs in the active queue.
- *
- * The job was being worked on, but the worker process died and it failed to renew the lock.
- * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them
- * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait,
- * (e.g. if the job handler keeps crashing),
- * we limit the number stalled job recoveries to settings.maxStalledCount.
- */
- async moveStalledJobsToWait() {
- const client = await this.queue.client;
- const args = this.moveStalledJobsToWaitArgs();
- return this.execCommand(client, 'moveStalledJobsToWait', args);
- }
- /**
- * Moves a job back from Active to Wait.
- * This script is used when a job has been manually rate limited and needs
- * to be moved back to wait from active status.
- *
- * @param client - Redis client
- * @param jobId - Job id
- * @returns
- */
- async moveJobFromActiveToWait(jobId, token = '0') {
- const client = await this.queue.client;
- const keys = [
- this.queue.keys.active,
- this.queue.keys.wait,
- this.queue.keys.stalled,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.limiter,
- this.queue.keys.prioritized,
- this.queue.keys.marker,
- this.queue.keys.events,
- ];
- const args = [jobId, token, this.queue.toKey(jobId)];
- const result = await this.execCommand(client, 'moveJobFromActiveToWait', keys.concat(args));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveJobFromActiveToWait',
- state: 'active',
- });
- }
- return result;
- }
- async obliterate(opts) {
- const client = await this.queue.client;
- const keys = [
- this.queue.keys.meta,
- this.queue.toKey(''),
- ];
- const args = [opts.count, opts.force ? 'force' : null];
- const result = await this.execCommand(client, 'obliterate', keys.concat(args));
- if (result < 0) {
- switch (result) {
- case -1:
- throw new Error('Cannot obliterate non-paused queue');
- case -2:
- throw new Error('Cannot obliterate queue with active jobs');
- }
- }
- return result;
- }
- /**
- * Paginate a set or hash keys.
- * @param opts - options to define the pagination behaviour
- *
- */
- async paginate(key, opts) {
- const client = await this.queue.client;
- const keys = [key];
- const maxIterations = 5;
- const pageSize = opts.end >= 0 ? opts.end - opts.start + 1 : Infinity;
- let cursor = '0', offset = 0, items, total, rawJobs, page = [], jobs = [];
- do {
- const args = [
- opts.start + page.length,
- opts.end,
- cursor,
- offset,
- maxIterations,
- ];
- if (opts.fetchJobs) {
- args.push(1);
- }
- [cursor, offset, items, total, rawJobs] = await this.execCommand(client, 'paginate', keys.concat(args));
- page = page.concat(items);
- if (rawJobs && rawJobs.length) {
- jobs = jobs.concat(rawJobs.map(array2obj));
- }
- // Important to keep this coercive inequality (!=) instead of strict inequality (!==)
- } while (cursor != '0' && page.length < pageSize);
- // If we get an array of arrays, it means we are paginating a hash
- if (page.length && Array.isArray(page[0])) {
- const result = [];
- for (let index = 0; index < page.length; index++) {
- const [id, value] = page[index];
- try {
- result.push({ id, v: JSON.parse(value) });
- }
- catch (err) {
- result.push({ id, err: err.message });
- }
- }
- return {
- cursor,
- items: result,
- total,
- jobs,
- };
- }
- else {
- return {
- cursor,
- items: page.map(item => ({ id: item })),
- total,
- jobs,
- };
- }
- }
- finishedErrors({ code, jobId, parentKey, command, state, }) {
- let error;
- switch (code) {
- case ErrorCode.JobNotExist:
- error = new Error(`Missing key for job ${jobId}. ${command}`);
- break;
- case ErrorCode.JobLockNotExist:
- error = new Error(`Missing lock for job ${jobId}. ${command}`);
- break;
- case ErrorCode.JobNotInState:
- error = new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
- break;
- case ErrorCode.JobPendingChildren:
- error = new Error(`Job ${jobId} has pending dependencies. ${command}`);
- break;
- case ErrorCode.ParentJobNotExist:
- error = new Error(`Missing key for parent job ${parentKey}. ${command}`);
- break;
- case ErrorCode.JobLockMismatch:
- error = new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
- break;
- case ErrorCode.ParentJobCannotBeReplaced:
- error = new Error(`The parent job ${parentKey} cannot be replaced. ${command}`);
- break;
- case ErrorCode.JobBelongsToJobScheduler:
- error = new Error(`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`);
- break;
- case ErrorCode.JobHasFailedChildren:
- error = new UnrecoverableError(`Cannot complete job ${jobId} because it has at least one failed child. ${command}`);
- break;
- case ErrorCode.SchedulerJobIdCollision:
- error = new Error(`Cannot create job scheduler iteration - job ID already exists. ${command}`);
- break;
- case ErrorCode.SchedulerJobSlotsBusy:
- error = new Error(`Cannot create job scheduler iteration - current and next time slots already have jobs. ${command}`);
- break;
- default:
- error = new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
- }
- // Add the code property to the error object
- error.code = code;
- return error;
- }
- async removeOrphanedJobs(candidateJobIds, stateKeySuffixes, jobSubKeySuffixes) {
- const client = await this.queue.client;
- const args = [
- this.queue.toKey(''),
- stateKeySuffixes.length,
- ...stateKeySuffixes,
- jobSubKeySuffixes.length,
- ...jobSubKeySuffixes,
- ...candidateJobIds,
- ];
- return this.execCommand(client, 'removeOrphanedJobs', args);
- }
- }
- export function raw2NextJobData(raw) {
- if (raw) {
- const result = [null, raw[1], raw[2], raw[3]];
- if (raw[0]) {
- result[0] = array2obj(raw[0]);
- }
- return result;
- }
- return [];
- }
- //# sourceMappingURL=scripts.js.map
|