job.js 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. import { __rest } from "tslib";
  2. import { debuglog } from 'util';
  3. import { errorObject, isEmpty, getParentKey, lengthInUtf8Bytes, optsDecodeMap, optsEncodeMap, parseObjectValues, tryCatch, removeUndefinedFields, } from '../utils';
  4. import { createScripts } from '../utils/create-scripts';
  5. import { Backoffs } from './backoffs';
  6. import { UnrecoverableError } from './errors/unrecoverable-error';
  7. import { SpanKind, TelemetryAttributes, MetricNames } from '../enums';
  8. const logger = debuglog('bull');
  9. export const PRIORITY_LIMIT = 2 ** 21;
  10. /**
  11. * Job
  12. *
  13. * This class represents a Job in the queue. Normally job are implicitly created when
  14. * you add a job to the queue with methods such as Queue.addJob( ... )
  15. *
  16. * A Job instance is also passed to the Worker's process function.
  17. *
  18. */
  19. export class Job {
  20. constructor(queue,
  21. /**
  22. * The name of the Job
  23. */
  24. name,
  25. /**
  26. * The payload for this job.
  27. */
  28. data,
  29. /**
  30. * The options object for this job.
  31. */
  32. opts = {}, id) {
  33. this.queue = queue;
  34. this.name = name;
  35. this.data = data;
  36. this.opts = opts;
  37. this.id = id;
  38. /**
  39. * The progress a job has performed so far.
  40. * @defaultValue 0
  41. */
  42. this.progress = 0;
  43. /**
  44. * The value returned by the processor when processing this job.
  45. * @defaultValue null
  46. */
  47. this.returnvalue = null;
  48. /**
  49. * Stacktrace for the error (for failed jobs).
  50. * @defaultValue null
  51. */
  52. this.stacktrace = null;
  53. /**
  54. * An amount of milliseconds to wait until this job can be processed.
  55. * @defaultValue 0
  56. */
  57. this.delay = 0;
  58. /**
  59. * Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
  60. * using priorities has a slight impact on performance,
  61. * so do not use it if not required.
  62. * @defaultValue 0
  63. */
  64. this.priority = 0;
  65. /**
  66. * Number of attempts when job is moved to active.
  67. * @defaultValue 0
  68. */
  69. this.attemptsStarted = 0;
  70. /**
  71. * Number of attempts after the job has failed.
  72. * @defaultValue 0
  73. */
  74. this.attemptsMade = 0;
  75. /**
  76. * Number of times where job has stalled.
  77. * @defaultValue 0
  78. */
  79. this.stalledCounter = 0;
  80. const _a = this.opts, { repeatJobKey } = _a, restOpts = __rest(_a, ["repeatJobKey"]);
  81. this.opts = Object.assign({
  82. attempts: 0,
  83. }, restOpts);
  84. this.delay = this.opts.delay;
  85. this.priority = this.opts.priority || 0;
  86. this.repeatJobKey = repeatJobKey;
  87. this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
  88. this.opts.backoff = Backoffs.normalize(opts.backoff);
  89. this.parentKey = getParentKey(opts.parent);
  90. if (opts.parent) {
  91. this.parent = { id: opts.parent.id, queueKey: opts.parent.queue };
  92. if (opts.failParentOnFailure) {
  93. this.parent.fpof = true;
  94. }
  95. if (opts.removeDependencyOnFailure) {
  96. this.parent.rdof = true;
  97. }
  98. if (opts.ignoreDependencyOnFailure) {
  99. this.parent.idof = true;
  100. }
  101. if (opts.continueParentOnFailure) {
  102. this.parent.cpof = true;
  103. }
  104. }
  105. this.debounceId = opts.debounce ? opts.debounce.id : undefined;
  106. this.deduplicationId = opts.deduplication
  107. ? opts.deduplication.id
  108. : this.debounceId;
  109. this.toKey = queue.toKey.bind(queue);
  110. this.createScripts();
  111. this.queueQualifiedName = queue.qualifiedName;
  112. }
  113. /**
  114. * Creates a new job and adds it to the queue.
  115. *
  116. * @param queue - the queue where to add the job.
  117. * @param name - the name of the job.
  118. * @param data - the payload of the job.
  119. * @param opts - the options bag for this job.
  120. * @returns The created Job instance
  121. */
  122. static async create(queue, name, data, opts) {
  123. const client = await queue.client;
  124. const job = new this(queue, name, data, opts, opts && opts.jobId);
  125. job.id = await job.addJob(client, {
  126. parentKey: job.parentKey,
  127. parentDependenciesKey: job.parentKey
  128. ? `${job.parentKey}:dependencies`
  129. : '',
  130. });
  131. return job;
  132. }
  133. /**
  134. * Creates a bulk of jobs and adds them atomically to the given queue.
  135. *
  136. * @param queue - the queue where to add the jobs.
  137. * @param jobs - an array of jobs to be added to the queue.
  138. * @returns The created Job instances
  139. */
  140. static async createBulk(queue, jobs) {
  141. const client = await queue.client;
  142. const jobInstances = jobs.map(job => { var _a; return new this(queue, job.name, job.data, job.opts, (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId); });
  143. const pipeline = client.pipeline();
  144. for (const job of jobInstances) {
  145. job.addJob(pipeline, {
  146. parentKey: job.parentKey,
  147. parentDependenciesKey: job.parentKey
  148. ? `${job.parentKey}:dependencies`
  149. : '',
  150. });
  151. }
  152. const results = (await pipeline.exec());
  153. for (let index = 0; index < results.length; ++index) {
  154. const [err, id] = results[index];
  155. if (err) {
  156. throw err;
  157. }
  158. jobInstances[index].id = id;
  159. }
  160. return jobInstances;
  161. }
  162. /**
  163. * Instantiates a Job from a JobJsonRaw object (coming from a deserialized JSON object)
  164. *
  165. * @param queue - the queue where the job belongs to.
  166. * @param json - the plain object containing the job.
  167. * @param jobId - an optional job id (overrides the id coming from the JSON object)
  168. * @returns A Job instance reconstructed from the JSON data
  169. */
  170. static fromJSON(queue, json, jobId) {
  171. const data = JSON.parse(json.data || '{}');
  172. const opts = Job.optsFromJSON(json.opts);
  173. const job = new this(queue, json.name, data, opts, json.id || jobId);
  174. job.progress = JSON.parse(json.progress || '0');
  175. job.delay = parseInt(json.delay);
  176. job.priority = parseInt(json.priority);
  177. job.timestamp = parseInt(json.timestamp);
  178. if (json.finishedOn) {
  179. job.finishedOn = parseInt(json.finishedOn);
  180. }
  181. if (json.processedOn) {
  182. job.processedOn = parseInt(json.processedOn);
  183. }
  184. if (json.rjk) {
  185. job.repeatJobKey = json.rjk;
  186. }
  187. if (json.deid) {
  188. job.debounceId = json.deid;
  189. job.deduplicationId = json.deid;
  190. }
  191. if (json.failedReason) {
  192. job.failedReason = json.failedReason;
  193. }
  194. job.attemptsStarted = parseInt(json.ats || '0');
  195. job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0');
  196. job.stalledCounter = parseInt(json.stc || '0');
  197. if (json.defa) {
  198. job.deferredFailure = json.defa;
  199. }
  200. job.stacktrace = getTraces(json.stacktrace);
  201. if (typeof json.returnvalue === 'string') {
  202. job.returnvalue = getReturnValue(json.returnvalue);
  203. }
  204. if (json.parentKey) {
  205. job.parentKey = json.parentKey;
  206. }
  207. if (json.parent) {
  208. job.parent = JSON.parse(json.parent);
  209. }
  210. if (json.pb) {
  211. job.processedBy = json.pb;
  212. }
  213. if (json.nrjid) {
  214. job.nextRepeatableJobId = json.nrjid;
  215. }
  216. return job;
  217. }
  218. createScripts() {
  219. this.scripts = createScripts(this.queue);
  220. }
  221. static optsFromJSON(rawOpts, optsDecode = optsDecodeMap) {
  222. const opts = JSON.parse(rawOpts || '{}');
  223. const optionEntries = Object.entries(opts);
  224. const options = {};
  225. for (const item of optionEntries) {
  226. const [attributeName, value] = item;
  227. if (optsDecode[attributeName]) {
  228. options[optsDecode[attributeName]] =
  229. value;
  230. }
  231. else {
  232. if (attributeName === 'tm') {
  233. options.telemetry = Object.assign(Object.assign({}, options.telemetry), { metadata: value });
  234. }
  235. else if (attributeName === 'omc') {
  236. options.telemetry = Object.assign(Object.assign({}, options.telemetry), { omitContext: value });
  237. }
  238. else {
  239. options[attributeName] = value;
  240. }
  241. }
  242. }
  243. return options;
  244. }
  245. /**
  246. * Fetches a Job from the queue given the passed job id.
  247. *
  248. * @param queue - the queue where the job belongs to.
  249. * @param jobId - the job id.
  250. * @returns
  251. */
  252. static async fromId(queue, jobId) {
  253. // jobId can be undefined if moveJob returns undefined
  254. if (jobId) {
  255. const client = await queue.client;
  256. const jobData = await client.hgetall(queue.toKey(jobId));
  257. return isEmpty(jobData)
  258. ? undefined
  259. : this.fromJSON(queue, jobData, jobId);
  260. }
  261. }
  262. /**
  263. * addJobLog
  264. *
  265. * @param queue - A minimal queue instance
  266. * @param jobId - Job id
  267. * @param logRow - String with a row of log data to be logged
  268. * @param keepLogs - The optional amount of log entries to preserve
  269. *
  270. * @returns The total number of log entries for this job so far.
  271. */
  272. static addJobLog(queue, jobId, logRow, keepLogs) {
  273. const scripts = queue.scripts;
  274. return scripts.addLog(jobId, logRow, keepLogs);
  275. }
  276. toJSON() {
  277. const _a = this, { queue, scripts } = _a, withoutQueueAndScripts = __rest(_a, ["queue", "scripts"]);
  278. return withoutQueueAndScripts;
  279. }
  280. /**
  281. * Prepares a job to be serialized for storage in Redis.
  282. * @returns
  283. */
  284. asJSON() {
  285. return removeUndefinedFields({
  286. id: this.id,
  287. name: this.name,
  288. data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
  289. opts: Job.optsAsJSON(this.opts),
  290. parent: this.parent ? Object.assign({}, this.parent) : undefined,
  291. parentKey: this.parentKey,
  292. progress: this.progress,
  293. attemptsMade: this.attemptsMade,
  294. attemptsStarted: this.attemptsStarted,
  295. stalledCounter: this.stalledCounter,
  296. finishedOn: this.finishedOn,
  297. processedOn: this.processedOn,
  298. timestamp: this.timestamp,
  299. failedReason: JSON.stringify(this.failedReason),
  300. stacktrace: JSON.stringify(this.stacktrace),
  301. debounceId: this.debounceId,
  302. deduplicationId: this.deduplicationId,
  303. repeatJobKey: this.repeatJobKey,
  304. returnvalue: JSON.stringify(this.returnvalue),
  305. nrjid: this.nextRepeatableJobId,
  306. });
  307. }
  308. static optsAsJSON(opts = {}, optsEncode = optsEncodeMap) {
  309. const optionEntries = Object.entries(opts);
  310. const options = {};
  311. for (const [attributeName, value] of optionEntries) {
  312. if (typeof value === 'undefined') {
  313. continue;
  314. }
  315. if (attributeName in optsEncode) {
  316. const compressableAttribute = attributeName;
  317. const key = optsEncode[compressableAttribute];
  318. options[key] = value;
  319. }
  320. else {
  321. // Handle complex compressable fields separately
  322. if (attributeName === 'telemetry') {
  323. if (value.metadata !== undefined) {
  324. options.tm = value.metadata;
  325. }
  326. if (value.omitContext !== undefined) {
  327. options.omc = value.omitContext;
  328. }
  329. }
  330. else {
  331. options[attributeName] = value;
  332. }
  333. }
  334. }
  335. return options;
  336. }
  337. /**
  338. * Prepares a job to be passed to Sandbox.
  339. * @returns
  340. */
  341. asJSONSandbox() {
  342. return Object.assign(Object.assign({}, this.asJSON()), { queueName: this.queueName, queueQualifiedName: this.queueQualifiedName, prefix: this.prefix });
  343. }
  344. /**
  345. * Updates a job's data
  346. *
  347. * @param data - the data that will replace the current jobs data.
  348. */
  349. updateData(data) {
  350. this.data = data;
  351. return this.scripts.updateData(this, data);
  352. }
  353. /**
  354. * Updates a job's progress
  355. *
  356. * @param progress - number or object to be saved as progress.
  357. */
  358. async updateProgress(progress) {
  359. this.progress = progress;
  360. await this.scripts.updateProgress(this.id, progress);
  361. this.queue.emit('progress', this, progress);
  362. }
  363. /**
  364. * Logs one row of log data.
  365. *
  366. * @param logRow - string with log data to be logged.
  367. * @returns The total number of log entries for this job so far.
  368. */
  369. async log(logRow) {
  370. return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
  371. }
  372. /**
  373. * Removes child dependency from parent when child is not yet finished
  374. *
  375. * @returns True if the relationship existed and if it was removed.
  376. */
  377. async removeChildDependency() {
  378. const childDependencyIsRemoved = await this.scripts.removeChildDependency(this.id, this.parentKey);
  379. if (childDependencyIsRemoved) {
  380. this.parent = undefined;
  381. this.parentKey = undefined;
  382. return true;
  383. }
  384. return false;
  385. }
  386. /**
  387. * Clears job's logs
  388. *
  389. * @param keepLogs - the amount of log entries to preserve
  390. */
  391. async clearLogs(keepLogs) {
  392. const client = await this.queue.client;
  393. const logsKey = this.toKey(this.id) + ':logs';
  394. if (keepLogs) {
  395. await client.ltrim(logsKey, -keepLogs, -1);
  396. }
  397. else {
  398. await client.del(logsKey);
  399. }
  400. }
  401. /**
  402. * Completely remove the job from the queue.
  403. * Note, this call will throw an exception if the job
  404. * is being processed when the call is performed.
  405. *
  406. * @param opts - Options to remove a job
  407. */
  408. async remove({ removeChildren = true } = {}) {
  409. await this.queue.waitUntilReady();
  410. const queue = this.queue;
  411. const job = this;
  412. const removed = await this.scripts.remove(job.id, removeChildren);
  413. if (removed) {
  414. queue.emit('removed', job);
  415. }
  416. else {
  417. throw new Error(`Job ${this.id} could not be removed because it is locked by another worker`);
  418. }
  419. }
  420. /**
  421. * Remove all children from this job that are not yet processed,
  422. * in other words that are in any other state than completed, failed or active.
  423. *
  424. * @remarks
  425. * - Jobs with locks (most likely active) are ignored.
  426. * - This method can be slow if the number of children is large (\> 1000).
  427. */
  428. async removeUnprocessedChildren() {
  429. const jobId = this.id;
  430. await this.scripts.removeUnprocessedChildren(jobId);
  431. }
  432. /**
  433. * Extend the lock for this job.
  434. *
  435. * @param token - unique token for the lock
  436. * @param duration - lock duration in milliseconds
  437. */
  438. extendLock(token, duration) {
  439. return this.scripts.extendLock(this.id, token, duration);
  440. }
  441. /**
  442. * Moves a job to the completed queue.
  443. * Returned job to be used with Queue.prototype.nextJobFromJobData.
  444. *
  445. * @param returnValue - The jobs success message.
  446. * @param token - Worker token used to acquire completed job.
  447. * @param fetchNext - True when wanting to fetch the next job.
  448. * @returns Returns the jobData of the next job in the waiting queue or void.
  449. */
  450. async moveToCompleted(returnValue, token, fetchNext = true) {
  451. return this.queue.trace(SpanKind.INTERNAL, 'complete', this.queue.name, async (span) => {
  452. this.setSpanJobAttributes(span);
  453. await this.queue.waitUntilReady();
  454. this.returnvalue = returnValue || void 0;
  455. const stringifiedReturnValue = tryCatch(JSON.stringify, JSON, [
  456. returnValue,
  457. ]);
  458. if (stringifiedReturnValue === errorObject) {
  459. throw errorObject.value;
  460. }
  461. const args = this.scripts.moveToCompletedArgs(this, stringifiedReturnValue, this.opts.removeOnComplete, token, fetchNext);
  462. const result = await this.scripts.moveToFinished(this.id, args);
  463. this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
  464. this.attemptsMade += 1;
  465. this.recordJobMetrics('completed');
  466. return result;
  467. });
  468. }
  469. /**
  470. * Moves a job to the wait or prioritized state.
  471. *
  472. * @param token - Worker token used to acquire completed job.
  473. * @returns Returns pttl.
  474. */
  475. async moveToWait(token) {
  476. const result = await this.scripts.moveJobFromActiveToWait(this.id, token);
  477. this.recordJobMetrics('waiting');
  478. return result;
  479. }
  480. async shouldRetryJob(err) {
  481. if (this.attemptsMade + 1 < this.opts.attempts &&
  482. !this.discarded &&
  483. !(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')) {
  484. const opts = this.queue.opts;
  485. const delay = await Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy);
  486. return [delay == -1 ? false : true, delay == -1 ? 0 : delay];
  487. }
  488. else {
  489. return [false, 0];
  490. }
  491. }
  492. /**
  493. * Moves a job to the failed queue.
  494. *
  495. * @param err - the jobs error message.
  496. * @param token - token to check job is locked by current worker
  497. * @param fetchNext - true when wanting to fetch the next job
  498. * @returns Returns the jobData of the next job in the waiting queue or void.
  499. */
  500. async moveToFailed(err, token, fetchNext = false) {
  501. this.failedReason = err === null || err === void 0 ? void 0 : err.message;
  502. // Check if an automatic retry should be performed
  503. const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);
  504. return this.queue.trace(SpanKind.INTERNAL, this.getSpanOperation(shouldRetry, retryDelay), this.queue.name, async (span, dstPropagationMetadata) => {
  505. var _a, _b;
  506. this.setSpanJobAttributes(span);
  507. let tm;
  508. if (!((_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.omitContext) && dstPropagationMetadata) {
  509. tm = dstPropagationMetadata;
  510. }
  511. let result;
  512. this.updateStacktrace(err);
  513. const fieldsToUpdate = {
  514. failedReason: this.failedReason,
  515. stacktrace: JSON.stringify(this.stacktrace),
  516. tm,
  517. };
  518. let finishedOn;
  519. if (shouldRetry) {
  520. if (retryDelay) {
  521. // Retry with delay
  522. result = await this.scripts.moveToDelayed(this.id, Date.now(), retryDelay, token, { fieldsToUpdate, fetchNext });
  523. this.recordJobMetrics('delayed');
  524. }
  525. else {
  526. // Retry immediately
  527. result = await this.scripts.retryJob(this.id, this.opts.lifo, token, {
  528. fieldsToUpdate,
  529. });
  530. this.recordJobMetrics('retried');
  531. }
  532. }
  533. else {
  534. const args = this.scripts.moveToFailedArgs(this, this.failedReason, this.opts.removeOnFail, token, fetchNext, fieldsToUpdate);
  535. result = await this.scripts.moveToFinished(this.id, args);
  536. finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
  537. // Only record failed metrics when job is not retrying
  538. this.recordJobMetrics('failed');
  539. }
  540. if (finishedOn && typeof finishedOn === 'number') {
  541. this.finishedOn = finishedOn;
  542. }
  543. if (retryDelay && typeof retryDelay === 'number') {
  544. this.delay = retryDelay;
  545. }
  546. this.attemptsMade += 1;
  547. return result;
  548. });
  549. }
  550. getSpanOperation(shouldRetry, retryDelay) {
  551. if (shouldRetry) {
  552. if (retryDelay) {
  553. return 'delay';
  554. }
  555. return 'retry';
  556. }
  557. return 'fail';
  558. }
  559. /**
  560. * Records job metrics if a meter is configured in telemetry options.
  561. *
  562. * @param status - The job status
  563. */
  564. recordJobMetrics(status) {
  565. var _a, _b;
  566. const meter = (_b = (_a = this.queue.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.meter;
  567. if (!meter) {
  568. return;
  569. }
  570. const attributes = {
  571. [TelemetryAttributes.QueueName]: this.queue.name,
  572. [TelemetryAttributes.JobName]: this.name,
  573. [TelemetryAttributes.JobStatus]: status,
  574. };
  575. // Record counter metric based on status
  576. const statusToCounterName = {
  577. completed: MetricNames.JobsCompleted,
  578. failed: MetricNames.JobsFailed,
  579. delayed: MetricNames.JobsDelayed,
  580. retried: MetricNames.JobsRetried,
  581. waiting: MetricNames.JobsWaiting,
  582. 'waiting-children': MetricNames.JobsWaitingChildren,
  583. };
  584. const counterName = statusToCounterName[status];
  585. const counter = meter.createCounter(counterName, {
  586. description: `Number of jobs ${status}`,
  587. unit: '1',
  588. });
  589. counter.add(1, attributes);
  590. // Record duration histogram if processedOn is available
  591. if (this.processedOn) {
  592. const duration = Date.now() - this.processedOn;
  593. const histogram = meter.createHistogram(MetricNames.JobDuration, {
  594. description: 'Job processing duration',
  595. unit: 'ms',
  596. });
  597. histogram.record(duration, attributes);
  598. }
  599. }
  600. /**
  601. * @returns true if the job has completed.
  602. */
  603. isCompleted() {
  604. return this.isInZSet('completed');
  605. }
  606. /**
  607. * @returns true if the job has failed.
  608. */
  609. isFailed() {
  610. return this.isInZSet('failed');
  611. }
  612. /**
  613. * @returns true if the job is delayed.
  614. */
  615. isDelayed() {
  616. return this.isInZSet('delayed');
  617. }
  618. /**
  619. * @returns true if the job is waiting for children.
  620. */
  621. isWaitingChildren() {
  622. return this.isInZSet('waiting-children');
  623. }
  624. /**
  625. * @returns true of the job is active.
  626. */
  627. isActive() {
  628. return this.isInList('active');
  629. }
  630. /**
  631. * @returns true if the job is waiting.
  632. */
  633. async isWaiting() {
  634. return (await this.isInList('wait')) || (await this.isInList('paused'));
  635. }
  636. /**
  637. * @returns the queue name this job belongs to.
  638. */
  639. get queueName() {
  640. return this.queue.name;
  641. }
  642. /**
  643. * @returns the prefix that is used.
  644. */
  645. get prefix() {
  646. return this.queue.opts.prefix;
  647. }
  648. /**
  649. * Get current state.
  650. *
  651. * @returns Returns one of these values:
  652. * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
  653. */
  654. getState() {
  655. return this.scripts.getState(this.id);
  656. }
  657. /**
  658. * Change delay of a delayed job.
  659. *
  660. * Reschedules a delayed job by setting a new delay from the current time.
  661. * For example, calling changeDelay(5000) will reschedule the job to execute
  662. * 5000 milliseconds (5 seconds) from now, regardless of the original delay.
  663. *
  664. * @param delay - milliseconds from now when the job should be processed.
  665. * @returns void
  666. * @throws JobNotExist
  667. * This exception is thrown if jobId is missing.
  668. * @throws JobNotInState
  669. * This exception is thrown if job is not in delayed state.
  670. */
  671. async changeDelay(delay) {
  672. await this.scripts.changeDelay(this.id, delay);
  673. this.delay = delay;
  674. }
  675. /**
  676. * Change job priority.
  677. *
  678. * @param opts - options containing priority and lifo values.
  679. * @returns void
  680. */
  681. async changePriority(opts) {
  682. await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
  683. this.priority = opts.priority || 0;
  684. }
  685. /**
  686. * Get this jobs children result values if any.
  687. *
  688. * @returns Object mapping children job keys with their values.
  689. */
  690. async getChildrenValues() {
  691. const client = await this.queue.client;
  692. const result = (await client.hgetall(this.toKey(`${this.id}:processed`)));
  693. if (result) {
  694. return parseObjectValues(result);
  695. }
  696. }
  697. /**
  698. * Retrieves the failures of child jobs that were explicitly ignored while using ignoreDependencyOnFailure option.
  699. * This method is useful for inspecting which child jobs were intentionally ignored when an error occurred.
  700. * @see {@link https://docs.bullmq.io/guide/flows/ignore-dependency}
  701. *
  702. * @returns Object mapping children job keys with their failure values.
  703. */
  704. async getIgnoredChildrenFailures() {
  705. const client = await this.queue.client;
  706. return client.hgetall(this.toKey(`${this.id}:failed`));
  707. }
  708. /**
  709. * Get job's children failure values that were ignored if any.
  710. *
  711. * @deprecated This method is deprecated and will be removed in v6. Use getIgnoredChildrenFailures instead.
  712. *
  713. * @returns Object mapping children job keys with their failure values.
  714. */
  715. async getFailedChildrenValues() {
  716. const client = await this.queue.client;
  717. return client.hgetall(this.toKey(`${this.id}:failed`));
  718. }
  719. /**
  720. * Get children job keys if this job is a parent and has children.
  721. * @remarks
  722. * Count options before Redis v7.2 works as expected with any quantity of entries
  723. * on processed/unprocessed dependencies, since v7.2 you must consider that count
  724. * won't have any effect until processed/unprocessed dependencies have a length
  725. * greater than 127
  726. * @see {@link https://redis.io/docs/management/optimization/memory-optimization/#redis--72}
  727. * @see {@link https://docs.bullmq.io/guide/flows#getters}
  728. * @returns dependencies separated by processed, unprocessed, ignored and failed.
  729. */
  730. async getDependencies(opts = {}) {
  731. const client = await this.queue.client;
  732. const multi = client.multi();
  733. if (!opts.processed && !opts.unprocessed && !opts.ignored && !opts.failed) {
  734. multi.hgetall(this.toKey(`${this.id}:processed`));
  735. multi.smembers(this.toKey(`${this.id}:dependencies`));
  736. multi.hgetall(this.toKey(`${this.id}:failed`));
  737. multi.zrange(this.toKey(`${this.id}:unsuccessful`), 0, -1);
  738. const [[err1, processed], [err2, unprocessed], [err3, ignored], [err4, failed],] = (await multi.exec());
  739. return {
  740. processed: parseObjectValues(processed),
  741. unprocessed,
  742. failed,
  743. ignored,
  744. };
  745. }
  746. else {
  747. const defaultOpts = {
  748. cursor: 0,
  749. count: 20,
  750. };
  751. const childrenResultOrder = [];
  752. if (opts.processed) {
  753. childrenResultOrder.push('processed');
  754. const processedOpts = Object.assign(Object.assign({}, defaultOpts), opts.processed);
  755. multi.hscan(this.toKey(`${this.id}:processed`), processedOpts.cursor, 'COUNT', processedOpts.count);
  756. }
  757. if (opts.unprocessed) {
  758. childrenResultOrder.push('unprocessed');
  759. const unprocessedOpts = Object.assign(Object.assign({}, defaultOpts), opts.unprocessed);
  760. multi.sscan(this.toKey(`${this.id}:dependencies`), unprocessedOpts.cursor, 'COUNT', unprocessedOpts.count);
  761. }
  762. if (opts.ignored) {
  763. childrenResultOrder.push('ignored');
  764. const ignoredOpts = Object.assign(Object.assign({}, defaultOpts), opts.ignored);
  765. multi.hscan(this.toKey(`${this.id}:failed`), ignoredOpts.cursor, 'COUNT', ignoredOpts.count);
  766. }
  767. let failedCursor;
  768. if (opts.failed) {
  769. childrenResultOrder.push('failed');
  770. const failedOpts = Object.assign(Object.assign({}, defaultOpts), opts.failed);
  771. failedCursor = failedOpts.cursor + failedOpts.count;
  772. multi.zrange(this.toKey(`${this.id}:unsuccessful`), failedOpts.cursor, failedOpts.count - 1);
  773. }
  774. const results = (await multi.exec());
  775. let processedCursor, processed, unprocessedCursor, unprocessed, failed, ignoredCursor, ignored;
  776. childrenResultOrder.forEach((key, index) => {
  777. switch (key) {
  778. case 'processed': {
  779. processedCursor = results[index][1][0];
  780. const rawProcessed = results[index][1][1];
  781. const transformedProcessed = {};
  782. for (let ind = 0; ind < rawProcessed.length; ++ind) {
  783. if (ind % 2) {
  784. transformedProcessed[rawProcessed[ind - 1]] = JSON.parse(rawProcessed[ind]);
  785. }
  786. }
  787. processed = transformedProcessed;
  788. break;
  789. }
  790. case 'failed': {
  791. failed = results[index][1];
  792. break;
  793. }
  794. case 'ignored': {
  795. ignoredCursor = results[index][1][0];
  796. const rawIgnored = results[index][1][1];
  797. const transformedIgnored = {};
  798. for (let ind = 0; ind < rawIgnored.length; ++ind) {
  799. if (ind % 2) {
  800. transformedIgnored[rawIgnored[ind - 1]] = rawIgnored[ind];
  801. }
  802. }
  803. ignored = transformedIgnored;
  804. break;
  805. }
  806. case 'unprocessed': {
  807. unprocessedCursor = results[index][1][0];
  808. unprocessed = results[index][1][1];
  809. break;
  810. }
  811. }
  812. });
  813. return Object.assign(Object.assign(Object.assign(Object.assign({}, (processedCursor
  814. ? {
  815. processed,
  816. nextProcessedCursor: Number(processedCursor),
  817. }
  818. : {})), (ignoredCursor
  819. ? {
  820. ignored,
  821. nextIgnoredCursor: Number(ignoredCursor),
  822. }
  823. : {})), (failedCursor
  824. ? {
  825. failed,
  826. nextFailedCursor: failedCursor,
  827. }
  828. : {})), (unprocessedCursor
  829. ? { unprocessed, nextUnprocessedCursor: Number(unprocessedCursor) }
  830. : {}));
  831. }
  832. }
  833. /**
  834. * Get children job counts if this job is a parent and has children.
  835. *
  836. * @returns dependencies count separated by processed, unprocessed, ignored and failed.
  837. */
  838. async getDependenciesCount(opts = {}) {
  839. const types = [];
  840. Object.entries(opts).forEach(([key, value]) => {
  841. if (value) {
  842. types.push(key);
  843. }
  844. });
  845. const finalTypes = types.length
  846. ? types
  847. : ['processed', 'unprocessed', 'ignored', 'failed'];
  848. const responses = await this.scripts.getDependencyCounts(this.id, finalTypes);
  849. const counts = {};
  850. responses.forEach((res, index) => {
  851. counts[`${finalTypes[index]}`] = res || 0;
  852. });
  853. return counts;
  854. }
  855. /**
  856. * Returns a promise the resolves when the job has completed (containing the return value of the job),
  857. * or rejects when the job has failed (containing the failedReason).
  858. *
  859. * @param queueEvents - Instance of QueueEvents.
  860. * @param ttl - Time in milliseconds to wait for job to finish before timing out.
  861. */
  862. async waitUntilFinished(queueEvents, ttl) {
  863. await this.queue.waitUntilReady();
  864. const jobId = this.id;
  865. return new Promise(async (resolve, reject) => {
  866. let timeout;
  867. if (ttl) {
  868. timeout = setTimeout(() => onFailed(
  869. /* eslint-disable max-len */
  870. `Job wait ${this.name} timed out before finishing, no finish notification arrived after ${ttl}ms (id=${jobId})`), ttl);
  871. }
  872. function onCompleted(args) {
  873. removeListeners();
  874. resolve(args.returnvalue);
  875. }
  876. function onFailed(args) {
  877. removeListeners();
  878. reject(new Error(args.failedReason || args));
  879. }
  880. const completedEvent = `completed:${jobId}`;
  881. const failedEvent = `failed:${jobId}`;
  882. queueEvents.on(completedEvent, onCompleted);
  883. queueEvents.on(failedEvent, onFailed);
  884. this.queue.on('closing', onFailed);
  885. const removeListeners = () => {
  886. clearInterval(timeout);
  887. queueEvents.removeListener(completedEvent, onCompleted);
  888. queueEvents.removeListener(failedEvent, onFailed);
  889. this.queue.removeListener('closing', onFailed);
  890. };
  891. // Poll once right now to see if the job has already finished. The job may have been completed before we were able
  892. // to register the event handlers on the QueueEvents, so we check here to make sure we're not waiting for an event
  893. // that has already happened. We block checking the job until the queue events object is actually listening to
  894. // Redis so there's no chance that it will miss events.
  895. await queueEvents.waitUntilReady();
  896. const [status, result] = (await this.scripts.isFinished(jobId, true));
  897. const finished = status != 0;
  898. if (finished) {
  899. if (status == -1 || status == 2) {
  900. onFailed({ failedReason: result });
  901. }
  902. else {
  903. onCompleted({ returnvalue: getReturnValue(result) });
  904. }
  905. }
  906. });
  907. }
  908. /**
  909. * Moves the job to the delay set.
  910. *
  911. * @param timestamp - timestamp when the job should be moved back to "wait"
  912. * @param token - token to check job is locked by current worker
  913. * @returns
  914. */
  915. async moveToDelayed(timestamp, token) {
  916. const now = Date.now();
  917. const delay = timestamp - now;
  918. const finalDelay = delay > 0 ? delay : 0;
  919. await this.scripts.moveToDelayed(this.id, now, finalDelay, token, {
  920. skipAttempt: true,
  921. });
  922. this.delay = finalDelay;
  923. this.recordJobMetrics('delayed');
  924. }
  925. /**
  926. * Moves the job to the waiting-children set.
  927. *
  928. * @param token - Token to check job is locked by current worker
  929. * @param opts - The options bag for moving a job to waiting-children.
  930. * @returns true if the job was moved
  931. */
  932. async moveToWaitingChildren(token, opts = {}) {
  933. const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts);
  934. if (movedToWaitingChildren) {
  935. this.recordJobMetrics('waiting-children');
  936. }
  937. return movedToWaitingChildren;
  938. }
  939. /**
  940. * Promotes a delayed job so that it starts to be processed as soon as possible.
  941. */
  942. async promote() {
  943. const jobId = this.id;
  944. await this.scripts.promote(jobId);
  945. this.delay = 0;
  946. }
  947. /**
  948. * Attempts to retry the job. Only a job that has failed or completed can be retried.
  949. *
  950. * @param state - completed / failed
  951. * @param opts - options to retry a job
  952. * @returns A promise that resolves when the job has been successfully moved to the wait queue.
  953. * The queue emits a waiting event when the job is successfully moved.
  954. * @throws Will throw an error if the job does not exist, is locked, or is not in the expected state.
  955. */
  956. async retry(state = 'failed', opts = {}) {
  957. await this.scripts.reprocessJob(this, state, opts);
  958. this.failedReason = null;
  959. this.finishedOn = null;
  960. this.processedOn = null;
  961. this.returnvalue = null;
  962. if (opts.resetAttemptsMade) {
  963. this.attemptsMade = 0;
  964. }
  965. if (opts.resetAttemptsStarted) {
  966. this.attemptsStarted = 0;
  967. }
  968. }
  969. /**
  970. * Marks a job to not be retried if it fails (even if attempts has been configured)
  971. * @deprecated use UnrecoverableError
  972. */
  973. discard() {
  974. this.discarded = true;
  975. }
  976. async isInZSet(set) {
  977. const client = await this.queue.client;
  978. const score = await client.zscore(this.queue.toKey(set), this.id);
  979. return score !== null;
  980. }
  981. async isInList(list) {
  982. return this.scripts.isJobInList(this.queue.toKey(list), this.id);
  983. }
  984. /**
  985. * Adds the job to Redis.
  986. *
  987. * @param client - The Redis client to use for adding the job.
  988. * @param parentOpts - Options for the parent-child relationship.
  989. * @returns The job ID
  990. */
  991. addJob(client, parentOpts) {
  992. const jobData = this.asJSON();
  993. this.validateOptions(jobData);
  994. return this.scripts.addJob(client, jobData, jobData.opts, this.id, parentOpts);
  995. }
  996. /**
  997. * Removes a deduplication key if job is still the cause of deduplication.
  998. * @returns true if the deduplication key was removed.
  999. */
  1000. async removeDeduplicationKey() {
  1001. if (this.deduplicationId) {
  1002. const result = await this.scripts.removeDeduplicationKey(this.deduplicationId, this.id);
  1003. return result > 0;
  1004. }
  1005. return false;
  1006. }
  1007. validateOptions(jobData) {
  1008. var _a, _b, _c, _d, _e, _f, _g, _h;
  1009. const exclusiveOptions = [
  1010. 'removeDependencyOnFailure',
  1011. 'failParentOnFailure',
  1012. 'continueParentOnFailure',
  1013. 'ignoreDependencyOnFailure',
  1014. ];
  1015. const exceedLimit = this.opts.sizeLimit &&
  1016. lengthInUtf8Bytes(jobData.data) > this.opts.sizeLimit;
  1017. if (exceedLimit) {
  1018. throw new Error(`The size of job ${this.name} exceeds the limit ${this.opts.sizeLimit} bytes`);
  1019. }
  1020. if (this.opts.delay && this.opts.repeat && !((_a = this.opts.repeat) === null || _a === void 0 ? void 0 : _a.count)) {
  1021. throw new Error(`Delay and repeat options cannot be used together`);
  1022. }
  1023. const enabledExclusiveOptions = exclusiveOptions.filter(opt => this.opts[opt]);
  1024. if (enabledExclusiveOptions.length > 1) {
  1025. const optionsList = enabledExclusiveOptions.join(', ');
  1026. throw new Error(`The following options cannot be used together: ${optionsList}`);
  1027. }
  1028. if ((_b = this.opts) === null || _b === void 0 ? void 0 : _b.jobId) {
  1029. if (`${parseInt(this.opts.jobId, 10)}` === ((_c = this.opts) === null || _c === void 0 ? void 0 : _c.jobId)) {
  1030. throw new Error('Custom Id cannot be integers');
  1031. }
  1032. // TODO: replace this check in next breaking check with include(':')
  1033. // By using split we are still keeping compatibility with old repeatable jobs
  1034. if (((_d = this.opts) === null || _d === void 0 ? void 0 : _d.jobId.includes(':')) &&
  1035. ((_f = (_e = this.opts) === null || _e === void 0 ? void 0 : _e.jobId) === null || _f === void 0 ? void 0 : _f.split(':').length) !== 3) {
  1036. throw new Error('Custom Id cannot contain :');
  1037. }
  1038. }
  1039. if (this.opts.priority) {
  1040. if (Math.trunc(this.opts.priority) !== this.opts.priority) {
  1041. throw new Error(`Priority should not be float`);
  1042. }
  1043. if (this.opts.priority > PRIORITY_LIMIT) {
  1044. throw new Error(`Priority should be between 0 and ${PRIORITY_LIMIT}`);
  1045. }
  1046. }
  1047. if (this.opts.deduplication) {
  1048. if (!((_g = this.opts.deduplication) === null || _g === void 0 ? void 0 : _g.id)) {
  1049. throw new Error('Deduplication id must be provided');
  1050. }
  1051. if (this.parentKey) {
  1052. throw new Error('Deduplication and parent options cannot be used together');
  1053. }
  1054. }
  1055. // TODO: remove in v6
  1056. if (this.opts.debounce) {
  1057. if (!((_h = this.opts.debounce) === null || _h === void 0 ? void 0 : _h.id)) {
  1058. throw new Error('Debounce id must be provided');
  1059. }
  1060. if (this.parentKey) {
  1061. throw new Error('Debounce and parent options cannot be used together');
  1062. }
  1063. }
  1064. if (typeof this.opts.backoff === 'object' &&
  1065. typeof this.opts.backoff.jitter === 'number') {
  1066. if (this.opts.backoff.jitter < 0 || this.opts.backoff.jitter > 1) {
  1067. throw new Error(`Jitter should be between 0 and 1`);
  1068. }
  1069. }
  1070. }
  1071. updateStacktrace(err) {
  1072. this.stacktrace = this.stacktrace || [];
  1073. if (err === null || err === void 0 ? void 0 : err.stack) {
  1074. this.stacktrace.push(err.stack);
  1075. if (this.opts.stackTraceLimit === 0) {
  1076. this.stacktrace = [];
  1077. }
  1078. else if (this.opts.stackTraceLimit) {
  1079. this.stacktrace = this.stacktrace.slice(-this.opts.stackTraceLimit);
  1080. }
  1081. }
  1082. }
  1083. setSpanJobAttributes(span) {
  1084. span === null || span === void 0 ? void 0 : span.setAttributes({
  1085. [TelemetryAttributes.JobName]: this.name,
  1086. [TelemetryAttributes.JobId]: this.id,
  1087. });
  1088. }
  1089. }
  1090. function getTraces(stacktrace) {
  1091. if (!stacktrace) {
  1092. return [];
  1093. }
  1094. const traces = tryCatch(JSON.parse, JSON, [stacktrace]);
  1095. if (traces === errorObject || !(traces instanceof Array)) {
  1096. return [];
  1097. }
  1098. else {
  1099. return traces;
  1100. }
  1101. }
  1102. function getReturnValue(_value) {
  1103. const value = tryCatch(JSON.parse, JSON, [_value]);
  1104. if (value !== errorObject) {
  1105. return value;
  1106. }
  1107. else {
  1108. logger('corrupted returnvalue: ' + _value, value);
  1109. }
  1110. }
  1111. //# sourceMappingURL=job.js.map