| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111 |
- import { __rest } from "tslib";
- import { debuglog } from 'util';
- import { errorObject, isEmpty, getParentKey, lengthInUtf8Bytes, optsDecodeMap, optsEncodeMap, parseObjectValues, tryCatch, removeUndefinedFields, } from '../utils';
- import { createScripts } from '../utils/create-scripts';
- import { Backoffs } from './backoffs';
- import { UnrecoverableError } from './errors/unrecoverable-error';
- import { SpanKind, TelemetryAttributes, MetricNames } from '../enums';
- const logger = debuglog('bull');
- export const PRIORITY_LIMIT = 2 ** 21;
- /**
- * Job
- *
- * This class represents a Job in the queue. Normally job are implicitly created when
- * you add a job to the queue with methods such as Queue.addJob( ... )
- *
- * A Job instance is also passed to the Worker's process function.
- *
- */
- export class Job {
- constructor(queue,
- /**
- * The name of the Job
- */
- name,
- /**
- * The payload for this job.
- */
- data,
- /**
- * The options object for this job.
- */
- opts = {}, id) {
- this.queue = queue;
- this.name = name;
- this.data = data;
- this.opts = opts;
- this.id = id;
- /**
- * The progress a job has performed so far.
- * @defaultValue 0
- */
- this.progress = 0;
- /**
- * The value returned by the processor when processing this job.
- * @defaultValue null
- */
- this.returnvalue = null;
- /**
- * Stacktrace for the error (for failed jobs).
- * @defaultValue null
- */
- this.stacktrace = null;
- /**
- * An amount of milliseconds to wait until this job can be processed.
- * @defaultValue 0
- */
- this.delay = 0;
- /**
- * Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
- * using priorities has a slight impact on performance,
- * so do not use it if not required.
- * @defaultValue 0
- */
- this.priority = 0;
- /**
- * Number of attempts when job is moved to active.
- * @defaultValue 0
- */
- this.attemptsStarted = 0;
- /**
- * Number of attempts after the job has failed.
- * @defaultValue 0
- */
- this.attemptsMade = 0;
- /**
- * Number of times where job has stalled.
- * @defaultValue 0
- */
- this.stalledCounter = 0;
- const _a = this.opts, { repeatJobKey } = _a, restOpts = __rest(_a, ["repeatJobKey"]);
- this.opts = Object.assign({
- attempts: 0,
- }, restOpts);
- this.delay = this.opts.delay;
- this.priority = this.opts.priority || 0;
- this.repeatJobKey = repeatJobKey;
- this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
- this.opts.backoff = Backoffs.normalize(opts.backoff);
- this.parentKey = getParentKey(opts.parent);
- if (opts.parent) {
- this.parent = { id: opts.parent.id, queueKey: opts.parent.queue };
- if (opts.failParentOnFailure) {
- this.parent.fpof = true;
- }
- if (opts.removeDependencyOnFailure) {
- this.parent.rdof = true;
- }
- if (opts.ignoreDependencyOnFailure) {
- this.parent.idof = true;
- }
- if (opts.continueParentOnFailure) {
- this.parent.cpof = true;
- }
- }
- this.debounceId = opts.debounce ? opts.debounce.id : undefined;
- this.deduplicationId = opts.deduplication
- ? opts.deduplication.id
- : this.debounceId;
- this.toKey = queue.toKey.bind(queue);
- this.createScripts();
- this.queueQualifiedName = queue.qualifiedName;
- }
- /**
- * Creates a new job and adds it to the queue.
- *
- * @param queue - the queue where to add the job.
- * @param name - the name of the job.
- * @param data - the payload of the job.
- * @param opts - the options bag for this job.
- * @returns The created Job instance
- */
- static async create(queue, name, data, opts) {
- const client = await queue.client;
- const job = new this(queue, name, data, opts, opts && opts.jobId);
- job.id = await job.addJob(client, {
- parentKey: job.parentKey,
- parentDependenciesKey: job.parentKey
- ? `${job.parentKey}:dependencies`
- : '',
- });
- return job;
- }
- /**
- * Creates a bulk of jobs and adds them atomically to the given queue.
- *
- * @param queue - the queue where to add the jobs.
- * @param jobs - an array of jobs to be added to the queue.
- * @returns The created Job instances
- */
- static async createBulk(queue, jobs) {
- const client = await queue.client;
- const jobInstances = jobs.map(job => { var _a; return new this(queue, job.name, job.data, job.opts, (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId); });
- const pipeline = client.pipeline();
- for (const job of jobInstances) {
- job.addJob(pipeline, {
- parentKey: job.parentKey,
- parentDependenciesKey: job.parentKey
- ? `${job.parentKey}:dependencies`
- : '',
- });
- }
- const results = (await pipeline.exec());
- for (let index = 0; index < results.length; ++index) {
- const [err, id] = results[index];
- if (err) {
- throw err;
- }
- jobInstances[index].id = id;
- }
- return jobInstances;
- }
- /**
- * Instantiates a Job from a JobJsonRaw object (coming from a deserialized JSON object)
- *
- * @param queue - the queue where the job belongs to.
- * @param json - the plain object containing the job.
- * @param jobId - an optional job id (overrides the id coming from the JSON object)
- * @returns A Job instance reconstructed from the JSON data
- */
- static fromJSON(queue, json, jobId) {
- const data = JSON.parse(json.data || '{}');
- const opts = Job.optsFromJSON(json.opts);
- const job = new this(queue, json.name, data, opts, json.id || jobId);
- job.progress = JSON.parse(json.progress || '0');
- job.delay = parseInt(json.delay);
- job.priority = parseInt(json.priority);
- job.timestamp = parseInt(json.timestamp);
- if (json.finishedOn) {
- job.finishedOn = parseInt(json.finishedOn);
- }
- if (json.processedOn) {
- job.processedOn = parseInt(json.processedOn);
- }
- if (json.rjk) {
- job.repeatJobKey = json.rjk;
- }
- if (json.deid) {
- job.debounceId = json.deid;
- job.deduplicationId = json.deid;
- }
- if (json.failedReason) {
- job.failedReason = json.failedReason;
- }
- job.attemptsStarted = parseInt(json.ats || '0');
- job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0');
- job.stalledCounter = parseInt(json.stc || '0');
- if (json.defa) {
- job.deferredFailure = json.defa;
- }
- job.stacktrace = getTraces(json.stacktrace);
- if (typeof json.returnvalue === 'string') {
- job.returnvalue = getReturnValue(json.returnvalue);
- }
- if (json.parentKey) {
- job.parentKey = json.parentKey;
- }
- if (json.parent) {
- job.parent = JSON.parse(json.parent);
- }
- if (json.pb) {
- job.processedBy = json.pb;
- }
- if (json.nrjid) {
- job.nextRepeatableJobId = json.nrjid;
- }
- return job;
- }
- createScripts() {
- this.scripts = createScripts(this.queue);
- }
- static optsFromJSON(rawOpts, optsDecode = optsDecodeMap) {
- const opts = JSON.parse(rawOpts || '{}');
- const optionEntries = Object.entries(opts);
- const options = {};
- for (const item of optionEntries) {
- const [attributeName, value] = item;
- if (optsDecode[attributeName]) {
- options[optsDecode[attributeName]] =
- value;
- }
- else {
- if (attributeName === 'tm') {
- options.telemetry = Object.assign(Object.assign({}, options.telemetry), { metadata: value });
- }
- else if (attributeName === 'omc') {
- options.telemetry = Object.assign(Object.assign({}, options.telemetry), { omitContext: value });
- }
- else {
- options[attributeName] = value;
- }
- }
- }
- return options;
- }
- /**
- * Fetches a Job from the queue given the passed job id.
- *
- * @param queue - the queue where the job belongs to.
- * @param jobId - the job id.
- * @returns
- */
- static async fromId(queue, jobId) {
- // jobId can be undefined if moveJob returns undefined
- if (jobId) {
- const client = await queue.client;
- const jobData = await client.hgetall(queue.toKey(jobId));
- return isEmpty(jobData)
- ? undefined
- : this.fromJSON(queue, jobData, jobId);
- }
- }
- /**
- * addJobLog
- *
- * @param queue - A minimal queue instance
- * @param jobId - Job id
- * @param logRow - String with a row of log data to be logged
- * @param keepLogs - The optional amount of log entries to preserve
- *
- * @returns The total number of log entries for this job so far.
- */
- static addJobLog(queue, jobId, logRow, keepLogs) {
- const scripts = queue.scripts;
- return scripts.addLog(jobId, logRow, keepLogs);
- }
- toJSON() {
- const _a = this, { queue, scripts } = _a, withoutQueueAndScripts = __rest(_a, ["queue", "scripts"]);
- return withoutQueueAndScripts;
- }
- /**
- * Prepares a job to be serialized for storage in Redis.
- * @returns
- */
- asJSON() {
- return removeUndefinedFields({
- id: this.id,
- name: this.name,
- data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
- opts: Job.optsAsJSON(this.opts),
- parent: this.parent ? Object.assign({}, this.parent) : undefined,
- parentKey: this.parentKey,
- progress: this.progress,
- attemptsMade: this.attemptsMade,
- attemptsStarted: this.attemptsStarted,
- stalledCounter: this.stalledCounter,
- finishedOn: this.finishedOn,
- processedOn: this.processedOn,
- timestamp: this.timestamp,
- failedReason: JSON.stringify(this.failedReason),
- stacktrace: JSON.stringify(this.stacktrace),
- debounceId: this.debounceId,
- deduplicationId: this.deduplicationId,
- repeatJobKey: this.repeatJobKey,
- returnvalue: JSON.stringify(this.returnvalue),
- nrjid: this.nextRepeatableJobId,
- });
- }
- static optsAsJSON(opts = {}, optsEncode = optsEncodeMap) {
- const optionEntries = Object.entries(opts);
- const options = {};
- for (const [attributeName, value] of optionEntries) {
- if (typeof value === 'undefined') {
- continue;
- }
- if (attributeName in optsEncode) {
- const compressableAttribute = attributeName;
- const key = optsEncode[compressableAttribute];
- options[key] = value;
- }
- else {
- // Handle complex compressable fields separately
- if (attributeName === 'telemetry') {
- if (value.metadata !== undefined) {
- options.tm = value.metadata;
- }
- if (value.omitContext !== undefined) {
- options.omc = value.omitContext;
- }
- }
- else {
- options[attributeName] = value;
- }
- }
- }
- return options;
- }
- /**
- * Prepares a job to be passed to Sandbox.
- * @returns
- */
- asJSONSandbox() {
- return Object.assign(Object.assign({}, this.asJSON()), { queueName: this.queueName, queueQualifiedName: this.queueQualifiedName, prefix: this.prefix });
- }
- /**
- * Updates a job's data
- *
- * @param data - the data that will replace the current jobs data.
- */
- updateData(data) {
- this.data = data;
- return this.scripts.updateData(this, data);
- }
- /**
- * Updates a job's progress
- *
- * @param progress - number or object to be saved as progress.
- */
- async updateProgress(progress) {
- this.progress = progress;
- await this.scripts.updateProgress(this.id, progress);
- this.queue.emit('progress', this, progress);
- }
- /**
- * Logs one row of log data.
- *
- * @param logRow - string with log data to be logged.
- * @returns The total number of log entries for this job so far.
- */
- async log(logRow) {
- return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
- }
- /**
- * Removes child dependency from parent when child is not yet finished
- *
- * @returns True if the relationship existed and if it was removed.
- */
- async removeChildDependency() {
- const childDependencyIsRemoved = await this.scripts.removeChildDependency(this.id, this.parentKey);
- if (childDependencyIsRemoved) {
- this.parent = undefined;
- this.parentKey = undefined;
- return true;
- }
- return false;
- }
- /**
- * Clears job's logs
- *
- * @param keepLogs - the amount of log entries to preserve
- */
- async clearLogs(keepLogs) {
- const client = await this.queue.client;
- const logsKey = this.toKey(this.id) + ':logs';
- if (keepLogs) {
- await client.ltrim(logsKey, -keepLogs, -1);
- }
- else {
- await client.del(logsKey);
- }
- }
- /**
- * Completely remove the job from the queue.
- * Note, this call will throw an exception if the job
- * is being processed when the call is performed.
- *
- * @param opts - Options to remove a job
- */
- async remove({ removeChildren = true } = {}) {
- await this.queue.waitUntilReady();
- const queue = this.queue;
- const job = this;
- const removed = await this.scripts.remove(job.id, removeChildren);
- if (removed) {
- queue.emit('removed', job);
- }
- else {
- throw new Error(`Job ${this.id} could not be removed because it is locked by another worker`);
- }
- }
- /**
- * Remove all children from this job that are not yet processed,
- * in other words that are in any other state than completed, failed or active.
- *
- * @remarks
- * - Jobs with locks (most likely active) are ignored.
- * - This method can be slow if the number of children is large (\> 1000).
- */
- async removeUnprocessedChildren() {
- const jobId = this.id;
- await this.scripts.removeUnprocessedChildren(jobId);
- }
- /**
- * Extend the lock for this job.
- *
- * @param token - unique token for the lock
- * @param duration - lock duration in milliseconds
- */
- extendLock(token, duration) {
- return this.scripts.extendLock(this.id, token, duration);
- }
- /**
- * Moves a job to the completed queue.
- * Returned job to be used with Queue.prototype.nextJobFromJobData.
- *
- * @param returnValue - The jobs success message.
- * @param token - Worker token used to acquire completed job.
- * @param fetchNext - True when wanting to fetch the next job.
- * @returns Returns the jobData of the next job in the waiting queue or void.
- */
- async moveToCompleted(returnValue, token, fetchNext = true) {
- return this.queue.trace(SpanKind.INTERNAL, 'complete', this.queue.name, async (span) => {
- this.setSpanJobAttributes(span);
- await this.queue.waitUntilReady();
- this.returnvalue = returnValue || void 0;
- const stringifiedReturnValue = tryCatch(JSON.stringify, JSON, [
- returnValue,
- ]);
- if (stringifiedReturnValue === errorObject) {
- throw errorObject.value;
- }
- const args = this.scripts.moveToCompletedArgs(this, stringifiedReturnValue, this.opts.removeOnComplete, token, fetchNext);
- const result = await this.scripts.moveToFinished(this.id, args);
- this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
- this.attemptsMade += 1;
- this.recordJobMetrics('completed');
- return result;
- });
- }
- /**
- * Moves a job to the wait or prioritized state.
- *
- * @param token - Worker token used to acquire completed job.
- * @returns Returns pttl.
- */
- async moveToWait(token) {
- const result = await this.scripts.moveJobFromActiveToWait(this.id, token);
- this.recordJobMetrics('waiting');
- return result;
- }
- async shouldRetryJob(err) {
- if (this.attemptsMade + 1 < this.opts.attempts &&
- !this.discarded &&
- !(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')) {
- const opts = this.queue.opts;
- const delay = await Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy);
- return [delay == -1 ? false : true, delay == -1 ? 0 : delay];
- }
- else {
- return [false, 0];
- }
- }
- /**
- * Moves a job to the failed queue.
- *
- * @param err - the jobs error message.
- * @param token - token to check job is locked by current worker
- * @param fetchNext - true when wanting to fetch the next job
- * @returns Returns the jobData of the next job in the waiting queue or void.
- */
- async moveToFailed(err, token, fetchNext = false) {
- this.failedReason = err === null || err === void 0 ? void 0 : err.message;
- // Check if an automatic retry should be performed
- const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);
- return this.queue.trace(SpanKind.INTERNAL, this.getSpanOperation(shouldRetry, retryDelay), this.queue.name, async (span, dstPropagationMetadata) => {
- var _a, _b;
- this.setSpanJobAttributes(span);
- let tm;
- if (!((_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.omitContext) && dstPropagationMetadata) {
- tm = dstPropagationMetadata;
- }
- let result;
- this.updateStacktrace(err);
- const fieldsToUpdate = {
- failedReason: this.failedReason,
- stacktrace: JSON.stringify(this.stacktrace),
- tm,
- };
- let finishedOn;
- if (shouldRetry) {
- if (retryDelay) {
- // Retry with delay
- result = await this.scripts.moveToDelayed(this.id, Date.now(), retryDelay, token, { fieldsToUpdate, fetchNext });
- this.recordJobMetrics('delayed');
- }
- else {
- // Retry immediately
- result = await this.scripts.retryJob(this.id, this.opts.lifo, token, {
- fieldsToUpdate,
- });
- this.recordJobMetrics('retried');
- }
- }
- else {
- const args = this.scripts.moveToFailedArgs(this, this.failedReason, this.opts.removeOnFail, token, fetchNext, fieldsToUpdate);
- result = await this.scripts.moveToFinished(this.id, args);
- finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
- // Only record failed metrics when job is not retrying
- this.recordJobMetrics('failed');
- }
- if (finishedOn && typeof finishedOn === 'number') {
- this.finishedOn = finishedOn;
- }
- if (retryDelay && typeof retryDelay === 'number') {
- this.delay = retryDelay;
- }
- this.attemptsMade += 1;
- return result;
- });
- }
- getSpanOperation(shouldRetry, retryDelay) {
- if (shouldRetry) {
- if (retryDelay) {
- return 'delay';
- }
- return 'retry';
- }
- return 'fail';
- }
- /**
- * Records job metrics if a meter is configured in telemetry options.
- *
- * @param status - The job status
- */
- recordJobMetrics(status) {
- var _a, _b;
- const meter = (_b = (_a = this.queue.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.meter;
- if (!meter) {
- return;
- }
- const attributes = {
- [TelemetryAttributes.QueueName]: this.queue.name,
- [TelemetryAttributes.JobName]: this.name,
- [TelemetryAttributes.JobStatus]: status,
- };
- // Record counter metric based on status
- const statusToCounterName = {
- completed: MetricNames.JobsCompleted,
- failed: MetricNames.JobsFailed,
- delayed: MetricNames.JobsDelayed,
- retried: MetricNames.JobsRetried,
- waiting: MetricNames.JobsWaiting,
- 'waiting-children': MetricNames.JobsWaitingChildren,
- };
- const counterName = statusToCounterName[status];
- const counter = meter.createCounter(counterName, {
- description: `Number of jobs ${status}`,
- unit: '1',
- });
- counter.add(1, attributes);
- // Record duration histogram if processedOn is available
- if (this.processedOn) {
- const duration = Date.now() - this.processedOn;
- const histogram = meter.createHistogram(MetricNames.JobDuration, {
- description: 'Job processing duration',
- unit: 'ms',
- });
- histogram.record(duration, attributes);
- }
- }
- /**
- * @returns true if the job has completed.
- */
- isCompleted() {
- return this.isInZSet('completed');
- }
- /**
- * @returns true if the job has failed.
- */
- isFailed() {
- return this.isInZSet('failed');
- }
- /**
- * @returns true if the job is delayed.
- */
- isDelayed() {
- return this.isInZSet('delayed');
- }
- /**
- * @returns true if the job is waiting for children.
- */
- isWaitingChildren() {
- return this.isInZSet('waiting-children');
- }
- /**
- * @returns true of the job is active.
- */
- isActive() {
- return this.isInList('active');
- }
- /**
- * @returns true if the job is waiting.
- */
- async isWaiting() {
- return (await this.isInList('wait')) || (await this.isInList('paused'));
- }
- /**
- * @returns the queue name this job belongs to.
- */
- get queueName() {
- return this.queue.name;
- }
- /**
- * @returns the prefix that is used.
- */
- get prefix() {
- return this.queue.opts.prefix;
- }
- /**
- * Get current state.
- *
- * @returns Returns one of these values:
- * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
- */
- getState() {
- return this.scripts.getState(this.id);
- }
- /**
- * Change delay of a delayed job.
- *
- * Reschedules a delayed job by setting a new delay from the current time.
- * For example, calling changeDelay(5000) will reschedule the job to execute
- * 5000 milliseconds (5 seconds) from now, regardless of the original delay.
- *
- * @param delay - milliseconds from now when the job should be processed.
- * @returns void
- * @throws JobNotExist
- * This exception is thrown if jobId is missing.
- * @throws JobNotInState
- * This exception is thrown if job is not in delayed state.
- */
- async changeDelay(delay) {
- await this.scripts.changeDelay(this.id, delay);
- this.delay = delay;
- }
- /**
- * Change job priority.
- *
- * @param opts - options containing priority and lifo values.
- * @returns void
- */
- async changePriority(opts) {
- await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
- this.priority = opts.priority || 0;
- }
- /**
- * Get this jobs children result values if any.
- *
- * @returns Object mapping children job keys with their values.
- */
- async getChildrenValues() {
- const client = await this.queue.client;
- const result = (await client.hgetall(this.toKey(`${this.id}:processed`)));
- if (result) {
- return parseObjectValues(result);
- }
- }
- /**
- * Retrieves the failures of child jobs that were explicitly ignored while using ignoreDependencyOnFailure option.
- * This method is useful for inspecting which child jobs were intentionally ignored when an error occurred.
- * @see {@link https://docs.bullmq.io/guide/flows/ignore-dependency}
- *
- * @returns Object mapping children job keys with their failure values.
- */
- async getIgnoredChildrenFailures() {
- const client = await this.queue.client;
- return client.hgetall(this.toKey(`${this.id}:failed`));
- }
- /**
- * Get job's children failure values that were ignored if any.
- *
- * @deprecated This method is deprecated and will be removed in v6. Use getIgnoredChildrenFailures instead.
- *
- * @returns Object mapping children job keys with their failure values.
- */
- async getFailedChildrenValues() {
- const client = await this.queue.client;
- return client.hgetall(this.toKey(`${this.id}:failed`));
- }
- /**
- * Get children job keys if this job is a parent and has children.
- * @remarks
- * Count options before Redis v7.2 works as expected with any quantity of entries
- * on processed/unprocessed dependencies, since v7.2 you must consider that count
- * won't have any effect until processed/unprocessed dependencies have a length
- * greater than 127
- * @see {@link https://redis.io/docs/management/optimization/memory-optimization/#redis--72}
- * @see {@link https://docs.bullmq.io/guide/flows#getters}
- * @returns dependencies separated by processed, unprocessed, ignored and failed.
- */
- async getDependencies(opts = {}) {
- const client = await this.queue.client;
- const multi = client.multi();
- if (!opts.processed && !opts.unprocessed && !opts.ignored && !opts.failed) {
- multi.hgetall(this.toKey(`${this.id}:processed`));
- multi.smembers(this.toKey(`${this.id}:dependencies`));
- multi.hgetall(this.toKey(`${this.id}:failed`));
- multi.zrange(this.toKey(`${this.id}:unsuccessful`), 0, -1);
- const [[err1, processed], [err2, unprocessed], [err3, ignored], [err4, failed],] = (await multi.exec());
- return {
- processed: parseObjectValues(processed),
- unprocessed,
- failed,
- ignored,
- };
- }
- else {
- const defaultOpts = {
- cursor: 0,
- count: 20,
- };
- const childrenResultOrder = [];
- if (opts.processed) {
- childrenResultOrder.push('processed');
- const processedOpts = Object.assign(Object.assign({}, defaultOpts), opts.processed);
- multi.hscan(this.toKey(`${this.id}:processed`), processedOpts.cursor, 'COUNT', processedOpts.count);
- }
- if (opts.unprocessed) {
- childrenResultOrder.push('unprocessed');
- const unprocessedOpts = Object.assign(Object.assign({}, defaultOpts), opts.unprocessed);
- multi.sscan(this.toKey(`${this.id}:dependencies`), unprocessedOpts.cursor, 'COUNT', unprocessedOpts.count);
- }
- if (opts.ignored) {
- childrenResultOrder.push('ignored');
- const ignoredOpts = Object.assign(Object.assign({}, defaultOpts), opts.ignored);
- multi.hscan(this.toKey(`${this.id}:failed`), ignoredOpts.cursor, 'COUNT', ignoredOpts.count);
- }
- let failedCursor;
- if (opts.failed) {
- childrenResultOrder.push('failed');
- const failedOpts = Object.assign(Object.assign({}, defaultOpts), opts.failed);
- failedCursor = failedOpts.cursor + failedOpts.count;
- multi.zrange(this.toKey(`${this.id}:unsuccessful`), failedOpts.cursor, failedOpts.count - 1);
- }
- const results = (await multi.exec());
- let processedCursor, processed, unprocessedCursor, unprocessed, failed, ignoredCursor, ignored;
- childrenResultOrder.forEach((key, index) => {
- switch (key) {
- case 'processed': {
- processedCursor = results[index][1][0];
- const rawProcessed = results[index][1][1];
- const transformedProcessed = {};
- for (let ind = 0; ind < rawProcessed.length; ++ind) {
- if (ind % 2) {
- transformedProcessed[rawProcessed[ind - 1]] = JSON.parse(rawProcessed[ind]);
- }
- }
- processed = transformedProcessed;
- break;
- }
- case 'failed': {
- failed = results[index][1];
- break;
- }
- case 'ignored': {
- ignoredCursor = results[index][1][0];
- const rawIgnored = results[index][1][1];
- const transformedIgnored = {};
- for (let ind = 0; ind < rawIgnored.length; ++ind) {
- if (ind % 2) {
- transformedIgnored[rawIgnored[ind - 1]] = rawIgnored[ind];
- }
- }
- ignored = transformedIgnored;
- break;
- }
- case 'unprocessed': {
- unprocessedCursor = results[index][1][0];
- unprocessed = results[index][1][1];
- break;
- }
- }
- });
- return Object.assign(Object.assign(Object.assign(Object.assign({}, (processedCursor
- ? {
- processed,
- nextProcessedCursor: Number(processedCursor),
- }
- : {})), (ignoredCursor
- ? {
- ignored,
- nextIgnoredCursor: Number(ignoredCursor),
- }
- : {})), (failedCursor
- ? {
- failed,
- nextFailedCursor: failedCursor,
- }
- : {})), (unprocessedCursor
- ? { unprocessed, nextUnprocessedCursor: Number(unprocessedCursor) }
- : {}));
- }
- }
- /**
- * Get children job counts if this job is a parent and has children.
- *
- * @returns dependencies count separated by processed, unprocessed, ignored and failed.
- */
- async getDependenciesCount(opts = {}) {
- const types = [];
- Object.entries(opts).forEach(([key, value]) => {
- if (value) {
- types.push(key);
- }
- });
- const finalTypes = types.length
- ? types
- : ['processed', 'unprocessed', 'ignored', 'failed'];
- const responses = await this.scripts.getDependencyCounts(this.id, finalTypes);
- const counts = {};
- responses.forEach((res, index) => {
- counts[`${finalTypes[index]}`] = res || 0;
- });
- return counts;
- }
- /**
- * Returns a promise the resolves when the job has completed (containing the return value of the job),
- * or rejects when the job has failed (containing the failedReason).
- *
- * @param queueEvents - Instance of QueueEvents.
- * @param ttl - Time in milliseconds to wait for job to finish before timing out.
- */
- async waitUntilFinished(queueEvents, ttl) {
- await this.queue.waitUntilReady();
- const jobId = this.id;
- return new Promise(async (resolve, reject) => {
- let timeout;
- if (ttl) {
- timeout = setTimeout(() => onFailed(
- /* eslint-disable max-len */
- `Job wait ${this.name} timed out before finishing, no finish notification arrived after ${ttl}ms (id=${jobId})`), ttl);
- }
- function onCompleted(args) {
- removeListeners();
- resolve(args.returnvalue);
- }
- function onFailed(args) {
- removeListeners();
- reject(new Error(args.failedReason || args));
- }
- const completedEvent = `completed:${jobId}`;
- const failedEvent = `failed:${jobId}`;
- queueEvents.on(completedEvent, onCompleted);
- queueEvents.on(failedEvent, onFailed);
- this.queue.on('closing', onFailed);
- const removeListeners = () => {
- clearInterval(timeout);
- queueEvents.removeListener(completedEvent, onCompleted);
- queueEvents.removeListener(failedEvent, onFailed);
- this.queue.removeListener('closing', onFailed);
- };
- // Poll once right now to see if the job has already finished. The job may have been completed before we were able
- // to register the event handlers on the QueueEvents, so we check here to make sure we're not waiting for an event
- // that has already happened. We block checking the job until the queue events object is actually listening to
- // Redis so there's no chance that it will miss events.
- await queueEvents.waitUntilReady();
- const [status, result] = (await this.scripts.isFinished(jobId, true));
- const finished = status != 0;
- if (finished) {
- if (status == -1 || status == 2) {
- onFailed({ failedReason: result });
- }
- else {
- onCompleted({ returnvalue: getReturnValue(result) });
- }
- }
- });
- }
- /**
- * Moves the job to the delay set.
- *
- * @param timestamp - timestamp when the job should be moved back to "wait"
- * @param token - token to check job is locked by current worker
- * @returns
- */
- async moveToDelayed(timestamp, token) {
- const now = Date.now();
- const delay = timestamp - now;
- const finalDelay = delay > 0 ? delay : 0;
- await this.scripts.moveToDelayed(this.id, now, finalDelay, token, {
- skipAttempt: true,
- });
- this.delay = finalDelay;
- this.recordJobMetrics('delayed');
- }
- /**
- * Moves the job to the waiting-children set.
- *
- * @param token - Token to check job is locked by current worker
- * @param opts - The options bag for moving a job to waiting-children.
- * @returns true if the job was moved
- */
- async moveToWaitingChildren(token, opts = {}) {
- const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts);
- if (movedToWaitingChildren) {
- this.recordJobMetrics('waiting-children');
- }
- return movedToWaitingChildren;
- }
- /**
- * Promotes a delayed job so that it starts to be processed as soon as possible.
- */
- async promote() {
- const jobId = this.id;
- await this.scripts.promote(jobId);
- this.delay = 0;
- }
- /**
- * Attempts to retry the job. Only a job that has failed or completed can be retried.
- *
- * @param state - completed / failed
- * @param opts - options to retry a job
- * @returns A promise that resolves when the job has been successfully moved to the wait queue.
- * The queue emits a waiting event when the job is successfully moved.
- * @throws Will throw an error if the job does not exist, is locked, or is not in the expected state.
- */
- async retry(state = 'failed', opts = {}) {
- await this.scripts.reprocessJob(this, state, opts);
- this.failedReason = null;
- this.finishedOn = null;
- this.processedOn = null;
- this.returnvalue = null;
- if (opts.resetAttemptsMade) {
- this.attemptsMade = 0;
- }
- if (opts.resetAttemptsStarted) {
- this.attemptsStarted = 0;
- }
- }
- /**
- * Marks a job to not be retried if it fails (even if attempts has been configured)
- * @deprecated use UnrecoverableError
- */
- discard() {
- this.discarded = true;
- }
- async isInZSet(set) {
- const client = await this.queue.client;
- const score = await client.zscore(this.queue.toKey(set), this.id);
- return score !== null;
- }
- async isInList(list) {
- return this.scripts.isJobInList(this.queue.toKey(list), this.id);
- }
- /**
- * Adds the job to Redis.
- *
- * @param client - The Redis client to use for adding the job.
- * @param parentOpts - Options for the parent-child relationship.
- * @returns The job ID
- */
- addJob(client, parentOpts) {
- const jobData = this.asJSON();
- this.validateOptions(jobData);
- return this.scripts.addJob(client, jobData, jobData.opts, this.id, parentOpts);
- }
- /**
- * Removes a deduplication key if job is still the cause of deduplication.
- * @returns true if the deduplication key was removed.
- */
- async removeDeduplicationKey() {
- if (this.deduplicationId) {
- const result = await this.scripts.removeDeduplicationKey(this.deduplicationId, this.id);
- return result > 0;
- }
- return false;
- }
- validateOptions(jobData) {
- var _a, _b, _c, _d, _e, _f, _g, _h;
- const exclusiveOptions = [
- 'removeDependencyOnFailure',
- 'failParentOnFailure',
- 'continueParentOnFailure',
- 'ignoreDependencyOnFailure',
- ];
- const exceedLimit = this.opts.sizeLimit &&
- lengthInUtf8Bytes(jobData.data) > this.opts.sizeLimit;
- if (exceedLimit) {
- throw new Error(`The size of job ${this.name} exceeds the limit ${this.opts.sizeLimit} bytes`);
- }
- if (this.opts.delay && this.opts.repeat && !((_a = this.opts.repeat) === null || _a === void 0 ? void 0 : _a.count)) {
- throw new Error(`Delay and repeat options cannot be used together`);
- }
- const enabledExclusiveOptions = exclusiveOptions.filter(opt => this.opts[opt]);
- if (enabledExclusiveOptions.length > 1) {
- const optionsList = enabledExclusiveOptions.join(', ');
- throw new Error(`The following options cannot be used together: ${optionsList}`);
- }
- if ((_b = this.opts) === null || _b === void 0 ? void 0 : _b.jobId) {
- if (`${parseInt(this.opts.jobId, 10)}` === ((_c = this.opts) === null || _c === void 0 ? void 0 : _c.jobId)) {
- throw new Error('Custom Id cannot be integers');
- }
- // TODO: replace this check in next breaking check with include(':')
- // By using split we are still keeping compatibility with old repeatable jobs
- if (((_d = this.opts) === null || _d === void 0 ? void 0 : _d.jobId.includes(':')) &&
- ((_f = (_e = this.opts) === null || _e === void 0 ? void 0 : _e.jobId) === null || _f === void 0 ? void 0 : _f.split(':').length) !== 3) {
- throw new Error('Custom Id cannot contain :');
- }
- }
- if (this.opts.priority) {
- if (Math.trunc(this.opts.priority) !== this.opts.priority) {
- throw new Error(`Priority should not be float`);
- }
- if (this.opts.priority > PRIORITY_LIMIT) {
- throw new Error(`Priority should be between 0 and ${PRIORITY_LIMIT}`);
- }
- }
- if (this.opts.deduplication) {
- if (!((_g = this.opts.deduplication) === null || _g === void 0 ? void 0 : _g.id)) {
- throw new Error('Deduplication id must be provided');
- }
- if (this.parentKey) {
- throw new Error('Deduplication and parent options cannot be used together');
- }
- }
- // TODO: remove in v6
- if (this.opts.debounce) {
- if (!((_h = this.opts.debounce) === null || _h === void 0 ? void 0 : _h.id)) {
- throw new Error('Debounce id must be provided');
- }
- if (this.parentKey) {
- throw new Error('Debounce and parent options cannot be used together');
- }
- }
- if (typeof this.opts.backoff === 'object' &&
- typeof this.opts.backoff.jitter === 'number') {
- if (this.opts.backoff.jitter < 0 || this.opts.backoff.jitter > 1) {
- throw new Error(`Jitter should be between 0 and 1`);
- }
- }
- }
- updateStacktrace(err) {
- this.stacktrace = this.stacktrace || [];
- if (err === null || err === void 0 ? void 0 : err.stack) {
- this.stacktrace.push(err.stack);
- if (this.opts.stackTraceLimit === 0) {
- this.stacktrace = [];
- }
- else if (this.opts.stackTraceLimit) {
- this.stacktrace = this.stacktrace.slice(-this.opts.stackTraceLimit);
- }
- }
- }
- setSpanJobAttributes(span) {
- span === null || span === void 0 ? void 0 : span.setAttributes({
- [TelemetryAttributes.JobName]: this.name,
- [TelemetryAttributes.JobId]: this.id,
- });
- }
- }
- function getTraces(stacktrace) {
- if (!stacktrace) {
- return [];
- }
- const traces = tryCatch(JSON.parse, JSON, [stacktrace]);
- if (traces === errorObject || !(traces instanceof Array)) {
- return [];
- }
- else {
- return traces;
- }
- }
- function getReturnValue(_value) {
- const value = tryCatch(JSON.parse, JSON, [_value]);
- if (value !== errorObject) {
- return value;
- }
- else {
- logger('corrupted returnvalue: ' + _value, value);
- }
- }
- //# sourceMappingURL=job.js.map
|