queue-base.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import { EventEmitter } from 'events';
  2. import { delay, DELAY_TIME_5, isNotConnectionError, isRedisInstance, trace, } from '../utils';
  3. import { createScripts } from '../utils/create-scripts';
  4. import { RedisConnection } from './redis-connection';
  5. import { Job } from './job';
  6. import { QueueKeys } from './queue-keys';
  7. /**
  8. * Base class for all classes that need to interact with queues.
  9. * This class is normally not used directly, but extended by the other classes.
  10. *
  11. */
  12. export class QueueBase extends EventEmitter {
  13. /**
  14. *
  15. * @param name - The name of the queue.
  16. * @param opts - Options for the queue.
  17. * @param Connection - An optional "Connection" class used to instantiate a Connection. This is useful for
  18. * testing with mockups and/or extending the Connection class and passing an alternate implementation.
  19. */
  20. constructor(name, opts = { connection: {} }, Connection = RedisConnection, hasBlockingConnection = false) {
  21. super();
  22. this.name = name;
  23. this.opts = opts;
  24. this.closed = false;
  25. this.hasBlockingConnection = false;
  26. this.hasBlockingConnection = hasBlockingConnection;
  27. this.opts = Object.assign({ prefix: 'bull' }, opts);
  28. if (!name) {
  29. throw new Error('Queue name must be provided');
  30. }
  31. if (name.includes(':')) {
  32. throw new Error('Queue name cannot contain :');
  33. }
  34. this.connection = new Connection(opts.connection, {
  35. shared: isRedisInstance(opts.connection),
  36. blocking: hasBlockingConnection,
  37. skipVersionCheck: opts.skipVersionCheck,
  38. skipWaitingForReady: opts.skipWaitingForReady,
  39. });
  40. this.connection.on('error', (error) => this.emit('error', error));
  41. this.connection.on('close', () => {
  42. if (!this.closing) {
  43. this.emit('ioredis:close');
  44. }
  45. });
  46. const queueKeys = new QueueKeys(opts.prefix);
  47. this.qualifiedName = queueKeys.getQueueQualifiedName(name);
  48. this.keys = queueKeys.getKeys(name);
  49. this.toKey = (type) => queueKeys.toKey(name, type);
  50. this.createScripts();
  51. }
  52. /**
  53. * Returns a promise that resolves to a redis client. Normally used only by subclasses.
  54. */
  55. get client() {
  56. return this.connection.client;
  57. }
  58. createScripts() {
  59. this.scripts = createScripts(this);
  60. }
  61. /**
  62. * Returns the version of the Redis instance the client is connected to,
  63. */
  64. get redisVersion() {
  65. return this.connection.redisVersion;
  66. }
  67. /**
  68. * Returns the database type of the Redis instance the client is connected to,
  69. */
  70. get databaseType() {
  71. return this.connection.databaseType;
  72. }
  73. /**
  74. * Helper to easily extend Job class calls.
  75. */
  76. get Job() {
  77. return Job;
  78. }
  79. /**
  80. * Emits an event. Normally used by subclasses to emit events.
  81. *
  82. * @param event - The emitted event.
  83. * @param args -
  84. * @returns
  85. */
  86. emit(event, ...args) {
  87. try {
  88. return super.emit(event, ...args);
  89. }
  90. catch (err) {
  91. try {
  92. return super.emit('error', err);
  93. }
  94. catch (err) {
  95. // We give up if the error event also throws an exception.
  96. console.error(err);
  97. return false;
  98. }
  99. }
  100. }
  101. waitUntilReady() {
  102. return this.client;
  103. }
  104. base64Name() {
  105. return Buffer.from(this.name).toString('base64');
  106. }
  107. clientName(suffix = '') {
  108. const queueNameBase64 = this.base64Name();
  109. return `${this.opts.prefix}:${queueNameBase64}${suffix}`;
  110. }
  111. /**
  112. *
  113. * Closes the connection and returns a promise that resolves when the connection is closed.
  114. */
  115. async close() {
  116. if (!this.closing) {
  117. this.closing = this.connection.close();
  118. }
  119. await this.closing;
  120. this.closed = true;
  121. }
  122. /**
  123. *
  124. * Force disconnects a connection.
  125. */
  126. disconnect() {
  127. return this.connection.disconnect();
  128. }
  129. async checkConnectionError(fn, delayInMs = DELAY_TIME_5) {
  130. try {
  131. return await fn();
  132. }
  133. catch (error) {
  134. if (isNotConnectionError(error)) {
  135. this.emit('error', error);
  136. }
  137. if (!this.closing && delayInMs) {
  138. await delay(delayInMs);
  139. }
  140. else {
  141. return;
  142. }
  143. }
  144. }
  145. /**
  146. * Wraps the code with telemetry and provides a span for configuration.
  147. *
  148. * @param spanKind - kind of the span: Producer, Consumer, Internal
  149. * @param operation - operation name (such as add, process, etc)
  150. * @param destination - destination name (normally the queue name)
  151. * @param callback - code to wrap with telemetry
  152. * @param srcPropagationMetadata -
  153. * @returns
  154. */
  155. trace(spanKind, operation, destination, callback, srcPropagationMetadata) {
  156. return trace(this.opts.telemetry, spanKind, this.name, operation, destination, callback, srcPropagationMetadata);
  157. }
  158. }
  159. //# sourceMappingURL=queue-base.js.map