lock-manager.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.LockManager = void 0;
  4. const abort_controller_1 = require("./abort-controller");
  5. const enums_1 = require("../enums");
  6. /**
  7. * Manages lock renewal for BullMQ workers.
  8. * It periodically extends locks for active jobs to prevent them from being
  9. * considered stalled by other workers.
  10. */
  11. class LockManager {
  12. constructor(worker, opts) {
  13. this.worker = worker;
  14. this.opts = opts;
  15. // Maps job ids with their tokens, timestamps, and abort controllers
  16. this.trackedJobs = new Map();
  17. this.closed = false;
  18. }
  19. /**
  20. * Starts the lock manager timers for lock renewal.
  21. */
  22. start() {
  23. if (this.closed) {
  24. return;
  25. }
  26. // Start lock renewal timer if not disabled
  27. if (this.opts.lockRenewTime > 0) {
  28. this.startLockExtenderTimer();
  29. }
  30. }
  31. async extendLocks(jobIds) {
  32. await this.worker.trace(enums_1.SpanKind.INTERNAL, 'extendLocks', this.worker.name, async (span) => {
  33. span === null || span === void 0 ? void 0 : span.setAttributes({
  34. [enums_1.TelemetryAttributes.WorkerId]: this.opts.workerId,
  35. [enums_1.TelemetryAttributes.WorkerName]: this.opts.workerName,
  36. [enums_1.TelemetryAttributes.WorkerJobsToExtendLocks]: jobIds,
  37. });
  38. try {
  39. const jobTokens = jobIds.map(id => { var _a; return ((_a = this.trackedJobs.get(id)) === null || _a === void 0 ? void 0 : _a.token) || ''; });
  40. const erroredJobIds = await this.worker.extendJobLocks(jobIds, jobTokens, this.opts.lockDuration);
  41. if (erroredJobIds.length > 0) {
  42. this.worker.emit('lockRenewalFailed', erroredJobIds);
  43. for (const jobId of erroredJobIds) {
  44. this.worker.emit('error', new Error(`could not renew lock for job ${jobId}`));
  45. }
  46. }
  47. const succeededJobIds = jobIds.filter(id => !erroredJobIds.includes(id));
  48. if (succeededJobIds.length > 0) {
  49. this.worker.emit('locksRenewed', {
  50. count: succeededJobIds.length,
  51. jobIds: succeededJobIds,
  52. });
  53. }
  54. }
  55. catch (err) {
  56. this.worker.emit('error', err);
  57. }
  58. });
  59. }
  60. startLockExtenderTimer() {
  61. clearTimeout(this.lockRenewalTimer);
  62. if (!this.closed) {
  63. this.lockRenewalTimer = setTimeout(async () => {
  64. // Get all the jobs whose locks expire in less than 1/2 of the lockRenewTime
  65. const now = Date.now();
  66. const jobsToExtend = [];
  67. for (const jobId of this.trackedJobs.keys()) {
  68. const tracked = this.trackedJobs.get(jobId);
  69. const { ts, token, abortController } = tracked;
  70. if (!ts) {
  71. this.trackedJobs.set(jobId, { token, ts: now, abortController });
  72. continue;
  73. }
  74. if (ts + this.opts.lockRenewTime / 2 < now) {
  75. this.trackedJobs.set(jobId, { token, ts: now, abortController });
  76. jobsToExtend.push(jobId);
  77. }
  78. }
  79. if (jobsToExtend.length) {
  80. await this.extendLocks(jobsToExtend);
  81. }
  82. this.startLockExtenderTimer();
  83. }, this.opts.lockRenewTime / 2);
  84. }
  85. }
  86. /**
  87. * Stops the lock manager and clears all timers.
  88. */
  89. async close() {
  90. if (this.closed) {
  91. return;
  92. }
  93. this.closed = true;
  94. if (this.lockRenewalTimer) {
  95. clearTimeout(this.lockRenewalTimer);
  96. this.lockRenewalTimer = undefined;
  97. }
  98. this.trackedJobs.clear();
  99. }
  100. /**
  101. * Adds a job to be tracked for lock renewal.
  102. * Returns an AbortController if shouldCreateController is true, undefined otherwise.
  103. */
  104. trackJob(jobId, token, ts, shouldCreateController = false) {
  105. const abortController = shouldCreateController
  106. ? new abort_controller_1.AbortController()
  107. : undefined;
  108. if (!this.closed && jobId) {
  109. this.trackedJobs.set(jobId, { token, ts, abortController });
  110. }
  111. return abortController;
  112. }
  113. /**
  114. * Removes a job from lock renewal tracking.
  115. */
  116. untrackJob(jobId) {
  117. this.trackedJobs.delete(jobId);
  118. }
  119. /**
  120. * Gets the number of jobs currently being tracked.
  121. */
  122. getActiveJobCount() {
  123. return this.trackedJobs.size;
  124. }
  125. /**
  126. * Checks if the lock manager is running.
  127. */
  128. isRunning() {
  129. return !this.closed && this.lockRenewalTimer !== undefined;
  130. }
  131. /**
  132. * Cancels a specific job by aborting its signal.
  133. * @param jobId - The ID of the job to cancel
  134. * @param reason - Optional reason for the cancellation
  135. * @returns true if the job was found and cancelled, false otherwise
  136. */
  137. cancelJob(jobId, reason) {
  138. const tracked = this.trackedJobs.get(jobId);
  139. if (tracked === null || tracked === void 0 ? void 0 : tracked.abortController) {
  140. tracked.abortController.abort(reason);
  141. return true;
  142. }
  143. return false;
  144. }
  145. /**
  146. * Cancels all tracked jobs by aborting their signals.
  147. * @param reason - Optional reason for the cancellation
  148. */
  149. cancelAllJobs(reason) {
  150. for (const tracked of this.trackedJobs.values()) {
  151. if (tracked.abortController) {
  152. tracked.abortController.abort(reason);
  153. }
  154. }
  155. }
  156. /**
  157. * Gets a list of all tracked job IDs.
  158. * @returns Array of job IDs currently being tracked
  159. */
  160. getTrackedJobIds() {
  161. return Array.from(this.trackedJobs.keys());
  162. }
  163. }
  164. exports.LockManager = LockManager;
  165. //# sourceMappingURL=lock-manager.js.map