| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229 |
- /**
- * Includes all the scripts needed by the queue and jobs.
- */
- 'use strict';
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.Scripts = void 0;
- exports.raw2NextJobData = raw2NextJobData;
- const msgpackr_1 = require("msgpackr");
- const packer = new msgpackr_1.Packr({
- useRecords: false,
- encodeUndefinedAsNil: true,
- });
- const pack = packer.pack;
- const enums_1 = require("../enums");
- const utils_1 = require("../utils");
- const version_1 = require("../version");
- const errors_1 = require("./errors");
- class Scripts {
- constructor(queue) {
- this.queue = queue;
- this.version = version_1.version;
- const queueKeys = this.queue.keys;
- this.moveToFinishedKeys = [
- queueKeys.wait,
- queueKeys.active,
- queueKeys.prioritized,
- queueKeys.events,
- queueKeys.stalled,
- queueKeys.limiter,
- queueKeys.delayed,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.pc,
- undefined,
- undefined,
- undefined,
- undefined,
- ];
- }
- execCommand(client, commandName, args) {
- const commandNameWithVersion = `${commandName}:${this.version}`;
- return client[commandNameWithVersion](args);
- }
- async isJobInList(listKey, jobId) {
- const client = await this.queue.client;
- let result;
- if ((0, utils_1.isRedisVersionLowerThan)(this.queue.redisVersion, '6.0.6', this.queue.databaseType)) {
- result = await this.execCommand(client, 'isJobInList', [listKey, jobId]);
- }
- else {
- result = await client.lpos(listKey, jobId);
- }
- return Number.isInteger(result);
- }
- addDelayedJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.marker,
- queueKeys.meta,
- queueKeys.id,
- queueKeys.delayed,
- queueKeys.completed,
- queueKeys.events,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addDelayedJob(client, job, encodedOpts, args) {
- const argsList = this.addDelayedJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addDelayedJob', argsList);
- }
- addPrioritizedJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.marker,
- queueKeys.meta,
- queueKeys.id,
- queueKeys.prioritized,
- queueKeys.delayed,
- queueKeys.completed,
- queueKeys.active,
- queueKeys.events,
- queueKeys.pc,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addPrioritizedJob(client, job, encodedOpts, args) {
- const argsList = this.addPrioritizedJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addPrioritizedJob', argsList);
- }
- addParentJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.meta,
- queueKeys.id,
- queueKeys.delayed,
- queueKeys['waiting-children'],
- queueKeys.completed,
- queueKeys.events,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addParentJob(client, job, encodedOpts, args) {
- const argsList = this.addParentJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addParentJob', argsList);
- }
- addStandardJobArgs(job, encodedOpts, args) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.id,
- queueKeys.completed,
- queueKeys.delayed,
- queueKeys.active,
- queueKeys.events,
- queueKeys.marker,
- ];
- keys.push(pack(args), job.data, encodedOpts);
- return keys;
- }
- addStandardJob(client, job, encodedOpts, args) {
- const argsList = this.addStandardJobArgs(job, encodedOpts, args);
- return this.execCommand(client, 'addStandardJob', argsList);
- }
- async addJob(client, job, opts, jobId, parentKeyOpts = {}) {
- const queueKeys = this.queue.keys;
- const parent = job.parent;
- const args = [
- queueKeys[''],
- typeof jobId !== 'undefined' ? jobId : '',
- job.name,
- job.timestamp,
- job.parentKey || null,
- parentKeyOpts.parentDependenciesKey || null,
- parent,
- job.repeatJobKey,
- job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
- ];
- let encodedOpts;
- if (opts.repeat) {
- const repeat = Object.assign({}, opts.repeat);
- if (repeat.startDate) {
- repeat.startDate = +new Date(repeat.startDate);
- }
- if (repeat.endDate) {
- repeat.endDate = +new Date(repeat.endDate);
- }
- encodedOpts = pack(Object.assign(Object.assign({}, opts), { repeat }));
- }
- else {
- encodedOpts = pack(opts);
- }
- let result;
- if (parentKeyOpts.addToWaitingChildren) {
- result = await this.addParentJob(client, job, encodedOpts, args);
- }
- else if (typeof opts.delay == 'number' && opts.delay > 0) {
- result = await this.addDelayedJob(client, job, encodedOpts, args);
- }
- else if (opts.priority) {
- result = await this.addPrioritizedJob(client, job, encodedOpts, args);
- }
- else {
- result = await this.addStandardJob(client, job, encodedOpts, args);
- }
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- parentKey: parentKeyOpts.parentKey,
- command: 'addJob',
- });
- }
- return result;
- }
- pauseArgs(pause) {
- let src = 'wait', dst = 'paused';
- if (!pause) {
- src = 'paused';
- dst = 'wait';
- }
- const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name));
- keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker);
- const args = [pause ? 'paused' : 'resumed'];
- return keys.concat(args);
- }
- async pause(pause) {
- const client = await this.queue.client;
- const args = this.pauseArgs(pause);
- return this.execCommand(client, 'pause', args);
- }
- addRepeatableJobArgs(customKey, nextMillis, opts, legacyCustomKey) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.repeat,
- queueKeys.delayed,
- ];
- const args = [
- nextMillis,
- pack(opts),
- legacyCustomKey,
- customKey,
- queueKeys[''],
- ];
- return keys.concat(args);
- }
- async addRepeatableJob(customKey, nextMillis, opts, legacyCustomKey) {
- const client = await this.queue.client;
- const args = this.addRepeatableJobArgs(customKey, nextMillis, opts, legacyCustomKey);
- return this.execCommand(client, 'addRepeatableJob', args);
- }
- async removeDeduplicationKey(deduplicationId, jobId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [`${queueKeys.de}:${deduplicationId}`];
- const args = [jobId];
- return this.execCommand(client, 'removeDeduplicationKey', keys.concat(args));
- }
- async addJobScheduler(jobSchedulerId, nextMillis, templateData, templateOpts, opts, delayedJobOpts,
- // The job id of the job that produced this next iteration
- producerId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.repeat,
- queueKeys.delayed,
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.prioritized,
- queueKeys.marker,
- queueKeys.id,
- queueKeys.events,
- queueKeys.pc,
- queueKeys.active,
- ];
- const args = [
- nextMillis,
- pack(opts),
- jobSchedulerId,
- templateData,
- pack(templateOpts),
- pack(delayedJobOpts),
- Date.now(),
- queueKeys[''],
- producerId ? this.queue.toKey(producerId) : '',
- ];
- const result = await this.execCommand(client, 'addJobScheduler', keys.concat(args));
- if (typeof result === 'number' && result < 0) {
- throw this.finishedErrors({
- code: result,
- command: 'addJobScheduler',
- });
- }
- return result;
- }
- async updateRepeatableJobMillis(client, customKey, nextMillis, legacyCustomKey) {
- const args = [
- this.queue.keys.repeat,
- nextMillis,
- customKey,
- legacyCustomKey,
- ];
- return this.execCommand(client, 'updateRepeatableJobMillis', args);
- }
- async updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, templateData, delayedJobOpts,
- // The job id of the job that produced this next iteration - TODO: remove in next breaking change
- producerId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.repeat,
- queueKeys.delayed,
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.prioritized,
- queueKeys.marker,
- queueKeys.id,
- queueKeys.events,
- queueKeys.pc,
- producerId ? this.queue.toKey(producerId) : '',
- queueKeys.active,
- ];
- const args = [
- nextMillis,
- jobSchedulerId,
- templateData,
- pack(delayedJobOpts),
- Date.now(),
- queueKeys[''],
- producerId,
- ];
- return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
- }
- removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {
- const queueKeys = this.queue.keys;
- const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events];
- const args = [
- legacyRepeatJobId,
- this.getRepeatConcatOptions(repeatConcatOptions, repeatJobKey),
- repeatJobKey,
- queueKeys[''],
- ];
- return keys.concat(args);
- }
- // TODO: remove this check in next breaking change
- getRepeatConcatOptions(repeatConcatOptions, repeatJobKey) {
- if (repeatJobKey && repeatJobKey.split(':').length > 2) {
- return repeatJobKey;
- }
- return repeatConcatOptions;
- }
- async removeRepeatable(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {
- const client = await this.queue.client;
- const args = this.removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey);
- return this.execCommand(client, 'removeRepeatable', args);
- }
- async removeJobScheduler(jobSchedulerId) {
- const client = await this.queue.client;
- const queueKeys = this.queue.keys;
- const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events];
- const args = [jobSchedulerId, queueKeys['']];
- return this.execCommand(client, 'removeJobScheduler', keys.concat(args));
- }
- removeArgs(jobId, removeChildren) {
- const keys = [jobId, 'repeat'].map(name => this.queue.toKey(name));
- const args = [jobId, removeChildren ? 1 : 0, this.queue.toKey('')];
- return keys.concat(args);
- }
- async remove(jobId, removeChildren) {
- const client = await this.queue.client;
- const args = this.removeArgs(jobId, removeChildren);
- const result = await this.execCommand(client, 'removeJob', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'removeJob',
- });
- }
- return result;
- }
- async removeUnprocessedChildren(jobId) {
- const client = await this.queue.client;
- const args = [
- this.queue.toKey(jobId),
- this.queue.keys.meta,
- this.queue.toKey(''),
- jobId,
- ];
- await this.execCommand(client, 'removeUnprocessedChildren', args);
- }
- async extendLock(jobId, token, duration, client) {
- client = client || (await this.queue.client);
- const args = [
- this.queue.toKey(jobId) + ':lock',
- this.queue.keys.stalled,
- token,
- duration,
- jobId,
- ];
- return this.execCommand(client, 'extendLock', args);
- }
- async extendLocks(jobIds, tokens, duration) {
- const client = await this.queue.client;
- const args = [
- this.queue.keys.stalled,
- this.queue.toKey(''),
- pack(tokens),
- pack(jobIds),
- duration,
- ];
- return this.execCommand(client, 'extendLocks', args);
- }
- async updateData(job, data) {
- const client = await this.queue.client;
- const keys = [this.queue.toKey(job.id)];
- const dataJson = JSON.stringify(data);
- const result = await this.execCommand(client, 'updateData', keys.concat([dataJson]));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId: job.id,
- command: 'updateData',
- });
- }
- }
- async updateProgress(jobId, progress) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(jobId),
- this.queue.keys.events,
- this.queue.keys.meta,
- ];
- const progressJson = JSON.stringify(progress);
- const result = await this.execCommand(client, 'updateProgress', keys.concat([jobId, progressJson]));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'updateProgress',
- });
- }
- }
- async addLog(jobId, logRow, keepLogs) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(jobId),
- this.queue.toKey(jobId) + ':logs',
- ];
- const result = await this.execCommand(client, 'addLog', keys.concat([jobId, logRow, keepLogs ? keepLogs : '']));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'addLog',
- });
- }
- return result;
- }
- moveToFinishedArgs(job, val, propVal, shouldRemove, target, token, timestamp, fetchNext = true, fieldsToUpdate) {
- var _a, _b, _c, _d, _e, _f, _g;
- const queueKeys = this.queue.keys;
- const opts = this.queue.opts;
- const workerKeepJobs = target === 'completed' ? opts.removeOnComplete : opts.removeOnFail;
- const metricsKey = this.queue.toKey(`metrics:${target}`);
- const keys = this.moveToFinishedKeys;
- keys[10] = queueKeys[target];
- keys[11] = this.queue.toKey((_a = job.id) !== null && _a !== void 0 ? _a : '');
- keys[12] = metricsKey;
- keys[13] = this.queue.keys.marker;
- const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);
- const args = [
- job.id,
- timestamp,
- propVal,
- typeof val === 'undefined' ? 'null' : val,
- target,
- !fetchNext || this.queue.closing ? 0 : 1,
- queueKeys[''],
- pack({
- token,
- name: opts.name,
- keepJobs,
- limiter: opts.limiter,
- lockDuration: opts.lockDuration,
- attempts: job.opts.attempts,
- maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints)
- ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints
- : '',
- fpof: !!((_d = job.opts) === null || _d === void 0 ? void 0 : _d.failParentOnFailure),
- cpof: !!((_e = job.opts) === null || _e === void 0 ? void 0 : _e.continueParentOnFailure),
- idof: !!((_f = job.opts) === null || _f === void 0 ? void 0 : _f.ignoreDependencyOnFailure),
- rdof: !!((_g = job.opts) === null || _g === void 0 ? void 0 : _g.removeDependencyOnFailure),
- }),
- fieldsToUpdate ? pack((0, utils_1.objectToFlatArray)(fieldsToUpdate)) : void 0,
- ];
- return keys.concat(args);
- }
- getKeepJobs(shouldRemove, workerKeepJobs) {
- if (typeof shouldRemove === 'undefined') {
- return workerKeepJobs || { count: shouldRemove ? 0 : -1 };
- }
- return typeof shouldRemove === 'object'
- ? shouldRemove
- : typeof shouldRemove === 'number'
- ? { count: shouldRemove }
- : { count: shouldRemove ? 0 : -1 };
- }
- async moveToFinished(jobId, args) {
- const client = await this.queue.client;
- const result = await this.execCommand(client, 'moveToFinished', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveToFinished',
- state: 'active',
- });
- }
- else {
- if (typeof result !== 'undefined') {
- return raw2NextJobData(result);
- }
- }
- }
- drainArgs(delayed) {
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.wait,
- queueKeys.paused,
- queueKeys.delayed,
- queueKeys.prioritized,
- queueKeys.repeat,
- ];
- const args = [queueKeys[''], delayed ? '1' : '0'];
- return keys.concat(args);
- }
- async drain(delayed) {
- const client = await this.queue.client;
- const args = this.drainArgs(delayed);
- return this.execCommand(client, 'drain', args);
- }
- removeChildDependencyArgs(jobId, parentKey) {
- const queueKeys = this.queue.keys;
- const keys = [queueKeys['']];
- const args = [this.queue.toKey(jobId), parentKey];
- return keys.concat(args);
- }
- async removeChildDependency(jobId, parentKey) {
- const client = await this.queue.client;
- const args = this.removeChildDependencyArgs(jobId, parentKey);
- const result = await this.execCommand(client, 'removeChildDependency', args);
- switch (result) {
- case 0:
- return true;
- case 1:
- return false;
- default:
- throw this.finishedErrors({
- code: result,
- jobId,
- parentKey,
- command: 'removeChildDependency',
- });
- }
- }
- getRangesArgs(types, start, end, asc) {
- const queueKeys = this.queue.keys;
- const transformedTypes = types.map(type => {
- return type === 'waiting' ? 'wait' : type;
- });
- const keys = [queueKeys['']];
- const args = [start, end, asc ? '1' : '0', ...transformedTypes];
- return keys.concat(args);
- }
- async getRanges(types, start = 0, end = 1, asc = false) {
- const client = await this.queue.client;
- const args = this.getRangesArgs(types, start, end, asc);
- return await this.execCommand(client, 'getRanges', args);
- }
- getCountsArgs(types) {
- const queueKeys = this.queue.keys;
- const transformedTypes = types.map(type => {
- return type === 'waiting' ? 'wait' : type;
- });
- const keys = [queueKeys['']];
- const args = [...transformedTypes];
- return keys.concat(args);
- }
- async getCounts(types) {
- const client = await this.queue.client;
- const args = this.getCountsArgs(types);
- return await this.execCommand(client, 'getCounts', args);
- }
- getCountsPerPriorityArgs(priorities) {
- const keys = [
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.prioritized,
- ];
- const args = priorities;
- return keys.concat(args);
- }
- async getCountsPerPriority(priorities) {
- const client = await this.queue.client;
- const args = this.getCountsPerPriorityArgs(priorities);
- return await this.execCommand(client, 'getCountsPerPriority', args);
- }
- getDependencyCountsArgs(jobId, types) {
- const keys = [
- `${jobId}:processed`,
- `${jobId}:dependencies`,
- `${jobId}:failed`,
- `${jobId}:unsuccessful`,
- ].map(name => {
- return this.queue.toKey(name);
- });
- const args = types;
- return keys.concat(args);
- }
- async getDependencyCounts(jobId, types) {
- const client = await this.queue.client;
- const args = this.getDependencyCountsArgs(jobId, types);
- return await this.execCommand(client, 'getDependencyCounts', args);
- }
- moveToCompletedArgs(job, returnvalue, removeOnComplete, token, fetchNext = false) {
- const timestamp = Date.now();
- return this.moveToFinishedArgs(job, returnvalue, 'returnvalue', removeOnComplete, 'completed', token, timestamp, fetchNext);
- }
- moveToFailedArgs(job, failedReason, removeOnFailed, token, fetchNext = false, fieldsToUpdate) {
- const timestamp = Date.now();
- return this.moveToFinishedArgs(job, failedReason, 'failedReason', removeOnFailed, 'failed', token, timestamp, fetchNext, fieldsToUpdate);
- }
- async isFinished(jobId, returnValue = false) {
- const client = await this.queue.client;
- const keys = ['completed', 'failed', jobId].map((key) => {
- return this.queue.toKey(key);
- });
- return this.execCommand(client, 'isFinished', keys.concat([jobId, returnValue ? '1' : '']));
- }
- async getState(jobId) {
- const client = await this.queue.client;
- const keys = [
- 'completed',
- 'failed',
- 'delayed',
- 'active',
- 'wait',
- 'paused',
- 'waiting-children',
- 'prioritized',
- ].map((key) => {
- return this.queue.toKey(key);
- });
- if ((0, utils_1.isRedisVersionLowerThan)(this.queue.redisVersion, '6.0.6', this.queue.databaseType)) {
- return this.execCommand(client, 'getState', keys.concat([jobId]));
- }
- return this.execCommand(client, 'getStateV2', keys.concat([jobId]));
- }
- /**
- * Change delay of a delayed job.
- *
- * Reschedules a delayed job by setting a new delay from the current time.
- * For example, calling changeDelay(5000) will reschedule the job to execute
- * 5000 milliseconds (5 seconds) from now, regardless of the original delay.
- *
- * @param jobId - the ID of the job to change the delay for.
- * @param delay - milliseconds from now when the job should be processed.
- * @returns delay in milliseconds.
- * @throws JobNotExist
- * This exception is thrown if jobId is missing.
- * @throws JobNotInState
- * This exception is thrown if job is not in delayed state.
- */
- async changeDelay(jobId, delay) {
- const client = await this.queue.client;
- const args = this.changeDelayArgs(jobId, delay);
- const result = await this.execCommand(client, 'changeDelay', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'changeDelay',
- state: 'delayed',
- });
- }
- }
- changeDelayArgs(jobId, delay) {
- const timestamp = Date.now();
- const keys = [
- this.queue.keys.delayed,
- this.queue.keys.meta,
- this.queue.keys.marker,
- this.queue.keys.events,
- ];
- return keys.concat([
- delay,
- JSON.stringify(timestamp),
- jobId,
- this.queue.toKey(jobId),
- ]);
- }
- async changePriority(jobId, priority = 0, lifo = false) {
- const client = await this.queue.client;
- const args = this.changePriorityArgs(jobId, priority, lifo);
- const result = await this.execCommand(client, 'changePriority', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'changePriority',
- });
- }
- }
- changePriorityArgs(jobId, priority = 0, lifo = false) {
- const keys = [
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.prioritized,
- this.queue.keys.active,
- this.queue.keys.pc,
- this.queue.keys.marker,
- ];
- return keys.concat([priority, this.queue.toKey(''), jobId, lifo ? 1 : 0]);
- }
- moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) {
- const queueKeys = this.queue.keys;
- const workerOpts = this.queue.opts;
- const keys = [
- queueKeys.marker,
- queueKeys.active,
- queueKeys.prioritized,
- queueKeys.delayed,
- this.queue.toKey(jobId),
- queueKeys.events,
- queueKeys.meta,
- queueKeys.stalled,
- queueKeys.wait,
- queueKeys.limiter,
- queueKeys.paused,
- queueKeys.pc,
- ];
- const fetchNext = opts.fetchNext && !this.queue.closing ? 1 : 0;
- return keys.concat([
- this.queue.keys[''],
- timestamp,
- jobId,
- token,
- delay,
- opts.skipAttempt ? '1' : '0',
- opts.fieldsToUpdate
- ? pack((0, utils_1.objectToFlatArray)(opts.fieldsToUpdate))
- : void 0,
- fetchNext,
- fetchNext
- ? pack({
- token,
- lockDuration: workerOpts.lockDuration,
- limiter: workerOpts.limiter,
- name: workerOpts.name,
- })
- : void 0,
- ]);
- }
- moveToWaitingChildrenArgs(jobId, token, opts) {
- const timestamp = Date.now();
- const childKey = (0, utils_1.getParentKey)(opts.child);
- const keys = [
- 'active',
- 'waiting-children',
- jobId,
- `${jobId}:dependencies`,
- `${jobId}:unsuccessful`,
- 'stalled',
- 'events',
- ].map(name => {
- return this.queue.toKey(name);
- });
- return keys.concat([
- token,
- childKey !== null && childKey !== void 0 ? childKey : '',
- JSON.stringify(timestamp),
- jobId,
- this.queue.toKey(''),
- ]);
- }
- isMaxedArgs() {
- const queueKeys = this.queue.keys;
- const keys = [queueKeys.meta, queueKeys.active];
- return keys;
- }
- async isMaxed() {
- const client = await this.queue.client;
- const args = this.isMaxedArgs();
- return !!(await this.execCommand(client, 'isMaxed', args));
- }
- async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) {
- const client = await this.queue.client;
- const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
- const result = await this.execCommand(client, 'moveToDelayed', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveToDelayed',
- state: 'active',
- });
- }
- else if (typeof result !== 'undefined') {
- return raw2NextJobData(result);
- }
- }
- /**
- * Move parent job to waiting-children state.
- *
- * @returns true if job is successfully moved, false if there are pending dependencies.
- * @throws JobNotExist
- * This exception is thrown if jobId is missing.
- * @throws JobLockNotExist
- * This exception is thrown if job lock is missing.
- * @throws JobNotInState
- * This exception is thrown if job is not in active state.
- */
- async moveToWaitingChildren(jobId, token, opts = {}) {
- const client = await this.queue.client;
- const args = this.moveToWaitingChildrenArgs(jobId, token, opts);
- const result = await this.execCommand(client, 'moveToWaitingChildren', args);
- switch (result) {
- case 0:
- return true;
- case 1:
- return false;
- default:
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveToWaitingChildren',
- state: 'active',
- });
- }
- }
- getRateLimitTtlArgs(maxJobs) {
- const keys = [
- this.queue.keys.limiter,
- this.queue.keys.meta,
- ];
- return keys.concat([maxJobs !== null && maxJobs !== void 0 ? maxJobs : '0']);
- }
- async getRateLimitTtl(maxJobs) {
- const client = await this.queue.client;
- const args = this.getRateLimitTtlArgs(maxJobs);
- return this.execCommand(client, 'getRateLimitTtl', args);
- }
- /**
- * Remove jobs in a specific state.
- *
- * @returns Id jobs from the deleted records.
- */
- async cleanJobsInSet(set, timestamp, limit = 0) {
- const client = await this.queue.client;
- return this.execCommand(client, 'cleanJobsInSet', [
- this.queue.toKey(set),
- this.queue.toKey('events'),
- this.queue.toKey('repeat'),
- this.queue.toKey(''),
- timestamp,
- limit,
- set,
- ]);
- }
- getJobSchedulerArgs(id) {
- const keys = [this.queue.keys.repeat];
- return keys.concat([id]);
- }
- async getJobScheduler(id) {
- const client = await this.queue.client;
- const args = this.getJobSchedulerArgs(id);
- return this.execCommand(client, 'getJobScheduler', args);
- }
- retryJobArgs(jobId, lifo, token, opts = {}) {
- const keys = [
- this.queue.keys.active,
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.toKey(jobId),
- this.queue.keys.meta,
- this.queue.keys.events,
- this.queue.keys.delayed,
- this.queue.keys.prioritized,
- this.queue.keys.pc,
- this.queue.keys.marker,
- this.queue.keys.stalled,
- ];
- const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
- return keys.concat([
- this.queue.toKey(''),
- Date.now(),
- pushCmd,
- jobId,
- token,
- opts.fieldsToUpdate
- ? pack((0, utils_1.objectToFlatArray)(opts.fieldsToUpdate))
- : void 0,
- ]);
- }
- async retryJob(jobId, lifo, token = '0', opts = {}) {
- const client = await this.queue.client;
- const args = this.retryJobArgs(jobId, lifo, token, opts);
- const result = await this.execCommand(client, 'retryJob', args);
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'retryJob',
- state: 'active',
- });
- }
- }
- moveJobsToWaitArgs(state, count, timestamp) {
- const keys = [
- this.queue.toKey(''),
- this.queue.keys.events,
- this.queue.toKey(state),
- this.queue.toKey('wait'),
- this.queue.toKey('paused'),
- this.queue.keys.meta,
- this.queue.keys.active,
- this.queue.keys.marker,
- ];
- const args = [count, timestamp, state];
- return keys.concat(args);
- }
- async retryJobs(state = 'failed', count = 1000, timestamp = new Date().getTime()) {
- const client = await this.queue.client;
- const args = this.moveJobsToWaitArgs(state, count, timestamp);
- return this.execCommand(client, 'moveJobsToWait', args);
- }
- async promoteJobs(count = 1000) {
- const client = await this.queue.client;
- const args = this.moveJobsToWaitArgs('delayed', count, Number.MAX_VALUE);
- return this.execCommand(client, 'moveJobsToWait', args);
- }
- /**
- * Attempts to reprocess a job
- *
- * @param job - The job to reprocess
- * @param state - The expected job state. If the job is not found
- * on the provided state, then it's not reprocessed. Supported states: 'failed', 'completed'
- *
- * @returns A promise that resolves when the job has been successfully moved to the wait queue.
- * @throws Will throw an error with a code property indicating the failure reason:
- * - code 0: Job does not exist
- * - code -1: Job is currently locked and can't be retried
- * - code -2: Job was not found in the expected set
- */
- async reprocessJob(job, state, opts = {}) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(job.id),
- this.queue.keys.events,
- this.queue.toKey(state),
- this.queue.keys.wait,
- this.queue.keys.meta,
- this.queue.keys.paused,
- this.queue.keys.active,
- this.queue.keys.marker,
- ];
- const args = [
- job.id,
- (job.opts.lifo ? 'R' : 'L') + 'PUSH',
- state === 'failed' ? 'failedReason' : 'returnvalue',
- state,
- opts.resetAttemptsMade ? '1' : '0',
- opts.resetAttemptsStarted ? '1' : '0',
- ];
- const result = await this.execCommand(client, 'reprocessJob', keys.concat(args));
- switch (result) {
- case 1:
- return;
- default:
- throw this.finishedErrors({
- code: result,
- jobId: job.id,
- command: 'reprocessJob',
- state,
- });
- }
- }
- async getMetrics(type, start = 0, end = -1) {
- const client = await this.queue.client;
- const keys = [
- this.queue.toKey(`metrics:${type}`),
- this.queue.toKey(`metrics:${type}:data`),
- ];
- const args = [start, end];
- const result = await this.execCommand(client, 'getMetrics', keys.concat(args));
- return result;
- }
- async moveToActive(client, token, name) {
- const opts = this.queue.opts;
- const queueKeys = this.queue.keys;
- const keys = [
- queueKeys.wait,
- queueKeys.active,
- queueKeys.prioritized,
- queueKeys.events,
- queueKeys.stalled,
- queueKeys.limiter,
- queueKeys.delayed,
- queueKeys.paused,
- queueKeys.meta,
- queueKeys.pc,
- queueKeys.marker,
- ];
- const args = [
- queueKeys[''],
- Date.now(),
- pack({
- token,
- lockDuration: opts.lockDuration,
- limiter: opts.limiter,
- name,
- }),
- ];
- const result = await this.execCommand(client, 'moveToActive', keys.concat(args));
- return raw2NextJobData(result);
- }
- async promote(jobId) {
- const client = await this.queue.client;
- const keys = [
- this.queue.keys.delayed,
- this.queue.keys.wait,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.prioritized,
- this.queue.keys.active,
- this.queue.keys.pc,
- this.queue.keys.events,
- this.queue.keys.marker,
- ];
- const args = [this.queue.toKey(''), jobId];
- const code = await this.execCommand(client, 'promote', keys.concat(args));
- if (code < 0) {
- throw this.finishedErrors({
- code,
- jobId,
- command: 'promote',
- state: 'delayed',
- });
- }
- }
- moveStalledJobsToWaitArgs() {
- const opts = this.queue.opts;
- const keys = [
- this.queue.keys.stalled,
- this.queue.keys.wait,
- this.queue.keys.active,
- this.queue.keys['stalled-check'],
- this.queue.keys.meta,
- this.queue.keys.paused,
- this.queue.keys.marker,
- this.queue.keys.events,
- ];
- const args = [
- opts.maxStalledCount,
- this.queue.toKey(''),
- Date.now(),
- opts.stalledInterval,
- ];
- return keys.concat(args);
- }
- /**
- * Looks for unlocked jobs in the active queue.
- *
- * The job was being worked on, but the worker process died and it failed to renew the lock.
- * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them
- * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait,
- * (e.g. if the job handler keeps crashing),
- * we limit the number stalled job recoveries to settings.maxStalledCount.
- */
- async moveStalledJobsToWait() {
- const client = await this.queue.client;
- const args = this.moveStalledJobsToWaitArgs();
- return this.execCommand(client, 'moveStalledJobsToWait', args);
- }
- /**
- * Moves a job back from Active to Wait.
- * This script is used when a job has been manually rate limited and needs
- * to be moved back to wait from active status.
- *
- * @param client - Redis client
- * @param jobId - Job id
- * @returns
- */
- async moveJobFromActiveToWait(jobId, token = '0') {
- const client = await this.queue.client;
- const keys = [
- this.queue.keys.active,
- this.queue.keys.wait,
- this.queue.keys.stalled,
- this.queue.keys.paused,
- this.queue.keys.meta,
- this.queue.keys.limiter,
- this.queue.keys.prioritized,
- this.queue.keys.marker,
- this.queue.keys.events,
- ];
- const args = [jobId, token, this.queue.toKey(jobId)];
- const result = await this.execCommand(client, 'moveJobFromActiveToWait', keys.concat(args));
- if (result < 0) {
- throw this.finishedErrors({
- code: result,
- jobId,
- command: 'moveJobFromActiveToWait',
- state: 'active',
- });
- }
- return result;
- }
- async obliterate(opts) {
- const client = await this.queue.client;
- const keys = [
- this.queue.keys.meta,
- this.queue.toKey(''),
- ];
- const args = [opts.count, opts.force ? 'force' : null];
- const result = await this.execCommand(client, 'obliterate', keys.concat(args));
- if (result < 0) {
- switch (result) {
- case -1:
- throw new Error('Cannot obliterate non-paused queue');
- case -2:
- throw new Error('Cannot obliterate queue with active jobs');
- }
- }
- return result;
- }
- /**
- * Paginate a set or hash keys.
- * @param opts - options to define the pagination behaviour
- *
- */
- async paginate(key, opts) {
- const client = await this.queue.client;
- const keys = [key];
- const maxIterations = 5;
- const pageSize = opts.end >= 0 ? opts.end - opts.start + 1 : Infinity;
- let cursor = '0', offset = 0, items, total, rawJobs, page = [], jobs = [];
- do {
- const args = [
- opts.start + page.length,
- opts.end,
- cursor,
- offset,
- maxIterations,
- ];
- if (opts.fetchJobs) {
- args.push(1);
- }
- [cursor, offset, items, total, rawJobs] = await this.execCommand(client, 'paginate', keys.concat(args));
- page = page.concat(items);
- if (rawJobs && rawJobs.length) {
- jobs = jobs.concat(rawJobs.map(utils_1.array2obj));
- }
- // Important to keep this coercive inequality (!=) instead of strict inequality (!==)
- } while (cursor != '0' && page.length < pageSize);
- // If we get an array of arrays, it means we are paginating a hash
- if (page.length && Array.isArray(page[0])) {
- const result = [];
- for (let index = 0; index < page.length; index++) {
- const [id, value] = page[index];
- try {
- result.push({ id, v: JSON.parse(value) });
- }
- catch (err) {
- result.push({ id, err: err.message });
- }
- }
- return {
- cursor,
- items: result,
- total,
- jobs,
- };
- }
- else {
- return {
- cursor,
- items: page.map(item => ({ id: item })),
- total,
- jobs,
- };
- }
- }
- finishedErrors({ code, jobId, parentKey, command, state, }) {
- let error;
- switch (code) {
- case enums_1.ErrorCode.JobNotExist:
- error = new Error(`Missing key for job ${jobId}. ${command}`);
- break;
- case enums_1.ErrorCode.JobLockNotExist:
- error = new Error(`Missing lock for job ${jobId}. ${command}`);
- break;
- case enums_1.ErrorCode.JobNotInState:
- error = new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
- break;
- case enums_1.ErrorCode.JobPendingChildren:
- error = new Error(`Job ${jobId} has pending dependencies. ${command}`);
- break;
- case enums_1.ErrorCode.ParentJobNotExist:
- error = new Error(`Missing key for parent job ${parentKey}. ${command}`);
- break;
- case enums_1.ErrorCode.JobLockMismatch:
- error = new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
- break;
- case enums_1.ErrorCode.ParentJobCannotBeReplaced:
- error = new Error(`The parent job ${parentKey} cannot be replaced. ${command}`);
- break;
- case enums_1.ErrorCode.JobBelongsToJobScheduler:
- error = new Error(`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`);
- break;
- case enums_1.ErrorCode.JobHasFailedChildren:
- error = new errors_1.UnrecoverableError(`Cannot complete job ${jobId} because it has at least one failed child. ${command}`);
- break;
- case enums_1.ErrorCode.SchedulerJobIdCollision:
- error = new Error(`Cannot create job scheduler iteration - job ID already exists. ${command}`);
- break;
- case enums_1.ErrorCode.SchedulerJobSlotsBusy:
- error = new Error(`Cannot create job scheduler iteration - current and next time slots already have jobs. ${command}`);
- break;
- default:
- error = new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
- }
- // Add the code property to the error object
- error.code = code;
- return error;
- }
- async removeOrphanedJobs(candidateJobIds, stateKeySuffixes, jobSubKeySuffixes) {
- const client = await this.queue.client;
- const args = [
- this.queue.toKey(''),
- stateKeySuffixes.length,
- ...stateKeySuffixes,
- jobSubKeySuffixes.length,
- ...jobSubKeySuffixes,
- ...candidateJobIds,
- ];
- return this.execCommand(client, 'removeOrphanedJobs', args);
- }
- }
- exports.Scripts = Scripts;
- function raw2NextJobData(raw) {
- if (raw) {
- const result = [null, raw[1], raw[2], raw[3]];
- if (raw[0]) {
- result[0] = (0, utils_1.array2obj)(raw[0]);
- }
- return result;
- }
- return [];
- }
- //# sourceMappingURL=scripts.js.map
|