ConnectionPool.js 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const events_1 = require("events");
  4. const utils_1 = require("../utils");
  5. const util_1 = require("./util");
  6. const Redis_1 = require("../Redis");
  7. const debug = (0, utils_1.Debug)("cluster:connectionPool");
  8. class ConnectionPool extends events_1.EventEmitter {
  9. constructor(redisOptions) {
  10. super();
  11. this.redisOptions = redisOptions;
  12. // master + slave = all
  13. this.nodes = {
  14. all: {},
  15. master: {},
  16. slave: {},
  17. };
  18. this.specifiedOptions = {};
  19. }
  20. getNodes(role = "all") {
  21. const nodes = this.nodes[role];
  22. return Object.keys(nodes).map((key) => nodes[key]);
  23. }
  24. getInstanceByKey(key) {
  25. return this.nodes.all[key];
  26. }
  27. getSampleInstance(role) {
  28. const keys = Object.keys(this.nodes[role]);
  29. const sampleKey = (0, utils_1.sample)(keys);
  30. return this.nodes[role][sampleKey];
  31. }
  32. /**
  33. * Add a master node to the pool
  34. * @param node
  35. */
  36. addMasterNode(node) {
  37. const key = (0, util_1.getNodeKey)(node.options);
  38. const redis = this.createRedisFromOptions(node, node.options.readOnly);
  39. //Master nodes aren't read-only
  40. if (!node.options.readOnly) {
  41. this.nodes.all[key] = redis;
  42. this.nodes.master[key] = redis;
  43. return true;
  44. }
  45. return false;
  46. }
  47. /**
  48. * Creates a Redis connection instance from the node options
  49. * @param node
  50. * @param readOnly
  51. */
  52. createRedisFromOptions(node, readOnly) {
  53. const redis = new Redis_1.default((0, utils_1.defaults)({
  54. // Never try to reconnect when a node is lose,
  55. // instead, waiting for a `MOVED` error and
  56. // fetch the slots again.
  57. retryStrategy: null,
  58. // Offline queue should be enabled so that
  59. // we don't need to wait for the `ready` event
  60. // before sending commands to the node.
  61. enableOfflineQueue: true,
  62. readOnly: readOnly,
  63. }, node, this.redisOptions, { lazyConnect: true }));
  64. return redis;
  65. }
  66. /**
  67. * Find or create a connection to the node
  68. */
  69. findOrCreate(node, readOnly = false) {
  70. const key = (0, util_1.getNodeKey)(node);
  71. readOnly = Boolean(readOnly);
  72. if (this.specifiedOptions[key]) {
  73. Object.assign(node, this.specifiedOptions[key]);
  74. }
  75. else {
  76. this.specifiedOptions[key] = node;
  77. }
  78. let redis;
  79. if (this.nodes.all[key]) {
  80. redis = this.nodes.all[key];
  81. if (redis.options.readOnly !== readOnly) {
  82. redis.options.readOnly = readOnly;
  83. debug("Change role of %s to %s", key, readOnly ? "slave" : "master");
  84. redis[readOnly ? "readonly" : "readwrite"]().catch(utils_1.noop);
  85. if (readOnly) {
  86. delete this.nodes.master[key];
  87. this.nodes.slave[key] = redis;
  88. }
  89. else {
  90. delete this.nodes.slave[key];
  91. this.nodes.master[key] = redis;
  92. }
  93. }
  94. }
  95. else {
  96. debug("Connecting to %s as %s", key, readOnly ? "slave" : "master");
  97. redis = this.createRedisFromOptions(node, readOnly);
  98. this.nodes.all[key] = redis;
  99. this.nodes[readOnly ? "slave" : "master"][key] = redis;
  100. redis.once("end", () => {
  101. this.removeNode(key);
  102. this.emit("-node", redis, key);
  103. if (!Object.keys(this.nodes.all).length) {
  104. this.emit("drain");
  105. }
  106. });
  107. this.emit("+node", redis, key);
  108. redis.on("error", function (error) {
  109. this.emit("nodeError", error, key);
  110. });
  111. }
  112. return redis;
  113. }
  114. /**
  115. * Reset the pool with a set of nodes.
  116. * The old node will be removed.
  117. */
  118. reset(nodes) {
  119. debug("Reset with %O", nodes);
  120. const newNodes = {};
  121. nodes.forEach((node) => {
  122. const key = (0, util_1.getNodeKey)(node);
  123. // Don't override the existing (master) node
  124. // when the current one is slave.
  125. if (!(node.readOnly && newNodes[key])) {
  126. newNodes[key] = node;
  127. }
  128. });
  129. Object.keys(this.nodes.all).forEach((key) => {
  130. if (!newNodes[key]) {
  131. debug("Disconnect %s because the node does not hold any slot", key);
  132. this.nodes.all[key].disconnect();
  133. this.removeNode(key);
  134. }
  135. });
  136. Object.keys(newNodes).forEach((key) => {
  137. const node = newNodes[key];
  138. this.findOrCreate(node, node.readOnly);
  139. });
  140. }
  141. /**
  142. * Remove a node from the pool.
  143. */
  144. removeNode(key) {
  145. const { nodes } = this;
  146. if (nodes.all[key]) {
  147. debug("Remove %s from the pool", key);
  148. delete nodes.all[key];
  149. }
  150. delete nodes.master[key];
  151. delete nodes.slave[key];
  152. }
  153. }
  154. exports.default = ConnectionPool;