queue.js 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738
  1. import { v4 } from 'uuid';
  2. import { Job } from './job';
  3. import { QueueGetters } from './queue-getters';
  4. import { Repeat } from './repeat';
  5. import { SpanKind, TelemetryAttributes } from '../enums';
  6. import { JobScheduler } from './job-scheduler';
  7. import { version } from '../version';
  8. /**
  9. * Queue
  10. *
  11. * This class provides methods to add jobs to a queue and some other high-level
  12. * administration such as pausing or deleting queues.
  13. *
  14. * @typeParam DataType - The type of the data that the job will process.
  15. * @typeParam ResultType - The type of the result of the job.
  16. * @typeParam NameType - The type of the name of the job.
  17. *
  18. * @example
  19. *
  20. * ```typescript
  21. * import { Queue } from 'bullmq';
  22. *
  23. * interface MyDataType {
  24. * foo: string;
  25. * }
  26. *
  27. * interface MyResultType {
  28. * bar: string;
  29. * }
  30. *
  31. * const queue = new Queue<MyDataType, MyResultType, "blue" | "brown">('myQueue');
  32. * ```
  33. */
  34. export class Queue extends QueueGetters {
  35. constructor(name, opts, Connection) {
  36. var _a;
  37. super(name, Object.assign({}, opts), Connection);
  38. this.token = v4();
  39. this.libName = 'bullmq';
  40. this.jobsOpts = (_a = opts === null || opts === void 0 ? void 0 : opts.defaultJobOptions) !== null && _a !== void 0 ? _a : {};
  41. this.waitUntilReady()
  42. .then(client => {
  43. if (!this.closing && !(opts === null || opts === void 0 ? void 0 : opts.skipMetasUpdate)) {
  44. return client.hmset(this.keys.meta, this.metaValues);
  45. }
  46. })
  47. .catch(err => {
  48. // We ignore this error to avoid warnings. The error can still
  49. // be received by listening to event 'error'
  50. });
  51. }
  52. emit(event, ...args) {
  53. return super.emit(event, ...args);
  54. }
  55. off(eventName, listener) {
  56. super.off(eventName, listener);
  57. return this;
  58. }
  59. on(event, listener) {
  60. super.on(event, listener);
  61. return this;
  62. }
  63. once(event, listener) {
  64. super.once(event, listener);
  65. return this;
  66. }
  67. /**
  68. * Returns this instance current default job options.
  69. */
  70. get defaultJobOptions() {
  71. return Object.assign({}, this.jobsOpts);
  72. }
  73. get metaValues() {
  74. var _a, _b, _c, _d;
  75. return {
  76. 'opts.maxLenEvents': (_d = (_c = (_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.streams) === null || _b === void 0 ? void 0 : _b.events) === null || _c === void 0 ? void 0 : _c.maxLen) !== null && _d !== void 0 ? _d : 10000,
  77. version: `${this.libName}:${version}`,
  78. };
  79. }
  80. /**
  81. * Get library version.
  82. *
  83. * @returns the content of the meta.library field.
  84. */
  85. async getVersion() {
  86. const client = await this.client;
  87. return await client.hget(this.keys.meta, 'version');
  88. }
  89. get repeat() {
  90. return new Promise(async (resolve) => {
  91. if (!this._repeat) {
  92. this._repeat = new Repeat(this.name, Object.assign(Object.assign({}, this.opts), { connection: await this.client }));
  93. this._repeat.on('error', this.emit.bind(this, 'error'));
  94. }
  95. resolve(this._repeat);
  96. });
  97. }
  98. get jobScheduler() {
  99. return new Promise(async (resolve) => {
  100. if (!this._jobScheduler) {
  101. this._jobScheduler = new JobScheduler(this.name, Object.assign(Object.assign({}, this.opts), { connection: await this.client }));
  102. this._jobScheduler.on('error', this.emit.bind(this, 'error'));
  103. }
  104. resolve(this._jobScheduler);
  105. });
  106. }
  107. /**
  108. * Enable and set global concurrency value.
  109. * @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
  110. * For instance, setting this value to 1 ensures that no more than one job
  111. * is processed at any given time. If this limit is not defined, there will be no
  112. * restriction on the number of concurrent jobs.
  113. */
  114. async setGlobalConcurrency(concurrency) {
  115. const client = await this.client;
  116. return client.hset(this.keys.meta, 'concurrency', concurrency);
  117. }
  118. /**
  119. * Enable and set rate limit.
  120. * @param max - Max number of jobs to process in the time period specified in `duration`
  121. * @param duration - Time in milliseconds. During this time, a maximum of `max` jobs will be processed.
  122. */
  123. async setGlobalRateLimit(max, duration) {
  124. const client = await this.client;
  125. return client.hset(this.keys.meta, 'max', max, 'duration', duration);
  126. }
  127. /**
  128. * Remove global concurrency value.
  129. */
  130. async removeGlobalConcurrency() {
  131. const client = await this.client;
  132. return client.hdel(this.keys.meta, 'concurrency');
  133. }
  134. /**
  135. * Remove global rate limit values.
  136. */
  137. async removeGlobalRateLimit() {
  138. const client = await this.client;
  139. return client.hdel(this.keys.meta, 'max', 'duration');
  140. }
  141. /**
  142. * Adds a new job to the queue.
  143. *
  144. * @param name - Name of the job to be added to the queue.
  145. * @param data - Arbitrary data to append to the job.
  146. * @param opts - Job options that affects how the job is going to be processed.
  147. */
  148. async add(name, data, opts) {
  149. return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${name}`, async (span, srcPropagationMetadata) => {
  150. var _a;
  151. if (srcPropagationMetadata && !((_a = opts === null || opts === void 0 ? void 0 : opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext)) {
  152. const telemetry = {
  153. metadata: srcPropagationMetadata,
  154. };
  155. opts = Object.assign(Object.assign({}, opts), { telemetry });
  156. }
  157. const job = await this.addJob(name, data, opts);
  158. span === null || span === void 0 ? void 0 : span.setAttributes({
  159. [TelemetryAttributes.JobName]: name,
  160. [TelemetryAttributes.JobId]: job.id,
  161. });
  162. return job;
  163. });
  164. }
  165. /**
  166. * addJob is a telemetry free version of the add method, useful in order to wrap it
  167. * with custom telemetry on subclasses.
  168. *
  169. * @param name - Name of the job to be added to the queue.
  170. * @param data - Arbitrary data to append to the job.
  171. * @param opts - Job options that affects how the job is going to be processed.
  172. *
  173. * @returns Job
  174. */
  175. async addJob(name, data, opts) {
  176. if (opts && opts.repeat) {
  177. if (opts.repeat.endDate) {
  178. if (+new Date(opts.repeat.endDate) < Date.now()) {
  179. throw new Error('End date must be greater than current timestamp');
  180. }
  181. }
  182. return (await this.repeat).updateRepeatableJob(name, data, Object.assign(Object.assign({}, this.jobsOpts), opts), { override: true });
  183. }
  184. else {
  185. const jobId = opts === null || opts === void 0 ? void 0 : opts.jobId;
  186. if (jobId == '0' || (jobId === null || jobId === void 0 ? void 0 : jobId.startsWith('0:'))) {
  187. throw new Error("JobId cannot be '0' or start with 0:");
  188. }
  189. const mergedOpts = Object.assign(Object.assign(Object.assign({}, this.jobsOpts), opts), { jobId });
  190. const job = await this.Job.create(this, name, data, mergedOpts);
  191. this.emit('waiting', job);
  192. return job;
  193. }
  194. }
  195. /**
  196. * Adds an array of jobs to the queue. This method may be faster than adding
  197. * one job at a time in a sequence.
  198. *
  199. * @param jobs - The array of jobs to add to the queue. Each job is defined by 3
  200. * properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
  201. */
  202. async addBulk(jobs) {
  203. return this.trace(SpanKind.PRODUCER, 'addBulk', this.name, async (span, srcPropagationMetadata) => {
  204. if (span) {
  205. span.setAttributes({
  206. [TelemetryAttributes.BulkNames]: jobs.map(job => job.name),
  207. [TelemetryAttributes.BulkCount]: jobs.length,
  208. });
  209. }
  210. return await this.Job.createBulk(this, jobs.map(job => {
  211. var _a, _b, _c, _d, _e, _f;
  212. let telemetry = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry;
  213. if (srcPropagationMetadata) {
  214. const omitContext = (_c = (_b = job.opts) === null || _b === void 0 ? void 0 : _b.telemetry) === null || _c === void 0 ? void 0 : _c.omitContext;
  215. const telemetryMetadata = ((_e = (_d = job.opts) === null || _d === void 0 ? void 0 : _d.telemetry) === null || _e === void 0 ? void 0 : _e.metadata) ||
  216. (!omitContext && srcPropagationMetadata);
  217. if (telemetryMetadata || omitContext) {
  218. telemetry = {
  219. metadata: telemetryMetadata,
  220. omitContext,
  221. };
  222. }
  223. }
  224. const mergedOpts = Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_f = job.opts) === null || _f === void 0 ? void 0 : _f.jobId, telemetry });
  225. return {
  226. name: job.name,
  227. data: job.data,
  228. opts: mergedOpts,
  229. };
  230. }));
  231. });
  232. }
  233. /**
  234. * Upserts a scheduler.
  235. *
  236. * A scheduler is a job factory that creates jobs at a given interval.
  237. * Upserting a scheduler will create a new job scheduler or update an existing one.
  238. * It will also create the first job based on the repeat options and delayed accordingly.
  239. *
  240. * @param key - Unique key for the repeatable job meta.
  241. * @param repeatOpts - Repeat options
  242. * @param jobTemplate - Job template. If provided it will be used for all the jobs
  243. * created by the scheduler.
  244. *
  245. * @returns The next job to be scheduled (would normally be in delayed state).
  246. */
  247. async upsertJobScheduler(jobSchedulerId, repeatOpts, jobTemplate) {
  248. var _a, _b;
  249. if (repeatOpts.endDate) {
  250. if (+new Date(repeatOpts.endDate) < Date.now()) {
  251. throw new Error('End date must be greater than current timestamp');
  252. }
  253. }
  254. return (await this.jobScheduler).upsertJobScheduler(jobSchedulerId, repeatOpts, (_a = jobTemplate === null || jobTemplate === void 0 ? void 0 : jobTemplate.name) !== null && _a !== void 0 ? _a : jobSchedulerId, (_b = jobTemplate === null || jobTemplate === void 0 ? void 0 : jobTemplate.data) !== null && _b !== void 0 ? _b : {}, Object.assign(Object.assign({}, this.jobsOpts), jobTemplate === null || jobTemplate === void 0 ? void 0 : jobTemplate.opts), { override: true });
  255. }
  256. /**
  257. * Pauses the processing of this queue globally.
  258. *
  259. * We use an atomic RENAME operation on the wait queue. Since
  260. * we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
  261. * is renamed to 'paused', no new jobs will be processed (the current ones
  262. * will run until finalized).
  263. *
  264. * Adding jobs requires a LUA script to check first if the paused list exist
  265. * and in that case it will add it there instead of the wait list.
  266. */
  267. async pause() {
  268. await this.trace(SpanKind.INTERNAL, 'pause', this.name, async () => {
  269. await this.scripts.pause(true);
  270. this.emit('paused');
  271. });
  272. }
  273. /**
  274. * Close the queue instance.
  275. *
  276. */
  277. async close() {
  278. await this.trace(SpanKind.INTERNAL, 'close', this.name, async () => {
  279. if (!this.closing) {
  280. if (this._repeat) {
  281. await this._repeat.close();
  282. }
  283. }
  284. await super.close();
  285. });
  286. }
  287. /**
  288. * Overrides the rate limit to be active for the next jobs.
  289. *
  290. * @param expireTimeMs - expire time in ms of this rate limit.
  291. */
  292. async rateLimit(expireTimeMs) {
  293. await this.trace(SpanKind.INTERNAL, 'rateLimit', this.name, async (span) => {
  294. span === null || span === void 0 ? void 0 : span.setAttributes({
  295. [TelemetryAttributes.QueueRateLimit]: expireTimeMs,
  296. });
  297. await this.client.then(client => client.set(this.keys.limiter, Number.MAX_SAFE_INTEGER, 'PX', expireTimeMs));
  298. });
  299. }
  300. /**
  301. * Resumes the processing of this queue globally.
  302. *
  303. * The method reverses the pause operation by resuming the processing of the
  304. * queue.
  305. */
  306. async resume() {
  307. await this.trace(SpanKind.INTERNAL, 'resume', this.name, async () => {
  308. await this.scripts.pause(false);
  309. this.emit('resumed');
  310. });
  311. }
  312. /**
  313. * Returns true if the queue is currently paused.
  314. */
  315. async isPaused() {
  316. const client = await this.client;
  317. const pausedKeyExists = await client.hexists(this.keys.meta, 'paused');
  318. return pausedKeyExists === 1;
  319. }
  320. /**
  321. * Returns true if the queue is currently maxed.
  322. */
  323. isMaxed() {
  324. return this.scripts.isMaxed();
  325. }
  326. /**
  327. * Get all repeatable meta jobs.
  328. *
  329. * @deprecated This method is deprecated and will be removed in v6. Use getJobSchedulers instead.
  330. *
  331. * @param start - Offset of first job to return.
  332. * @param end - Offset of last job to return.
  333. * @param asc - Determine the order in which jobs are returned based on their
  334. * next execution time.
  335. */
  336. async getRepeatableJobs(start, end, asc) {
  337. return (await this.repeat).getRepeatableJobs(start, end, asc);
  338. }
  339. /**
  340. * Get Job Scheduler by id
  341. *
  342. * @param id - identifier of scheduler.
  343. */
  344. async getJobScheduler(id) {
  345. return (await this.jobScheduler).getScheduler(id);
  346. }
  347. /**
  348. * Get all Job Schedulers
  349. *
  350. * @param start - Offset of first scheduler to return.
  351. * @param end - Offset of last scheduler to return.
  352. * @param asc - Determine the order in which schedulers are returned based on their
  353. * next execution time.
  354. */
  355. async getJobSchedulers(start, end, asc) {
  356. return (await this.jobScheduler).getJobSchedulers(start, end, asc);
  357. }
  358. /**
  359. *
  360. * Get the number of job schedulers.
  361. *
  362. * @returns The number of job schedulers.
  363. */
  364. async getJobSchedulersCount() {
  365. return (await this.jobScheduler).getSchedulersCount();
  366. }
  367. /**
  368. * Removes a repeatable job.
  369. *
  370. * Note: you need to use the exact same repeatOpts when deleting a repeatable job
  371. * than when adding it.
  372. *
  373. * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead.
  374. *
  375. * @see removeRepeatableByKey
  376. *
  377. * @param name - Job name
  378. * @param repeatOpts - Repeat options
  379. * @param jobId - Job id to remove. If not provided, all jobs with the same repeatOpts
  380. * @returns
  381. */
  382. async removeRepeatable(name, repeatOpts, jobId) {
  383. return this.trace(SpanKind.INTERNAL, 'removeRepeatable', `${this.name}.${name}`, async (span) => {
  384. span === null || span === void 0 ? void 0 : span.setAttributes({
  385. [TelemetryAttributes.JobName]: name,
  386. [TelemetryAttributes.JobId]: jobId,
  387. });
  388. const repeat = await this.repeat;
  389. const removed = await repeat.removeRepeatable(name, repeatOpts, jobId);
  390. return !removed;
  391. });
  392. }
  393. /**
  394. *
  395. * Removes a job scheduler.
  396. *
  397. * @param jobSchedulerId - identifier of the job scheduler.
  398. *
  399. * @returns
  400. */
  401. async removeJobScheduler(jobSchedulerId) {
  402. const jobScheduler = await this.jobScheduler;
  403. const removed = await jobScheduler.removeJobScheduler(jobSchedulerId);
  404. return !removed;
  405. }
  406. /**
  407. * Removes a debounce key.
  408. * @deprecated use removeDeduplicationKey
  409. *
  410. * @param id - debounce identifier
  411. */
  412. async removeDebounceKey(id) {
  413. return this.trace(SpanKind.INTERNAL, 'removeDebounceKey', `${this.name}`, async (span) => {
  414. span === null || span === void 0 ? void 0 : span.setAttributes({
  415. [TelemetryAttributes.JobKey]: id,
  416. });
  417. const client = await this.client;
  418. return await client.del(`${this.keys.de}:${id}`);
  419. });
  420. }
  421. /**
  422. * Removes a deduplication key.
  423. *
  424. * @param id - identifier
  425. */
  426. async removeDeduplicationKey(id) {
  427. return this.trace(SpanKind.INTERNAL, 'removeDeduplicationKey', `${this.name}`, async (span) => {
  428. span === null || span === void 0 ? void 0 : span.setAttributes({
  429. [TelemetryAttributes.DeduplicationKey]: id,
  430. });
  431. const client = await this.client;
  432. return client.del(`${this.keys.de}:${id}`);
  433. });
  434. }
  435. /**
  436. * Removes rate limit key.
  437. */
  438. async removeRateLimitKey() {
  439. const client = await this.client;
  440. return client.del(this.keys.limiter);
  441. }
  442. /**
  443. * Removes a repeatable job by its key. Note that the key is the one used
  444. * to store the repeatable job metadata and not one of the job iterations
  445. * themselves. You can use "getRepeatableJobs" in order to get the keys.
  446. *
  447. * @see getRepeatableJobs
  448. *
  449. * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead.
  450. *
  451. * @param repeatJobKey - To the repeatable job.
  452. * @returns
  453. */
  454. async removeRepeatableByKey(key) {
  455. return this.trace(SpanKind.INTERNAL, 'removeRepeatableByKey', `${this.name}`, async (span) => {
  456. span === null || span === void 0 ? void 0 : span.setAttributes({
  457. [TelemetryAttributes.JobKey]: key,
  458. });
  459. const repeat = await this.repeat;
  460. const removed = await repeat.removeRepeatableByKey(key);
  461. return !removed;
  462. });
  463. }
  464. /**
  465. * Removes the given job from the queue as well as all its
  466. * dependencies.
  467. *
  468. * @param jobId - The id of the job to remove
  469. * @param opts - Options to remove a job
  470. * @returns 1 if it managed to remove the job or 0 if the job or
  471. * any of its dependencies were locked.
  472. */
  473. async remove(jobId, { removeChildren = true } = {}) {
  474. return this.trace(SpanKind.INTERNAL, 'remove', this.name, async (span) => {
  475. span === null || span === void 0 ? void 0 : span.setAttributes({
  476. [TelemetryAttributes.JobId]: jobId,
  477. [TelemetryAttributes.JobOptions]: JSON.stringify({
  478. removeChildren,
  479. }),
  480. });
  481. const code = await this.scripts.remove(jobId, removeChildren);
  482. if (code === 1) {
  483. this.emit('removed', jobId);
  484. }
  485. return code;
  486. });
  487. }
  488. /**
  489. * Updates the given job's progress.
  490. *
  491. * @param jobId - The id of the job to update
  492. * @param progress - Number or object to be saved as progress.
  493. */
  494. async updateJobProgress(jobId, progress) {
  495. await this.trace(SpanKind.INTERNAL, 'updateJobProgress', this.name, async (span) => {
  496. span === null || span === void 0 ? void 0 : span.setAttributes({
  497. [TelemetryAttributes.JobId]: jobId,
  498. [TelemetryAttributes.JobProgress]: JSON.stringify(progress),
  499. });
  500. await this.scripts.updateProgress(jobId, progress);
  501. this.emit('progress', jobId, progress);
  502. });
  503. }
  504. /**
  505. * Logs one row of job's log data.
  506. *
  507. * @param jobId - The job id to log against.
  508. * @param logRow - String with log data to be logged.
  509. * @param keepLogs - Max number of log entries to keep (0 for unlimited).
  510. *
  511. * @returns The total number of log entries for this job so far.
  512. */
  513. async addJobLog(jobId, logRow, keepLogs) {
  514. return Job.addJobLog(this, jobId, logRow, keepLogs);
  515. }
  516. /**
  517. * Drains the queue, i.e., removes all jobs that are waiting
  518. * or delayed, but not active, completed or failed.
  519. *
  520. * @param delayed - Pass true if it should also clean the
  521. * delayed jobs.
  522. */
  523. async drain(delayed = false) {
  524. await this.trace(SpanKind.INTERNAL, 'drain', this.name, async (span) => {
  525. span === null || span === void 0 ? void 0 : span.setAttributes({
  526. [TelemetryAttributes.QueueDrainDelay]: delayed,
  527. });
  528. await this.scripts.drain(delayed);
  529. });
  530. }
  531. /**
  532. * Cleans jobs from a queue. Similar to drain but keeps jobs within a certain
  533. * grace period.
  534. *
  535. * @param grace - The grace period in milliseconds
  536. * @param limit - Max number of jobs to clean
  537. * @param type - The type of job to clean
  538. * Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed.
  539. * @returns Id jobs from the deleted records
  540. */
  541. async clean(grace, limit, type = 'completed') {
  542. return this.trace(SpanKind.INTERNAL, 'clean', this.name, async (span) => {
  543. const maxCount = limit || Infinity;
  544. const maxCountPerCall = Math.min(10000, maxCount);
  545. const timestamp = Date.now() - grace;
  546. let deletedCount = 0;
  547. const deletedJobsIds = [];
  548. // Normalize 'waiting' to 'wait' for consistency with internal Redis keys
  549. const normalizedType = type === 'waiting' ? 'wait' : type;
  550. while (deletedCount < maxCount) {
  551. const jobsIds = await this.scripts.cleanJobsInSet(normalizedType, timestamp, maxCountPerCall);
  552. this.emit('cleaned', jobsIds, normalizedType);
  553. deletedCount += jobsIds.length;
  554. deletedJobsIds.push(...jobsIds);
  555. if (jobsIds.length < maxCountPerCall) {
  556. break;
  557. }
  558. }
  559. span === null || span === void 0 ? void 0 : span.setAttributes({
  560. [TelemetryAttributes.QueueGrace]: grace,
  561. [TelemetryAttributes.JobType]: type,
  562. [TelemetryAttributes.QueueCleanLimit]: maxCount,
  563. [TelemetryAttributes.JobIds]: deletedJobsIds,
  564. });
  565. return deletedJobsIds;
  566. });
  567. }
  568. /**
  569. * Completely destroys the queue and all of its contents irreversibly.
  570. * This method will *pause* the queue and requires that there are no
  571. * active jobs. It is possible to bypass this requirement, i.e. not
  572. * having active jobs using the "force" option.
  573. *
  574. * Note: This operation requires to iterate on all the jobs stored in the queue
  575. * and can be slow for very large queues.
  576. *
  577. * @param opts - Obliterate options.
  578. */
  579. async obliterate(opts) {
  580. await this.trace(SpanKind.INTERNAL, 'obliterate', this.name, async () => {
  581. await this.pause();
  582. let cursor = 0;
  583. do {
  584. cursor = await this.scripts.obliterate(Object.assign({ force: false, count: 1000 }, opts));
  585. } while (cursor);
  586. });
  587. }
  588. /**
  589. * Retry all the failed or completed jobs.
  590. *
  591. * @param opts - An object with the following properties:
  592. * - count number to limit how many jobs will be moved to wait status per iteration,
  593. * - state failed by default or completed.
  594. * - timestamp from which timestamp to start moving jobs to wait status, default Date.now().
  595. *
  596. * @returns
  597. */
  598. async retryJobs(opts = {}) {
  599. await this.trace(SpanKind.PRODUCER, 'retryJobs', this.name, async (span) => {
  600. span === null || span === void 0 ? void 0 : span.setAttributes({
  601. [TelemetryAttributes.QueueOptions]: JSON.stringify(opts),
  602. });
  603. let cursor = 0;
  604. do {
  605. cursor = await this.scripts.retryJobs(opts.state, opts.count, opts.timestamp);
  606. } while (cursor);
  607. });
  608. }
  609. /**
  610. * Promote all the delayed jobs.
  611. *
  612. * @param opts - An object with the following properties:
  613. * - count number to limit how many jobs will be moved to wait status per iteration
  614. *
  615. * @returns
  616. */
  617. async promoteJobs(opts = {}) {
  618. await this.trace(SpanKind.INTERNAL, 'promoteJobs', this.name, async (span) => {
  619. span === null || span === void 0 ? void 0 : span.setAttributes({
  620. [TelemetryAttributes.QueueOptions]: JSON.stringify(opts),
  621. });
  622. let cursor = 0;
  623. do {
  624. cursor = await this.scripts.promoteJobs(opts.count);
  625. } while (cursor);
  626. });
  627. }
  628. /**
  629. * Trim the event stream to an approximately maxLength.
  630. *
  631. * @param maxLength -
  632. */
  633. async trimEvents(maxLength) {
  634. return this.trace(SpanKind.INTERNAL, 'trimEvents', this.name, async (span) => {
  635. span === null || span === void 0 ? void 0 : span.setAttributes({
  636. [TelemetryAttributes.QueueEventMaxLength]: maxLength,
  637. });
  638. const client = await this.client;
  639. return await client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength);
  640. });
  641. }
  642. /**
  643. * Delete old priority helper key.
  644. */
  645. async removeDeprecatedPriorityKey() {
  646. const client = await this.client;
  647. return client.del(this.toKey('priority'));
  648. }
  649. /**
  650. * Removes orphaned job keys that exist in Redis but are not referenced
  651. * in any queue state set.
  652. *
  653. * Orphaned keys can occur in rare cases when the removal-by-max-age logic
  654. * removes sorted-set entries without fully cleaning up the corresponding
  655. * job hash data (a regression introduced in v5.66.6 via #3694).
  656. * Under normal operation this method is
  657. * **not needed** — it is provided only as a one-time migration helper for
  658. * users who were affected by that specific bug and want to reclaim the
  659. * leaked memory.
  660. *
  661. * The method uses a Lua script so that every check-and-delete cycle is
  662. * atomic (per SCAN iteration). State keys are derived dynamically from
  663. * the queue's key map and their Redis TYPE is checked at runtime, so newly
  664. * introduced states are picked up automatically.
  665. *
  666. * @param count - Approximate number of keys to SCAN per iteration (default 1000).
  667. * @param limit - Maximum number of orphaned jobs to remove (0 = unlimited).
  668. * When set, the method returns as soon as the limit is reached.
  669. * Users with a very large number of orphans can call this method
  670. * in a loop: `while (await queue.removeOrphanedJobs(1000, 10000)) {}`
  671. * @returns The total number of orphaned jobs that were removed.
  672. */
  673. async removeOrphanedJobs(count = 1000, limit = 0) {
  674. const client = await this.client;
  675. // Derive infrastructure suffixes dynamically from the queue key map
  676. // so any future keys are automatically excluded without code changes.
  677. const knownSuffixes = new Set(Object.keys(this.keys));
  678. // State key suffixes (excluding '') — passed to the Lua script which
  679. // uses TYPE to decide whether a key is a list / zset / set and picks
  680. // the right membership command automatically.
  681. const stateKeySuffixes = Object.keys(this.keys).filter(s => s !== '');
  682. // Known job sub-key suffixes (cleaned up during deletion).
  683. const jobSubKeySuffixes = [
  684. 'logs',
  685. 'dependencies',
  686. 'processed',
  687. 'failed',
  688. 'unsuccessful',
  689. 'lock',
  690. ];
  691. const basePrefix = this.qualifiedName + ':';
  692. const scanPattern = basePrefix + '*';
  693. let totalRemoved = 0;
  694. let cursor = '0';
  695. do {
  696. const [nextCursor, keys] = await client.scan(cursor, 'MATCH', scanPattern, 'COUNT', count);
  697. cursor = nextCursor;
  698. // Extract unique potential job IDs from this batch.
  699. const candidateJobIds = new Set();
  700. for (const key of keys) {
  701. const suffix = key.slice(basePrefix.length);
  702. // Skip infrastructure keys (derived from this.keys).
  703. if (knownSuffixes.has(suffix)) {
  704. continue;
  705. }
  706. // Skip sub-keys of infrastructure prefixes (e.g. repeat:xxx, de:xxx).
  707. const colonIdx = suffix.indexOf(':');
  708. if (colonIdx !== -1) {
  709. const prefixPart = suffix.slice(0, colonIdx);
  710. if (knownSuffixes.has(prefixPart)) {
  711. continue;
  712. }
  713. }
  714. // Extract the job ID portion (before first colon, or the whole suffix).
  715. const jobId = colonIdx === -1 ? suffix : suffix.slice(0, colonIdx);
  716. // For sub-keys, only consider known job sub-key suffixes.
  717. if (colonIdx !== -1) {
  718. const subKey = suffix.slice(colonIdx + 1);
  719. if (!jobSubKeySuffixes.includes(subKey)) {
  720. continue;
  721. }
  722. }
  723. candidateJobIds.add(jobId);
  724. }
  725. if (candidateJobIds.size === 0) {
  726. continue;
  727. }
  728. // Run the Lua script atomically for this batch of candidates.
  729. const result = await this.scripts.removeOrphanedJobs([...candidateJobIds], stateKeySuffixes, jobSubKeySuffixes);
  730. totalRemoved += result || 0;
  731. if (limit > 0 && totalRemoved >= limit) {
  732. break;
  733. }
  734. } while (cursor !== '0');
  735. return totalRemoved;
  736. }
  737. }
  738. //# sourceMappingURL=queue.js.map