queue-base.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.QueueBase = void 0;
  4. const events_1 = require("events");
  5. const utils_1 = require("../utils");
  6. const create_scripts_1 = require("../utils/create-scripts");
  7. const redis_connection_1 = require("./redis-connection");
  8. const job_1 = require("./job");
  9. const queue_keys_1 = require("./queue-keys");
  10. /**
  11. * Base class for all classes that need to interact with queues.
  12. * This class is normally not used directly, but extended by the other classes.
  13. *
  14. */
  15. class QueueBase extends events_1.EventEmitter {
  16. /**
  17. *
  18. * @param name - The name of the queue.
  19. * @param opts - Options for the queue.
  20. * @param Connection - An optional "Connection" class used to instantiate a Connection. This is useful for
  21. * testing with mockups and/or extending the Connection class and passing an alternate implementation.
  22. */
  23. constructor(name, opts = { connection: {} }, Connection = redis_connection_1.RedisConnection, hasBlockingConnection = false) {
  24. super();
  25. this.name = name;
  26. this.opts = opts;
  27. this.closed = false;
  28. this.hasBlockingConnection = false;
  29. this.hasBlockingConnection = hasBlockingConnection;
  30. this.opts = Object.assign({ prefix: 'bull' }, opts);
  31. if (!name) {
  32. throw new Error('Queue name must be provided');
  33. }
  34. if (name.includes(':')) {
  35. throw new Error('Queue name cannot contain :');
  36. }
  37. this.connection = new Connection(opts.connection, {
  38. shared: (0, utils_1.isRedisInstance)(opts.connection),
  39. blocking: hasBlockingConnection,
  40. skipVersionCheck: opts.skipVersionCheck,
  41. skipWaitingForReady: opts.skipWaitingForReady,
  42. });
  43. this.connection.on('error', (error) => this.emit('error', error));
  44. this.connection.on('close', () => {
  45. if (!this.closing) {
  46. this.emit('ioredis:close');
  47. }
  48. });
  49. const queueKeys = new queue_keys_1.QueueKeys(opts.prefix);
  50. this.qualifiedName = queueKeys.getQueueQualifiedName(name);
  51. this.keys = queueKeys.getKeys(name);
  52. this.toKey = (type) => queueKeys.toKey(name, type);
  53. this.createScripts();
  54. }
  55. /**
  56. * Returns a promise that resolves to a redis client. Normally used only by subclasses.
  57. */
  58. get client() {
  59. return this.connection.client;
  60. }
  61. createScripts() {
  62. this.scripts = (0, create_scripts_1.createScripts)(this);
  63. }
  64. /**
  65. * Returns the version of the Redis instance the client is connected to,
  66. */
  67. get redisVersion() {
  68. return this.connection.redisVersion;
  69. }
  70. /**
  71. * Returns the database type of the Redis instance the client is connected to,
  72. */
  73. get databaseType() {
  74. return this.connection.databaseType;
  75. }
  76. /**
  77. * Helper to easily extend Job class calls.
  78. */
  79. get Job() {
  80. return job_1.Job;
  81. }
  82. /**
  83. * Emits an event. Normally used by subclasses to emit events.
  84. *
  85. * @param event - The emitted event.
  86. * @param args -
  87. * @returns
  88. */
  89. emit(event, ...args) {
  90. try {
  91. return super.emit(event, ...args);
  92. }
  93. catch (err) {
  94. try {
  95. return super.emit('error', err);
  96. }
  97. catch (err) {
  98. // We give up if the error event also throws an exception.
  99. console.error(err);
  100. return false;
  101. }
  102. }
  103. }
  104. waitUntilReady() {
  105. return this.client;
  106. }
  107. base64Name() {
  108. return Buffer.from(this.name).toString('base64');
  109. }
  110. clientName(suffix = '') {
  111. const queueNameBase64 = this.base64Name();
  112. return `${this.opts.prefix}:${queueNameBase64}${suffix}`;
  113. }
  114. /**
  115. *
  116. * Closes the connection and returns a promise that resolves when the connection is closed.
  117. */
  118. async close() {
  119. if (!this.closing) {
  120. this.closing = this.connection.close();
  121. }
  122. await this.closing;
  123. this.closed = true;
  124. }
  125. /**
  126. *
  127. * Force disconnects a connection.
  128. */
  129. disconnect() {
  130. return this.connection.disconnect();
  131. }
  132. async checkConnectionError(fn, delayInMs = utils_1.DELAY_TIME_5) {
  133. try {
  134. return await fn();
  135. }
  136. catch (error) {
  137. if ((0, utils_1.isNotConnectionError)(error)) {
  138. this.emit('error', error);
  139. }
  140. if (!this.closing && delayInMs) {
  141. await (0, utils_1.delay)(delayInMs);
  142. }
  143. else {
  144. return;
  145. }
  146. }
  147. }
  148. /**
  149. * Wraps the code with telemetry and provides a span for configuration.
  150. *
  151. * @param spanKind - kind of the span: Producer, Consumer, Internal
  152. * @param operation - operation name (such as add, process, etc)
  153. * @param destination - destination name (normally the queue name)
  154. * @param callback - code to wrap with telemetry
  155. * @param srcPropagationMetadata -
  156. * @returns
  157. */
  158. trace(spanKind, operation, destination, callback, srcPropagationMetadata) {
  159. return (0, utils_1.trace)(this.opts.telemetry, spanKind, this.name, operation, destination, callback, srcPropagationMetadata);
  160. }
  161. }
  162. exports.QueueBase = QueueBase;
  163. //# sourceMappingURL=queue-base.js.map