lock-manager.js 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import { AbortController } from './abort-controller';
  2. import { SpanKind, TelemetryAttributes } from '../enums';
  3. /**
  4. * Manages lock renewal for BullMQ workers.
  5. * It periodically extends locks for active jobs to prevent them from being
  6. * considered stalled by other workers.
  7. */
  8. export class LockManager {
  9. constructor(worker, opts) {
  10. this.worker = worker;
  11. this.opts = opts;
  12. // Maps job ids with their tokens, timestamps, and abort controllers
  13. this.trackedJobs = new Map();
  14. this.closed = false;
  15. }
  16. /**
  17. * Starts the lock manager timers for lock renewal.
  18. */
  19. start() {
  20. if (this.closed) {
  21. return;
  22. }
  23. // Start lock renewal timer if not disabled
  24. if (this.opts.lockRenewTime > 0) {
  25. this.startLockExtenderTimer();
  26. }
  27. }
  28. async extendLocks(jobIds) {
  29. await this.worker.trace(SpanKind.INTERNAL, 'extendLocks', this.worker.name, async (span) => {
  30. span === null || span === void 0 ? void 0 : span.setAttributes({
  31. [TelemetryAttributes.WorkerId]: this.opts.workerId,
  32. [TelemetryAttributes.WorkerName]: this.opts.workerName,
  33. [TelemetryAttributes.WorkerJobsToExtendLocks]: jobIds,
  34. });
  35. try {
  36. const jobTokens = jobIds.map(id => { var _a; return ((_a = this.trackedJobs.get(id)) === null || _a === void 0 ? void 0 : _a.token) || ''; });
  37. const erroredJobIds = await this.worker.extendJobLocks(jobIds, jobTokens, this.opts.lockDuration);
  38. if (erroredJobIds.length > 0) {
  39. this.worker.emit('lockRenewalFailed', erroredJobIds);
  40. for (const jobId of erroredJobIds) {
  41. this.worker.emit('error', new Error(`could not renew lock for job ${jobId}`));
  42. }
  43. }
  44. const succeededJobIds = jobIds.filter(id => !erroredJobIds.includes(id));
  45. if (succeededJobIds.length > 0) {
  46. this.worker.emit('locksRenewed', {
  47. count: succeededJobIds.length,
  48. jobIds: succeededJobIds,
  49. });
  50. }
  51. }
  52. catch (err) {
  53. this.worker.emit('error', err);
  54. }
  55. });
  56. }
  57. startLockExtenderTimer() {
  58. clearTimeout(this.lockRenewalTimer);
  59. if (!this.closed) {
  60. this.lockRenewalTimer = setTimeout(async () => {
  61. // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime
  62. const now = Date.now();
  63. const jobsToExtend = [];
  64. for (const jobId of this.trackedJobs.keys()) {
  65. const tracked = this.trackedJobs.get(jobId);
  66. const { ts, token, abortController } = tracked;
  67. if (!ts) {
  68. this.trackedJobs.set(jobId, { token, ts: now, abortController });
  69. continue;
  70. }
  71. if (ts + this.opts.lockRenewTime / 2 < now) {
  72. this.trackedJobs.set(jobId, { token, ts: now, abortController });
  73. jobsToExtend.push(jobId);
  74. }
  75. }
  76. if (jobsToExtend.length) {
  77. await this.extendLocks(jobsToExtend);
  78. }
  79. this.startLockExtenderTimer();
  80. }, this.opts.lockRenewTime / 2);
  81. }
  82. }
  83. /**
  84. * Stops the lock manager and clears all timers.
  85. */
  86. async close() {
  87. if (this.closed) {
  88. return;
  89. }
  90. this.closed = true;
  91. if (this.lockRenewalTimer) {
  92. clearTimeout(this.lockRenewalTimer);
  93. this.lockRenewalTimer = undefined;
  94. }
  95. this.trackedJobs.clear();
  96. }
  97. /**
  98. * Adds a job to be tracked for lock renewal.
  99. * Returns an AbortController if shouldCreateController is true, undefined otherwise.
  100. */
  101. trackJob(jobId, token, ts, shouldCreateController = false) {
  102. const abortController = shouldCreateController
  103. ? new AbortController()
  104. : undefined;
  105. if (!this.closed && jobId) {
  106. this.trackedJobs.set(jobId, { token, ts, abortController });
  107. }
  108. return abortController;
  109. }
  110. /**
  111. * Removes a job from lock renewal tracking.
  112. */
  113. untrackJob(jobId) {
  114. this.trackedJobs.delete(jobId);
  115. }
  116. /**
  117. * Gets the number of jobs currently being tracked.
  118. */
  119. getActiveJobCount() {
  120. return this.trackedJobs.size;
  121. }
  122. /**
  123. * Checks if the lock manager is running.
  124. */
  125. isRunning() {
  126. return !this.closed && this.lockRenewalTimer !== undefined;
  127. }
  128. /**
  129. * Cancels a specific job by aborting its signal.
  130. * @param jobId - The ID of the job to cancel
  131. * @param reason - Optional reason for the cancellation
  132. * @returns true if the job was found and cancelled, false otherwise
  133. */
  134. cancelJob(jobId, reason) {
  135. const tracked = this.trackedJobs.get(jobId);
  136. if (tracked === null || tracked === void 0 ? void 0 : tracked.abortController) {
  137. tracked.abortController.abort(reason);
  138. return true;
  139. }
  140. return false;
  141. }
  142. /**
  143. * Cancels all tracked jobs by aborting their signals.
  144. * @param reason - Optional reason for the cancellation
  145. */
  146. cancelAllJobs(reason) {
  147. for (const tracked of this.trackedJobs.values()) {
  148. if (tracked.abortController) {
  149. tracked.abortController.abort(reason);
  150. }
  151. }
  152. }
  153. /**
  154. * Gets a list of all tracked job IDs.
  155. * @returns Array of job IDs currently being tracked
  156. */
  157. getTrackedJobIds() {
  158. return Array.from(this.trackedJobs.keys());
  159. }
  160. }
  161. //# sourceMappingURL=lock-manager.js.map