ClusterSubscriberGroup.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const utils_1 = require("../utils");
  4. const util_1 = require("./util");
  5. const calculateSlot = require("cluster-key-slot");
  6. const ShardedSubscriber_1 = require("./ShardedSubscriber");
  7. const debug = (0, utils_1.Debug)("cluster:subscriberGroup");
  8. /**
  9. * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
  10. * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
  11. * messages between shards. Sharded PubSub removes this limitation by making each shard
  12. * responsible for its own messages.
  13. *
  14. * This class coordinates one ShardedSubscriber per master node in the cluster, providing
  15. * sharded PubSub support while keeping the public API backward compatible.
  16. */
  17. class ClusterSubscriberGroup {
  18. /**
  19. * Register callbacks
  20. *
  21. * @param cluster
  22. */
  23. constructor(subscriberGroupEmitter, options) {
  24. this.subscriberGroupEmitter = subscriberGroupEmitter;
  25. this.options = options;
  26. this.shardedSubscribers = new Map();
  27. this.clusterSlots = [];
  28. // Simple [min, max] slot ranges aren't enough because you can migrate single slots
  29. this.subscriberToSlotsIndex = new Map();
  30. this.channels = new Map();
  31. this.failedAttemptsByNode = new Map();
  32. // Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay
  33. this.isResetting = false;
  34. this.pendingReset = null;
  35. /**
  36. * Handles failed subscriber connections by emitting an event to refresh the slots cache
  37. * after a backoff period.
  38. *
  39. * @param error
  40. * @param nodeKey
  41. */
  42. this.handleSubscriberConnectFailed = (error, nodeKey) => {
  43. const currentAttempts = this.failedAttemptsByNode.get(nodeKey) || 0;
  44. const failedAttempts = currentAttempts + 1;
  45. this.failedAttemptsByNode.set(nodeKey, failedAttempts);
  46. const attempts = Math.min(failedAttempts, ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS);
  47. const backoff = Math.min(ClusterSubscriberGroup.BASE_BACKOFF_MS * 2 ** attempts, ClusterSubscriberGroup.MAX_BACKOFF_MS);
  48. const jitter = Math.floor((Math.random() - 0.5) * (backoff * 0.5));
  49. const delay = Math.max(0, backoff + jitter);
  50. debug("Failed to connect subscriber for %s. Refreshing slots in %dms", nodeKey, delay);
  51. this.subscriberGroupEmitter.emit("subscriberConnectFailed", {
  52. delay,
  53. error,
  54. });
  55. };
  56. /**
  57. * Handles successful subscriber connections by resetting the failed attempts counter.
  58. *
  59. * @param nodeKey
  60. */
  61. this.handleSubscriberConnectSucceeded = (nodeKey) => {
  62. this.failedAttemptsByNode.delete(nodeKey);
  63. };
  64. }
  65. /**
  66. * Get the responsible subscriber.
  67. *
  68. * @param slot
  69. */
  70. getResponsibleSubscriber(slot) {
  71. const nodeKey = this.clusterSlots[slot][0];
  72. const sub = this.shardedSubscribers.get(nodeKey);
  73. if (sub && sub.subscriberStatus === "idle") {
  74. sub
  75. .start()
  76. .then(() => {
  77. this.handleSubscriberConnectSucceeded(sub.getNodeKey());
  78. })
  79. .catch((err) => {
  80. this.handleSubscriberConnectFailed(err, sub.getNodeKey());
  81. });
  82. }
  83. return sub;
  84. }
  85. /**
  86. * Adds a channel for which this subscriber group is responsible
  87. *
  88. * @param channels
  89. */
  90. addChannels(channels) {
  91. const slot = calculateSlot(channels[0]);
  92. // Check if the all channels belong to the same slot and otherwise reject the operation
  93. for (const c of channels) {
  94. if (calculateSlot(c) !== slot) {
  95. return -1;
  96. }
  97. }
  98. const currChannels = this.channels.get(slot);
  99. if (!currChannels) {
  100. this.channels.set(slot, channels);
  101. }
  102. else {
  103. this.channels.set(slot, currChannels.concat(channels));
  104. }
  105. return Array.from(this.channels.values()).reduce((sum, array) => sum + array.length, 0);
  106. }
  107. /**
  108. * Removes channels for which the subscriber group is responsible by optionally unsubscribing
  109. * @param channels
  110. */
  111. removeChannels(channels) {
  112. const slot = calculateSlot(channels[0]);
  113. // Check if the all channels belong to the same slot and otherwise reject the operation
  114. for (const c of channels) {
  115. if (calculateSlot(c) !== slot) {
  116. return -1;
  117. }
  118. }
  119. const slotChannels = this.channels.get(slot);
  120. if (slotChannels) {
  121. const updatedChannels = slotChannels.filter((c) => !channels.includes(c));
  122. this.channels.set(slot, updatedChannels);
  123. }
  124. return Array.from(this.channels.values()).reduce((sum, array) => sum + array.length, 0);
  125. }
  126. /**
  127. * Disconnect all subscribers and clear some of the internal state.
  128. */
  129. stop() {
  130. for (const s of this.shardedSubscribers.values()) {
  131. s.stop();
  132. }
  133. // Clear subscriber instances and pending operations.
  134. // Channels are preserved for resubscription on reconnect.
  135. this.pendingReset = null;
  136. this.shardedSubscribers.clear();
  137. this.subscriberToSlotsIndex.clear();
  138. }
  139. /**
  140. * Start all not yet started subscribers
  141. */
  142. start() {
  143. const startPromises = [];
  144. for (const s of this.shardedSubscribers.values()) {
  145. if (this.shouldStartSubscriber(s)) {
  146. startPromises.push(s
  147. .start()
  148. .then(() => {
  149. this.handleSubscriberConnectSucceeded(s.getNodeKey());
  150. })
  151. .catch((err) => {
  152. this.handleSubscriberConnectFailed(err, s.getNodeKey());
  153. }));
  154. this.subscriberGroupEmitter.emit("+subscriber");
  155. }
  156. }
  157. return Promise.all(startPromises);
  158. }
  159. /**
  160. * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
  161. */
  162. async reset(clusterSlots, clusterNodes) {
  163. if (this.isResetting) {
  164. this.pendingReset = { slots: clusterSlots, nodes: clusterNodes };
  165. return;
  166. }
  167. this.isResetting = true;
  168. try {
  169. const hasTopologyChanged = this._refreshSlots(clusterSlots);
  170. const hasFailedSubscribers = this.hasUnhealthySubscribers();
  171. if (!hasTopologyChanged && !hasFailedSubscribers) {
  172. debug("No topology change detected or failed subscribers. Skipping reset.");
  173. return;
  174. }
  175. // For each of the sharded subscribers
  176. for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) {
  177. if (
  178. // If the subscriber is still responsible for a slot range and is healthy then keep it
  179. this.subscriberToSlotsIndex.has(nodeKey) &&
  180. shardedSubscriber.isHealthy()) {
  181. debug("Skipping deleting subscriber for %s", nodeKey);
  182. continue;
  183. }
  184. debug("Removing subscriber for %s", nodeKey);
  185. // Otherwise stop the subscriber and remove it
  186. shardedSubscriber.stop();
  187. this.shardedSubscribers.delete(nodeKey);
  188. this.subscriberGroupEmitter.emit("-subscriber");
  189. }
  190. const startPromises = [];
  191. // For each node in slots cache
  192. for (const [nodeKey, _] of this.subscriberToSlotsIndex) {
  193. const existingSubscriber = this.shardedSubscribers.get(nodeKey);
  194. // If we already have a subscriber for this node, only ensure it is healthy
  195. // when it now owns slots with active channel subscriptions.
  196. if (existingSubscriber && existingSubscriber.isHealthy()) {
  197. debug("Skipping creating new subscriber for %s", nodeKey);
  198. if (!existingSubscriber.isStarted() &&
  199. this.shouldStartSubscriber(existingSubscriber)) {
  200. startPromises.push(existingSubscriber
  201. .start()
  202. .then(() => {
  203. this.handleSubscriberConnectSucceeded(nodeKey);
  204. })
  205. .catch((error) => {
  206. this.handleSubscriberConnectFailed(error, nodeKey);
  207. }));
  208. }
  209. continue;
  210. }
  211. // If we have an existing subscriber but it is not healthy, stop it
  212. if (existingSubscriber && !existingSubscriber.isHealthy()) {
  213. debug("Replacing subscriber for %s", nodeKey);
  214. existingSubscriber.stop();
  215. this.shardedSubscribers.delete(nodeKey);
  216. this.subscriberGroupEmitter.emit("-subscriber");
  217. }
  218. debug("Creating new subscriber for %s", nodeKey);
  219. // Otherwise create a new subscriber
  220. const redis = clusterNodes.find((node) => {
  221. return (0, util_1.getNodeKey)(node.options) === nodeKey;
  222. });
  223. if (!redis) {
  224. debug("Failed to find node for key %s", nodeKey);
  225. continue;
  226. }
  227. const sub = new ShardedSubscriber_1.default(this.subscriberGroupEmitter, redis.options, this.options.redisOptions);
  228. this.shardedSubscribers.set(nodeKey, sub);
  229. if (this.shouldStartSubscriber(sub)) {
  230. startPromises.push(sub
  231. .start()
  232. .then(() => {
  233. this.handleSubscriberConnectSucceeded(nodeKey);
  234. })
  235. .catch((error) => {
  236. this.handleSubscriberConnectFailed(error, nodeKey);
  237. }));
  238. }
  239. this.subscriberGroupEmitter.emit("+subscriber");
  240. }
  241. // It's vital to await the start promises before resubscribing
  242. // Otherwise we might try to resubscribe to a subscriber that is not yet connected
  243. // This can cause a race condition
  244. await Promise.all(startPromises);
  245. this._resubscribe();
  246. this.subscriberGroupEmitter.emit("subscribersReady");
  247. }
  248. finally {
  249. this.isResetting = false;
  250. if (this.pendingReset) {
  251. const { slots, nodes } = this.pendingReset;
  252. this.pendingReset = null;
  253. await this.reset(slots, nodes);
  254. }
  255. }
  256. }
  257. /**
  258. * Refreshes the subscriber-related slot ranges
  259. *
  260. * Returns false if no refresh was needed
  261. *
  262. * @param targetSlots
  263. */
  264. _refreshSlots(targetSlots) {
  265. //If there was an actual change, then reassign the slot ranges
  266. // Also rebuild if subscriberToSlotsIndex is empty (e.g., after stop() was called)
  267. if (this._slotsAreEqual(targetSlots) && this.subscriberToSlotsIndex.size > 0) {
  268. debug("Nothing to refresh because the new cluster map is equal to the previous one.");
  269. return false;
  270. }
  271. debug("Refreshing the slots of the subscriber group.");
  272. //Rebuild the slots index
  273. this.subscriberToSlotsIndex = new Map();
  274. for (let slot = 0; slot < targetSlots.length; slot++) {
  275. const node = targetSlots[slot][0];
  276. if (!this.subscriberToSlotsIndex.has(node)) {
  277. this.subscriberToSlotsIndex.set(node, []);
  278. }
  279. this.subscriberToSlotsIndex.get(node).push(Number(slot));
  280. }
  281. //Update the cached slots map
  282. this.clusterSlots = JSON.parse(JSON.stringify(targetSlots));
  283. return true;
  284. }
  285. /**
  286. * Resubscribes to the previous channels
  287. *
  288. * @private
  289. */
  290. _resubscribe() {
  291. if (this.shardedSubscribers) {
  292. this.shardedSubscribers.forEach((s, nodeKey) => {
  293. const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey);
  294. if (subscriberSlots) {
  295. //Resubscribe on the underlying connection
  296. subscriberSlots.forEach((ss) => {
  297. //Might return null if being disconnected
  298. const redis = s.getInstance();
  299. const channels = this.channels.get(ss);
  300. if (channels && channels.length > 0) {
  301. if (!redis || redis.status === "end") {
  302. return;
  303. }
  304. if (redis.status === "ready") {
  305. redis.ssubscribe(...channels).catch((err) => {
  306. // TODO: Should we emit an error event here?
  307. debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
  308. });
  309. }
  310. else {
  311. redis.once("ready", () => {
  312. redis.ssubscribe(...channels).catch((err) => {
  313. // TODO: Should we emit an error event here?
  314. debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
  315. });
  316. });
  317. }
  318. }
  319. });
  320. }
  321. });
  322. }
  323. }
  324. /**
  325. * Deep equality of the cluster slots objects
  326. *
  327. * @param other
  328. * @private
  329. */
  330. _slotsAreEqual(other) {
  331. if (this.clusterSlots === undefined) {
  332. return false;
  333. }
  334. else {
  335. return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
  336. }
  337. }
  338. /**
  339. * Checks if any subscribers are in an unhealthy state.
  340. *
  341. * A subscriber is considered unhealthy if:
  342. * - It exists but is not started (failed/disconnected)
  343. * - It's missing entirely for a node that should have one
  344. *
  345. * @returns true if any subscribers need to be recreated
  346. */
  347. hasUnhealthySubscribers() {
  348. const hasFailedSubscribers = Array.from(this.shardedSubscribers.values()).some((sub) => !sub.isHealthy());
  349. const hasMissingSubscribers = Array.from(this.subscriberToSlotsIndex.keys()).some((nodeKey) => !this.shardedSubscribers.has(nodeKey));
  350. return hasFailedSubscribers || hasMissingSubscribers;
  351. }
  352. shouldStartSubscriber(sub) {
  353. if (sub.isStarted()) {
  354. return false;
  355. }
  356. if (!sub.isLazyConnect()) {
  357. return true;
  358. }
  359. const subscriberSlots = this.subscriberToSlotsIndex.get(sub.getNodeKey());
  360. if (!subscriberSlots) {
  361. return false;
  362. }
  363. return subscriberSlots.some((slot) => {
  364. const channels = this.channels.get(slot);
  365. return Boolean(channels && channels.length > 0);
  366. });
  367. }
  368. }
  369. exports.default = ClusterSubscriberGroup;
  370. // Retry strategy
  371. ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS = 10;
  372. ClusterSubscriberGroup.MAX_BACKOFF_MS = 2000;
  373. ClusterSubscriberGroup.BASE_BACKOFF_MS = 100;