queue.d.ts 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. import { BaseJobOptions, BulkJobOptions, IoredisListener, JobSchedulerJson, QueueOptions, RepeatableJob, RepeatOptions } from '../interfaces';
  2. import { FinishedStatus, JobsOptions, JobSchedulerTemplateOptions, JobProgress } from '../types';
  3. import { Job } from './job';
  4. import { QueueGetters } from './queue-getters';
  5. import { Repeat } from './repeat';
  6. import { RedisConnection } from './redis-connection';
  7. import { JobScheduler } from './job-scheduler';
  8. export interface ObliterateOpts {
  9. /**
  10. * Use force = true to force obliteration even with active jobs in the queue
  11. * @defaultValue false
  12. */
  13. force?: boolean;
  14. /**
  15. * Use count with the maximum number of deleted keys per iteration
  16. * @defaultValue 1000
  17. */
  18. count?: number;
  19. }
  20. export interface QueueListener<JobBase extends Job = Job> extends IoredisListener {
  21. /**
  22. * Listen to 'cleaned' event.
  23. *
  24. * This event is triggered when the queue calls clean method.
  25. */
  26. cleaned: (jobs: string[], type: string) => void;
  27. /**
  28. * Listen to 'error' event.
  29. *
  30. * This event is triggered when an error is thrown.
  31. */
  32. error: (err: Error) => void;
  33. /**
  34. * Listen to 'paused' event.
  35. *
  36. * This event is triggered when the queue is paused.
  37. */
  38. paused: () => void;
  39. /**
  40. * Listen to 'progress' event.
  41. *
  42. * This event is triggered when the job updates its progress.
  43. */
  44. progress: (jobId: string, progress: JobProgress) => void;
  45. /**
  46. * Listen to 'removed' event.
  47. *
  48. * This event is triggered when a job is removed.
  49. */
  50. removed: (jobId: string) => void;
  51. /**
  52. * Listen to 'resumed' event.
  53. *
  54. * This event is triggered when the queue is resumed.
  55. */
  56. resumed: () => void;
  57. /**
  58. * Listen to 'waiting' event.
  59. *
  60. * This event is triggered when the queue creates a new job.
  61. */
  62. waiting: (job: JobBase) => void;
  63. }
  64. /**
  65. * IsAny<T> A type helper to determine if a given type `T` is `any`.
  66. * This works by using `any` type with the intersection
  67. * operator (`&`). If `T` is `any`, then `1 & T` resolves to `any`, and since `0`
  68. * is assignable to `any`, the conditional type returns `true`.
  69. */
  70. type IsAny<T> = 0 extends 1 & T ? true : false;
  71. type JobBase<T, ResultType, NameType extends string> = IsAny<T> extends true ? Job<T, ResultType, NameType> : T extends Job<any, any, any> ? T : Job<T, ResultType, NameType>;
  72. type ExtractDataType<DataTypeOrJob, Default> = DataTypeOrJob extends Job<infer D, any, any> ? D : Default;
  73. type ExtractResultType<DataTypeOrJob, Default> = DataTypeOrJob extends Job<any, infer R, any> ? R : Default;
  74. type ExtractNameType<DataTypeOrJob, Default extends string> = DataTypeOrJob extends Job<any, any, infer N> ? N : Default;
  75. /**
  76. * Queue
  77. *
  78. * This class provides methods to add jobs to a queue and some other high-level
  79. * administration such as pausing or deleting queues.
  80. *
  81. * @typeParam DataType - The type of the data that the job will process.
  82. * @typeParam ResultType - The type of the result of the job.
  83. * @typeParam NameType - The type of the name of the job.
  84. *
  85. * @example
  86. *
  87. * ```typescript
  88. * import { Queue } from 'bullmq';
  89. *
  90. * interface MyDataType {
  91. * foo: string;
  92. * }
  93. *
  94. * interface MyResultType {
  95. * bar: string;
  96. * }
  97. *
  98. * const queue = new Queue<MyDataType, MyResultType, "blue" | "brown">('myQueue');
  99. * ```
  100. */
  101. export declare class Queue<DataTypeOrJob = any, DefaultResultType = any, DefaultNameType extends string = string, DataType = ExtractDataType<DataTypeOrJob, DataTypeOrJob>, ResultType = ExtractResultType<DataTypeOrJob, DefaultResultType>, NameType extends string = ExtractNameType<DataTypeOrJob, DefaultNameType>> extends QueueGetters<JobBase<DataTypeOrJob, ResultType, NameType>> {
  102. token: string;
  103. jobsOpts: BaseJobOptions;
  104. opts: QueueOptions;
  105. protected libName: string;
  106. protected _repeat?: Repeat;
  107. protected _jobScheduler?: JobScheduler;
  108. constructor(name: string, opts?: QueueOptions, Connection?: typeof RedisConnection);
  109. emit<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(event: U, ...args: Parameters<QueueListener<JobBase<DataType, ResultType, NameType>>[U]>): boolean;
  110. off<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(eventName: U, listener: QueueListener<JobBase<DataType, ResultType, NameType>>[U]): this;
  111. on<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(event: U, listener: QueueListener<JobBase<DataType, ResultType, NameType>>[U]): this;
  112. once<U extends keyof QueueListener<JobBase<DataType, ResultType, NameType>>>(event: U, listener: QueueListener<JobBase<DataType, ResultType, NameType>>[U]): this;
  113. /**
  114. * Returns this instance current default job options.
  115. */
  116. get defaultJobOptions(): JobsOptions;
  117. get metaValues(): Record<string, string | number>;
  118. /**
  119. * Get library version.
  120. *
  121. * @returns the content of the meta.library field.
  122. */
  123. getVersion(): Promise<string>;
  124. get repeat(): Promise<Repeat>;
  125. get jobScheduler(): Promise<JobScheduler>;
  126. /**
  127. * Enable and set global concurrency value.
  128. * @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
  129. * For instance, setting this value to 1 ensures that no more than one job
  130. * is processed at any given time. If this limit is not defined, there will be no
  131. * restriction on the number of concurrent jobs.
  132. */
  133. setGlobalConcurrency(concurrency: number): Promise<number>;
  134. /**
  135. * Enable and set rate limit.
  136. * @param max - Max number of jobs to process in the time period specified in `duration`
  137. * @param duration - Time in milliseconds. During this time, a maximum of `max` jobs will be processed.
  138. */
  139. setGlobalRateLimit(max: number, duration: number): Promise<number>;
  140. /**
  141. * Remove global concurrency value.
  142. */
  143. removeGlobalConcurrency(): Promise<number>;
  144. /**
  145. * Remove global rate limit values.
  146. */
  147. removeGlobalRateLimit(): Promise<number>;
  148. /**
  149. * Adds a new job to the queue.
  150. *
  151. * @param name - Name of the job to be added to the queue.
  152. * @param data - Arbitrary data to append to the job.
  153. * @param opts - Job options that affects how the job is going to be processed.
  154. */
  155. add(name: NameType, data: DataType, opts?: JobsOptions): Promise<Job<DataType, ResultType, NameType>>;
  156. /**
  157. * addJob is a telemetry free version of the add method, useful in order to wrap it
  158. * with custom telemetry on subclasses.
  159. *
  160. * @param name - Name of the job to be added to the queue.
  161. * @param data - Arbitrary data to append to the job.
  162. * @param opts - Job options that affects how the job is going to be processed.
  163. *
  164. * @returns Job
  165. */
  166. protected addJob(name: NameType, data: DataType, opts?: JobsOptions): Promise<Job<DataType, ResultType, NameType>>;
  167. /**
  168. * Adds an array of jobs to the queue. This method may be faster than adding
  169. * one job at a time in a sequence.
  170. *
  171. * @param jobs - The array of jobs to add to the queue. Each job is defined by 3
  172. * properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
  173. */
  174. addBulk(jobs: {
  175. name: NameType;
  176. data: DataType;
  177. opts?: BulkJobOptions;
  178. }[]): Promise<Job<DataType, ResultType, NameType>[]>;
  179. /**
  180. * Upserts a scheduler.
  181. *
  182. * A scheduler is a job factory that creates jobs at a given interval.
  183. * Upserting a scheduler will create a new job scheduler or update an existing one.
  184. * It will also create the first job based on the repeat options and delayed accordingly.
  185. *
  186. * @param key - Unique key for the repeatable job meta.
  187. * @param repeatOpts - Repeat options
  188. * @param jobTemplate - Job template. If provided it will be used for all the jobs
  189. * created by the scheduler.
  190. *
  191. * @returns The next job to be scheduled (would normally be in delayed state).
  192. */
  193. upsertJobScheduler(jobSchedulerId: NameType, repeatOpts: Omit<RepeatOptions, 'key'>, jobTemplate?: {
  194. name?: NameType;
  195. data?: DataType;
  196. opts?: JobSchedulerTemplateOptions;
  197. }): Promise<Job<DataType, ResultType, NameType>>;
  198. /**
  199. * Pauses the processing of this queue globally.
  200. *
  201. * We use an atomic RENAME operation on the wait queue. Since
  202. * we have blocking calls with BRPOPLPUSH on the wait queue, as long as the queue
  203. * is renamed to 'paused', no new jobs will be processed (the current ones
  204. * will run until finalized).
  205. *
  206. * Adding jobs requires a LUA script to check first if the paused list exist
  207. * and in that case it will add it there instead of the wait list.
  208. */
  209. pause(): Promise<void>;
  210. /**
  211. * Close the queue instance.
  212. *
  213. */
  214. close(): Promise<void>;
  215. /**
  216. * Overrides the rate limit to be active for the next jobs.
  217. *
  218. * @param expireTimeMs - expire time in ms of this rate limit.
  219. */
  220. rateLimit(expireTimeMs: number): Promise<void>;
  221. /**
  222. * Resumes the processing of this queue globally.
  223. *
  224. * The method reverses the pause operation by resuming the processing of the
  225. * queue.
  226. */
  227. resume(): Promise<void>;
  228. /**
  229. * Returns true if the queue is currently paused.
  230. */
  231. isPaused(): Promise<boolean>;
  232. /**
  233. * Returns true if the queue is currently maxed.
  234. */
  235. isMaxed(): Promise<boolean>;
  236. /**
  237. * Get all repeatable meta jobs.
  238. *
  239. * @deprecated This method is deprecated and will be removed in v6. Use getJobSchedulers instead.
  240. *
  241. * @param start - Offset of first job to return.
  242. * @param end - Offset of last job to return.
  243. * @param asc - Determine the order in which jobs are returned based on their
  244. * next execution time.
  245. */
  246. getRepeatableJobs(start?: number, end?: number, asc?: boolean): Promise<RepeatableJob[]>;
  247. /**
  248. * Get Job Scheduler by id
  249. *
  250. * @param id - identifier of scheduler.
  251. */
  252. getJobScheduler(id: string): Promise<JobSchedulerJson<DataType> | undefined>;
  253. /**
  254. * Get all Job Schedulers
  255. *
  256. * @param start - Offset of first scheduler to return.
  257. * @param end - Offset of last scheduler to return.
  258. * @param asc - Determine the order in which schedulers are returned based on their
  259. * next execution time.
  260. */
  261. getJobSchedulers(start?: number, end?: number, asc?: boolean): Promise<JobSchedulerJson<DataType>[]>;
  262. /**
  263. *
  264. * Get the number of job schedulers.
  265. *
  266. * @returns The number of job schedulers.
  267. */
  268. getJobSchedulersCount(): Promise<number>;
  269. /**
  270. * Removes a repeatable job.
  271. *
  272. * Note: you need to use the exact same repeatOpts when deleting a repeatable job
  273. * than when adding it.
  274. *
  275. * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead.
  276. *
  277. * @see removeRepeatableByKey
  278. *
  279. * @param name - Job name
  280. * @param repeatOpts - Repeat options
  281. * @param jobId - Job id to remove. If not provided, all jobs with the same repeatOpts
  282. * @returns
  283. */
  284. removeRepeatable(name: NameType, repeatOpts: RepeatOptions, jobId?: string): Promise<boolean>;
  285. /**
  286. *
  287. * Removes a job scheduler.
  288. *
  289. * @param jobSchedulerId - identifier of the job scheduler.
  290. *
  291. * @returns
  292. */
  293. removeJobScheduler(jobSchedulerId: string): Promise<boolean>;
  294. /**
  295. * Removes a debounce key.
  296. * @deprecated use removeDeduplicationKey
  297. *
  298. * @param id - debounce identifier
  299. */
  300. removeDebounceKey(id: string): Promise<number>;
  301. /**
  302. * Removes a deduplication key.
  303. *
  304. * @param id - identifier
  305. */
  306. removeDeduplicationKey(id: string): Promise<number>;
  307. /**
  308. * Removes rate limit key.
  309. */
  310. removeRateLimitKey(): Promise<number>;
  311. /**
  312. * Removes a repeatable job by its key. Note that the key is the one used
  313. * to store the repeatable job metadata and not one of the job iterations
  314. * themselves. You can use "getRepeatableJobs" in order to get the keys.
  315. *
  316. * @see getRepeatableJobs
  317. *
  318. * @deprecated This method is deprecated and will be removed in v6. Use removeJobScheduler instead.
  319. *
  320. * @param repeatJobKey - To the repeatable job.
  321. * @returns
  322. */
  323. removeRepeatableByKey(key: string): Promise<boolean>;
  324. /**
  325. * Removes the given job from the queue as well as all its
  326. * dependencies.
  327. *
  328. * @param jobId - The id of the job to remove
  329. * @param opts - Options to remove a job
  330. * @returns 1 if it managed to remove the job or 0 if the job or
  331. * any of its dependencies were locked.
  332. */
  333. remove(jobId: string, { removeChildren }?: {
  334. removeChildren?: boolean;
  335. }): Promise<number>;
  336. /**
  337. * Updates the given job's progress.
  338. *
  339. * @param jobId - The id of the job to update
  340. * @param progress - Number or object to be saved as progress.
  341. */
  342. updateJobProgress(jobId: string, progress: JobProgress): Promise<void>;
  343. /**
  344. * Logs one row of job's log data.
  345. *
  346. * @param jobId - The job id to log against.
  347. * @param logRow - String with log data to be logged.
  348. * @param keepLogs - Max number of log entries to keep (0 for unlimited).
  349. *
  350. * @returns The total number of log entries for this job so far.
  351. */
  352. addJobLog(jobId: string, logRow: string, keepLogs?: number): Promise<number>;
  353. /**
  354. * Drains the queue, i.e., removes all jobs that are waiting
  355. * or delayed, but not active, completed or failed.
  356. *
  357. * @param delayed - Pass true if it should also clean the
  358. * delayed jobs.
  359. */
  360. drain(delayed?: boolean): Promise<void>;
  361. /**
  362. * Cleans jobs from a queue. Similar to drain but keeps jobs within a certain
  363. * grace period.
  364. *
  365. * @param grace - The grace period in milliseconds
  366. * @param limit - Max number of jobs to clean
  367. * @param type - The type of job to clean
  368. * Possible values are completed, wait, active, paused, delayed, failed. Defaults to completed.
  369. * @returns Id jobs from the deleted records
  370. */
  371. clean(grace: number, limit: number, type?: 'completed' | 'wait' | 'waiting' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed'): Promise<string[]>;
  372. /**
  373. * Completely destroys the queue and all of its contents irreversibly.
  374. * This method will *pause* the queue and requires that there are no
  375. * active jobs. It is possible to bypass this requirement, i.e. not
  376. * having active jobs using the "force" option.
  377. *
  378. * Note: This operation requires to iterate on all the jobs stored in the queue
  379. * and can be slow for very large queues.
  380. *
  381. * @param opts - Obliterate options.
  382. */
  383. obliterate(opts?: ObliterateOpts): Promise<void>;
  384. /**
  385. * Retry all the failed or completed jobs.
  386. *
  387. * @param opts - An object with the following properties:
  388. * - count number to limit how many jobs will be moved to wait status per iteration,
  389. * - state failed by default or completed.
  390. * - timestamp from which timestamp to start moving jobs to wait status, default Date.now().
  391. *
  392. * @returns
  393. */
  394. retryJobs(opts?: {
  395. count?: number;
  396. state?: FinishedStatus;
  397. timestamp?: number;
  398. }): Promise<void>;
  399. /**
  400. * Promote all the delayed jobs.
  401. *
  402. * @param opts - An object with the following properties:
  403. * - count number to limit how many jobs will be moved to wait status per iteration
  404. *
  405. * @returns
  406. */
  407. promoteJobs(opts?: {
  408. count?: number;
  409. }): Promise<void>;
  410. /**
  411. * Trim the event stream to an approximately maxLength.
  412. *
  413. * @param maxLength -
  414. */
  415. trimEvents(maxLength: number): Promise<number>;
  416. /**
  417. * Delete old priority helper key.
  418. */
  419. removeDeprecatedPriorityKey(): Promise<number>;
  420. /**
  421. * Removes orphaned job keys that exist in Redis but are not referenced
  422. * in any queue state set.
  423. *
  424. * Orphaned keys can occur in rare cases when the removal-by-max-age logic
  425. * removes sorted-set entries without fully cleaning up the corresponding
  426. * job hash data (a regression introduced in v5.66.6 via #3694).
  427. * Under normal operation this method is
  428. * **not needed** — it is provided only as a one-time migration helper for
  429. * users who were affected by that specific bug and want to reclaim the
  430. * leaked memory.
  431. *
  432. * The method uses a Lua script so that every check-and-delete cycle is
  433. * atomic (per SCAN iteration). State keys are derived dynamically from
  434. * the queue's key map and their Redis TYPE is checked at runtime, so newly
  435. * introduced states are picked up automatically.
  436. *
  437. * @param count - Approximate number of keys to SCAN per iteration (default 1000).
  438. * @param limit - Maximum number of orphaned jobs to remove (0 = unlimited).
  439. * When set, the method returns as soon as the limit is reached.
  440. * Users with a very large number of orphans can call this method
  441. * in a loop: `while (await queue.removeOrphanedJobs(1000, 10000)) {}`
  442. * @returns The total number of orphaned jobs that were removed.
  443. */
  444. removeOrphanedJobs(count?: number, limit?: number): Promise<number>;
  445. }
  446. export {};