ClusterSubscriber.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const util_1 = require("./util");
  4. const utils_1 = require("../utils");
  5. const Redis_1 = require("../Redis");
  6. const debug = (0, utils_1.Debug)("cluster:subscriber");
  7. class ClusterSubscriber {
  8. constructor(connectionPool, emitter, isSharded = false) {
  9. this.connectionPool = connectionPool;
  10. this.emitter = emitter;
  11. this.isSharded = isSharded;
  12. this.started = false;
  13. //There is only one connection for the entire pool
  14. this.subscriber = null;
  15. //The slot range for which this subscriber is responsible
  16. this.slotRange = [];
  17. this.onSubscriberEnd = () => {
  18. if (!this.started) {
  19. debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting.");
  20. return;
  21. }
  22. // If the subscriber closes whilst it's still the active connection,
  23. // we might as well try to connecting to a new node if possible to
  24. // minimise the number of missed publishes.
  25. debug("subscriber has disconnected, selecting a new one...");
  26. this.selectSubscriber();
  27. };
  28. // If the current node we're using as the subscriber disappears
  29. // from the node pool for some reason, we will select a new one
  30. // to connect to.
  31. // Note that this event is only triggered if the connection to
  32. // the node has been used; cluster subscriptions are setup with
  33. // lazyConnect = true. It's possible for the subscriber node to
  34. // disappear without this method being called!
  35. // See https://github.com/luin/ioredis/pull/1589
  36. this.connectionPool.on("-node", (_, key) => {
  37. if (!this.started || !this.subscriber) {
  38. return;
  39. }
  40. if ((0, util_1.getNodeKey)(this.subscriber.options) === key) {
  41. debug("subscriber has left, selecting a new one...");
  42. this.selectSubscriber();
  43. }
  44. });
  45. this.connectionPool.on("+node", () => {
  46. if (!this.started || this.subscriber) {
  47. return;
  48. }
  49. debug("a new node is discovered and there is no subscriber, selecting a new one...");
  50. this.selectSubscriber();
  51. });
  52. }
  53. getInstance() {
  54. return this.subscriber;
  55. }
  56. /**
  57. * Associate this subscriber to a specific slot range.
  58. *
  59. * Returns the range or an empty array if the slot range couldn't be associated.
  60. *
  61. * BTW: This is more for debugging and testing purposes.
  62. *
  63. * @param range
  64. */
  65. associateSlotRange(range) {
  66. if (this.isSharded) {
  67. this.slotRange = range;
  68. }
  69. return this.slotRange;
  70. }
  71. start() {
  72. this.started = true;
  73. this.selectSubscriber();
  74. debug("started");
  75. }
  76. stop() {
  77. this.started = false;
  78. if (this.subscriber) {
  79. this.subscriber.disconnect();
  80. this.subscriber = null;
  81. }
  82. }
  83. isStarted() {
  84. return this.started;
  85. }
  86. selectSubscriber() {
  87. const lastActiveSubscriber = this.lastActiveSubscriber;
  88. // Disconnect the previous subscriber even if there
  89. // will not be a new one.
  90. if (lastActiveSubscriber) {
  91. lastActiveSubscriber.off("end", this.onSubscriberEnd);
  92. lastActiveSubscriber.disconnect();
  93. }
  94. if (this.subscriber) {
  95. this.subscriber.off("end", this.onSubscriberEnd);
  96. this.subscriber.disconnect();
  97. }
  98. const sampleNode = (0, utils_1.sample)(this.connectionPool.getNodes());
  99. if (!sampleNode) {
  100. debug("selecting subscriber failed since there is no node discovered in the cluster yet");
  101. this.subscriber = null;
  102. return;
  103. }
  104. const { options } = sampleNode;
  105. debug("selected a subscriber %s:%s", options.host, options.port);
  106. /*
  107. * Create a specialized Redis connection for the subscription.
  108. * Note that auto reconnection is enabled here.
  109. *
  110. * `enableReadyCheck` is also enabled because although subscription is allowed
  111. * while redis is loading data from the disk, we can check if the password
  112. * provided for the subscriber is correct, and if not, the current subscriber
  113. * will be disconnected and a new subscriber will be selected.
  114. */
  115. let connectionPrefix = "subscriber";
  116. if (this.isSharded)
  117. connectionPrefix = "ssubscriber";
  118. this.subscriber = new Redis_1.default({
  119. port: options.port,
  120. host: options.host,
  121. username: options.username,
  122. password: options.password,
  123. enableReadyCheck: true,
  124. connectionName: (0, util_1.getConnectionName)(connectionPrefix, options.connectionName),
  125. lazyConnect: true,
  126. tls: options.tls,
  127. // Don't try to reconnect the subscriber connection. If the connection fails
  128. // we will get an end event (handled below), at which point we'll pick a new
  129. // node from the pool and try to connect to that as the subscriber connection.
  130. retryStrategy: null,
  131. });
  132. // Ignore the errors since they're handled in the connection pool.
  133. this.subscriber.on("error", utils_1.noop);
  134. this.subscriber.on("moved", () => {
  135. this.emitter.emit("forceRefresh");
  136. });
  137. // The node we lost connection to may not come back up in a
  138. // reasonable amount of time (e.g. a slave that's taken down
  139. // for maintainence), we could potentially miss many published
  140. // messages so we should reconnect as quickly as possible, to
  141. // a different node if needed.
  142. this.subscriber.once("end", this.onSubscriberEnd);
  143. // Re-subscribe previous channels
  144. const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
  145. if (lastActiveSubscriber) {
  146. const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
  147. if (condition && condition.subscriber) {
  148. previousChannels.subscribe = condition.subscriber.channels("subscribe");
  149. previousChannels.psubscribe =
  150. condition.subscriber.channels("psubscribe");
  151. previousChannels.ssubscribe =
  152. condition.subscriber.channels("ssubscribe");
  153. }
  154. }
  155. if (previousChannels.subscribe.length ||
  156. previousChannels.psubscribe.length ||
  157. previousChannels.ssubscribe.length) {
  158. let pending = 0;
  159. for (const type of ["subscribe", "psubscribe", "ssubscribe"]) {
  160. const channels = previousChannels[type];
  161. if (channels.length == 0) {
  162. continue;
  163. }
  164. debug("%s %d channels", type, channels.length);
  165. if (type === "ssubscribe") {
  166. for (const channel of channels) {
  167. pending += 1;
  168. this.subscriber[type](channel)
  169. .then(() => {
  170. if (!--pending) {
  171. this.lastActiveSubscriber = this.subscriber;
  172. }
  173. })
  174. .catch(() => {
  175. // TODO: should probably disconnect the subscriber and try again.
  176. debug("failed to ssubscribe to channel: %s", channel);
  177. });
  178. }
  179. }
  180. else {
  181. pending += 1;
  182. this.subscriber[type](channels)
  183. .then(() => {
  184. if (!--pending) {
  185. this.lastActiveSubscriber = this.subscriber;
  186. }
  187. })
  188. .catch(() => {
  189. // TODO: should probably disconnect the subscriber and try again.
  190. debug("failed to %s %d channels", type, channels.length);
  191. });
  192. }
  193. }
  194. }
  195. else {
  196. this.lastActiveSubscriber = this.subscriber;
  197. }
  198. for (const event of [
  199. "message",
  200. "messageBuffer",
  201. ]) {
  202. this.subscriber.on(event, (arg1, arg2) => {
  203. this.emitter.emit(event, arg1, arg2);
  204. });
  205. }
  206. for (const event of ["pmessage", "pmessageBuffer"]) {
  207. this.subscriber.on(event, (arg1, arg2, arg3) => {
  208. this.emitter.emit(event, arg1, arg2, arg3);
  209. });
  210. }
  211. if (this.isSharded == true) {
  212. for (const event of [
  213. "smessage",
  214. "smessageBuffer",
  215. ]) {
  216. this.subscriber.on(event, (arg1, arg2) => {
  217. this.emitter.emit(event, arg1, arg2);
  218. });
  219. }
  220. }
  221. }
  222. }
  223. exports.default = ClusterSubscriber;