queue.js 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.Queue = void 0;
  4. const uuid_1 = require("uuid");
  5. const job_1 = require("./job");
  6. const queue_getters_1 = require("./queue-getters");
  7. const repeat_1 = require("./repeat");
  8. const enums_1 = require("../enums");
  9. const job_scheduler_1 = require("./job-scheduler");
  10. const version_1 = require("../version");
  11. /**
  12. * Queue
  13. *
  14. * This class provides methods to add jobs to a queue and some other high-level
  15. * administration such as pausing or deleting queues.
  16. *
  17. * @typeParam DataType - The type of the data that the job will process.
  18. * @typeParam ResultType - The type of the result of the job.
  19. * @typeParam NameType - The type of the name of the job.
  20. *
  21. * @example
  22. *
  23. * ```typescript
  24. * import { Queue } from 'bullmq';
  25. *
  26. * interface MyDataType {
  27. * foo: string;
  28. * }
  29. *
  30. * interface MyResultType {
  31. * bar: string;
  32. * }
  33. *
  34. * const queue = new Queue<MyDataType, MyResultType, "blue" | "brown">('myQueue');
  35. * ```
  36. */
  37. class Queue extends queue_getters_1.QueueGetters {
  38. constructor(name, opts, Connection) {
  39. var _a;
  40. super(name, Object.assign({}, opts), Connection);
  41. this.token = (0, uuid_1.v4)();
  42. this.libName = 'bullmq';
  43. this.jobsOpts = (_a = opts === null || opts === void 0 ? void 0 : opts.defaultJobOptions) !== null && _a !== void 0 ? _a : {};
  44. this.waitUntilReady()
  45. .then(client => {
  46. if (!this.closing && !(opts === null || opts === void 0 ? void 0 : opts.skipMetasUpdate)) {
  47. return client.hmset(this.keys.meta, this.metaValues);
  48. }
  49. })
  50. .catch(err => {
  51. // We ignore this error to avoid warnings. The error can still
  52. // be received by listening to event 'error'
  53. });
  54. }
  55. emit(event, ...args) {
  56. return super.emit(event, ...args);
  57. }
  58. off(eventName, listener) {
  59. super.off(eventName, listener);
  60. return this;
  61. }
  62. on(event, listener) {
  63. super.on(event, listener);
  64. return this;
  65. }
  66. once(event, listener) {
  67. super.once(event, listener);
  68. return this;
  69. }
  70. /**
  71. * Returns this instance current default job options.
  72. */
  73. get defaultJobOptions() {
  74. return Object.assign({}, this.jobsOpts);
  75. }
  76. get metaValues() {
  77. var _a, _b, _c, _d;
  78. return {
  79. 'opts.maxLenEvents': (_d = (_c = (_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.streams) === null || _b === void 0 ? void 0 : _b.events) === null || _c === void 0 ? void 0 : _c.maxLen) !== null && _d !== void 0 ? _d : 10000,
  80. version: `${this.libName}:${version_1.version}`,
  81. };
  82. }
  83. /**
  84. * Get library version.
  85. *
  86. * @returns the content of the meta.library field.
  87. */
  88. async getVersion() {
  89. const client = await this.client;
  90. return await client.hget(this.keys.meta, 'version');
  91. }
  92. get repeat() {
  93. return new Promise(async (resolve) => {
  94. if (!this._repeat) {
  95. this._repeat = new repeat_1.Repeat(this.name, Object.assign(Object.assign({}, this.opts), { connection: await this.client }));
  96. this._repeat.on('error', this.emit.bind(this, 'error'));
  97. }
  98. resolve(this._repeat);
  99. });
  100. }
  101. get jobScheduler() {
  102. return new Promise(async (resolve) => {
  103. if (!this._jobScheduler) {
  104. this._jobScheduler = new job_scheduler_1.JobScheduler(this.name, Object.assign(Object.assign({}, this.opts), { connection: await this.client }));
  105. this._jobScheduler.on('error', this.emit.bind(this, 'error'));
  106. }
  107. resolve(this._jobScheduler);
  108. });
  109. }
  110. /**
  111. * Enable and set global concurrency value.
  112. * @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
  113. * For instance, setting this value to 1 ensures that no more than one job
  114. * is processed at any given time. If this limit is not defined, there will be no
  115. * restriction on the number of concurrent jobs.
  116. */
  117. async setGlobalConcurrency(concurrency) {
  118. const client = await this.client;
  119. return client.hset(this.keys.meta, 'concurrency', concurrency);
  120. }
  121. /**
  122. * Enable and set rate limit.
  123. * @param max - Max number of jobs to process in the time period specified in `duration`
  124. * @param duration - Time in milliseconds. During this time, a maximum of `max` jobs will be processed.
  125. */
  126. async setGlobalRateLimit(max, duration) {
  127. const client = await this.client;
  128. return client.hset(this.keys.meta, 'max', max, 'duration', duration);
  129. }
  130. /**
  131. * Remove global concurrency value.
  132. */
  133. async removeGlobalConcurrency() {
  134. const client = await this.client;
  135. return client.hdel(this.keys.meta, 'concurrency');
  136. }
  137. /**
  138. * Remove global rate limit values.
  139. */
  140. async removeGlobalRateLimit() {
  141. const client = await this.client;
  142. return client.hdel(this.keys.meta, 'max', 'duration');
  143. }
  144. /**
  145. * Adds a new job to the queue.
  146. *
  147. * @param name - Name of the job to be added to the queue.
  148. * @param data - Arbitrary data to append to the job.
  149. * @param opts - Job options that affects how the job is going to be processed.
  150. */
  151. async add(name, data, opts) {
  152. return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${name}`, async (span, srcPropagationMetadata) => {
  153. var _a;
  154. if (srcPropagationMetadata && !((_a = opts === null || opts === void 0 ? void 0 : opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext)) {
  155. const telemetry = {
  156. metadata: srcPropagationMetadata,
  157. };
  158. opts = Object.assign(Object.assign({}, opts), { telemetry });
  159. }
  160. const job = await this.addJob(name, data, opts);
  161. span === null || span === void 0 ? void 0 : span.setAttributes({
  162. [enums_1.TelemetryAttributes.JobName]: name,
  163. [enums_1.TelemetryAttributes.JobId]: job.id,
  164. });
  165. return job;
  166. });
  167. }
  168. /**
  169. * addJob is a telemetry free version of the add method, useful in order to wrap it
  170. * with custom telemetry on subclasses.
  171. *
  172. * @param name - Name of the job to be added to the queue.
  173. * @param data - Arbitrary data to append to the job.
  174. * @param opts - Job options that affects how the job is going to be processed.
  175. *
  176. * @returns Job
  177. */
  178. async addJob(name, data, opts) {
  179. if (opts && opts.repeat) {
  180. if (opts.repeat.endDate) {
  181. if (+new Date(opts.repeat.endDate) < Date.now()) {
  182. throw new Error('End date must be greater than current timestamp');
  183. }
  184. }
  185. return (await this.repeat).updateRepeatableJob(name, data, Object.assign(Object.assign({}, this.jobsOpts), opts), { override: true });
  186. }
  187. else {
  188. const jobId = opts === null || opts === void 0 ? void 0 : opts.jobId;
  189. if (jobId == '0' || (jobId === null || jobId === void 0 ? void 0 : jobId.startsWith('0:'))) {
  190. throw new Error("JobId cannot be '0' or start with 0:");
  191. }
  192. const mergedOpts = Object.assign(Object.assign(Object.assign({}, this.jobsOpts), opts), { jobId });
  193. const job = await this.Job.create(this, name, data, mergedOpts);
  194. this.emit('waiting', job);
  195. return job;
  196. }
  197. }
  198. /**
  199. * Adds an array of jobs to the queue. This method may be faster than adding
  200. * one job at a time in a sequence.
  201. *
  202. * @param jobs - The array of jobs to add to the queue. Each job is defined by 3
  203. * properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
  204. */
  205. async addBulk(jobs) {
  206. return this.trace(enums_1.SpanKind.PRODUCER, 'addBulk', this.name, async (span, srcPropagationMetadata) => {
  207. if (span) {
  208. span.setAttributes({
  209. [enums_1.TelemetryAttributes.BulkNames]: jobs.map(job => job.name),
  210. [enums_1.TelemetryAttributes.BulkCount]: jobs.length,
  211. });
  212. }
  213. return await this.Job.createBulk(this, jobs.map(job => {
  214. var _a, _b, _c, _d, _e, _f;
  215. let telemetry = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry;
  216. if (srcPropagationMetadata) {
  217. const omitContext = (_c = (_b = job.opts) === null || _b === void 0 ? void 0 : _b.telemetry) === null || _c === void 0 ? void 0 : _c.omitContext;
  218. const telemetryMetadata = ((_e = (_d = job.opts) === null || _d === void 0 ? void 0 : _d.telemetry) === null || _e === void 0 ? void 0 : _e.metadata) ||
  219. (!omitContext && srcPropagationMetadata);
  220. if (telemetryMetadata || omitContext) {
  221. telemetry = {
  222. metadata: telemetryMetadata,
  223. omitContext,
  224. };
  225. }
  226. }
  227. const mergedOpts = Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_f = job.opts) === null || _f === void 0 ? void 0 : _f.jobId, telemetry });
  228. return {
  229. name: job.name,
  230. data: job.data,
  231. opts: mergedOpts,
  232. };
  233. }));
  234. });
  235. }
  236. /**
  237. * Upserts a scheduler.
  238. *
  239. * A scheduler is a job factory that creates jobs at a given interval.
  240. * Upserting a scheduler will create a new job scheduler or update an existing one.
  241. * It will also create the first job based on the repeat options and delayed accordingly.
  242. *
  243. * @param key - Unique key for the repeatable job meta.
  244. * @param repeatOpts - Repeat options
  245. * @param jobTemplate - Job template. If provided it will be used for all the jobs
  246. * created by the scheduler.
  247. *
  248. * @returns The next job to be scheduled (would normally be in delayed state).
  249. */
  250. async upsertJobScheduler(jobSchedulerId, repeatOpts, jobTemplate) {
  251. var _a, _b;
  252. if (repeatOpts.endDate) {
  253. if (+new Date(repeatOpts.endDate) < Date.now()) {
  254. throw new Error('End date must be greater than current timestamp');
  255. }
  256. }
  257. return (await this.jobScheduler).upsertJobScheduler(jobSchedulerId, repeatOpts, (_a = jobTemplate === null || jobTemplate === void 0 ? void 0 : jobTemplate.name) !== null && _a !== void 0 ? _a : jobSchedulerId, (_b = jobTemplate === null || jobTemplate === void 0 ? void 0 : jobTemplate.data) !== null && _b !== void 0 ? _b : {}, Object.assign(Object.assign({}, this.jobsOpts), jobTemplate === null || jobTemplate === void 0 ? void 0 : jobTemplate.opts), { override: true });
  258. }
  259. /**
  260. * Pauses the processing of this queue globally.
  261. *
  262. * We use an atomic RENAME operation on the wait queue. Since
  263. * we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
  264. * is renamed to 'paused', no new jobs will be processed (the current ones
  265. * will run until finalized).
  266. *
  267. * Adding jobs requires a LUA script to check first if the paused list exist
  268. * and in that case it will add it there instead of the wait list.
  269. */
  270. async pause() {
  271. await this.trace(enums_1.SpanKind.INTERNAL, 'pause', this.name, async () => {
  272. await this.scripts.pause(true);
  273. this.emit('paused');
  274. });
  275. }
  276. /**
  277. * Close the queue instance.
  278. *
  279. */
  280. async close() {
  281. await this.trace(enums_1.SpanKind.INTERNAL, 'close', this.name, async () => {
  282. if (!this.closing) {
  283. if (this._repeat) {
  284. await this._repeat.close();
  285. }
  286. }
  287. await super.close();
  288. });
  289. }
  290. /**
  291. * Overrides the rate limit to be active for the next jobs.
  292. *
  293. * @param expireTimeMs - expire time in ms of this rate limit.
  294. */
  295. async rateLimit(expireTimeMs) {
  296. await this.trace(enums_1.SpanKind.INTERNAL, 'rateLimit', this.name, async (span) => {
  297. span === null || span === void 0 ? void 0 : span.setAttributes({
  298. [enums_1.TelemetryAttributes.QueueRateLimit]: expireTimeMs,
  299. });
  300. await this.client.then(client => client.set(this.keys.limiter, Number.MAX_SAFE_INTEGER, 'PX', expireTimeMs));
  301. });
  302. }
  303. /**
  304. * Resumes the processing of this queue globally.
  305. *
  306. * The method reverses the pause operation by resuming the processing of the
  307. * queue.
  308. */
  309. async resume() {
  310. await this.trace(enums_1.SpanKind.INTERNAL, 'resume', this.name, async () => {
  311. await this.scripts.pause(false);
  312. this.emit('resumed');
  313. });
  314. }
  315. /**
  316. * Returns true if the queue is currently paused.
  317. */
  318. async isPaused() {
  319. const client = await this.client;
  320. const pausedKeyExists = await client.hexists(this.keys.meta, 'paused');
  321. return pausedKeyExists === 1;
  322. }
  323. /**
  324. * Returns true if the queue is currently maxed.
  325. */
  326. isMaxed() {
  327. return this.scripts.isMaxed();
  328. }
  329. /**
  330. * Get all repeatable meta jobs.
  331. *
  332. * @deprecated This method is deprecated and will be removed in v6. Use getJobSchedulers instead.
  333. *
  334. * @param start - Offset of first job to return.
  335. * @param end - Offset of last job to return.
  336. * @param asc - Determine the order in which jobs are returned based on their
  337. * next execution time.
  338. */
  339. async getRepeatableJobs(start, end, asc) {
  340. return (await this.repeat).getRepeatableJobs(start, end, asc);
  341. }
  342. /**
  343. * Get Job Scheduler by id
  344. *
  345. * @param id - identifier of scheduler.
  346. */
  347. async getJobScheduler(id) {
  348. return (await this.jobScheduler).getScheduler(id);
  349. }
  350. /**
  351. * Get all Job Schedulers
  352. *
  353. * @param start - Offset of first scheduler to return.
  354. * @param end - Offset of last scheduler to return.
  355. * @param asc - Determine the order in which schedulers are returned based on their
  356. * next execution time.
  357. */
  358. async getJobSchedulers(start, end, asc) {
  359. return (await this.jobScheduler).getJobSchedulers(start, end, asc);
  360. }
  361. /**
  362. *
  363. * Get the number of job schedulers.
  364. *
  365. * @returns The number of job schedulers.
  366. */
  367. async getJobSchedulersCount() {
  368. return (await this.jobScheduler).getSchedulersCount();
  369. }
  370. /**
  371. * Removes a repeatable job.
  372. *
  373. * Note: you need to use the exact same repeatOpts when deleting a repeatable job
  374. * than when adding it.
  375. *
  376. * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead.
  377. *
  378. * @see removeRepeatableByKey
  379. *
  380. * @param name - Job name
  381. * @param repeatOpts - Repeat options
  382. * @param jobId - Job id to remove. If not provided, all jobs with the same repeatOpts
  383. * @returns
  384. */
  385. async removeRepeatable(name, repeatOpts, jobId) {
  386. return this.trace(enums_1.SpanKind.INTERNAL, 'removeRepeatable', `${this.name}.${name}`, async (span) => {
  387. span === null || span === void 0 ? void 0 : span.setAttributes({
  388. [enums_1.TelemetryAttributes.JobName]: name,
  389. [enums_1.TelemetryAttributes.JobId]: jobId,
  390. });
  391. const repeat = await this.repeat;
  392. const removed = await repeat.removeRepeatable(name, repeatOpts, jobId);
  393. return !removed;
  394. });
  395. }
  396. /**
  397. *
  398. * Removes a job scheduler.
  399. *
  400. * @param jobSchedulerId - identifier of the job scheduler.
  401. *
  402. * @returns
  403. */
  404. async removeJobScheduler(jobSchedulerId) {
  405. const jobScheduler = await this.jobScheduler;
  406. const removed = await jobScheduler.removeJobScheduler(jobSchedulerId);
  407. return !removed;
  408. }
  409. /**
  410. * Removes a debounce key.
  411. * @deprecated use removeDeduplicationKey
  412. *
  413. * @param id - debounce identifier
  414. */
  415. async removeDebounceKey(id) {
  416. return this.trace(enums_1.SpanKind.INTERNAL, 'removeDebounceKey', `${this.name}`, async (span) => {
  417. span === null || span === void 0 ? void 0 : span.setAttributes({
  418. [enums_1.TelemetryAttributes.JobKey]: id,
  419. });
  420. const client = await this.client;
  421. return await client.del(`${this.keys.de}:${id}`);
  422. });
  423. }
  424. /**
  425. * Removes a deduplication key.
  426. *
  427. * @param id - identifier
  428. */
  429. async removeDeduplicationKey(id) {
  430. return this.trace(enums_1.SpanKind.INTERNAL, 'removeDeduplicationKey', `${this.name}`, async (span) => {
  431. span === null || span === void 0 ? void 0 : span.setAttributes({
  432. [enums_1.TelemetryAttributes.DeduplicationKey]: id,
  433. });
  434. const client = await this.client;
  435. return client.del(`${this.keys.de}:${id}`);
  436. });
  437. }
  438. /**
  439. * Removes rate limit key.
  440. */
  441. async removeRateLimitKey() {
  442. const client = await this.client;
  443. return client.del(this.keys.limiter);
  444. }
  445. /**
  446. * Removes a repeatable job by its key. Note that the key is the one used
  447. * to store the repeatable job metadata and not one of the job iterations
  448. * themselves. You can use "getRepeatableJobs" in order to get the keys.
  449. *
  450. * @see getRepeatableJobs
  451. *
  452. * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead.
  453. *
  454. * @param repeatJobKey - To the repeatable job.
  455. * @returns
  456. */
  457. async removeRepeatableByKey(key) {
  458. return this.trace(enums_1.SpanKind.INTERNAL, 'removeRepeatableByKey', `${this.name}`, async (span) => {
  459. span === null || span === void 0 ? void 0 : span.setAttributes({
  460. [enums_1.TelemetryAttributes.JobKey]: key,
  461. });
  462. const repeat = await this.repeat;
  463. const removed = await repeat.removeRepeatableByKey(key);
  464. return !removed;
  465. });
  466. }
  467. /**
  468. * Removes the given job from the queue as well as all its
  469. * dependencies.
  470. *
  471. * @param jobId - The id of the job to remove
  472. * @param opts - Options to remove a job
  473. * @returns 1 if it managed to remove the job or 0 if the job or
  474. * any of its dependencies were locked.
  475. */
  476. async remove(jobId, { removeChildren = true } = {}) {
  477. return this.trace(enums_1.SpanKind.INTERNAL, 'remove', this.name, async (span) => {
  478. span === null || span === void 0 ? void 0 : span.setAttributes({
  479. [enums_1.TelemetryAttributes.JobId]: jobId,
  480. [enums_1.TelemetryAttributes.JobOptions]: JSON.stringify({
  481. removeChildren,
  482. }),
  483. });
  484. const code = await this.scripts.remove(jobId, removeChildren);
  485. if (code === 1) {
  486. this.emit('removed', jobId);
  487. }
  488. return code;
  489. });
  490. }
  491. /**
  492. * Updates the given job's progress.
  493. *
  494. * @param jobId - The id of the job to update
  495. * @param progress - Number or object to be saved as progress.
  496. */
  497. async updateJobProgress(jobId, progress) {
  498. await this.trace(enums_1.SpanKind.INTERNAL, 'updateJobProgress', this.name, async (span) => {
  499. span === null || span === void 0 ? void 0 : span.setAttributes({
  500. [enums_1.TelemetryAttributes.JobId]: jobId,
  501. [enums_1.TelemetryAttributes.JobProgress]: JSON.stringify(progress),
  502. });
  503. await this.scripts.updateProgress(jobId, progress);
  504. this.emit('progress', jobId, progress);
  505. });
  506. }
  507. /**
  508. * Logs one row of job's log data.
  509. *
  510. * @param jobId - The job id to log against.
  511. * @param logRow - String with log data to be logged.
  512. * @param keepLogs - Max number of log entries to keep (0 for unlimited).
  513. *
  514. * @returns The total number of log entries for this job so far.
  515. */
  516. async addJobLog(jobId, logRow, keepLogs) {
  517. return job_1.Job.addJobLog(this, jobId, logRow, keepLogs);
  518. }
  519. /**
  520. * Drains the queue, i.e., removes all jobs that are waiting
  521. * or delayed, but not active, completed or failed.
  522. *
  523. * @param delayed - Pass true if it should also clean the
  524. * delayed jobs.
  525. */
  526. async drain(delayed = false) {
  527. await this.trace(enums_1.SpanKind.INTERNAL, 'drain', this.name, async (span) => {
  528. span === null || span === void 0 ? void 0 : span.setAttributes({
  529. [enums_1.TelemetryAttributes.QueueDrainDelay]: delayed,
  530. });
  531. await this.scripts.drain(delayed);
  532. });
  533. }
  534. /**
  535. * Cleans jobs from a queue. Similar to drain but keeps jobs within a certain
  536. * grace period.
  537. *
  538. * @param grace - The grace period in milliseconds
  539. * @param limit - Max number of jobs to clean
  540. * @param type - The type of job to clean
  541. * Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed.
  542. * @returns Id jobs from the deleted records
  543. */
  544. async clean(grace, limit, type = 'completed') {
  545. return this.trace(enums_1.SpanKind.INTERNAL, 'clean', this.name, async (span) => {
  546. const maxCount = limit || Infinity;
  547. const maxCountPerCall = Math.min(10000, maxCount);
  548. const timestamp = Date.now() - grace;
  549. let deletedCount = 0;
  550. const deletedJobsIds = [];
  551. // Normalize 'waiting' to 'wait' for consistency with internal Redis keys
  552. const normalizedType = type === 'waiting' ? 'wait' : type;
  553. while (deletedCount < maxCount) {
  554. const jobsIds = await this.scripts.cleanJobsInSet(normalizedType, timestamp, maxCountPerCall);
  555. this.emit('cleaned', jobsIds, normalizedType);
  556. deletedCount += jobsIds.length;
  557. deletedJobsIds.push(...jobsIds);
  558. if (jobsIds.length < maxCountPerCall) {
  559. break;
  560. }
  561. }
  562. span === null || span === void 0 ? void 0 : span.setAttributes({
  563. [enums_1.TelemetryAttributes.QueueGrace]: grace,
  564. [enums_1.TelemetryAttributes.JobType]: type,
  565. [enums_1.TelemetryAttributes.QueueCleanLimit]: maxCount,
  566. [enums_1.TelemetryAttributes.JobIds]: deletedJobsIds,
  567. });
  568. return deletedJobsIds;
  569. });
  570. }
  571. /**
  572. * Completely destroys the queue and all of its contents irreversibly.
  573. * This method will *pause* the queue and requires that there are no
  574. * active jobs. It is possible to bypass this requirement, i.e. not
  575. * having active jobs using the "force" option.
  576. *
  577. * Note: This operation requires to iterate on all the jobs stored in the queue
  578. * and can be slow for very large queues.
  579. *
  580. * @param opts - Obliterate options.
  581. */
  582. async obliterate(opts) {
  583. await this.trace(enums_1.SpanKind.INTERNAL, 'obliterate', this.name, async () => {
  584. await this.pause();
  585. let cursor = 0;
  586. do {
  587. cursor = await this.scripts.obliterate(Object.assign({ force: false, count: 1000 }, opts));
  588. } while (cursor);
  589. });
  590. }
  591. /**
  592. * Retry all the failed or completed jobs.
  593. *
  594. * @param opts - An object with the following properties:
  595. * - count number to limit how many jobs will be moved to wait status per iteration,
  596. * - state failed by default or completed.
  597. * - timestamp from which timestamp to start moving jobs to wait status, default Date.now().
  598. *
  599. * @returns
  600. */
  601. async retryJobs(opts = {}) {
  602. await this.trace(enums_1.SpanKind.PRODUCER, 'retryJobs', this.name, async (span) => {
  603. span === null || span === void 0 ? void 0 : span.setAttributes({
  604. [enums_1.TelemetryAttributes.QueueOptions]: JSON.stringify(opts),
  605. });
  606. let cursor = 0;
  607. do {
  608. cursor = await this.scripts.retryJobs(opts.state, opts.count, opts.timestamp);
  609. } while (cursor);
  610. });
  611. }
  612. /**
  613. * Promote all the delayed jobs.
  614. *
  615. * @param opts - An object with the following properties:
  616. * - count number to limit how many jobs will be moved to wait status per iteration
  617. *
  618. * @returns
  619. */
  620. async promoteJobs(opts = {}) {
  621. await this.trace(enums_1.SpanKind.INTERNAL, 'promoteJobs', this.name, async (span) => {
  622. span === null || span === void 0 ? void 0 : span.setAttributes({
  623. [enums_1.TelemetryAttributes.QueueOptions]: JSON.stringify(opts),
  624. });
  625. let cursor = 0;
  626. do {
  627. cursor = await this.scripts.promoteJobs(opts.count);
  628. } while (cursor);
  629. });
  630. }
  631. /**
  632. * Trim the event stream to an approximately maxLength.
  633. *
  634. * @param maxLength -
  635. */
  636. async trimEvents(maxLength) {
  637. return this.trace(enums_1.SpanKind.INTERNAL, 'trimEvents', this.name, async (span) => {
  638. span === null || span === void 0 ? void 0 : span.setAttributes({
  639. [enums_1.TelemetryAttributes.QueueEventMaxLength]: maxLength,
  640. });
  641. const client = await this.client;
  642. return await client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength);
  643. });
  644. }
  645. /**
  646. * Delete old priority helper key.
  647. */
  648. async removeDeprecatedPriorityKey() {
  649. const client = await this.client;
  650. return client.del(this.toKey('priority'));
  651. }
  652. /**
  653. * Removes orphaned job keys that exist in Redis but are not referenced
  654. * in any queue state set.
  655. *
  656. * Orphaned keys can occur in rare cases when the removal-by-max-age logic
  657. * removes sorted-set entries without fully cleaning up the corresponding
  658. * job hash data (a regression introduced in v5.66.6 via #3694).
  659. * Under normal operation this method is
  660. * **not needed** — it is provided only as a one-time migration helper for
  661. * users who were affected by that specific bug and want to reclaim the
  662. * leaked memory.
  663. *
  664. * The method uses a Lua script so that every check-and-delete cycle is
  665. * atomic (per SCAN iteration). State keys are derived dynamically from
  666. * the queue's key map and their Redis TYPE is checked at runtime, so newly
  667. * introduced states are picked up automatically.
  668. *
  669. * @param count - Approximate number of keys to SCAN per iteration (default 1000).
  670. * @param limit - Maximum number of orphaned jobs to remove (0 = unlimited).
  671. * When set, the method returns as soon as the limit is reached.
  672. * Users with a very large number of orphans can call this method
  673. * in a loop: `while (await queue.removeOrphanedJobs(1000, 10000)) {}`
  674. * @returns The total number of orphaned jobs that were removed.
  675. */
  676. async removeOrphanedJobs(count = 1000, limit = 0) {
  677. const client = await this.client;
  678. // Derive infrastructure suffixes dynamically from the queue key map
  679. // so any future keys are automatically excluded without code changes.
  680. const knownSuffixes = new Set(Object.keys(this.keys));
  681. // State key suffixes (excluding '') — passed to the Lua script which
  682. // uses TYPE to decide whether a key is a list / zset / set and picks
  683. // the right membership command automatically.
  684. const stateKeySuffixes = Object.keys(this.keys).filter(s => s !== '');
  685. // Known job sub-key suffixes (cleaned up during deletion).
  686. const jobSubKeySuffixes = [
  687. 'logs',
  688. 'dependencies',
  689. 'processed',
  690. 'failed',
  691. 'unsuccessful',
  692. 'lock',
  693. ];
  694. const basePrefix = this.qualifiedName + ':';
  695. const scanPattern = basePrefix + '*';
  696. let totalRemoved = 0;
  697. let cursor = '0';
  698. do {
  699. const [nextCursor, keys] = await client.scan(cursor, 'MATCH', scanPattern, 'COUNT', count);
  700. cursor = nextCursor;
  701. // Extract unique potential job IDs from this batch.
  702. const candidateJobIds = new Set();
  703. for (const key of keys) {
  704. const suffix = key.slice(basePrefix.length);
  705. // Skip infrastructure keys (derived from this.keys).
  706. if (knownSuffixes.has(suffix)) {
  707. continue;
  708. }
  709. // Skip sub-keys of infrastructure prefixes (e.g. repeat:xxx, de:xxx).
  710. const colonIdx = suffix.indexOf(':');
  711. if (colonIdx !== -1) {
  712. const prefixPart = suffix.slice(0, colonIdx);
  713. if (knownSuffixes.has(prefixPart)) {
  714. continue;
  715. }
  716. }
  717. // Extract the job ID portion (before first colon, or the whole suffix).
  718. const jobId = colonIdx === -1 ? suffix : suffix.slice(0, colonIdx);
  719. // For sub-keys, only consider known job sub-key suffixes.
  720. if (colonIdx !== -1) {
  721. const subKey = suffix.slice(colonIdx + 1);
  722. if (!jobSubKeySuffixes.includes(subKey)) {
  723. continue;
  724. }
  725. }
  726. candidateJobIds.add(jobId);
  727. }
  728. if (candidateJobIds.size === 0) {
  729. continue;
  730. }
  731. // Run the Lua script atomically for this batch of candidates.
  732. const result = await this.scripts.removeOrphanedJobs([...candidateJobIds], stateKeySuffixes, jobSubKeySuffixes);
  733. totalRemoved += result || 0;
  734. if (limit > 0 && totalRemoved >= limit) {
  735. break;
  736. }
  737. } while (cursor !== '0');
  738. return totalRemoved;
  739. }
  740. }
  741. exports.Queue = Queue;
  742. //# sourceMappingURL=queue.js.map