index.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.SentinelIterator = void 0;
  4. const net_1 = require("net");
  5. const utils_1 = require("../../utils");
  6. const tls_1 = require("tls");
  7. const SentinelIterator_1 = require("./SentinelIterator");
  8. exports.SentinelIterator = SentinelIterator_1.default;
  9. const AbstractConnector_1 = require("../AbstractConnector");
  10. const Redis_1 = require("../../Redis");
  11. const FailoverDetector_1 = require("./FailoverDetector");
  12. const debug = (0, utils_1.Debug)("SentinelConnector");
  13. class SentinelConnector extends AbstractConnector_1.default {
  14. constructor(options) {
  15. super(options.disconnectTimeout);
  16. this.options = options;
  17. this.emitter = null;
  18. this.failoverDetector = null;
  19. if (!this.options.sentinels.length) {
  20. throw new Error("Requires at least one sentinel to connect to.");
  21. }
  22. if (!this.options.name) {
  23. throw new Error("Requires the name of master.");
  24. }
  25. this.sentinelIterator = new SentinelIterator_1.default(this.options.sentinels);
  26. }
  27. check(info) {
  28. const roleMatches = !info.role || this.options.role === info.role;
  29. if (!roleMatches) {
  30. debug("role invalid, expected %s, but got %s", this.options.role, info.role);
  31. // Start from the next item.
  32. // Note that `reset` will move the cursor to the previous element,
  33. // so we advance two steps here.
  34. this.sentinelIterator.next();
  35. this.sentinelIterator.next();
  36. this.sentinelIterator.reset(true);
  37. }
  38. return roleMatches;
  39. }
  40. disconnect() {
  41. super.disconnect();
  42. if (this.failoverDetector) {
  43. this.failoverDetector.cleanup();
  44. }
  45. }
  46. connect(eventEmitter) {
  47. this.connecting = true;
  48. this.retryAttempts = 0;
  49. let lastError;
  50. const connectToNext = async () => {
  51. const endpoint = this.sentinelIterator.next();
  52. if (endpoint.done) {
  53. this.sentinelIterator.reset(false);
  54. const retryDelay = typeof this.options.sentinelRetryStrategy === "function"
  55. ? this.options.sentinelRetryStrategy(++this.retryAttempts)
  56. : null;
  57. let errorMsg = typeof retryDelay !== "number"
  58. ? "All sentinels are unreachable and retry is disabled."
  59. : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.`;
  60. if (lastError) {
  61. errorMsg += ` Last error: ${lastError.message}`;
  62. }
  63. debug(errorMsg);
  64. const error = new Error(errorMsg);
  65. if (typeof retryDelay === "number") {
  66. eventEmitter("error", error);
  67. await new Promise((resolve) => setTimeout(resolve, retryDelay));
  68. return connectToNext();
  69. }
  70. else {
  71. throw error;
  72. }
  73. }
  74. let resolved = null;
  75. let err = null;
  76. try {
  77. resolved = await this.resolve(endpoint.value);
  78. }
  79. catch (error) {
  80. err = error;
  81. }
  82. if (!this.connecting) {
  83. throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
  84. }
  85. const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
  86. if (resolved) {
  87. debug("resolved: %s:%s from sentinel %s", resolved.host, resolved.port, endpointAddress);
  88. if (this.options.enableTLSForSentinelMode && this.options.tls) {
  89. Object.assign(resolved, this.options.tls);
  90. this.stream = (0, tls_1.connect)(resolved);
  91. this.stream.once("secureConnect", this.initFailoverDetector.bind(this));
  92. }
  93. else {
  94. this.stream = (0, net_1.createConnection)(resolved);
  95. this.stream.once("connect", this.initFailoverDetector.bind(this));
  96. }
  97. this.stream.once("error", (err) => {
  98. this.firstError = err;
  99. });
  100. return this.stream;
  101. }
  102. else {
  103. const errorMsg = err
  104. ? "failed to connect to sentinel " +
  105. endpointAddress +
  106. " because " +
  107. err.message
  108. : "connected to sentinel " +
  109. endpointAddress +
  110. " successfully, but got an invalid reply: " +
  111. resolved;
  112. debug(errorMsg);
  113. eventEmitter("sentinelError", new Error(errorMsg));
  114. if (err) {
  115. lastError = err;
  116. }
  117. return connectToNext();
  118. }
  119. };
  120. return connectToNext();
  121. }
  122. async updateSentinels(client) {
  123. if (!this.options.updateSentinels) {
  124. return;
  125. }
  126. const result = await client.sentinel("sentinels", this.options.name);
  127. if (!Array.isArray(result)) {
  128. return;
  129. }
  130. result
  131. .map(utils_1.packObject)
  132. .forEach((sentinel) => {
  133. const flags = sentinel.flags ? sentinel.flags.split(",") : [];
  134. if (flags.indexOf("disconnected") === -1 &&
  135. sentinel.ip &&
  136. sentinel.port) {
  137. const endpoint = this.sentinelNatResolve(addressResponseToAddress(sentinel));
  138. if (this.sentinelIterator.add(endpoint)) {
  139. debug("adding sentinel %s:%s", endpoint.host, endpoint.port);
  140. }
  141. }
  142. });
  143. debug("Updated internal sentinels: %s", this.sentinelIterator);
  144. }
  145. async resolveMaster(client) {
  146. const result = await client.sentinel("get-master-addr-by-name", this.options.name);
  147. await this.updateSentinels(client);
  148. return this.sentinelNatResolve(Array.isArray(result)
  149. ? { host: result[0], port: Number(result[1]) }
  150. : null);
  151. }
  152. async resolveSlave(client) {
  153. const result = await client.sentinel("slaves", this.options.name);
  154. if (!Array.isArray(result)) {
  155. return null;
  156. }
  157. const availableSlaves = result
  158. .map(utils_1.packObject)
  159. .filter((slave) => slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/));
  160. return this.sentinelNatResolve(selectPreferredSentinel(availableSlaves, this.options.preferredSlaves));
  161. }
  162. sentinelNatResolve(item) {
  163. if (!item || !this.options.natMap)
  164. return item;
  165. const key = `${item.host}:${item.port}`;
  166. let result = item;
  167. if (typeof this.options.natMap === "function") {
  168. result = this.options.natMap(key) || item;
  169. }
  170. else if (typeof this.options.natMap === "object") {
  171. result = this.options.natMap[key] || item;
  172. }
  173. return result;
  174. }
  175. connectToSentinel(endpoint, options) {
  176. const redis = new Redis_1.default({
  177. port: endpoint.port || 26379,
  178. host: endpoint.host,
  179. username: this.options.sentinelUsername || null,
  180. password: this.options.sentinelPassword || null,
  181. family: endpoint.family ||
  182. // @ts-expect-error
  183. ("path" in this.options && this.options.path
  184. ? undefined
  185. : // @ts-expect-error
  186. this.options.family),
  187. tls: this.options.sentinelTLS,
  188. retryStrategy: null,
  189. enableReadyCheck: false,
  190. connectTimeout: this.options.connectTimeout,
  191. commandTimeout: this.options.sentinelCommandTimeout,
  192. ...options,
  193. });
  194. // @ts-expect-error
  195. return redis;
  196. }
  197. async resolve(endpoint) {
  198. const client = this.connectToSentinel(endpoint);
  199. // ignore the errors since resolve* methods will handle them
  200. client.on("error", noop);
  201. try {
  202. if (this.options.role === "slave") {
  203. return await this.resolveSlave(client);
  204. }
  205. else {
  206. return await this.resolveMaster(client);
  207. }
  208. }
  209. finally {
  210. client.disconnect();
  211. }
  212. }
  213. async initFailoverDetector() {
  214. var _a;
  215. if (!this.options.failoverDetector) {
  216. return;
  217. }
  218. // Move the current sentinel to the first position
  219. this.sentinelIterator.reset(true);
  220. const sentinels = [];
  221. // In case of a large amount of sentinels, limit the number of concurrent connections
  222. while (sentinels.length < this.options.sentinelMaxConnections) {
  223. const { done, value } = this.sentinelIterator.next();
  224. if (done) {
  225. break;
  226. }
  227. const client = this.connectToSentinel(value, {
  228. lazyConnect: true,
  229. retryStrategy: this.options.sentinelReconnectStrategy,
  230. });
  231. client.on("reconnecting", () => {
  232. var _a;
  233. // Tests listen to this event
  234. (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("sentinelReconnecting");
  235. });
  236. sentinels.push({ address: value, client });
  237. }
  238. this.sentinelIterator.reset(false);
  239. if (this.failoverDetector) {
  240. // Clean up previous detector
  241. this.failoverDetector.cleanup();
  242. }
  243. this.failoverDetector = new FailoverDetector_1.FailoverDetector(this, sentinels);
  244. await this.failoverDetector.subscribe();
  245. // Tests listen to this event
  246. (_a = this.emitter) === null || _a === void 0 ? void 0 : _a.emit("failoverSubscribed");
  247. }
  248. }
  249. exports.default = SentinelConnector;
  250. function selectPreferredSentinel(availableSlaves, preferredSlaves) {
  251. if (availableSlaves.length === 0) {
  252. return null;
  253. }
  254. let selectedSlave;
  255. if (typeof preferredSlaves === "function") {
  256. selectedSlave = preferredSlaves(availableSlaves);
  257. }
  258. else if (preferredSlaves !== null && typeof preferredSlaves === "object") {
  259. const preferredSlavesArray = Array.isArray(preferredSlaves)
  260. ? preferredSlaves
  261. : [preferredSlaves];
  262. // sort by priority
  263. preferredSlavesArray.sort((a, b) => {
  264. // default the priority to 1
  265. if (!a.prio) {
  266. a.prio = 1;
  267. }
  268. if (!b.prio) {
  269. b.prio = 1;
  270. }
  271. // lowest priority first
  272. if (a.prio < b.prio) {
  273. return -1;
  274. }
  275. if (a.prio > b.prio) {
  276. return 1;
  277. }
  278. return 0;
  279. });
  280. // loop over preferred slaves and return the first match
  281. for (let p = 0; p < preferredSlavesArray.length; p++) {
  282. for (let a = 0; a < availableSlaves.length; a++) {
  283. const slave = availableSlaves[a];
  284. if (slave.ip === preferredSlavesArray[p].ip) {
  285. if (slave.port === preferredSlavesArray[p].port) {
  286. selectedSlave = slave;
  287. break;
  288. }
  289. }
  290. }
  291. if (selectedSlave) {
  292. break;
  293. }
  294. }
  295. }
  296. // if none of the preferred slaves are available, a random available slave is returned
  297. if (!selectedSlave) {
  298. selectedSlave = (0, utils_1.sample)(availableSlaves);
  299. }
  300. return addressResponseToAddress(selectedSlave);
  301. }
  302. function addressResponseToAddress(input) {
  303. return { host: input.ip, port: Number(input.port) };
  304. }
  305. function noop() { }