job.js 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Job = exports.PRIORITY_LIMIT = void 0;
  4. const tslib_1 = require("tslib");
  5. const util_1 = require("util");
  6. const utils_1 = require("../utils");
  7. const create_scripts_1 = require("../utils/create-scripts");
  8. const backoffs_1 = require("./backoffs");
  9. const unrecoverable_error_1 = require("./errors/unrecoverable-error");
  10. const enums_1 = require("../enums");
  11. const logger = (0, util_1.debuglog)('bull');
  12. exports.PRIORITY_LIMIT = 2 ** 21;
  13. /**
  14. * Job
  15. *
  16. * This class represents a Job in the queue. Normally job are implicitly created when
  17. * you add a job to the queue with methods such as Queue.addJob( ... )
  18. *
  19. * A Job instance is also passed to the Worker's process function.
  20. *
  21. */
  22. class Job {
  23. constructor(queue,
  24. /**
  25. * The name of the Job
  26. */
  27. name,
  28. /**
  29. * The payload for this job.
  30. */
  31. data,
  32. /**
  33. * The options object for this job.
  34. */
  35. opts = {}, id) {
  36. this.queue = queue;
  37. this.name = name;
  38. this.data = data;
  39. this.opts = opts;
  40. this.id = id;
  41. /**
  42. * The progress a job has performed so far.
  43. * @defaultValue 0
  44. */
  45. this.progress = 0;
  46. /**
  47. * The value returned by the processor when processing this job.
  48. * @defaultValue null
  49. */
  50. this.returnvalue = null;
  51. /**
  52. * Stacktrace for the error (for failed jobs).
  53. * @defaultValue null
  54. */
  55. this.stacktrace = null;
  56. /**
  57. * An amount of milliseconds to wait until this job can be processed.
  58. * @defaultValue 0
  59. */
  60. this.delay = 0;
  61. /**
  62. * Ranges from 0 (highest priority) to 2 097 152 (lowest priority). Note that
  63. * using priorities has a slight impact on performance,
  64. * so do not use it if not required.
  65. * @defaultValue 0
  66. */
  67. this.priority = 0;
  68. /**
  69. * Number of attempts when job is moved to active.
  70. * @defaultValue 0
  71. */
  72. this.attemptsStarted = 0;
  73. /**
  74. * Number of attempts after the job has failed.
  75. * @defaultValue 0
  76. */
  77. this.attemptsMade = 0;
  78. /**
  79. * Number of times where job has stalled.
  80. * @defaultValue 0
  81. */
  82. this.stalledCounter = 0;
  83. const _a = this.opts, { repeatJobKey } = _a, restOpts = tslib_1.__rest(_a, ["repeatJobKey"]);
  84. this.opts = Object.assign({
  85. attempts: 0,
  86. }, restOpts);
  87. this.delay = this.opts.delay;
  88. this.priority = this.opts.priority || 0;
  89. this.repeatJobKey = repeatJobKey;
  90. this.timestamp = opts.timestamp ? opts.timestamp : Date.now();
  91. this.opts.backoff = backoffs_1.Backoffs.normalize(opts.backoff);
  92. this.parentKey = (0, utils_1.getParentKey)(opts.parent);
  93. if (opts.parent) {
  94. this.parent = { id: opts.parent.id, queueKey: opts.parent.queue };
  95. if (opts.failParentOnFailure) {
  96. this.parent.fpof = true;
  97. }
  98. if (opts.removeDependencyOnFailure) {
  99. this.parent.rdof = true;
  100. }
  101. if (opts.ignoreDependencyOnFailure) {
  102. this.parent.idof = true;
  103. }
  104. if (opts.continueParentOnFailure) {
  105. this.parent.cpof = true;
  106. }
  107. }
  108. this.debounceId = opts.debounce ? opts.debounce.id : undefined;
  109. this.deduplicationId = opts.deduplication
  110. ? opts.deduplication.id
  111. : this.debounceId;
  112. this.toKey = queue.toKey.bind(queue);
  113. this.createScripts();
  114. this.queueQualifiedName = queue.qualifiedName;
  115. }
  116. /**
  117. * Creates a new job and adds it to the queue.
  118. *
  119. * @param queue - the queue where to add the job.
  120. * @param name - the name of the job.
  121. * @param data - the payload of the job.
  122. * @param opts - the options bag for this job.
  123. * @returns The created Job instance
  124. */
  125. static async create(queue, name, data, opts) {
  126. const client = await queue.client;
  127. const job = new this(queue, name, data, opts, opts && opts.jobId);
  128. job.id = await job.addJob(client, {
  129. parentKey: job.parentKey,
  130. parentDependenciesKey: job.parentKey
  131. ? `${job.parentKey}:dependencies`
  132. : '',
  133. });
  134. return job;
  135. }
  136. /**
  137. * Creates a bulk of jobs and adds them atomically to the given queue.
  138. *
  139. * @param queue - the queue where to add the jobs.
  140. * @param jobs - an array of jobs to be added to the queue.
  141. * @returns The created Job instances
  142. */
  143. static async createBulk(queue, jobs) {
  144. const client = await queue.client;
  145. const jobInstances = jobs.map(job => { var _a; return new this(queue, job.name, job.data, job.opts, (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId); });
  146. const pipeline = client.pipeline();
  147. for (const job of jobInstances) {
  148. job.addJob(pipeline, {
  149. parentKey: job.parentKey,
  150. parentDependenciesKey: job.parentKey
  151. ? `${job.parentKey}:dependencies`
  152. : '',
  153. });
  154. }
  155. const results = (await pipeline.exec());
  156. for (let index = 0; index < results.length; ++index) {
  157. const [err, id] = results[index];
  158. if (err) {
  159. throw err;
  160. }
  161. jobInstances[index].id = id;
  162. }
  163. return jobInstances;
  164. }
  165. /**
  166. * Instantiates a Job from a JobJsonRaw object (coming from a deserialized JSON object)
  167. *
  168. * @param queue - the queue where the job belongs to.
  169. * @param json - the plain object containing the job.
  170. * @param jobId - an optional job id (overrides the id coming from the JSON object)
  171. * @returns A Job instance reconstructed from the JSON data
  172. */
  173. static fromJSON(queue, json, jobId) {
  174. const data = JSON.parse(json.data || '{}');
  175. const opts = Job.optsFromJSON(json.opts);
  176. const job = new this(queue, json.name, data, opts, json.id || jobId);
  177. job.progress = JSON.parse(json.progress || '0');
  178. job.delay = parseInt(json.delay);
  179. job.priority = parseInt(json.priority);
  180. job.timestamp = parseInt(json.timestamp);
  181. if (json.finishedOn) {
  182. job.finishedOn = parseInt(json.finishedOn);
  183. }
  184. if (json.processedOn) {
  185. job.processedOn = parseInt(json.processedOn);
  186. }
  187. if (json.rjk) {
  188. job.repeatJobKey = json.rjk;
  189. }
  190. if (json.deid) {
  191. job.debounceId = json.deid;
  192. job.deduplicationId = json.deid;
  193. }
  194. if (json.failedReason) {
  195. job.failedReason = json.failedReason;
  196. }
  197. job.attemptsStarted = parseInt(json.ats || '0');
  198. job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0');
  199. job.stalledCounter = parseInt(json.stc || '0');
  200. if (json.defa) {
  201. job.deferredFailure = json.defa;
  202. }
  203. job.stacktrace = getTraces(json.stacktrace);
  204. if (typeof json.returnvalue === 'string') {
  205. job.returnvalue = getReturnValue(json.returnvalue);
  206. }
  207. if (json.parentKey) {
  208. job.parentKey = json.parentKey;
  209. }
  210. if (json.parent) {
  211. job.parent = JSON.parse(json.parent);
  212. }
  213. if (json.pb) {
  214. job.processedBy = json.pb;
  215. }
  216. if (json.nrjid) {
  217. job.nextRepeatableJobId = json.nrjid;
  218. }
  219. return job;
  220. }
  221. createScripts() {
  222. this.scripts = (0, create_scripts_1.createScripts)(this.queue);
  223. }
  224. static optsFromJSON(rawOpts, optsDecode = utils_1.optsDecodeMap) {
  225. const opts = JSON.parse(rawOpts || '{}');
  226. const optionEntries = Object.entries(opts);
  227. const options = {};
  228. for (const item of optionEntries) {
  229. const [attributeName, value] = item;
  230. if (optsDecode[attributeName]) {
  231. options[optsDecode[attributeName]] =
  232. value;
  233. }
  234. else {
  235. if (attributeName === 'tm') {
  236. options.telemetry = Object.assign(Object.assign({}, options.telemetry), { metadata: value });
  237. }
  238. else if (attributeName === 'omc') {
  239. options.telemetry = Object.assign(Object.assign({}, options.telemetry), { omitContext: value });
  240. }
  241. else {
  242. options[attributeName] = value;
  243. }
  244. }
  245. }
  246. return options;
  247. }
  248. /**
  249. * Fetches a Job from the queue given the passed job id.
  250. *
  251. * @param queue - the queue where the job belongs to.
  252. * @param jobId - the job id.
  253. * @returns
  254. */
  255. static async fromId(queue, jobId) {
  256. // jobId can be undefined if moveJob returns undefined
  257. if (jobId) {
  258. const client = await queue.client;
  259. const jobData = await client.hgetall(queue.toKey(jobId));
  260. return (0, utils_1.isEmpty)(jobData)
  261. ? undefined
  262. : this.fromJSON(queue, jobData, jobId);
  263. }
  264. }
  265. /**
  266. * addJobLog
  267. *
  268. * @param queue - A minimal queue instance
  269. * @param jobId - Job id
  270. * @param logRow - String with a row of log data to be logged
  271. * @param keepLogs - The optional amount of log entries to preserve
  272. *
  273. * @returns The total number of log entries for this job so far.
  274. */
  275. static addJobLog(queue, jobId, logRow, keepLogs) {
  276. const scripts = queue.scripts;
  277. return scripts.addLog(jobId, logRow, keepLogs);
  278. }
  279. toJSON() {
  280. const _a = this, { queue, scripts } = _a, withoutQueueAndScripts = tslib_1.__rest(_a, ["queue", "scripts"]);
  281. return withoutQueueAndScripts;
  282. }
  283. /**
  284. * Prepares a job to be serialized for storage in Redis.
  285. * @returns
  286. */
  287. asJSON() {
  288. return (0, utils_1.removeUndefinedFields)({
  289. id: this.id,
  290. name: this.name,
  291. data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
  292. opts: Job.optsAsJSON(this.opts),
  293. parent: this.parent ? Object.assign({}, this.parent) : undefined,
  294. parentKey: this.parentKey,
  295. progress: this.progress,
  296. attemptsMade: this.attemptsMade,
  297. attemptsStarted: this.attemptsStarted,
  298. stalledCounter: this.stalledCounter,
  299. finishedOn: this.finishedOn,
  300. processedOn: this.processedOn,
  301. timestamp: this.timestamp,
  302. failedReason: JSON.stringify(this.failedReason),
  303. stacktrace: JSON.stringify(this.stacktrace),
  304. debounceId: this.debounceId,
  305. deduplicationId: this.deduplicationId,
  306. repeatJobKey: this.repeatJobKey,
  307. returnvalue: JSON.stringify(this.returnvalue),
  308. nrjid: this.nextRepeatableJobId,
  309. });
  310. }
  311. static optsAsJSON(opts = {}, optsEncode = utils_1.optsEncodeMap) {
  312. const optionEntries = Object.entries(opts);
  313. const options = {};
  314. for (const [attributeName, value] of optionEntries) {
  315. if (typeof value === 'undefined') {
  316. continue;
  317. }
  318. if (attributeName in optsEncode) {
  319. const compressableAttribute = attributeName;
  320. const key = optsEncode[compressableAttribute];
  321. options[key] = value;
  322. }
  323. else {
  324. // Handle complex compressable fields separately
  325. if (attributeName === 'telemetry') {
  326. if (value.metadata !== undefined) {
  327. options.tm = value.metadata;
  328. }
  329. if (value.omitContext !== undefined) {
  330. options.omc = value.omitContext;
  331. }
  332. }
  333. else {
  334. options[attributeName] = value;
  335. }
  336. }
  337. }
  338. return options;
  339. }
  340. /**
  341. * Prepares a job to be passed to Sandbox.
  342. * @returns
  343. */
  344. asJSONSandbox() {
  345. return Object.assign(Object.assign({}, this.asJSON()), { queueName: this.queueName, queueQualifiedName: this.queueQualifiedName, prefix: this.prefix });
  346. }
  347. /**
  348. * Updates a job's data
  349. *
  350. * @param data - the data that will replace the current jobs data.
  351. */
  352. updateData(data) {
  353. this.data = data;
  354. return this.scripts.updateData(this, data);
  355. }
  356. /**
  357. * Updates a job's progress
  358. *
  359. * @param progress - number or object to be saved as progress.
  360. */
  361. async updateProgress(progress) {
  362. this.progress = progress;
  363. await this.scripts.updateProgress(this.id, progress);
  364. this.queue.emit('progress', this, progress);
  365. }
  366. /**
  367. * Logs one row of log data.
  368. *
  369. * @param logRow - string with log data to be logged.
  370. * @returns The total number of log entries for this job so far.
  371. */
  372. async log(logRow) {
  373. return Job.addJobLog(this.queue, this.id, logRow, this.opts.keepLogs);
  374. }
  375. /**
  376. * Removes child dependency from parent when child is not yet finished
  377. *
  378. * @returns True if the relationship existed and if it was removed.
  379. */
  380. async removeChildDependency() {
  381. const childDependencyIsRemoved = await this.scripts.removeChildDependency(this.id, this.parentKey);
  382. if (childDependencyIsRemoved) {
  383. this.parent = undefined;
  384. this.parentKey = undefined;
  385. return true;
  386. }
  387. return false;
  388. }
  389. /**
  390. * Clears job's logs
  391. *
  392. * @param keepLogs - the amount of log entries to preserve
  393. */
  394. async clearLogs(keepLogs) {
  395. const client = await this.queue.client;
  396. const logsKey = this.toKey(this.id) + ':logs';
  397. if (keepLogs) {
  398. await client.ltrim(logsKey, -keepLogs, -1);
  399. }
  400. else {
  401. await client.del(logsKey);
  402. }
  403. }
  404. /**
  405. * Completely remove the job from the queue.
  406. * Note, this call will throw an exception if the job
  407. * is being processed when the call is performed.
  408. *
  409. * @param opts - Options to remove a job
  410. */
  411. async remove({ removeChildren = true } = {}) {
  412. await this.queue.waitUntilReady();
  413. const queue = this.queue;
  414. const job = this;
  415. const removed = await this.scripts.remove(job.id, removeChildren);
  416. if (removed) {
  417. queue.emit('removed', job);
  418. }
  419. else {
  420. throw new Error(`Job ${this.id} could not be removed because it is locked by another worker`);
  421. }
  422. }
  423. /**
  424. * Remove all children from this job that are not yet processed,
  425. * in other words that are in any other state than completed, failed or active.
  426. *
  427. * @remarks
  428. * - Jobs with locks (most likely active) are ignored.
  429. * - This method can be slow if the number of children is large (\> 1000).
  430. */
  431. async removeUnprocessedChildren() {
  432. const jobId = this.id;
  433. await this.scripts.removeUnprocessedChildren(jobId);
  434. }
  435. /**
  436. * Extend the lock for this job.
  437. *
  438. * @param token - unique token for the lock
  439. * @param duration - lock duration in milliseconds
  440. */
  441. extendLock(token, duration) {
  442. return this.scripts.extendLock(this.id, token, duration);
  443. }
  444. /**
  445. * Moves a job to the completed queue.
  446. * Returned job to be used with Queue.prototype.nextJobFromJobData.
  447. *
  448. * @param returnValue - The jobs success message.
  449. * @param token - Worker token used to acquire completed job.
  450. * @param fetchNext - True when wanting to fetch the next job.
  451. * @returns Returns the jobData of the next job in the waiting queue or void.
  452. */
  453. async moveToCompleted(returnValue, token, fetchNext = true) {
  454. return this.queue.trace(enums_1.SpanKind.INTERNAL, 'complete', this.queue.name, async (span) => {
  455. this.setSpanJobAttributes(span);
  456. await this.queue.waitUntilReady();
  457. this.returnvalue = returnValue || void 0;
  458. const stringifiedReturnValue = (0, utils_1.tryCatch)(JSON.stringify, JSON, [
  459. returnValue,
  460. ]);
  461. if (stringifiedReturnValue === utils_1.errorObject) {
  462. throw utils_1.errorObject.value;
  463. }
  464. const args = this.scripts.moveToCompletedArgs(this, stringifiedReturnValue, this.opts.removeOnComplete, token, fetchNext);
  465. const result = await this.scripts.moveToFinished(this.id, args);
  466. this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
  467. this.attemptsMade += 1;
  468. this.recordJobMetrics('completed');
  469. return result;
  470. });
  471. }
  472. /**
  473. * Moves a job to the wait or prioritized state.
  474. *
  475. * @param token - Worker token used to acquire completed job.
  476. * @returns Returns pttl.
  477. */
  478. async moveToWait(token) {
  479. const result = await this.scripts.moveJobFromActiveToWait(this.id, token);
  480. this.recordJobMetrics('waiting');
  481. return result;
  482. }
  483. async shouldRetryJob(err) {
  484. if (this.attemptsMade + 1 < this.opts.attempts &&
  485. !this.discarded &&
  486. !(err instanceof unrecoverable_error_1.UnrecoverableError || err.name == 'UnrecoverableError')) {
  487. const opts = this.queue.opts;
  488. const delay = await backoffs_1.Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy);
  489. return [delay == -1 ? false : true, delay == -1 ? 0 : delay];
  490. }
  491. else {
  492. return [false, 0];
  493. }
  494. }
  495. /**
  496. * Moves a job to the failed queue.
  497. *
  498. * @param err - the jobs error message.
  499. * @param token - token to check job is locked by current worker
  500. * @param fetchNext - true when wanting to fetch the next job
  501. * @returns Returns the jobData of the next job in the waiting queue or void.
  502. */
  503. async moveToFailed(err, token, fetchNext = false) {
  504. this.failedReason = err === null || err === void 0 ? void 0 : err.message;
  505. // Check if an automatic retry should be performed
  506. const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);
  507. return this.queue.trace(enums_1.SpanKind.INTERNAL, this.getSpanOperation(shouldRetry, retryDelay), this.queue.name, async (span, dstPropagationMetadata) => {
  508. var _a, _b;
  509. this.setSpanJobAttributes(span);
  510. let tm;
  511. if (!((_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.omitContext) && dstPropagationMetadata) {
  512. tm = dstPropagationMetadata;
  513. }
  514. let result;
  515. this.updateStacktrace(err);
  516. const fieldsToUpdate = {
  517. failedReason: this.failedReason,
  518. stacktrace: JSON.stringify(this.stacktrace),
  519. tm,
  520. };
  521. let finishedOn;
  522. if (shouldRetry) {
  523. if (retryDelay) {
  524. // Retry with delay
  525. result = await this.scripts.moveToDelayed(this.id, Date.now(), retryDelay, token, { fieldsToUpdate, fetchNext });
  526. this.recordJobMetrics('delayed');
  527. }
  528. else {
  529. // Retry immediately
  530. result = await this.scripts.retryJob(this.id, this.opts.lifo, token, {
  531. fieldsToUpdate,
  532. });
  533. this.recordJobMetrics('retried');
  534. }
  535. }
  536. else {
  537. const args = this.scripts.moveToFailedArgs(this, this.failedReason, this.opts.removeOnFail, token, fetchNext, fieldsToUpdate);
  538. result = await this.scripts.moveToFinished(this.id, args);
  539. finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
  540. // Only record failed metrics when job is not retrying
  541. this.recordJobMetrics('failed');
  542. }
  543. if (finishedOn && typeof finishedOn === 'number') {
  544. this.finishedOn = finishedOn;
  545. }
  546. if (retryDelay && typeof retryDelay === 'number') {
  547. this.delay = retryDelay;
  548. }
  549. this.attemptsMade += 1;
  550. return result;
  551. });
  552. }
  553. getSpanOperation(shouldRetry, retryDelay) {
  554. if (shouldRetry) {
  555. if (retryDelay) {
  556. return 'delay';
  557. }
  558. return 'retry';
  559. }
  560. return 'fail';
  561. }
  562. /**
  563. * Records job metrics if a meter is configured in telemetry options.
  564. *
  565. * @param status - The job status
  566. */
  567. recordJobMetrics(status) {
  568. var _a, _b;
  569. const meter = (_b = (_a = this.queue.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.meter;
  570. if (!meter) {
  571. return;
  572. }
  573. const attributes = {
  574. [enums_1.TelemetryAttributes.QueueName]: this.queue.name,
  575. [enums_1.TelemetryAttributes.JobName]: this.name,
  576. [enums_1.TelemetryAttributes.JobStatus]: status,
  577. };
  578. // Record counter metric based on status
  579. const statusToCounterName = {
  580. completed: enums_1.MetricNames.JobsCompleted,
  581. failed: enums_1.MetricNames.JobsFailed,
  582. delayed: enums_1.MetricNames.JobsDelayed,
  583. retried: enums_1.MetricNames.JobsRetried,
  584. waiting: enums_1.MetricNames.JobsWaiting,
  585. 'waiting-children': enums_1.MetricNames.JobsWaitingChildren,
  586. };
  587. const counterName = statusToCounterName[status];
  588. const counter = meter.createCounter(counterName, {
  589. description: `Number of jobs ${status}`,
  590. unit: '1',
  591. });
  592. counter.add(1, attributes);
  593. // Record duration histogram if processedOn is available
  594. if (this.processedOn) {
  595. const duration = Date.now() - this.processedOn;
  596. const histogram = meter.createHistogram(enums_1.MetricNames.JobDuration, {
  597. description: 'Job processing duration',
  598. unit: 'ms',
  599. });
  600. histogram.record(duration, attributes);
  601. }
  602. }
  603. /**
  604. * @returns true if the job has completed.
  605. */
  606. isCompleted() {
  607. return this.isInZSet('completed');
  608. }
  609. /**
  610. * @returns true if the job has failed.
  611. */
  612. isFailed() {
  613. return this.isInZSet('failed');
  614. }
  615. /**
  616. * @returns true if the job is delayed.
  617. */
  618. isDelayed() {
  619. return this.isInZSet('delayed');
  620. }
  621. /**
  622. * @returns true if the job is waiting for children.
  623. */
  624. isWaitingChildren() {
  625. return this.isInZSet('waiting-children');
  626. }
  627. /**
  628. * @returns true of the job is active.
  629. */
  630. isActive() {
  631. return this.isInList('active');
  632. }
  633. /**
  634. * @returns true if the job is waiting.
  635. */
  636. async isWaiting() {
  637. return (await this.isInList('wait')) || (await this.isInList('paused'));
  638. }
  639. /**
  640. * @returns the queue name this job belongs to.
  641. */
  642. get queueName() {
  643. return this.queue.name;
  644. }
  645. /**
  646. * @returns the prefix that is used.
  647. */
  648. get prefix() {
  649. return this.queue.opts.prefix;
  650. }
  651. /**
  652. * Get current state.
  653. *
  654. * @returns Returns one of these values:
  655. * 'completed', 'failed', 'delayed', 'active', 'waiting', 'waiting-children', 'unknown'.
  656. */
  657. getState() {
  658. return this.scripts.getState(this.id);
  659. }
  660. /**
  661. * Change delay of a delayed job.
  662. *
  663. * Reschedules a delayed job by setting a new delay from the current time.
  664. * For example, calling changeDelay(5000) will reschedule the job to execute
  665. * 5000 milliseconds (5 seconds) from now, regardless of the original delay.
  666. *
  667. * @param delay - milliseconds from now when the job should be processed.
  668. * @returns void
  669. * @throws JobNotExist
  670. * This exception is thrown if jobId is missing.
  671. * @throws JobNotInState
  672. * This exception is thrown if job is not in delayed state.
  673. */
  674. async changeDelay(delay) {
  675. await this.scripts.changeDelay(this.id, delay);
  676. this.delay = delay;
  677. }
  678. /**
  679. * Change job priority.
  680. *
  681. * @param opts - options containing priority and lifo values.
  682. * @returns void
  683. */
  684. async changePriority(opts) {
  685. await this.scripts.changePriority(this.id, opts.priority, opts.lifo);
  686. this.priority = opts.priority || 0;
  687. }
  688. /**
  689. * Get this jobs children result values if any.
  690. *
  691. * @returns Object mapping children job keys with their values.
  692. */
  693. async getChildrenValues() {
  694. const client = await this.queue.client;
  695. const result = (await client.hgetall(this.toKey(`${this.id}:processed`)));
  696. if (result) {
  697. return (0, utils_1.parseObjectValues)(result);
  698. }
  699. }
  700. /**
  701. * Retrieves the failures of child jobs that were explicitly ignored while using ignoreDependencyOnFailure option.
  702. * This method is useful for inspecting which child jobs were intentionally ignored when an error occurred.
  703. * @see {@link https://docs.bullmq.io/guide/flows/ignore-dependency}
  704. *
  705. * @returns Object mapping children job keys with their failure values.
  706. */
  707. async getIgnoredChildrenFailures() {
  708. const client = await this.queue.client;
  709. return client.hgetall(this.toKey(`${this.id}:failed`));
  710. }
  711. /**
  712. * Get job's children failure values that were ignored if any.
  713. *
  714. * @deprecated This method is deprecated and will be removed in v6. Use getIgnoredChildrenFailures instead.
  715. *
  716. * @returns Object mapping children job keys with their failure values.
  717. */
  718. async getFailedChildrenValues() {
  719. const client = await this.queue.client;
  720. return client.hgetall(this.toKey(`${this.id}:failed`));
  721. }
  722. /**
  723. * Get children job keys if this job is a parent and has children.
  724. * @remarks
  725. * Count options before Redis v7.2 works as expected with any quantity of entries
  726. * on processed/unprocessed dependencies, since v7.2 you must consider that count
  727. * won't have any effect until processed/unprocessed dependencies have a length
  728. * greater than 127
  729. * @see {@link https://redis.io/docs/management/optimization/memory-optimization/#redis--72}
  730. * @see {@link https://docs.bullmq.io/guide/flows#getters}
  731. * @returns dependencies separated by processed, unprocessed, ignored and failed.
  732. */
  733. async getDependencies(opts = {}) {
  734. const client = await this.queue.client;
  735. const multi = client.multi();
  736. if (!opts.processed && !opts.unprocessed && !opts.ignored && !opts.failed) {
  737. multi.hgetall(this.toKey(`${this.id}:processed`));
  738. multi.smembers(this.toKey(`${this.id}:dependencies`));
  739. multi.hgetall(this.toKey(`${this.id}:failed`));
  740. multi.zrange(this.toKey(`${this.id}:unsuccessful`), 0, -1);
  741. const [[err1, processed], [err2, unprocessed], [err3, ignored], [err4, failed],] = (await multi.exec());
  742. return {
  743. processed: (0, utils_1.parseObjectValues)(processed),
  744. unprocessed,
  745. failed,
  746. ignored,
  747. };
  748. }
  749. else {
  750. const defaultOpts = {
  751. cursor: 0,
  752. count: 20,
  753. };
  754. const childrenResultOrder = [];
  755. if (opts.processed) {
  756. childrenResultOrder.push('processed');
  757. const processedOpts = Object.assign(Object.assign({}, defaultOpts), opts.processed);
  758. multi.hscan(this.toKey(`${this.id}:processed`), processedOpts.cursor, 'COUNT', processedOpts.count);
  759. }
  760. if (opts.unprocessed) {
  761. childrenResultOrder.push('unprocessed');
  762. const unprocessedOpts = Object.assign(Object.assign({}, defaultOpts), opts.unprocessed);
  763. multi.sscan(this.toKey(`${this.id}:dependencies`), unprocessedOpts.cursor, 'COUNT', unprocessedOpts.count);
  764. }
  765. if (opts.ignored) {
  766. childrenResultOrder.push('ignored');
  767. const ignoredOpts = Object.assign(Object.assign({}, defaultOpts), opts.ignored);
  768. multi.hscan(this.toKey(`${this.id}:failed`), ignoredOpts.cursor, 'COUNT', ignoredOpts.count);
  769. }
  770. let failedCursor;
  771. if (opts.failed) {
  772. childrenResultOrder.push('failed');
  773. const failedOpts = Object.assign(Object.assign({}, defaultOpts), opts.failed);
  774. failedCursor = failedOpts.cursor + failedOpts.count;
  775. multi.zrange(this.toKey(`${this.id}:unsuccessful`), failedOpts.cursor, failedOpts.count - 1);
  776. }
  777. const results = (await multi.exec());
  778. let processedCursor, processed, unprocessedCursor, unprocessed, failed, ignoredCursor, ignored;
  779. childrenResultOrder.forEach((key, index) => {
  780. switch (key) {
  781. case 'processed': {
  782. processedCursor = results[index][1][0];
  783. const rawProcessed = results[index][1][1];
  784. const transformedProcessed = {};
  785. for (let ind = 0; ind < rawProcessed.length; ++ind) {
  786. if (ind % 2) {
  787. transformedProcessed[rawProcessed[ind - 1]] = JSON.parse(rawProcessed[ind]);
  788. }
  789. }
  790. processed = transformedProcessed;
  791. break;
  792. }
  793. case 'failed': {
  794. failed = results[index][1];
  795. break;
  796. }
  797. case 'ignored': {
  798. ignoredCursor = results[index][1][0];
  799. const rawIgnored = results[index][1][1];
  800. const transformedIgnored = {};
  801. for (let ind = 0; ind < rawIgnored.length; ++ind) {
  802. if (ind % 2) {
  803. transformedIgnored[rawIgnored[ind - 1]] = rawIgnored[ind];
  804. }
  805. }
  806. ignored = transformedIgnored;
  807. break;
  808. }
  809. case 'unprocessed': {
  810. unprocessedCursor = results[index][1][0];
  811. unprocessed = results[index][1][1];
  812. break;
  813. }
  814. }
  815. });
  816. return Object.assign(Object.assign(Object.assign(Object.assign({}, (processedCursor
  817. ? {
  818. processed,
  819. nextProcessedCursor: Number(processedCursor),
  820. }
  821. : {})), (ignoredCursor
  822. ? {
  823. ignored,
  824. nextIgnoredCursor: Number(ignoredCursor),
  825. }
  826. : {})), (failedCursor
  827. ? {
  828. failed,
  829. nextFailedCursor: failedCursor,
  830. }
  831. : {})), (unprocessedCursor
  832. ? { unprocessed, nextUnprocessedCursor: Number(unprocessedCursor) }
  833. : {}));
  834. }
  835. }
  836. /**
  837. * Get children job counts if this job is a parent and has children.
  838. *
  839. * @returns dependencies count separated by processed, unprocessed, ignored and failed.
  840. */
  841. async getDependenciesCount(opts = {}) {
  842. const types = [];
  843. Object.entries(opts).forEach(([key, value]) => {
  844. if (value) {
  845. types.push(key);
  846. }
  847. });
  848. const finalTypes = types.length
  849. ? types
  850. : ['processed', 'unprocessed', 'ignored', 'failed'];
  851. const responses = await this.scripts.getDependencyCounts(this.id, finalTypes);
  852. const counts = {};
  853. responses.forEach((res, index) => {
  854. counts[`${finalTypes[index]}`] = res || 0;
  855. });
  856. return counts;
  857. }
  858. /**
  859. * Returns a promise the resolves when the job has completed (containing the return value of the job),
  860. * or rejects when the job has failed (containing the failedReason).
  861. *
  862. * @param queueEvents - Instance of QueueEvents.
  863. * @param ttl - Time in milliseconds to wait for job to finish before timing out.
  864. */
  865. async waitUntilFinished(queueEvents, ttl) {
  866. await this.queue.waitUntilReady();
  867. const jobId = this.id;
  868. return new Promise(async (resolve, reject) => {
  869. let timeout;
  870. if (ttl) {
  871. timeout = setTimeout(() => onFailed(
  872. /* eslint-disable max-len */
  873. `Job wait ${this.name} timed out before finishing, no finish notification arrived after ${ttl}ms (id=${jobId})`), ttl);
  874. }
  875. function onCompleted(args) {
  876. removeListeners();
  877. resolve(args.returnvalue);
  878. }
  879. function onFailed(args) {
  880. removeListeners();
  881. reject(new Error(args.failedReason || args));
  882. }
  883. const completedEvent = `completed:${jobId}`;
  884. const failedEvent = `failed:${jobId}`;
  885. queueEvents.on(completedEvent, onCompleted);
  886. queueEvents.on(failedEvent, onFailed);
  887. this.queue.on('closing', onFailed);
  888. const removeListeners = () => {
  889. clearInterval(timeout);
  890. queueEvents.removeListener(completedEvent, onCompleted);
  891. queueEvents.removeListener(failedEvent, onFailed);
  892. this.queue.removeListener('closing', onFailed);
  893. };
  894. // Poll once right now to see if the job has already finished. The job may have been completed before we were able
  895. // to register the event handlers on the QueueEvents, so we check here to make sure we're not waiting for an event
  896. // that has already happened. We block checking the job until the queue events object is actually listening to
  897. // Redis so there's no chance that it will miss events.
  898. await queueEvents.waitUntilReady();
  899. const [status, result] = (await this.scripts.isFinished(jobId, true));
  900. const finished = status != 0;
  901. if (finished) {
  902. if (status == -1 || status == 2) {
  903. onFailed({ failedReason: result });
  904. }
  905. else {
  906. onCompleted({ returnvalue: getReturnValue(result) });
  907. }
  908. }
  909. });
  910. }
  911. /**
  912. * Moves the job to the delay set.
  913. *
  914. * @param timestamp - timestamp when the job should be moved back to "wait"
  915. * @param token - token to check job is locked by current worker
  916. * @returns
  917. */
  918. async moveToDelayed(timestamp, token) {
  919. const now = Date.now();
  920. const delay = timestamp - now;
  921. const finalDelay = delay > 0 ? delay : 0;
  922. await this.scripts.moveToDelayed(this.id, now, finalDelay, token, {
  923. skipAttempt: true,
  924. });
  925. this.delay = finalDelay;
  926. this.recordJobMetrics('delayed');
  927. }
  928. /**
  929. * Moves the job to the waiting-children set.
  930. *
  931. * @param token - Token to check job is locked by current worker
  932. * @param opts - The options bag for moving a job to waiting-children.
  933. * @returns true if the job was moved
  934. */
  935. async moveToWaitingChildren(token, opts = {}) {
  936. const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts);
  937. if (movedToWaitingChildren) {
  938. this.recordJobMetrics('waiting-children');
  939. }
  940. return movedToWaitingChildren;
  941. }
  942. /**
  943. * Promotes a delayed job so that it starts to be processed as soon as possible.
  944. */
  945. async promote() {
  946. const jobId = this.id;
  947. await this.scripts.promote(jobId);
  948. this.delay = 0;
  949. }
  950. /**
  951. * Attempts to retry the job. Only a job that has failed or completed can be retried.
  952. *
  953. * @param state - completed / failed
  954. * @param opts - options to retry a job
  955. * @returns A promise that resolves when the job has been successfully moved to the wait queue.
  956. * The queue emits a waiting event when the job is successfully moved.
  957. * @throws Will throw an error if the job does not exist, is locked, or is not in the expected state.
  958. */
  959. async retry(state = 'failed', opts = {}) {
  960. await this.scripts.reprocessJob(this, state, opts);
  961. this.failedReason = null;
  962. this.finishedOn = null;
  963. this.processedOn = null;
  964. this.returnvalue = null;
  965. if (opts.resetAttemptsMade) {
  966. this.attemptsMade = 0;
  967. }
  968. if (opts.resetAttemptsStarted) {
  969. this.attemptsStarted = 0;
  970. }
  971. }
  972. /**
  973. * Marks a job to not be retried if it fails (even if attempts has been configured)
  974. * @deprecated use UnrecoverableError
  975. */
  976. discard() {
  977. this.discarded = true;
  978. }
  979. async isInZSet(set) {
  980. const client = await this.queue.client;
  981. const score = await client.zscore(this.queue.toKey(set), this.id);
  982. return score !== null;
  983. }
  984. async isInList(list) {
  985. return this.scripts.isJobInList(this.queue.toKey(list), this.id);
  986. }
  987. /**
  988. * Adds the job to Redis.
  989. *
  990. * @param client - The Redis client to use for adding the job.
  991. * @param parentOpts - Options for the parent-child relationship.
  992. * @returns The job ID
  993. */
  994. addJob(client, parentOpts) {
  995. const jobData = this.asJSON();
  996. this.validateOptions(jobData);
  997. return this.scripts.addJob(client, jobData, jobData.opts, this.id, parentOpts);
  998. }
  999. /**
  1000. * Removes a deduplication key if job is still the cause of deduplication.
  1001. * @returns true if the deduplication key was removed.
  1002. */
  1003. async removeDeduplicationKey() {
  1004. if (this.deduplicationId) {
  1005. const result = await this.scripts.removeDeduplicationKey(this.deduplicationId, this.id);
  1006. return result > 0;
  1007. }
  1008. return false;
  1009. }
  1010. validateOptions(jobData) {
  1011. var _a, _b, _c, _d, _e, _f, _g, _h;
  1012. const exclusiveOptions = [
  1013. 'removeDependencyOnFailure',
  1014. 'failParentOnFailure',
  1015. 'continueParentOnFailure',
  1016. 'ignoreDependencyOnFailure',
  1017. ];
  1018. const exceedLimit = this.opts.sizeLimit &&
  1019. (0, utils_1.lengthInUtf8Bytes)(jobData.data) > this.opts.sizeLimit;
  1020. if (exceedLimit) {
  1021. throw new Error(`The size of job ${this.name} exceeds the limit ${this.opts.sizeLimit} bytes`);
  1022. }
  1023. if (this.opts.delay && this.opts.repeat && !((_a = this.opts.repeat) === null || _a === void 0 ? void 0 : _a.count)) {
  1024. throw new Error(`Delay and repeat options cannot be used together`);
  1025. }
  1026. const enabledExclusiveOptions = exclusiveOptions.filter(opt => this.opts[opt]);
  1027. if (enabledExclusiveOptions.length > 1) {
  1028. const optionsList = enabledExclusiveOptions.join(', ');
  1029. throw new Error(`The following options cannot be used together: ${optionsList}`);
  1030. }
  1031. if ((_b = this.opts) === null || _b === void 0 ? void 0 : _b.jobId) {
  1032. if (`${parseInt(this.opts.jobId, 10)}` === ((_c = this.opts) === null || _c === void 0 ? void 0 : _c.jobId)) {
  1033. throw new Error('Custom Id cannot be integers');
  1034. }
  1035. // TODO: replace this check in next breaking check with include(':')
  1036. // By using split we are still keeping compatibility with old repeatable jobs
  1037. if (((_d = this.opts) === null || _d === void 0 ? void 0 : _d.jobId.includes(':')) &&
  1038. ((_f = (_e = this.opts) === null || _e === void 0 ? void 0 : _e.jobId) === null || _f === void 0 ? void 0 : _f.split(':').length) !== 3) {
  1039. throw new Error('Custom Id cannot contain :');
  1040. }
  1041. }
  1042. if (this.opts.priority) {
  1043. if (Math.trunc(this.opts.priority) !== this.opts.priority) {
  1044. throw new Error(`Priority should not be float`);
  1045. }
  1046. if (this.opts.priority > exports.PRIORITY_LIMIT) {
  1047. throw new Error(`Priority should be between 0 and ${exports.PRIORITY_LIMIT}`);
  1048. }
  1049. }
  1050. if (this.opts.deduplication) {
  1051. if (!((_g = this.opts.deduplication) === null || _g === void 0 ? void 0 : _g.id)) {
  1052. throw new Error('Deduplication id must be provided');
  1053. }
  1054. if (this.parentKey) {
  1055. throw new Error('Deduplication and parent options cannot be used together');
  1056. }
  1057. }
  1058. // TODO: remove in v6
  1059. if (this.opts.debounce) {
  1060. if (!((_h = this.opts.debounce) === null || _h === void 0 ? void 0 : _h.id)) {
  1061. throw new Error('Debounce id must be provided');
  1062. }
  1063. if (this.parentKey) {
  1064. throw new Error('Debounce and parent options cannot be used together');
  1065. }
  1066. }
  1067. if (typeof this.opts.backoff === 'object' &&
  1068. typeof this.opts.backoff.jitter === 'number') {
  1069. if (this.opts.backoff.jitter < 0 || this.opts.backoff.jitter > 1) {
  1070. throw new Error(`Jitter should be between 0 and 1`);
  1071. }
  1072. }
  1073. }
  1074. updateStacktrace(err) {
  1075. this.stacktrace = this.stacktrace || [];
  1076. if (err === null || err === void 0 ? void 0 : err.stack) {
  1077. this.stacktrace.push(err.stack);
  1078. if (this.opts.stackTraceLimit === 0) {
  1079. this.stacktrace = [];
  1080. }
  1081. else if (this.opts.stackTraceLimit) {
  1082. this.stacktrace = this.stacktrace.slice(-this.opts.stackTraceLimit);
  1083. }
  1084. }
  1085. }
  1086. setSpanJobAttributes(span) {
  1087. span === null || span === void 0 ? void 0 : span.setAttributes({
  1088. [enums_1.TelemetryAttributes.JobName]: this.name,
  1089. [enums_1.TelemetryAttributes.JobId]: this.id,
  1090. });
  1091. }
  1092. }
  1093. exports.Job = Job;
  1094. function getTraces(stacktrace) {
  1095. if (!stacktrace) {
  1096. return [];
  1097. }
  1098. const traces = (0, utils_1.tryCatch)(JSON.parse, JSON, [stacktrace]);
  1099. if (traces === utils_1.errorObject || !(traces instanceof Array)) {
  1100. return [];
  1101. }
  1102. else {
  1103. return traces;
  1104. }
  1105. }
  1106. function getReturnValue(_value) {
  1107. const value = (0, utils_1.tryCatch)(JSON.parse, JSON, [_value]);
  1108. if (value !== utils_1.errorObject) {
  1109. return value;
  1110. }
  1111. else {
  1112. logger('corrupted returnvalue: ' + _value, value);
  1113. }
  1114. }
  1115. //# sourceMappingURL=job.js.map