queue-events.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import { __rest } from "tslib";
  2. import { array2obj, clientCommandMessageReg, isRedisInstance, QUEUE_EVENT_SUFFIX, } from '../utils';
  3. import { QueueBase } from './queue-base';
  4. /**
  5. * The QueueEvents class is used for listening to the global events
  6. * emitted by a given queue.
  7. *
  8. * This class requires a dedicated redis connection.
  9. *
  10. */
  11. export class QueueEvents extends QueueBase {
  12. constructor(name, _a = {
  13. connection: {},
  14. }, Connection) {
  15. var _b;
  16. var { connection, autorun = true } = _a, opts = __rest(_a, ["connection", "autorun"]);
  17. super(name, Object.assign(Object.assign({}, opts), { connection: isRedisInstance(connection)
  18. ? connection.isCluster
  19. ? connection.duplicate(undefined, {
  20. redisOptions: (_b = connection.options) === null || _b === void 0 ? void 0 : _b.redisOptions,
  21. })
  22. : connection.duplicate()
  23. : connection }), Connection, true);
  24. this.running = false;
  25. this.blocking = false;
  26. this.opts = Object.assign({
  27. blockingTimeout: 10000,
  28. }, this.opts);
  29. if (autorun) {
  30. this.run().catch(error => this.emit('error', error));
  31. }
  32. }
  33. emit(event, ...args) {
  34. return super.emit(event, ...args);
  35. }
  36. off(eventName, listener) {
  37. super.off(eventName, listener);
  38. return this;
  39. }
  40. on(event, listener) {
  41. super.on(event, listener);
  42. return this;
  43. }
  44. once(event, listener) {
  45. super.once(event, listener);
  46. return this;
  47. }
  48. /**
  49. * Manually starts running the event consumming loop. This shall be used if you do not
  50. * use the default "autorun" option on the constructor.
  51. */
  52. async run() {
  53. if (!this.running) {
  54. try {
  55. this.running = true;
  56. const client = await this.client;
  57. // TODO: Planed for deprecation as it has no really a use case
  58. try {
  59. await client.client('SETNAME', this.clientName(QUEUE_EVENT_SUFFIX));
  60. }
  61. catch (err) {
  62. if (!clientCommandMessageReg.test(err.message)) {
  63. throw err;
  64. }
  65. }
  66. await this.consumeEvents(client);
  67. }
  68. catch (error) {
  69. this.running = false;
  70. throw error;
  71. }
  72. }
  73. else {
  74. throw new Error('Queue Events is already running.');
  75. }
  76. }
  77. async consumeEvents(client) {
  78. const opts = this.opts;
  79. const key = this.keys.events;
  80. let id = opts.lastEventId || '$';
  81. while (!this.closing) {
  82. this.blocking = true;
  83. // Cast to actual return type, see: https://github.com/DefinitelyTyped/DefinitelyTyped/issues/44301
  84. const data = await this.checkConnectionError(() => client.xread('BLOCK', opts.blockingTimeout, 'STREAMS', key, id));
  85. this.blocking = false;
  86. if (data) {
  87. const stream = data[0];
  88. const events = stream[1];
  89. for (let i = 0; i < events.length; i++) {
  90. id = events[i][0];
  91. const args = array2obj(events[i][1]);
  92. //
  93. // TODO: we may need to have a separate xtream for progress data
  94. // to avoid this hack.
  95. switch (args.event) {
  96. case 'progress':
  97. args.data = JSON.parse(args.data);
  98. break;
  99. case 'completed':
  100. args.returnvalue = JSON.parse(args.returnvalue);
  101. break;
  102. }
  103. const { event } = args, restArgs = __rest(args, ["event"]);
  104. if (event === 'drained') {
  105. this.emit(event, id);
  106. }
  107. else {
  108. this.emit(event, restArgs, id);
  109. if (restArgs.jobId) {
  110. this.emit(`${event}:${restArgs.jobId}`, restArgs, id);
  111. }
  112. }
  113. }
  114. }
  115. }
  116. }
  117. /**
  118. * Stops consuming events and close the underlying Redis connection if necessary.
  119. *
  120. * @returns
  121. */
  122. async close() {
  123. if (!this.closing) {
  124. this.closing = (async () => {
  125. try {
  126. // As the connection has been wrongly markes as "shared" by QueueBase,
  127. // we need to forcibly close it here. We should fix QueueBase to avoid this in the future.
  128. const client = await this.client;
  129. client.disconnect();
  130. await this.connection.close(this.blocking);
  131. }
  132. finally {
  133. this.closed = true;
  134. }
  135. })();
  136. }
  137. return this.closing;
  138. }
  139. }
  140. //# sourceMappingURL=queue-events.js.map