ClusterSubscriberGroup.d.ts 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. /// <reference types="node" />
  2. import * as EventEmitter from "events";
  3. import ShardedSubscriber from "./ShardedSubscriber";
  4. import { ClusterOptions } from "./ClusterOptions";
  5. /**
  6. * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
  7. * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
  8. * messages between shards. Sharded PubSub removes this limitation by making each shard
  9. * responsible for its own messages.
  10. *
  11. * This class coordinates one ShardedSubscriber per master node in the cluster, providing
  12. * sharded PubSub support while keeping the public API backward compatible.
  13. */
  14. export default class ClusterSubscriberGroup {
  15. private readonly subscriberGroupEmitter;
  16. private readonly options;
  17. private shardedSubscribers;
  18. private clusterSlots;
  19. private subscriberToSlotsIndex;
  20. private channels;
  21. private failedAttemptsByNode;
  22. private isResetting;
  23. private pendingReset;
  24. private static readonly MAX_RETRY_ATTEMPTS;
  25. private static readonly MAX_BACKOFF_MS;
  26. private static readonly BASE_BACKOFF_MS;
  27. /**
  28. * Register callbacks
  29. *
  30. * @param cluster
  31. */
  32. constructor(subscriberGroupEmitter: EventEmitter, options: ClusterOptions);
  33. /**
  34. * Get the responsible subscriber.
  35. *
  36. * @param slot
  37. */
  38. getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined;
  39. /**
  40. * Adds a channel for which this subscriber group is responsible
  41. *
  42. * @param channels
  43. */
  44. addChannels(channels: (string | Buffer)[]): number;
  45. /**
  46. * Removes channels for which the subscriber group is responsible by optionally unsubscribing
  47. * @param channels
  48. */
  49. removeChannels(channels: (string | Buffer)[]): number;
  50. /**
  51. * Disconnect all subscribers and clear some of the internal state.
  52. */
  53. stop(): void;
  54. /**
  55. * Start all not yet started subscribers
  56. */
  57. start(): Promise<any[]>;
  58. /**
  59. * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
  60. */
  61. reset(clusterSlots: string[][], clusterNodes: any[]): Promise<void>;
  62. /**
  63. * Refreshes the subscriber-related slot ranges
  64. *
  65. * Returns false if no refresh was needed
  66. *
  67. * @param targetSlots
  68. */
  69. private _refreshSlots;
  70. /**
  71. * Resubscribes to the previous channels
  72. *
  73. * @private
  74. */
  75. private _resubscribe;
  76. /**
  77. * Deep equality of the cluster slots objects
  78. *
  79. * @param other
  80. * @private
  81. */
  82. private _slotsAreEqual;
  83. /**
  84. * Checks if any subscribers are in an unhealthy state.
  85. *
  86. * A subscriber is considered unhealthy if:
  87. * - It exists but is not started (failed/disconnected)
  88. * - It's missing entirely for a node that should have one
  89. *
  90. * @returns true if any subscribers need to be recreated
  91. */
  92. private hasUnhealthySubscribers;
  93. /**
  94. * Handles failed subscriber connections by emitting an event to refresh the slots cache
  95. * after a backoff period.
  96. *
  97. * @param error
  98. * @param nodeKey
  99. */
  100. private handleSubscriberConnectFailed;
  101. /**
  102. * Handles successful subscriber connections by resetting the failed attempts counter.
  103. *
  104. * @param nodeKey
  105. */
  106. private handleSubscriberConnectSucceeded;
  107. private shouldStartSubscriber;
  108. }