scripts.js 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. /**
  2. * Includes all the scripts needed by the queue and jobs.
  3. */
  4. 'use strict';
  5. import { Packr } from 'msgpackr';
  6. const packer = new Packr({
  7. useRecords: false,
  8. encodeUndefinedAsNil: true,
  9. });
  10. const pack = packer.pack;
  11. import { ErrorCode } from '../enums';
  12. import { array2obj, getParentKey, isRedisVersionLowerThan, objectToFlatArray, } from '../utils';
  13. import { version as packageVersion } from '../version';
  14. import { UnrecoverableError } from './errors';
  15. export class Scripts {
  16. constructor(queue) {
  17. this.queue = queue;
  18. this.version = packageVersion;
  19. const queueKeys = this.queue.keys;
  20. this.moveToFinishedKeys = [
  21. queueKeys.wait,
  22. queueKeys.active,
  23. queueKeys.prioritized,
  24. queueKeys.events,
  25. queueKeys.stalled,
  26. queueKeys.limiter,
  27. queueKeys.delayed,
  28. queueKeys.paused,
  29. queueKeys.meta,
  30. queueKeys.pc,
  31. undefined,
  32. undefined,
  33. undefined,
  34. undefined,
  35. ];
  36. }
  37. execCommand(client, commandName, args) {
  38. const commandNameWithVersion = `${commandName}:${this.version}`;
  39. return client[commandNameWithVersion](args);
  40. }
  41. async isJobInList(listKey, jobId) {
  42. const client = await this.queue.client;
  43. let result;
  44. if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6', this.queue.databaseType)) {
  45. result = await this.execCommand(client, 'isJobInList', [listKey, jobId]);
  46. }
  47. else {
  48. result = await client.lpos(listKey, jobId);
  49. }
  50. return Number.isInteger(result);
  51. }
  52. addDelayedJobArgs(job, encodedOpts, args) {
  53. const queueKeys = this.queue.keys;
  54. const keys = [
  55. queueKeys.marker,
  56. queueKeys.meta,
  57. queueKeys.id,
  58. queueKeys.delayed,
  59. queueKeys.completed,
  60. queueKeys.events,
  61. ];
  62. keys.push(pack(args), job.data, encodedOpts);
  63. return keys;
  64. }
  65. addDelayedJob(client, job, encodedOpts, args) {
  66. const argsList = this.addDelayedJobArgs(job, encodedOpts, args);
  67. return this.execCommand(client, 'addDelayedJob', argsList);
  68. }
  69. addPrioritizedJobArgs(job, encodedOpts, args) {
  70. const queueKeys = this.queue.keys;
  71. const keys = [
  72. queueKeys.marker,
  73. queueKeys.meta,
  74. queueKeys.id,
  75. queueKeys.prioritized,
  76. queueKeys.delayed,
  77. queueKeys.completed,
  78. queueKeys.active,
  79. queueKeys.events,
  80. queueKeys.pc,
  81. ];
  82. keys.push(pack(args), job.data, encodedOpts);
  83. return keys;
  84. }
  85. addPrioritizedJob(client, job, encodedOpts, args) {
  86. const argsList = this.addPrioritizedJobArgs(job, encodedOpts, args);
  87. return this.execCommand(client, 'addPrioritizedJob', argsList);
  88. }
  89. addParentJobArgs(job, encodedOpts, args) {
  90. const queueKeys = this.queue.keys;
  91. const keys = [
  92. queueKeys.meta,
  93. queueKeys.id,
  94. queueKeys.delayed,
  95. queueKeys['waiting-children'],
  96. queueKeys.completed,
  97. queueKeys.events,
  98. ];
  99. keys.push(pack(args), job.data, encodedOpts);
  100. return keys;
  101. }
  102. addParentJob(client, job, encodedOpts, args) {
  103. const argsList = this.addParentJobArgs(job, encodedOpts, args);
  104. return this.execCommand(client, 'addParentJob', argsList);
  105. }
  106. addStandardJobArgs(job, encodedOpts, args) {
  107. const queueKeys = this.queue.keys;
  108. const keys = [
  109. queueKeys.wait,
  110. queueKeys.paused,
  111. queueKeys.meta,
  112. queueKeys.id,
  113. queueKeys.completed,
  114. queueKeys.delayed,
  115. queueKeys.active,
  116. queueKeys.events,
  117. queueKeys.marker,
  118. ];
  119. keys.push(pack(args), job.data, encodedOpts);
  120. return keys;
  121. }
  122. addStandardJob(client, job, encodedOpts, args) {
  123. const argsList = this.addStandardJobArgs(job, encodedOpts, args);
  124. return this.execCommand(client, 'addStandardJob', argsList);
  125. }
  126. async addJob(client, job, opts, jobId, parentKeyOpts = {}) {
  127. const queueKeys = this.queue.keys;
  128. const parent = job.parent;
  129. const args = [
  130. queueKeys[''],
  131. typeof jobId !== 'undefined' ? jobId : '',
  132. job.name,
  133. job.timestamp,
  134. job.parentKey || null,
  135. parentKeyOpts.parentDependenciesKey || null,
  136. parent,
  137. job.repeatJobKey,
  138. job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null,
  139. ];
  140. let encodedOpts;
  141. if (opts.repeat) {
  142. const repeat = Object.assign({}, opts.repeat);
  143. if (repeat.startDate) {
  144. repeat.startDate = +new Date(repeat.startDate);
  145. }
  146. if (repeat.endDate) {
  147. repeat.endDate = +new Date(repeat.endDate);
  148. }
  149. encodedOpts = pack(Object.assign(Object.assign({}, opts), { repeat }));
  150. }
  151. else {
  152. encodedOpts = pack(opts);
  153. }
  154. let result;
  155. if (parentKeyOpts.addToWaitingChildren) {
  156. result = await this.addParentJob(client, job, encodedOpts, args);
  157. }
  158. else if (typeof opts.delay == 'number' && opts.delay > 0) {
  159. result = await this.addDelayedJob(client, job, encodedOpts, args);
  160. }
  161. else if (opts.priority) {
  162. result = await this.addPrioritizedJob(client, job, encodedOpts, args);
  163. }
  164. else {
  165. result = await this.addStandardJob(client, job, encodedOpts, args);
  166. }
  167. if (result < 0) {
  168. throw this.finishedErrors({
  169. code: result,
  170. parentKey: parentKeyOpts.parentKey,
  171. command: 'addJob',
  172. });
  173. }
  174. return result;
  175. }
  176. pauseArgs(pause) {
  177. let src = 'wait', dst = 'paused';
  178. if (!pause) {
  179. src = 'paused';
  180. dst = 'wait';
  181. }
  182. const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name));
  183. keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker);
  184. const args = [pause ? 'paused' : 'resumed'];
  185. return keys.concat(args);
  186. }
  187. async pause(pause) {
  188. const client = await this.queue.client;
  189. const args = this.pauseArgs(pause);
  190. return this.execCommand(client, 'pause', args);
  191. }
  192. addRepeatableJobArgs(customKey, nextMillis, opts, legacyCustomKey) {
  193. const queueKeys = this.queue.keys;
  194. const keys = [
  195. queueKeys.repeat,
  196. queueKeys.delayed,
  197. ];
  198. const args = [
  199. nextMillis,
  200. pack(opts),
  201. legacyCustomKey,
  202. customKey,
  203. queueKeys[''],
  204. ];
  205. return keys.concat(args);
  206. }
  207. async addRepeatableJob(customKey, nextMillis, opts, legacyCustomKey) {
  208. const client = await this.queue.client;
  209. const args = this.addRepeatableJobArgs(customKey, nextMillis, opts, legacyCustomKey);
  210. return this.execCommand(client, 'addRepeatableJob', args);
  211. }
  212. async removeDeduplicationKey(deduplicationId, jobId) {
  213. const client = await this.queue.client;
  214. const queueKeys = this.queue.keys;
  215. const keys = [`${queueKeys.de}:${deduplicationId}`];
  216. const args = [jobId];
  217. return this.execCommand(client, 'removeDeduplicationKey', keys.concat(args));
  218. }
  219. async addJobScheduler(jobSchedulerId, nextMillis, templateData, templateOpts, opts, delayedJobOpts,
  220. // The job id of the job that produced this next iteration
  221. producerId) {
  222. const client = await this.queue.client;
  223. const queueKeys = this.queue.keys;
  224. const keys = [
  225. queueKeys.repeat,
  226. queueKeys.delayed,
  227. queueKeys.wait,
  228. queueKeys.paused,
  229. queueKeys.meta,
  230. queueKeys.prioritized,
  231. queueKeys.marker,
  232. queueKeys.id,
  233. queueKeys.events,
  234. queueKeys.pc,
  235. queueKeys.active,
  236. ];
  237. const args = [
  238. nextMillis,
  239. pack(opts),
  240. jobSchedulerId,
  241. templateData,
  242. pack(templateOpts),
  243. pack(delayedJobOpts),
  244. Date.now(),
  245. queueKeys[''],
  246. producerId ? this.queue.toKey(producerId) : '',
  247. ];
  248. const result = await this.execCommand(client, 'addJobScheduler', keys.concat(args));
  249. if (typeof result === 'number' && result < 0) {
  250. throw this.finishedErrors({
  251. code: result,
  252. command: 'addJobScheduler',
  253. });
  254. }
  255. return result;
  256. }
  257. async updateRepeatableJobMillis(client, customKey, nextMillis, legacyCustomKey) {
  258. const args = [
  259. this.queue.keys.repeat,
  260. nextMillis,
  261. customKey,
  262. legacyCustomKey,
  263. ];
  264. return this.execCommand(client, 'updateRepeatableJobMillis', args);
  265. }
  266. async updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, templateData, delayedJobOpts,
  267. // The job id of the job that produced this next iteration - TODO: remove in next breaking change
  268. producerId) {
  269. const client = await this.queue.client;
  270. const queueKeys = this.queue.keys;
  271. const keys = [
  272. queueKeys.repeat,
  273. queueKeys.delayed,
  274. queueKeys.wait,
  275. queueKeys.paused,
  276. queueKeys.meta,
  277. queueKeys.prioritized,
  278. queueKeys.marker,
  279. queueKeys.id,
  280. queueKeys.events,
  281. queueKeys.pc,
  282. producerId ? this.queue.toKey(producerId) : '',
  283. queueKeys.active,
  284. ];
  285. const args = [
  286. nextMillis,
  287. jobSchedulerId,
  288. templateData,
  289. pack(delayedJobOpts),
  290. Date.now(),
  291. queueKeys[''],
  292. producerId,
  293. ];
  294. return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
  295. }
  296. removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {
  297. const queueKeys = this.queue.keys;
  298. const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events];
  299. const args = [
  300. legacyRepeatJobId,
  301. this.getRepeatConcatOptions(repeatConcatOptions, repeatJobKey),
  302. repeatJobKey,
  303. queueKeys[''],
  304. ];
  305. return keys.concat(args);
  306. }
  307. // TODO: remove this check in next breaking change
  308. getRepeatConcatOptions(repeatConcatOptions, repeatJobKey) {
  309. if (repeatJobKey && repeatJobKey.split(':').length > 2) {
  310. return repeatJobKey;
  311. }
  312. return repeatConcatOptions;
  313. }
  314. async removeRepeatable(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {
  315. const client = await this.queue.client;
  316. const args = this.removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey);
  317. return this.execCommand(client, 'removeRepeatable', args);
  318. }
  319. async removeJobScheduler(jobSchedulerId) {
  320. const client = await this.queue.client;
  321. const queueKeys = this.queue.keys;
  322. const keys = [queueKeys.repeat, queueKeys.delayed, queueKeys.events];
  323. const args = [jobSchedulerId, queueKeys['']];
  324. return this.execCommand(client, 'removeJobScheduler', keys.concat(args));
  325. }
  326. removeArgs(jobId, removeChildren) {
  327. const keys = [jobId, 'repeat'].map(name => this.queue.toKey(name));
  328. const args = [jobId, removeChildren ? 1 : 0, this.queue.toKey('')];
  329. return keys.concat(args);
  330. }
  331. async remove(jobId, removeChildren) {
  332. const client = await this.queue.client;
  333. const args = this.removeArgs(jobId, removeChildren);
  334. const result = await this.execCommand(client, 'removeJob', args);
  335. if (result < 0) {
  336. throw this.finishedErrors({
  337. code: result,
  338. jobId,
  339. command: 'removeJob',
  340. });
  341. }
  342. return result;
  343. }
  344. async removeUnprocessedChildren(jobId) {
  345. const client = await this.queue.client;
  346. const args = [
  347. this.queue.toKey(jobId),
  348. this.queue.keys.meta,
  349. this.queue.toKey(''),
  350. jobId,
  351. ];
  352. await this.execCommand(client, 'removeUnprocessedChildren', args);
  353. }
  354. async extendLock(jobId, token, duration, client) {
  355. client = client || (await this.queue.client);
  356. const args = [
  357. this.queue.toKey(jobId) + ':lock',
  358. this.queue.keys.stalled,
  359. token,
  360. duration,
  361. jobId,
  362. ];
  363. return this.execCommand(client, 'extendLock', args);
  364. }
  365. async extendLocks(jobIds, tokens, duration) {
  366. const client = await this.queue.client;
  367. const args = [
  368. this.queue.keys.stalled,
  369. this.queue.toKey(''),
  370. pack(tokens),
  371. pack(jobIds),
  372. duration,
  373. ];
  374. return this.execCommand(client, 'extendLocks', args);
  375. }
  376. async updateData(job, data) {
  377. const client = await this.queue.client;
  378. const keys = [this.queue.toKey(job.id)];
  379. const dataJson = JSON.stringify(data);
  380. const result = await this.execCommand(client, 'updateData', keys.concat([dataJson]));
  381. if (result < 0) {
  382. throw this.finishedErrors({
  383. code: result,
  384. jobId: job.id,
  385. command: 'updateData',
  386. });
  387. }
  388. }
  389. async updateProgress(jobId, progress) {
  390. const client = await this.queue.client;
  391. const keys = [
  392. this.queue.toKey(jobId),
  393. this.queue.keys.events,
  394. this.queue.keys.meta,
  395. ];
  396. const progressJson = JSON.stringify(progress);
  397. const result = await this.execCommand(client, 'updateProgress', keys.concat([jobId, progressJson]));
  398. if (result < 0) {
  399. throw this.finishedErrors({
  400. code: result,
  401. jobId,
  402. command: 'updateProgress',
  403. });
  404. }
  405. }
  406. async addLog(jobId, logRow, keepLogs) {
  407. const client = await this.queue.client;
  408. const keys = [
  409. this.queue.toKey(jobId),
  410. this.queue.toKey(jobId) + ':logs',
  411. ];
  412. const result = await this.execCommand(client, 'addLog', keys.concat([jobId, logRow, keepLogs ? keepLogs : '']));
  413. if (result < 0) {
  414. throw this.finishedErrors({
  415. code: result,
  416. jobId,
  417. command: 'addLog',
  418. });
  419. }
  420. return result;
  421. }
  422. moveToFinishedArgs(job, val, propVal, shouldRemove, target, token, timestamp, fetchNext = true, fieldsToUpdate) {
  423. var _a, _b, _c, _d, _e, _f, _g;
  424. const queueKeys = this.queue.keys;
  425. const opts = this.queue.opts;
  426. const workerKeepJobs = target === 'completed' ? opts.removeOnComplete : opts.removeOnFail;
  427. const metricsKey = this.queue.toKey(`metrics:${target}`);
  428. const keys = this.moveToFinishedKeys;
  429. keys[10] = queueKeys[target];
  430. keys[11] = this.queue.toKey((_a = job.id) !== null && _a !== void 0 ? _a : '');
  431. keys[12] = metricsKey;
  432. keys[13] = this.queue.keys.marker;
  433. const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);
  434. const args = [
  435. job.id,
  436. timestamp,
  437. propVal,
  438. typeof val === 'undefined' ? 'null' : val,
  439. target,
  440. !fetchNext || this.queue.closing ? 0 : 1,
  441. queueKeys[''],
  442. pack({
  443. token,
  444. name: opts.name,
  445. keepJobs,
  446. limiter: opts.limiter,
  447. lockDuration: opts.lockDuration,
  448. attempts: job.opts.attempts,
  449. maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints)
  450. ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints
  451. : '',
  452. fpof: !!((_d = job.opts) === null || _d === void 0 ? void 0 : _d.failParentOnFailure),
  453. cpof: !!((_e = job.opts) === null || _e === void 0 ? void 0 : _e.continueParentOnFailure),
  454. idof: !!((_f = job.opts) === null || _f === void 0 ? void 0 : _f.ignoreDependencyOnFailure),
  455. rdof: !!((_g = job.opts) === null || _g === void 0 ? void 0 : _g.removeDependencyOnFailure),
  456. }),
  457. fieldsToUpdate ? pack(objectToFlatArray(fieldsToUpdate)) : void 0,
  458. ];
  459. return keys.concat(args);
  460. }
  461. getKeepJobs(shouldRemove, workerKeepJobs) {
  462. if (typeof shouldRemove === 'undefined') {
  463. return workerKeepJobs || { count: shouldRemove ? 0 : -1 };
  464. }
  465. return typeof shouldRemove === 'object'
  466. ? shouldRemove
  467. : typeof shouldRemove === 'number'
  468. ? { count: shouldRemove }
  469. : { count: shouldRemove ? 0 : -1 };
  470. }
  471. async moveToFinished(jobId, args) {
  472. const client = await this.queue.client;
  473. const result = await this.execCommand(client, 'moveToFinished', args);
  474. if (result < 0) {
  475. throw this.finishedErrors({
  476. code: result,
  477. jobId,
  478. command: 'moveToFinished',
  479. state: 'active',
  480. });
  481. }
  482. else {
  483. if (typeof result !== 'undefined') {
  484. return raw2NextJobData(result);
  485. }
  486. }
  487. }
  488. drainArgs(delayed) {
  489. const queueKeys = this.queue.keys;
  490. const keys = [
  491. queueKeys.wait,
  492. queueKeys.paused,
  493. queueKeys.delayed,
  494. queueKeys.prioritized,
  495. queueKeys.repeat,
  496. ];
  497. const args = [queueKeys[''], delayed ? '1' : '0'];
  498. return keys.concat(args);
  499. }
  500. async drain(delayed) {
  501. const client = await this.queue.client;
  502. const args = this.drainArgs(delayed);
  503. return this.execCommand(client, 'drain', args);
  504. }
  505. removeChildDependencyArgs(jobId, parentKey) {
  506. const queueKeys = this.queue.keys;
  507. const keys = [queueKeys['']];
  508. const args = [this.queue.toKey(jobId), parentKey];
  509. return keys.concat(args);
  510. }
  511. async removeChildDependency(jobId, parentKey) {
  512. const client = await this.queue.client;
  513. const args = this.removeChildDependencyArgs(jobId, parentKey);
  514. const result = await this.execCommand(client, 'removeChildDependency', args);
  515. switch (result) {
  516. case 0:
  517. return true;
  518. case 1:
  519. return false;
  520. default:
  521. throw this.finishedErrors({
  522. code: result,
  523. jobId,
  524. parentKey,
  525. command: 'removeChildDependency',
  526. });
  527. }
  528. }
  529. getRangesArgs(types, start, end, asc) {
  530. const queueKeys = this.queue.keys;
  531. const transformedTypes = types.map(type => {
  532. return type === 'waiting' ? 'wait' : type;
  533. });
  534. const keys = [queueKeys['']];
  535. const args = [start, end, asc ? '1' : '0', ...transformedTypes];
  536. return keys.concat(args);
  537. }
  538. async getRanges(types, start = 0, end = 1, asc = false) {
  539. const client = await this.queue.client;
  540. const args = this.getRangesArgs(types, start, end, asc);
  541. return await this.execCommand(client, 'getRanges', args);
  542. }
  543. getCountsArgs(types) {
  544. const queueKeys = this.queue.keys;
  545. const transformedTypes = types.map(type => {
  546. return type === 'waiting' ? 'wait' : type;
  547. });
  548. const keys = [queueKeys['']];
  549. const args = [...transformedTypes];
  550. return keys.concat(args);
  551. }
  552. async getCounts(types) {
  553. const client = await this.queue.client;
  554. const args = this.getCountsArgs(types);
  555. return await this.execCommand(client, 'getCounts', args);
  556. }
  557. getCountsPerPriorityArgs(priorities) {
  558. const keys = [
  559. this.queue.keys.wait,
  560. this.queue.keys.paused,
  561. this.queue.keys.meta,
  562. this.queue.keys.prioritized,
  563. ];
  564. const args = priorities;
  565. return keys.concat(args);
  566. }
  567. async getCountsPerPriority(priorities) {
  568. const client = await this.queue.client;
  569. const args = this.getCountsPerPriorityArgs(priorities);
  570. return await this.execCommand(client, 'getCountsPerPriority', args);
  571. }
  572. getDependencyCountsArgs(jobId, types) {
  573. const keys = [
  574. `${jobId}:processed`,
  575. `${jobId}:dependencies`,
  576. `${jobId}:failed`,
  577. `${jobId}:unsuccessful`,
  578. ].map(name => {
  579. return this.queue.toKey(name);
  580. });
  581. const args = types;
  582. return keys.concat(args);
  583. }
  584. async getDependencyCounts(jobId, types) {
  585. const client = await this.queue.client;
  586. const args = this.getDependencyCountsArgs(jobId, types);
  587. return await this.execCommand(client, 'getDependencyCounts', args);
  588. }
  589. moveToCompletedArgs(job, returnvalue, removeOnComplete, token, fetchNext = false) {
  590. const timestamp = Date.now();
  591. return this.moveToFinishedArgs(job, returnvalue, 'returnvalue', removeOnComplete, 'completed', token, timestamp, fetchNext);
  592. }
  593. moveToFailedArgs(job, failedReason, removeOnFailed, token, fetchNext = false, fieldsToUpdate) {
  594. const timestamp = Date.now();
  595. return this.moveToFinishedArgs(job, failedReason, 'failedReason', removeOnFailed, 'failed', token, timestamp, fetchNext, fieldsToUpdate);
  596. }
  597. async isFinished(jobId, returnValue = false) {
  598. const client = await this.queue.client;
  599. const keys = ['completed', 'failed', jobId].map((key) => {
  600. return this.queue.toKey(key);
  601. });
  602. return this.execCommand(client, 'isFinished', keys.concat([jobId, returnValue ? '1' : '']));
  603. }
  604. async getState(jobId) {
  605. const client = await this.queue.client;
  606. const keys = [
  607. 'completed',
  608. 'failed',
  609. 'delayed',
  610. 'active',
  611. 'wait',
  612. 'paused',
  613. 'waiting-children',
  614. 'prioritized',
  615. ].map((key) => {
  616. return this.queue.toKey(key);
  617. });
  618. if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6', this.queue.databaseType)) {
  619. return this.execCommand(client, 'getState', keys.concat([jobId]));
  620. }
  621. return this.execCommand(client, 'getStateV2', keys.concat([jobId]));
  622. }
  623. /**
  624. * Change delay of a delayed job.
  625. *
  626. * Reschedules a delayed job by setting a new delay from the current time.
  627. * For example, calling changeDelay(5000) will reschedule the job to execute
  628. * 5000 milliseconds (5 seconds) from now, regardless of the original delay.
  629. *
  630. * @param jobId - the ID of the job to change the delay for.
  631. * @param delay - milliseconds from now when the job should be processed.
  632. * @returns delay in milliseconds.
  633. * @throws JobNotExist
  634. * This exception is thrown if jobId is missing.
  635. * @throws JobNotInState
  636. * This exception is thrown if job is not in delayed state.
  637. */
  638. async changeDelay(jobId, delay) {
  639. const client = await this.queue.client;
  640. const args = this.changeDelayArgs(jobId, delay);
  641. const result = await this.execCommand(client, 'changeDelay', args);
  642. if (result < 0) {
  643. throw this.finishedErrors({
  644. code: result,
  645. jobId,
  646. command: 'changeDelay',
  647. state: 'delayed',
  648. });
  649. }
  650. }
  651. changeDelayArgs(jobId, delay) {
  652. const timestamp = Date.now();
  653. const keys = [
  654. this.queue.keys.delayed,
  655. this.queue.keys.meta,
  656. this.queue.keys.marker,
  657. this.queue.keys.events,
  658. ];
  659. return keys.concat([
  660. delay,
  661. JSON.stringify(timestamp),
  662. jobId,
  663. this.queue.toKey(jobId),
  664. ]);
  665. }
  666. async changePriority(jobId, priority = 0, lifo = false) {
  667. const client = await this.queue.client;
  668. const args = this.changePriorityArgs(jobId, priority, lifo);
  669. const result = await this.execCommand(client, 'changePriority', args);
  670. if (result < 0) {
  671. throw this.finishedErrors({
  672. code: result,
  673. jobId,
  674. command: 'changePriority',
  675. });
  676. }
  677. }
  678. changePriorityArgs(jobId, priority = 0, lifo = false) {
  679. const keys = [
  680. this.queue.keys.wait,
  681. this.queue.keys.paused,
  682. this.queue.keys.meta,
  683. this.queue.keys.prioritized,
  684. this.queue.keys.active,
  685. this.queue.keys.pc,
  686. this.queue.keys.marker,
  687. ];
  688. return keys.concat([priority, this.queue.toKey(''), jobId, lifo ? 1 : 0]);
  689. }
  690. moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) {
  691. const queueKeys = this.queue.keys;
  692. const workerOpts = this.queue.opts;
  693. const keys = [
  694. queueKeys.marker,
  695. queueKeys.active,
  696. queueKeys.prioritized,
  697. queueKeys.delayed,
  698. this.queue.toKey(jobId),
  699. queueKeys.events,
  700. queueKeys.meta,
  701. queueKeys.stalled,
  702. queueKeys.wait,
  703. queueKeys.limiter,
  704. queueKeys.paused,
  705. queueKeys.pc,
  706. ];
  707. const fetchNext = opts.fetchNext && !this.queue.closing ? 1 : 0;
  708. return keys.concat([
  709. this.queue.keys[''],
  710. timestamp,
  711. jobId,
  712. token,
  713. delay,
  714. opts.skipAttempt ? '1' : '0',
  715. opts.fieldsToUpdate
  716. ? pack(objectToFlatArray(opts.fieldsToUpdate))
  717. : void 0,
  718. fetchNext,
  719. fetchNext
  720. ? pack({
  721. token,
  722. lockDuration: workerOpts.lockDuration,
  723. limiter: workerOpts.limiter,
  724. name: workerOpts.name,
  725. })
  726. : void 0,
  727. ]);
  728. }
  729. moveToWaitingChildrenArgs(jobId, token, opts) {
  730. const timestamp = Date.now();
  731. const childKey = getParentKey(opts.child);
  732. const keys = [
  733. 'active',
  734. 'waiting-children',
  735. jobId,
  736. `${jobId}:dependencies`,
  737. `${jobId}:unsuccessful`,
  738. 'stalled',
  739. 'events',
  740. ].map(name => {
  741. return this.queue.toKey(name);
  742. });
  743. return keys.concat([
  744. token,
  745. childKey !== null && childKey !== void 0 ? childKey : '',
  746. JSON.stringify(timestamp),
  747. jobId,
  748. this.queue.toKey(''),
  749. ]);
  750. }
  751. isMaxedArgs() {
  752. const queueKeys = this.queue.keys;
  753. const keys = [queueKeys.meta, queueKeys.active];
  754. return keys;
  755. }
  756. async isMaxed() {
  757. const client = await this.queue.client;
  758. const args = this.isMaxedArgs();
  759. return !!(await this.execCommand(client, 'isMaxed', args));
  760. }
  761. async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) {
  762. const client = await this.queue.client;
  763. const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
  764. const result = await this.execCommand(client, 'moveToDelayed', args);
  765. if (result < 0) {
  766. throw this.finishedErrors({
  767. code: result,
  768. jobId,
  769. command: 'moveToDelayed',
  770. state: 'active',
  771. });
  772. }
  773. else if (typeof result !== 'undefined') {
  774. return raw2NextJobData(result);
  775. }
  776. }
  777. /**
  778. * Move parent job to waiting-children state.
  779. *
  780. * @returns true if job is successfully moved, false if there are pending dependencies.
  781. * @throws JobNotExist
  782. * This exception is thrown if jobId is missing.
  783. * @throws JobLockNotExist
  784. * This exception is thrown if job lock is missing.
  785. * @throws JobNotInState
  786. * This exception is thrown if job is not in active state.
  787. */
  788. async moveToWaitingChildren(jobId, token, opts = {}) {
  789. const client = await this.queue.client;
  790. const args = this.moveToWaitingChildrenArgs(jobId, token, opts);
  791. const result = await this.execCommand(client, 'moveToWaitingChildren', args);
  792. switch (result) {
  793. case 0:
  794. return true;
  795. case 1:
  796. return false;
  797. default:
  798. throw this.finishedErrors({
  799. code: result,
  800. jobId,
  801. command: 'moveToWaitingChildren',
  802. state: 'active',
  803. });
  804. }
  805. }
  806. getRateLimitTtlArgs(maxJobs) {
  807. const keys = [
  808. this.queue.keys.limiter,
  809. this.queue.keys.meta,
  810. ];
  811. return keys.concat([maxJobs !== null && maxJobs !== void 0 ? maxJobs : '0']);
  812. }
  813. async getRateLimitTtl(maxJobs) {
  814. const client = await this.queue.client;
  815. const args = this.getRateLimitTtlArgs(maxJobs);
  816. return this.execCommand(client, 'getRateLimitTtl', args);
  817. }
  818. /**
  819. * Remove jobs in a specific state.
  820. *
  821. * @returns Id jobs from the deleted records.
  822. */
  823. async cleanJobsInSet(set, timestamp, limit = 0) {
  824. const client = await this.queue.client;
  825. return this.execCommand(client, 'cleanJobsInSet', [
  826. this.queue.toKey(set),
  827. this.queue.toKey('events'),
  828. this.queue.toKey('repeat'),
  829. this.queue.toKey(''),
  830. timestamp,
  831. limit,
  832. set,
  833. ]);
  834. }
  835. getJobSchedulerArgs(id) {
  836. const keys = [this.queue.keys.repeat];
  837. return keys.concat([id]);
  838. }
  839. async getJobScheduler(id) {
  840. const client = await this.queue.client;
  841. const args = this.getJobSchedulerArgs(id);
  842. return this.execCommand(client, 'getJobScheduler', args);
  843. }
  844. retryJobArgs(jobId, lifo, token, opts = {}) {
  845. const keys = [
  846. this.queue.keys.active,
  847. this.queue.keys.wait,
  848. this.queue.keys.paused,
  849. this.queue.toKey(jobId),
  850. this.queue.keys.meta,
  851. this.queue.keys.events,
  852. this.queue.keys.delayed,
  853. this.queue.keys.prioritized,
  854. this.queue.keys.pc,
  855. this.queue.keys.marker,
  856. this.queue.keys.stalled,
  857. ];
  858. const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
  859. return keys.concat([
  860. this.queue.toKey(''),
  861. Date.now(),
  862. pushCmd,
  863. jobId,
  864. token,
  865. opts.fieldsToUpdate
  866. ? pack(objectToFlatArray(opts.fieldsToUpdate))
  867. : void 0,
  868. ]);
  869. }
  870. async retryJob(jobId, lifo, token = '0', opts = {}) {
  871. const client = await this.queue.client;
  872. const args = this.retryJobArgs(jobId, lifo, token, opts);
  873. const result = await this.execCommand(client, 'retryJob', args);
  874. if (result < 0) {
  875. throw this.finishedErrors({
  876. code: result,
  877. jobId,
  878. command: 'retryJob',
  879. state: 'active',
  880. });
  881. }
  882. }
  883. moveJobsToWaitArgs(state, count, timestamp) {
  884. const keys = [
  885. this.queue.toKey(''),
  886. this.queue.keys.events,
  887. this.queue.toKey(state),
  888. this.queue.toKey('wait'),
  889. this.queue.toKey('paused'),
  890. this.queue.keys.meta,
  891. this.queue.keys.active,
  892. this.queue.keys.marker,
  893. ];
  894. const args = [count, timestamp, state];
  895. return keys.concat(args);
  896. }
  897. async retryJobs(state = 'failed', count = 1000, timestamp = new Date().getTime()) {
  898. const client = await this.queue.client;
  899. const args = this.moveJobsToWaitArgs(state, count, timestamp);
  900. return this.execCommand(client, 'moveJobsToWait', args);
  901. }
  902. async promoteJobs(count = 1000) {
  903. const client = await this.queue.client;
  904. const args = this.moveJobsToWaitArgs('delayed', count, Number.MAX_VALUE);
  905. return this.execCommand(client, 'moveJobsToWait', args);
  906. }
  907. /**
  908. * Attempts to reprocess a job
  909. *
  910. * @param job - The job to reprocess
  911. * @param state - The expected job state. If the job is not found
  912. * on the provided state, then it's not reprocessed. Supported states: 'failed', 'completed'
  913. *
  914. * @returns A promise that resolves when the job has been successfully moved to the wait queue.
  915. * @throws Will throw an error with a code property indicating the failure reason:
  916. * - code 0: Job does not exist
  917. * - code -1: Job is currently locked and can't be retried
  918. * - code -2: Job was not found in the expected set
  919. */
  920. async reprocessJob(job, state, opts = {}) {
  921. const client = await this.queue.client;
  922. const keys = [
  923. this.queue.toKey(job.id),
  924. this.queue.keys.events,
  925. this.queue.toKey(state),
  926. this.queue.keys.wait,
  927. this.queue.keys.meta,
  928. this.queue.keys.paused,
  929. this.queue.keys.active,
  930. this.queue.keys.marker,
  931. ];
  932. const args = [
  933. job.id,
  934. (job.opts.lifo ? 'R' : 'L') + 'PUSH',
  935. state === 'failed' ? 'failedReason' : 'returnvalue',
  936. state,
  937. opts.resetAttemptsMade ? '1' : '0',
  938. opts.resetAttemptsStarted ? '1' : '0',
  939. ];
  940. const result = await this.execCommand(client, 'reprocessJob', keys.concat(args));
  941. switch (result) {
  942. case 1:
  943. return;
  944. default:
  945. throw this.finishedErrors({
  946. code: result,
  947. jobId: job.id,
  948. command: 'reprocessJob',
  949. state,
  950. });
  951. }
  952. }
  953. async getMetrics(type, start = 0, end = -1) {
  954. const client = await this.queue.client;
  955. const keys = [
  956. this.queue.toKey(`metrics:${type}`),
  957. this.queue.toKey(`metrics:${type}:data`),
  958. ];
  959. const args = [start, end];
  960. const result = await this.execCommand(client, 'getMetrics', keys.concat(args));
  961. return result;
  962. }
  963. async moveToActive(client, token, name) {
  964. const opts = this.queue.opts;
  965. const queueKeys = this.queue.keys;
  966. const keys = [
  967. queueKeys.wait,
  968. queueKeys.active,
  969. queueKeys.prioritized,
  970. queueKeys.events,
  971. queueKeys.stalled,
  972. queueKeys.limiter,
  973. queueKeys.delayed,
  974. queueKeys.paused,
  975. queueKeys.meta,
  976. queueKeys.pc,
  977. queueKeys.marker,
  978. ];
  979. const args = [
  980. queueKeys[''],
  981. Date.now(),
  982. pack({
  983. token,
  984. lockDuration: opts.lockDuration,
  985. limiter: opts.limiter,
  986. name,
  987. }),
  988. ];
  989. const result = await this.execCommand(client, 'moveToActive', keys.concat(args));
  990. return raw2NextJobData(result);
  991. }
  992. async promote(jobId) {
  993. const client = await this.queue.client;
  994. const keys = [
  995. this.queue.keys.delayed,
  996. this.queue.keys.wait,
  997. this.queue.keys.paused,
  998. this.queue.keys.meta,
  999. this.queue.keys.prioritized,
  1000. this.queue.keys.active,
  1001. this.queue.keys.pc,
  1002. this.queue.keys.events,
  1003. this.queue.keys.marker,
  1004. ];
  1005. const args = [this.queue.toKey(''), jobId];
  1006. const code = await this.execCommand(client, 'promote', keys.concat(args));
  1007. if (code < 0) {
  1008. throw this.finishedErrors({
  1009. code,
  1010. jobId,
  1011. command: 'promote',
  1012. state: 'delayed',
  1013. });
  1014. }
  1015. }
  1016. moveStalledJobsToWaitArgs() {
  1017. const opts = this.queue.opts;
  1018. const keys = [
  1019. this.queue.keys.stalled,
  1020. this.queue.keys.wait,
  1021. this.queue.keys.active,
  1022. this.queue.keys['stalled-check'],
  1023. this.queue.keys.meta,
  1024. this.queue.keys.paused,
  1025. this.queue.keys.marker,
  1026. this.queue.keys.events,
  1027. ];
  1028. const args = [
  1029. opts.maxStalledCount,
  1030. this.queue.toKey(''),
  1031. Date.now(),
  1032. opts.stalledInterval,
  1033. ];
  1034. return keys.concat(args);
  1035. }
  1036. /**
  1037. * Looks for unlocked jobs in the active queue.
  1038. *
  1039. * The job was being worked on, but the worker process died and it failed to renew the lock.
  1040. * We call these jobs 'stalled'. This is the most common case. We resolve these by moving them
  1041. * back to wait to be re-processed. To prevent jobs from cycling endlessly between active and wait,
  1042. * (e.g. if the job handler keeps crashing),
  1043. * we limit the number stalled job recoveries to settings.maxStalledCount.
  1044. */
  1045. async moveStalledJobsToWait() {
  1046. const client = await this.queue.client;
  1047. const args = this.moveStalledJobsToWaitArgs();
  1048. return this.execCommand(client, 'moveStalledJobsToWait', args);
  1049. }
  1050. /**
  1051. * Moves a job back from Active to Wait.
  1052. * This script is used when a job has been manually rate limited and needs
  1053. * to be moved back to wait from active status.
  1054. *
  1055. * @param client - Redis client
  1056. * @param jobId - Job id
  1057. * @returns
  1058. */
  1059. async moveJobFromActiveToWait(jobId, token = '0') {
  1060. const client = await this.queue.client;
  1061. const keys = [
  1062. this.queue.keys.active,
  1063. this.queue.keys.wait,
  1064. this.queue.keys.stalled,
  1065. this.queue.keys.paused,
  1066. this.queue.keys.meta,
  1067. this.queue.keys.limiter,
  1068. this.queue.keys.prioritized,
  1069. this.queue.keys.marker,
  1070. this.queue.keys.events,
  1071. ];
  1072. const args = [jobId, token, this.queue.toKey(jobId)];
  1073. const result = await this.execCommand(client, 'moveJobFromActiveToWait', keys.concat(args));
  1074. if (result < 0) {
  1075. throw this.finishedErrors({
  1076. code: result,
  1077. jobId,
  1078. command: 'moveJobFromActiveToWait',
  1079. state: 'active',
  1080. });
  1081. }
  1082. return result;
  1083. }
  1084. async obliterate(opts) {
  1085. const client = await this.queue.client;
  1086. const keys = [
  1087. this.queue.keys.meta,
  1088. this.queue.toKey(''),
  1089. ];
  1090. const args = [opts.count, opts.force ? 'force' : null];
  1091. const result = await this.execCommand(client, 'obliterate', keys.concat(args));
  1092. if (result < 0) {
  1093. switch (result) {
  1094. case -1:
  1095. throw new Error('Cannot obliterate non-paused queue');
  1096. case -2:
  1097. throw new Error('Cannot obliterate queue with active jobs');
  1098. }
  1099. }
  1100. return result;
  1101. }
  1102. /**
  1103. * Paginate a set or hash keys.
  1104. * @param opts - options to define the pagination behaviour
  1105. *
  1106. */
  1107. async paginate(key, opts) {
  1108. const client = await this.queue.client;
  1109. const keys = [key];
  1110. const maxIterations = 5;
  1111. const pageSize = opts.end >= 0 ? opts.end - opts.start + 1 : Infinity;
  1112. let cursor = '0', offset = 0, items, total, rawJobs, page = [], jobs = [];
  1113. do {
  1114. const args = [
  1115. opts.start + page.length,
  1116. opts.end,
  1117. cursor,
  1118. offset,
  1119. maxIterations,
  1120. ];
  1121. if (opts.fetchJobs) {
  1122. args.push(1);
  1123. }
  1124. [cursor, offset, items, total, rawJobs] = await this.execCommand(client, 'paginate', keys.concat(args));
  1125. page = page.concat(items);
  1126. if (rawJobs && rawJobs.length) {
  1127. jobs = jobs.concat(rawJobs.map(array2obj));
  1128. }
  1129. // Important to keep this coercive inequality (!=) instead of strict inequality (!==)
  1130. } while (cursor != '0' && page.length < pageSize);
  1131. // If we get an array of arrays, it means we are paginating a hash
  1132. if (page.length && Array.isArray(page[0])) {
  1133. const result = [];
  1134. for (let index = 0; index < page.length; index++) {
  1135. const [id, value] = page[index];
  1136. try {
  1137. result.push({ id, v: JSON.parse(value) });
  1138. }
  1139. catch (err) {
  1140. result.push({ id, err: err.message });
  1141. }
  1142. }
  1143. return {
  1144. cursor,
  1145. items: result,
  1146. total,
  1147. jobs,
  1148. };
  1149. }
  1150. else {
  1151. return {
  1152. cursor,
  1153. items: page.map(item => ({ id: item })),
  1154. total,
  1155. jobs,
  1156. };
  1157. }
  1158. }
  1159. finishedErrors({ code, jobId, parentKey, command, state, }) {
  1160. let error;
  1161. switch (code) {
  1162. case ErrorCode.JobNotExist:
  1163. error = new Error(`Missing key for job ${jobId}. ${command}`);
  1164. break;
  1165. case ErrorCode.JobLockNotExist:
  1166. error = new Error(`Missing lock for job ${jobId}. ${command}`);
  1167. break;
  1168. case ErrorCode.JobNotInState:
  1169. error = new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
  1170. break;
  1171. case ErrorCode.JobPendingChildren:
  1172. error = new Error(`Job ${jobId} has pending dependencies. ${command}`);
  1173. break;
  1174. case ErrorCode.ParentJobNotExist:
  1175. error = new Error(`Missing key for parent job ${parentKey}. ${command}`);
  1176. break;
  1177. case ErrorCode.JobLockMismatch:
  1178. error = new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
  1179. break;
  1180. case ErrorCode.ParentJobCannotBeReplaced:
  1181. error = new Error(`The parent job ${parentKey} cannot be replaced. ${command}`);
  1182. break;
  1183. case ErrorCode.JobBelongsToJobScheduler:
  1184. error = new Error(`Job ${jobId} belongs to a job scheduler and cannot be removed directly. ${command}`);
  1185. break;
  1186. case ErrorCode.JobHasFailedChildren:
  1187. error = new UnrecoverableError(`Cannot complete job ${jobId} because it has at least one failed child. ${command}`);
  1188. break;
  1189. case ErrorCode.SchedulerJobIdCollision:
  1190. error = new Error(`Cannot create job scheduler iteration - job ID already exists. ${command}`);
  1191. break;
  1192. case ErrorCode.SchedulerJobSlotsBusy:
  1193. error = new Error(`Cannot create job scheduler iteration - current and next time slots already have jobs. ${command}`);
  1194. break;
  1195. default:
  1196. error = new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
  1197. }
  1198. // Add the code property to the error object
  1199. error.code = code;
  1200. return error;
  1201. }
  1202. async removeOrphanedJobs(candidateJobIds, stateKeySuffixes, jobSubKeySuffixes) {
  1203. const client = await this.queue.client;
  1204. const args = [
  1205. this.queue.toKey(''),
  1206. stateKeySuffixes.length,
  1207. ...stateKeySuffixes,
  1208. jobSubKeySuffixes.length,
  1209. ...jobSubKeySuffixes,
  1210. ...candidateJobIds,
  1211. ];
  1212. return this.execCommand(client, 'removeOrphanedJobs', args);
  1213. }
  1214. }
  1215. export function raw2NextJobData(raw) {
  1216. if (raw) {
  1217. const result = [null, raw[1], raw[2], raw[3]];
  1218. if (raw[0]) {
  1219. result[0] = array2obj(raw[0]);
  1220. }
  1221. return result;
  1222. }
  1223. return [];
  1224. }
  1225. //# sourceMappingURL=scripts.js.map