worker.d.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. import { URL } from 'url';
  2. import { AbortController } from './abort-controller';
  3. import { GetNextJobOptions, IoredisListener, JobJsonRaw, RedisClient, Span, WorkerOptions } from '../interfaces';
  4. import { JobProgress } from '../types';
  5. import { Processor } from '../types/processor';
  6. import { QueueBase } from './queue-base';
  7. import { Repeat } from './repeat';
  8. import { Job } from './job';
  9. import { RedisConnection } from './redis-connection';
  10. import { JobScheduler } from './job-scheduler';
  11. import { LockManager } from './lock-manager';
  12. export interface WorkerListener<DataType = any, ResultType = any, NameType extends string = string> extends IoredisListener {
  13. /**
  14. * Listen to 'active' event.
  15. *
  16. * This event is triggered when a job enters the 'active' state.
  17. */
  18. active: (job: Job<DataType, ResultType, NameType>, prev: string) => void;
  19. /**
  20. * Listen to 'closed' event.
  21. *
  22. * This event is triggered when the worker is closed.
  23. */
  24. closed: () => void;
  25. /**
  26. * Listen to 'closing' event.
  27. *
  28. * This event is triggered when the worker is closing.
  29. */
  30. closing: (msg: string) => void;
  31. /**
  32. * Listen to 'completed' event.
  33. *
  34. * This event is triggered when a job has successfully completed.
  35. */
  36. completed: (job: Job<DataType, ResultType, NameType>, result: ResultType, prev: string) => void;
  37. /**
  38. * Listen to 'drained' event.
  39. *
  40. * This event is triggered when the queue has drained the waiting list.
  41. * Note that there could still be delayed jobs waiting their timers to expire
  42. * and this event will still be triggered as long as the waiting list has emptied.
  43. */
  44. drained: () => void;
  45. /**
  46. * Listen to 'error' event.
  47. *
  48. * This event is triggered when an error is throw.
  49. */
  50. error: (failedReason: Error) => void;
  51. /**
  52. * Listen to 'failed' event.
  53. *
  54. * This event is triggered when a job has thrown an exception.
  55. * Note: job parameter could be received as undefined when an stalled job
  56. * reaches the stalled limit and it is deleted by the removeOnFail option.
  57. */
  58. failed: (job: Job<DataType, ResultType, NameType> | undefined, error: Error, prev: string) => void;
  59. /**
  60. * Listen to 'paused' event.
  61. *
  62. * This event is triggered when the queue is paused.
  63. */
  64. paused: () => void;
  65. /**
  66. * Listen to 'progress' event.
  67. *
  68. * This event is triggered when a job updates it progress, i.e. the
  69. * Job##updateProgress() method is called. This is useful to notify
  70. * progress or any other data from within a processor to the rest of the
  71. * world.
  72. */
  73. progress: (job: Job<DataType, ResultType, NameType>, progress: JobProgress) => void;
  74. /**
  75. * Listen to 'ready' event.
  76. *
  77. * This event is triggered when blockingConnection is ready.
  78. */
  79. ready: () => void;
  80. /**
  81. * Listen to 'resumed' event.
  82. *
  83. * This event is triggered when the queue is resumed.
  84. */
  85. resumed: () => void;
  86. /**
  87. * Listen to 'stalled' event.
  88. *
  89. * This event is triggered when a job has stalled and
  90. * has been moved back to the wait list.
  91. */
  92. stalled: (jobId: string, prev: string) => void;
  93. /**
  94. * Listen to 'lockRenewalFailed' event.
  95. *
  96. * This event is triggered when lock renewal fails for one or more jobs.
  97. */
  98. lockRenewalFailed: (jobIds: string[]) => void;
  99. /**
  100. * Listen to 'locksRenewed' event.
  101. *
  102. * This event is triggered when locks are successfully renewed.
  103. */
  104. locksRenewed: (data: {
  105. count: number;
  106. jobIds: string[];
  107. }) => void;
  108. }
  109. /**
  110. *
  111. * This class represents a worker that is able to process jobs from the queue.
  112. * As soon as the class is instantiated and a connection to Redis is established
  113. * it will start processing jobs.
  114. *
  115. */
  116. export declare class Worker<DataType = any, ResultType = any, NameType extends string = string> extends QueueBase {
  117. readonly opts: WorkerOptions;
  118. readonly id: string;
  119. private abortDelayController;
  120. private blockingConnection;
  121. private blockUntil;
  122. private _concurrency;
  123. private childPool;
  124. private drained;
  125. private limitUntil;
  126. protected lockManager: LockManager;
  127. private processorAcceptsSignal;
  128. private stalledCheckerRunning;
  129. private stalledCheckStopper?;
  130. private waiting;
  131. private _repeat;
  132. protected _jobScheduler: JobScheduler;
  133. protected paused: boolean;
  134. protected processFn: Processor<DataType, ResultType, NameType>;
  135. protected running: boolean;
  136. protected mainLoopRunning: Promise<void> | null;
  137. static RateLimitError(): Error;
  138. constructor(name: string, processor?: string | URL | null | Processor<DataType, ResultType, NameType>, opts?: WorkerOptions, Connection?: typeof RedisConnection);
  139. /**
  140. * Creates and configures the lock manager for processing jobs.
  141. * This method can be overridden in subclasses to customize lock manager behavior.
  142. */
  143. protected createLockManager(): void;
  144. /**
  145. * Creates and configures the sandbox for processing jobs.
  146. * This method can be overridden in subclasses to customize sandbox behavior.
  147. *
  148. * @param processor - The processor file path, URL, or function to be sandboxed
  149. */
  150. protected createSandbox(processor: string | URL | null | Processor<DataType, ResultType, NameType>): void;
  151. /**
  152. * Public accessor method for LockManager to extend locks.
  153. * This delegates to the protected scripts object.
  154. */
  155. extendJobLocks(jobIds: string[], tokens: string[], duration: number): Promise<string[]>;
  156. emit<U extends keyof WorkerListener<DataType, ResultType, NameType>>(event: U, ...args: Parameters<WorkerListener<DataType, ResultType, NameType>[U]>): boolean;
  157. off<U extends keyof WorkerListener<DataType, ResultType, NameType>>(eventName: U, listener: WorkerListener<DataType, ResultType, NameType>[U]): this;
  158. on<U extends keyof WorkerListener<DataType, ResultType, NameType>>(event: U, listener: WorkerListener<DataType, ResultType, NameType>[U]): this;
  159. once<U extends keyof WorkerListener<DataType, ResultType, NameType>>(event: U, listener: WorkerListener<DataType, ResultType, NameType>[U]): this;
  160. protected callProcessJob(job: Job<DataType, ResultType, NameType>, token: string, signal?: AbortSignal): Promise<ResultType>;
  161. protected createJob(data: JobJsonRaw, jobId: string): Job<DataType, ResultType, NameType>;
  162. /**
  163. *
  164. * Waits until the worker is ready to start processing jobs.
  165. * In general only useful when writing tests.
  166. *
  167. */
  168. waitUntilReady(): Promise<RedisClient>;
  169. /**
  170. * Cancels a specific job currently being processed by this worker.
  171. * The job's processor function will receive an abort signal.
  172. *
  173. * @param jobId - The ID of the job to cancel
  174. * @param reason - Optional reason for the cancellation
  175. * @returns true if the job was found and cancelled, false otherwise
  176. */
  177. cancelJob(jobId: string, reason?: string): boolean;
  178. /**
  179. * Cancels all jobs currently being processed by this worker.
  180. * All active job processor functions will receive abort signals.
  181. *
  182. * @param reason - Optional reason for the cancellation
  183. */
  184. cancelAllJobs(reason?: string): void;
  185. set concurrency(concurrency: number);
  186. get concurrency(): number;
  187. get repeat(): Promise<Repeat>;
  188. get jobScheduler(): Promise<JobScheduler>;
  189. run(): Promise<void>;
  190. private waitForRateLimit;
  191. /**
  192. * This is the main loop in BullMQ. Its goals are to fetch jobs from the queue
  193. * as efficiently as possible, providing concurrency and minimal unnecessary calls
  194. * to Redis.
  195. */
  196. private mainLoop;
  197. /**
  198. * Returns a promise that resolves to the next job in queue.
  199. * @param token - worker token to be assigned to retrieved job
  200. * @returns a Job or undefined if no job was available in the queue.
  201. */
  202. getNextJob(token: string, { block }?: GetNextJobOptions): Promise<Job<DataType, ResultType, NameType>>;
  203. private _getNextJob;
  204. /**
  205. * Overrides the rate limit to be active for the next jobs.
  206. * @deprecated This method is deprecated and will be removed in v6. Use queue.rateLimit method instead.
  207. * @param expireTimeMs - expire time in ms of this rate limit.
  208. */
  209. rateLimit(expireTimeMs: number): Promise<void>;
  210. get minimumBlockTimeout(): number;
  211. private isRateLimited;
  212. protected moveToActive(client: RedisClient, token: string, name?: string): Promise<Job<DataType, ResultType, NameType>>;
  213. private waitForJob;
  214. protected getBlockTimeout(blockUntil: number): number;
  215. protected getRateLimitDelay(delay: number): number;
  216. /**
  217. *
  218. * This function is exposed only for testing purposes.
  219. */
  220. delay(milliseconds?: number, abortController?: AbortController): Promise<void>;
  221. private updateDelays;
  222. protected nextJobFromJobData(jobData?: JobJsonRaw, jobId?: string, token?: string): Promise<Job<DataType, ResultType, NameType>>;
  223. processJob(job: Job<DataType, ResultType, NameType>, token: string, fetchNextCallback?: () => boolean): Promise<void | Job<DataType, ResultType, NameType>>;
  224. private getUnrecoverableErrorMessage;
  225. protected handleCompleted(result: ResultType, job: Job<DataType, ResultType, NameType>, token: string, fetchNextCallback?: () => boolean, span?: Span): Promise<Job<DataType, ResultType, NameType>>;
  226. protected handleFailed(err: Error, job: Job<DataType, ResultType, NameType>, token: string, fetchNextCallback?: () => boolean, span?: Span): Promise<Job<DataType, ResultType, NameType>>;
  227. /**
  228. *
  229. * Pauses the processing of this queue only for this worker.
  230. */
  231. pause(doNotWaitActive?: boolean): Promise<void>;
  232. /**
  233. *
  234. * Resumes processing of this worker (if paused).
  235. */
  236. resume(): void;
  237. /**
  238. *
  239. * Checks if worker is paused.
  240. *
  241. * @returns true if worker is paused, false otherwise.
  242. */
  243. isPaused(): boolean;
  244. /**
  245. *
  246. * Checks if worker is currently running.
  247. *
  248. * @returns true if worker is running, false otherwise.
  249. */
  250. isRunning(): boolean;
  251. /**
  252. *
  253. * Closes the worker and related redis connections.
  254. *
  255. * This method waits for current jobs to finalize before returning.
  256. *
  257. * @param force - Use force boolean parameter if you do not want to wait for
  258. * current jobs to be processed. When using telemetry, be mindful that it can
  259. * interfere with the proper closure of spans, potentially preventing them from being exported.
  260. *
  261. * @returns Promise that resolves when the worker has been closed.
  262. */
  263. close(force?: boolean): Promise<void>;
  264. /**
  265. *
  266. * Manually starts the stalled checker.
  267. * The check will run once as soon as this method is called, and
  268. * then every opts.stalledInterval milliseconds until the worker is closed.
  269. * Note: Normally you do not need to call this method, since the stalled checker
  270. * is automatically started when the worker starts processing jobs after
  271. * calling run. However if you want to process the jobs manually you need
  272. * to call this method to start the stalled checker.
  273. *
  274. * @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs}
  275. */
  276. startStalledCheckTimer(): Promise<void>;
  277. private stalledChecker;
  278. /**
  279. * Returns a promise that resolves when active jobs are cleared
  280. *
  281. * @returns
  282. */
  283. private whenCurrentJobsFinished;
  284. private retryIfFailed;
  285. private moveStalledJobsToWait;
  286. private moveLimitedBackToWait;
  287. }