ShardedSubscriber.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const util_1 = require("./util");
  4. const utils_1 = require("../utils");
  5. const Redis_1 = require("../Redis");
  6. const debug = (0, utils_1.Debug)("cluster:subscriberGroup:shardedSubscriber");
  7. const SubscriberStatus = {
  8. IDLE: "idle",
  9. STARTING: "starting",
  10. CONNECTED: "connected",
  11. STOPPING: "stopping",
  12. ENDED: "ended",
  13. };
  14. const ALLOWED_STATUS_UPDATES = {
  15. [SubscriberStatus.IDLE]: [
  16. SubscriberStatus.STARTING,
  17. SubscriberStatus.STOPPING,
  18. SubscriberStatus.ENDED,
  19. ],
  20. [SubscriberStatus.STARTING]: [
  21. SubscriberStatus.CONNECTED,
  22. SubscriberStatus.STOPPING,
  23. SubscriberStatus.ENDED,
  24. ],
  25. [SubscriberStatus.CONNECTED]: [
  26. SubscriberStatus.STOPPING,
  27. SubscriberStatus.ENDED,
  28. ],
  29. [SubscriberStatus.STOPPING]: [SubscriberStatus.ENDED],
  30. [SubscriberStatus.ENDED]: [],
  31. };
  32. class ShardedSubscriber {
  33. constructor(emitter, options, redisOptions) {
  34. var _a;
  35. this.emitter = emitter;
  36. this.status = SubscriberStatus.IDLE;
  37. this.instance = null;
  38. this.connectPromise = null;
  39. // Store listener references for cleanup
  40. this.messageListeners = new Map();
  41. this.onEnd = () => {
  42. this.updateStatus(SubscriberStatus.ENDED);
  43. this.emitter.emit("-node", this.instance, this.nodeKey);
  44. };
  45. this.onError = (error) => {
  46. this.emitter.emit("nodeError", error, this.nodeKey);
  47. };
  48. this.onMoved = () => {
  49. this.emitter.emit("moved");
  50. };
  51. this.instance = new Redis_1.default((0, utils_1.defaults)({
  52. enableReadyCheck: false,
  53. enableOfflineQueue: true,
  54. connectionName: (0, util_1.getConnectionName)("ssubscriber", options.connectionName),
  55. /**
  56. * Disable auto reconnection for subscribers.
  57. * The ClusterSubscriberGroup will handle the reconnection.
  58. */
  59. retryStrategy: null,
  60. lazyConnect: true,
  61. }, options, redisOptions));
  62. this.lazyConnect = (_a = redisOptions === null || redisOptions === void 0 ? void 0 : redisOptions.lazyConnect) !== null && _a !== void 0 ? _a : true;
  63. this.nodeKey = (0, util_1.getNodeKey)(options);
  64. // Register listeners
  65. this.instance.on("end", this.onEnd);
  66. this.instance.on("error", this.onError);
  67. this.instance.on("moved", this.onMoved);
  68. for (const event of ["smessage", "smessageBuffer"]) {
  69. const listener = (...args) => {
  70. this.emitter.emit(event, ...args);
  71. };
  72. this.messageListeners.set(event, listener);
  73. this.instance.on(event, listener);
  74. }
  75. }
  76. async start() {
  77. if (this.connectPromise) {
  78. return this.connectPromise;
  79. }
  80. if (this.status === SubscriberStatus.STARTING ||
  81. this.status === SubscriberStatus.CONNECTED) {
  82. return;
  83. }
  84. if (this.status === SubscriberStatus.ENDED || !this.instance) {
  85. throw new Error(`Sharded subscriber ${this.nodeKey} cannot be restarted once ended.`);
  86. }
  87. this.updateStatus(SubscriberStatus.STARTING);
  88. this.connectPromise = this.instance.connect();
  89. try {
  90. await this.connectPromise;
  91. this.updateStatus(SubscriberStatus.CONNECTED);
  92. }
  93. catch (err) {
  94. this.updateStatus(SubscriberStatus.ENDED);
  95. throw err;
  96. }
  97. finally {
  98. this.connectPromise = null;
  99. }
  100. }
  101. stop() {
  102. this.updateStatus(SubscriberStatus.STOPPING);
  103. if (this.instance) {
  104. this.instance.disconnect();
  105. this.instance.removeAllListeners();
  106. this.messageListeners.clear();
  107. this.instance = null;
  108. }
  109. this.updateStatus(SubscriberStatus.ENDED);
  110. debug("stopped %s", this.nodeKey);
  111. }
  112. isStarted() {
  113. return [
  114. SubscriberStatus.CONNECTED,
  115. SubscriberStatus.STARTING,
  116. ].includes(this.status);
  117. }
  118. get subscriberStatus() {
  119. return this.status;
  120. }
  121. isHealthy() {
  122. return ((this.status === SubscriberStatus.IDLE ||
  123. this.status === SubscriberStatus.CONNECTED ||
  124. this.status === SubscriberStatus.STARTING) &&
  125. this.instance !== null);
  126. }
  127. getInstance() {
  128. return this.instance;
  129. }
  130. getNodeKey() {
  131. return this.nodeKey;
  132. }
  133. isLazyConnect() {
  134. return this.lazyConnect;
  135. }
  136. updateStatus(nextStatus) {
  137. if (this.status === nextStatus) {
  138. return;
  139. }
  140. if (!ALLOWED_STATUS_UPDATES[this.status].includes(nextStatus)) {
  141. debug("Invalid status transition for %s: %s -> %s", this.nodeKey, this.status, nextStatus);
  142. return;
  143. }
  144. this.status = nextStatus;
  145. }
  146. }
  147. exports.default = ShardedSubscriber;