queue-events.js 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.QueueEvents = void 0;
  4. const tslib_1 = require("tslib");
  5. const utils_1 = require("../utils");
  6. const queue_base_1 = require("./queue-base");
  7. /**
  8. * The QueueEvents class is used for listening to the global events
  9. * emitted by a given queue.
  10. *
  11. * This class requires a dedicated redis connection.
  12. *
  13. */
  14. class QueueEvents extends queue_base_1.QueueBase {
  15. constructor(name, _a = {
  16. connection: {},
  17. }, Connection) {
  18. var _b;
  19. var { connection, autorun = true } = _a, opts = tslib_1.__rest(_a, ["connection", "autorun"]);
  20. super(name, Object.assign(Object.assign({}, opts), { connection: (0, utils_1.isRedisInstance)(connection)
  21. ? connection.isCluster
  22. ? connection.duplicate(undefined, {
  23. redisOptions: (_b = connection.options) === null || _b === void 0 ? void 0 : _b.redisOptions,
  24. })
  25. : connection.duplicate()
  26. : connection }), Connection, true);
  27. this.running = false;
  28. this.blocking = false;
  29. this.opts = Object.assign({
  30. blockingTimeout: 10000,
  31. }, this.opts);
  32. if (autorun) {
  33. this.run().catch(error => this.emit('error', error));
  34. }
  35. }
  36. emit(event, ...args) {
  37. return super.emit(event, ...args);
  38. }
  39. off(eventName, listener) {
  40. super.off(eventName, listener);
  41. return this;
  42. }
  43. on(event, listener) {
  44. super.on(event, listener);
  45. return this;
  46. }
  47. once(event, listener) {
  48. super.once(event, listener);
  49. return this;
  50. }
  51. /**
  52. * Manually starts running the event consumming loop. This shall be used if you do not
  53. * use the default "autorun" option on the constructor.
  54. */
  55. async run() {
  56. if (!this.running) {
  57. try {
  58. this.running = true;
  59. const client = await this.client;
  60. // TODO: Planed for deprecation as it has no really a use case
  61. try {
  62. await client.client('SETNAME', this.clientName(utils_1.QUEUE_EVENT_SUFFIX));
  63. }
  64. catch (err) {
  65. if (!utils_1.clientCommandMessageReg.test(err.message)) {
  66. throw err;
  67. }
  68. }
  69. await this.consumeEvents(client);
  70. }
  71. catch (error) {
  72. this.running = false;
  73. throw error;
  74. }
  75. }
  76. else {
  77. throw new Error('Queue Events is already running.');
  78. }
  79. }
  80. async consumeEvents(client) {
  81. const opts = this.opts;
  82. const key = this.keys.events;
  83. let id = opts.lastEventId || '$';
  84. while (!this.closing) {
  85. this.blocking = true;
  86. // Cast to actual return type, see: https://github.com/DefinitelyTyped/DefinitelyTyped/issues/44301
  87. const data = await this.checkConnectionError(() => client.xread('BLOCK', opts.blockingTimeout, 'STREAMS', key, id));
  88. this.blocking = false;
  89. if (data) {
  90. const stream = data[0];
  91. const events = stream[1];
  92. for (let i = 0; i < events.length; i++) {
  93. id = events[i][0];
  94. const args = (0, utils_1.array2obj)(events[i][1]);
  95. //
  96. // TODO: we may need to have a separate xtream for progress data
  97. // to avoid this hack.
  98. switch (args.event) {
  99. case 'progress':
  100. args.data = JSON.parse(args.data);
  101. break;
  102. case 'completed':
  103. args.returnvalue = JSON.parse(args.returnvalue);
  104. break;
  105. }
  106. const { event } = args, restArgs = tslib_1.__rest(args, ["event"]);
  107. if (event === 'drained') {
  108. this.emit(event, id);
  109. }
  110. else {
  111. this.emit(event, restArgs, id);
  112. if (restArgs.jobId) {
  113. this.emit(`${event}:${restArgs.jobId}`, restArgs, id);
  114. }
  115. }
  116. }
  117. }
  118. }
  119. }
  120. /**
  121. * Stops consuming events and close the underlying Redis connection if necessary.
  122. *
  123. * @returns
  124. */
  125. async close() {
  126. if (!this.closing) {
  127. this.closing = (async () => {
  128. try {
  129. // As the connection has been wrongly markes as "shared" by QueueBase,
  130. // we need to forcibly close it here. We should fix QueueBase to avoid this in the future.
  131. const client = await this.client;
  132. client.disconnect();
  133. await this.connection.close(this.blocking);
  134. }
  135. finally {
  136. this.closed = true;
  137. }
  138. })();
  139. }
  140. return this.closing;
  141. }
  142. }
  143. exports.QueueEvents = QueueEvents;
  144. //# sourceMappingURL=queue-events.js.map