index.js 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const commands_1 = require("@ioredis/commands");
  4. const events_1 = require("events");
  5. const redis_errors_1 = require("redis-errors");
  6. const standard_as_callback_1 = require("standard-as-callback");
  7. const Command_1 = require("../Command");
  8. const ClusterAllFailedError_1 = require("../errors/ClusterAllFailedError");
  9. const Redis_1 = require("../Redis");
  10. const ScanStream_1 = require("../ScanStream");
  11. const transaction_1 = require("../transaction");
  12. const utils_1 = require("../utils");
  13. const applyMixin_1 = require("../utils/applyMixin");
  14. const Commander_1 = require("../utils/Commander");
  15. const ClusterOptions_1 = require("./ClusterOptions");
  16. const ClusterSubscriber_1 = require("./ClusterSubscriber");
  17. const ConnectionPool_1 = require("./ConnectionPool");
  18. const DelayQueue_1 = require("./DelayQueue");
  19. const util_1 = require("./util");
  20. const Deque = require("denque");
  21. const ClusterSubscriberGroup_1 = require("./ClusterSubscriberGroup");
  22. const debug = (0, utils_1.Debug)("cluster");
  23. const REJECT_OVERWRITTEN_COMMANDS = new WeakSet();
  24. /**
  25. * Client for the official Redis Cluster
  26. */
  27. class Cluster extends Commander_1.default {
  28. /**
  29. * Creates an instance of Cluster.
  30. */
  31. //TODO: Add an option that enables or disables sharded PubSub
  32. constructor(startupNodes, options = {}) {
  33. super();
  34. this.slots = [];
  35. /**
  36. * @ignore
  37. */
  38. this._groupsIds = {};
  39. /**
  40. * @ignore
  41. */
  42. this._groupsBySlot = Array(16384);
  43. /**
  44. * @ignore
  45. */
  46. this.isCluster = true;
  47. this.retryAttempts = 0;
  48. this.delayQueue = new DelayQueue_1.default();
  49. this.offlineQueue = new Deque();
  50. this.isRefreshing = false;
  51. this._refreshSlotsCacheCallbacks = [];
  52. this._autoPipelines = new Map();
  53. this._runningAutoPipelines = new Set();
  54. this._readyDelayedCallbacks = [];
  55. /**
  56. * Every time Cluster#connect() is called, this value will be
  57. * auto-incrementing. The purpose of this value is used for
  58. * discarding previous connect attampts when creating a new
  59. * connection.
  60. */
  61. this.connectionEpoch = 0;
  62. events_1.EventEmitter.call(this);
  63. this.startupNodes = startupNodes;
  64. this.options = (0, utils_1.defaults)({}, options, ClusterOptions_1.DEFAULT_CLUSTER_OPTIONS, this.options);
  65. if (this.options.shardedSubscribers) {
  66. this.createShardedSubscriberGroup();
  67. }
  68. if (this.options.redisOptions &&
  69. this.options.redisOptions.keyPrefix &&
  70. !this.options.keyPrefix) {
  71. this.options.keyPrefix = this.options.redisOptions.keyPrefix;
  72. }
  73. // validate options
  74. if (typeof this.options.scaleReads !== "function" &&
  75. ["all", "master", "slave"].indexOf(this.options.scaleReads) === -1) {
  76. throw new Error('Invalid option scaleReads "' +
  77. this.options.scaleReads +
  78. '". Expected "all", "master", "slave" or a custom function');
  79. }
  80. this.connectionPool = new ConnectionPool_1.default(this.options.redisOptions);
  81. this.connectionPool.on("-node", (redis, key) => {
  82. this.emit("-node", redis);
  83. });
  84. this.connectionPool.on("+node", (redis) => {
  85. this.emit("+node", redis);
  86. });
  87. this.connectionPool.on("drain", () => {
  88. this.setStatus("close");
  89. });
  90. this.connectionPool.on("nodeError", (error, key) => {
  91. this.emit("node error", error, key);
  92. });
  93. this.subscriber = new ClusterSubscriber_1.default(this.connectionPool, this);
  94. if (this.options.scripts) {
  95. Object.entries(this.options.scripts).forEach(([name, definition]) => {
  96. this.defineCommand(name, definition);
  97. });
  98. }
  99. if (this.options.lazyConnect) {
  100. this.setStatus("wait");
  101. }
  102. else {
  103. this.connect().catch((err) => {
  104. debug("connecting failed: %s", err);
  105. });
  106. }
  107. }
  108. /**
  109. * Connect to a cluster
  110. */
  111. connect() {
  112. return new Promise((resolve, reject) => {
  113. if (this.status === "connecting" ||
  114. this.status === "connect" ||
  115. this.status === "ready") {
  116. reject(new Error("Redis is already connecting/connected"));
  117. return;
  118. }
  119. const epoch = ++this.connectionEpoch;
  120. this.setStatus("connecting");
  121. this.resolveStartupNodeHostnames()
  122. .then((nodes) => {
  123. if (this.connectionEpoch !== epoch) {
  124. debug("discard connecting after resolving startup nodes because epoch not match: %d != %d", epoch, this.connectionEpoch);
  125. reject(new redis_errors_1.RedisError("Connection is discarded because a new connection is made"));
  126. return;
  127. }
  128. if (this.status !== "connecting") {
  129. debug("discard connecting after resolving startup nodes because the status changed to %s", this.status);
  130. reject(new redis_errors_1.RedisError("Connection is aborted"));
  131. return;
  132. }
  133. this.connectionPool.reset(nodes);
  134. if (this.options.shardedSubscribers) {
  135. this.shardedSubscribers
  136. .reset(this.slots, this.connectionPool.getNodes("all"))
  137. .catch((err) => {
  138. // TODO should we emit an error event here?
  139. debug("Error while starting subscribers: %s", err);
  140. });
  141. }
  142. const readyHandler = () => {
  143. this.setStatus("ready");
  144. this.retryAttempts = 0;
  145. this.executeOfflineCommands();
  146. this.resetNodesRefreshInterval();
  147. resolve();
  148. };
  149. let closeListener = undefined;
  150. const refreshListener = () => {
  151. this.invokeReadyDelayedCallbacks(undefined);
  152. this.removeListener("close", closeListener);
  153. this.manuallyClosing = false;
  154. this.setStatus("connect");
  155. if (this.options.enableReadyCheck) {
  156. this.readyCheck((err, fail) => {
  157. if (err || fail) {
  158. debug("Ready check failed (%s). Reconnecting...", err || fail);
  159. if (this.status === "connect") {
  160. this.disconnect(true);
  161. }
  162. }
  163. else {
  164. readyHandler();
  165. }
  166. });
  167. }
  168. else {
  169. readyHandler();
  170. }
  171. };
  172. closeListener = () => {
  173. const error = new Error("None of startup nodes is available");
  174. this.removeListener("refresh", refreshListener);
  175. this.invokeReadyDelayedCallbacks(error);
  176. reject(error);
  177. };
  178. this.once("refresh", refreshListener);
  179. this.once("close", closeListener);
  180. this.once("close", this.handleCloseEvent.bind(this));
  181. this.refreshSlotsCache((err) => {
  182. if (err && err.message === ClusterAllFailedError_1.default.defaultMessage) {
  183. Redis_1.default.prototype.silentEmit.call(this, "error", err);
  184. this.connectionPool.reset([]);
  185. }
  186. });
  187. this.subscriber.start();
  188. if (this.options.shardedSubscribers) {
  189. this.shardedSubscribers.start().catch((err) => {
  190. // TODO should we emit an error event here?
  191. debug("Error while starting subscribers: %s", err);
  192. });
  193. }
  194. })
  195. .catch((err) => {
  196. this.setStatus("close");
  197. this.handleCloseEvent(err);
  198. this.invokeReadyDelayedCallbacks(err);
  199. reject(err);
  200. });
  201. });
  202. }
  203. /**
  204. * Disconnect from every node in the cluster.
  205. */
  206. disconnect(reconnect = false) {
  207. const status = this.status;
  208. this.setStatus("disconnecting");
  209. if (!reconnect) {
  210. this.manuallyClosing = true;
  211. }
  212. if (this.reconnectTimeout && !reconnect) {
  213. clearTimeout(this.reconnectTimeout);
  214. this.reconnectTimeout = null;
  215. debug("Canceled reconnecting attempts");
  216. }
  217. this.clearNodesRefreshInterval();
  218. this.subscriber.stop();
  219. if (this.options.shardedSubscribers) {
  220. this.shardedSubscribers.stop();
  221. }
  222. if (status === "wait") {
  223. this.setStatus("close");
  224. this.handleCloseEvent();
  225. }
  226. else {
  227. this.connectionPool.reset([]);
  228. }
  229. }
  230. /**
  231. * Quit the cluster gracefully.
  232. */
  233. quit(callback) {
  234. const status = this.status;
  235. this.setStatus("disconnecting");
  236. this.manuallyClosing = true;
  237. if (this.reconnectTimeout) {
  238. clearTimeout(this.reconnectTimeout);
  239. this.reconnectTimeout = null;
  240. }
  241. this.clearNodesRefreshInterval();
  242. this.subscriber.stop();
  243. if (this.options.shardedSubscribers) {
  244. this.shardedSubscribers.stop();
  245. }
  246. if (status === "wait") {
  247. const ret = (0, standard_as_callback_1.default)(Promise.resolve("OK"), callback);
  248. // use setImmediate to make sure "close" event
  249. // being emitted after quit() is returned
  250. setImmediate(function () {
  251. this.setStatus("close");
  252. this.handleCloseEvent();
  253. }.bind(this));
  254. return ret;
  255. }
  256. return (0, standard_as_callback_1.default)(Promise.all(this.nodes().map((node) => node.quit().catch((err) => {
  257. // Ignore the error caused by disconnecting since
  258. // we're disconnecting...
  259. if (err.message === utils_1.CONNECTION_CLOSED_ERROR_MSG) {
  260. return "OK";
  261. }
  262. throw err;
  263. }))).then(() => "OK"), callback);
  264. }
  265. /**
  266. * Create a new instance with the same startup nodes and options as the current one.
  267. *
  268. * @example
  269. * ```js
  270. * var cluster = new Redis.Cluster([{ host: "127.0.0.1", port: "30001" }]);
  271. * var anotherCluster = cluster.duplicate();
  272. * ```
  273. */
  274. duplicate(overrideStartupNodes = [], overrideOptions = {}) {
  275. const startupNodes = overrideStartupNodes.length > 0
  276. ? overrideStartupNodes
  277. : this.startupNodes.slice(0);
  278. const options = Object.assign({}, this.options, overrideOptions);
  279. return new Cluster(startupNodes, options);
  280. }
  281. /**
  282. * Get nodes with the specified role
  283. */
  284. nodes(role = "all") {
  285. if (role !== "all" && role !== "master" && role !== "slave") {
  286. throw new Error('Invalid role "' + role + '". Expected "all", "master" or "slave"');
  287. }
  288. return this.connectionPool.getNodes(role);
  289. }
  290. /**
  291. * This is needed in order not to install a listener for each auto pipeline
  292. *
  293. * @ignore
  294. */
  295. delayUntilReady(callback) {
  296. this._readyDelayedCallbacks.push(callback);
  297. }
  298. /**
  299. * Get the number of commands queued in automatic pipelines.
  300. *
  301. * This is not available (and returns 0) until the cluster is connected and slots information have been received.
  302. */
  303. get autoPipelineQueueSize() {
  304. let queued = 0;
  305. for (const pipeline of this._autoPipelines.values()) {
  306. queued += pipeline.length;
  307. }
  308. return queued;
  309. }
  310. /**
  311. * Refresh the slot cache
  312. *
  313. * @ignore
  314. */
  315. refreshSlotsCache(callback) {
  316. if (callback) {
  317. this._refreshSlotsCacheCallbacks.push(callback);
  318. }
  319. if (this.isRefreshing) {
  320. return;
  321. }
  322. this.isRefreshing = true;
  323. const _this = this;
  324. const wrapper = (error) => {
  325. this.isRefreshing = false;
  326. for (const callback of this._refreshSlotsCacheCallbacks) {
  327. callback(error);
  328. }
  329. this._refreshSlotsCacheCallbacks = [];
  330. };
  331. const nodes = (0, utils_1.shuffle)(this.connectionPool.getNodes());
  332. let lastNodeError = null;
  333. function tryNode(index) {
  334. if (index === nodes.length) {
  335. const error = new ClusterAllFailedError_1.default(ClusterAllFailedError_1.default.defaultMessage, lastNodeError);
  336. return wrapper(error);
  337. }
  338. const node = nodes[index];
  339. const key = `${node.options.host}:${node.options.port}`;
  340. debug("getting slot cache from %s", key);
  341. _this.getInfoFromNode(node, function (err) {
  342. switch (_this.status) {
  343. case "close":
  344. case "end":
  345. return wrapper(new Error("Cluster is disconnected."));
  346. case "disconnecting":
  347. return wrapper(new Error("Cluster is disconnecting."));
  348. }
  349. if (err) {
  350. _this.emit("node error", err, key);
  351. lastNodeError = err;
  352. tryNode(index + 1);
  353. }
  354. else {
  355. _this.emit("refresh");
  356. wrapper();
  357. }
  358. });
  359. }
  360. tryNode(0);
  361. }
  362. /**
  363. * @ignore
  364. */
  365. sendCommand(command, stream, node) {
  366. if (this.status === "wait") {
  367. this.connect().catch(utils_1.noop);
  368. }
  369. if (this.status === "end") {
  370. command.reject(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  371. return command.promise;
  372. }
  373. let to = this.options.scaleReads;
  374. if (to !== "master") {
  375. const isCommandReadOnly = command.isReadOnly ||
  376. ((0, commands_1.exists)(command.name) && (0, commands_1.hasFlag)(command.name, "readonly"));
  377. if (!isCommandReadOnly) {
  378. to = "master";
  379. }
  380. }
  381. let targetSlot = node ? node.slot : command.getSlot();
  382. const ttl = {};
  383. const _this = this;
  384. if (!node && !REJECT_OVERWRITTEN_COMMANDS.has(command)) {
  385. REJECT_OVERWRITTEN_COMMANDS.add(command);
  386. const reject = command.reject;
  387. command.reject = function (err) {
  388. const partialTry = tryConnection.bind(null, true);
  389. _this.handleError(err, ttl, {
  390. moved: function (slot, key) {
  391. debug("command %s is moved to %s", command.name, key);
  392. targetSlot = Number(slot);
  393. if (_this.slots[slot]) {
  394. _this.slots[slot][0] = key;
  395. }
  396. else {
  397. _this.slots[slot] = [key];
  398. }
  399. _this._groupsBySlot[slot] =
  400. _this._groupsIds[_this.slots[slot].join(";")];
  401. _this.connectionPool.findOrCreate(_this.natMapper(key));
  402. tryConnection();
  403. debug("refreshing slot caches... (triggered by MOVED error)");
  404. _this.refreshSlotsCache();
  405. },
  406. ask: function (slot, key) {
  407. debug("command %s is required to ask %s:%s", command.name, key);
  408. const mapped = _this.natMapper(key);
  409. _this.connectionPool.findOrCreate(mapped);
  410. tryConnection(false, `${mapped.host}:${mapped.port}`);
  411. },
  412. tryagain: partialTry,
  413. clusterDown: partialTry,
  414. connectionClosed: partialTry,
  415. maxRedirections: function (redirectionError) {
  416. reject.call(command, redirectionError);
  417. },
  418. defaults: function () {
  419. reject.call(command, err);
  420. },
  421. });
  422. };
  423. }
  424. tryConnection();
  425. function tryConnection(random, asking) {
  426. if (_this.status === "end") {
  427. command.reject(new redis_errors_1.AbortError("Cluster is ended."));
  428. return;
  429. }
  430. let redis;
  431. if (_this.status === "ready" || command.name === "cluster") {
  432. if (node && node.redis) {
  433. redis = node.redis;
  434. }
  435. else if (Command_1.default.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
  436. Command_1.default.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)) {
  437. if (_this.options.shardedSubscribers &&
  438. (command.name == "ssubscribe" || command.name == "sunsubscribe")) {
  439. const sub = _this.shardedSubscribers.getResponsibleSubscriber(targetSlot);
  440. if (!sub) {
  441. command.reject(new redis_errors_1.AbortError(`No sharded subscriber for slot: ${targetSlot}`));
  442. return;
  443. }
  444. let status = -1;
  445. if (command.name == "ssubscribe") {
  446. status = _this.shardedSubscribers.addChannels(command.getKeys());
  447. }
  448. if (command.name == "sunsubscribe") {
  449. status = _this.shardedSubscribers.removeChannels(command.getKeys());
  450. }
  451. if (status !== -1) {
  452. redis = sub.getInstance();
  453. }
  454. else {
  455. command.reject(new redis_errors_1.AbortError("Possible CROSSSLOT error: All channels must hash to the same slot"));
  456. }
  457. }
  458. else {
  459. redis = _this.subscriber.getInstance();
  460. }
  461. if (!redis) {
  462. command.reject(new redis_errors_1.AbortError("No subscriber for the cluster"));
  463. return;
  464. }
  465. }
  466. else {
  467. if (!random) {
  468. if (typeof targetSlot === "number" && _this.slots[targetSlot]) {
  469. const nodeKeys = _this.slots[targetSlot];
  470. if (typeof to === "function") {
  471. const nodes = nodeKeys.map(function (key) {
  472. return _this.connectionPool.getInstanceByKey(key);
  473. });
  474. redis = to(nodes, command);
  475. if (Array.isArray(redis)) {
  476. redis = (0, utils_1.sample)(redis);
  477. }
  478. if (!redis) {
  479. redis = nodes[0];
  480. }
  481. }
  482. else {
  483. let key;
  484. if (to === "all") {
  485. key = (0, utils_1.sample)(nodeKeys);
  486. }
  487. else if (to === "slave" && nodeKeys.length > 1) {
  488. key = (0, utils_1.sample)(nodeKeys, 1);
  489. }
  490. else {
  491. key = nodeKeys[0];
  492. }
  493. redis = _this.connectionPool.getInstanceByKey(key);
  494. }
  495. }
  496. if (asking) {
  497. redis = _this.connectionPool.getInstanceByKey(asking);
  498. redis.asking();
  499. }
  500. }
  501. if (!redis) {
  502. redis =
  503. (typeof to === "function"
  504. ? null
  505. : _this.connectionPool.getSampleInstance(to)) ||
  506. _this.connectionPool.getSampleInstance("all");
  507. }
  508. }
  509. if (node && !node.redis) {
  510. node.redis = redis;
  511. }
  512. }
  513. if (redis) {
  514. redis.sendCommand(command, stream);
  515. }
  516. else if (_this.options.enableOfflineQueue) {
  517. _this.offlineQueue.push({
  518. command: command,
  519. stream: stream,
  520. node: node,
  521. });
  522. }
  523. else {
  524. command.reject(new Error("Cluster isn't ready and enableOfflineQueue options is false"));
  525. }
  526. }
  527. return command.promise;
  528. }
  529. sscanStream(key, options) {
  530. return this.createScanStream("sscan", { key, options });
  531. }
  532. sscanBufferStream(key, options) {
  533. return this.createScanStream("sscanBuffer", { key, options });
  534. }
  535. hscanStream(key, options) {
  536. return this.createScanStream("hscan", { key, options });
  537. }
  538. hscanBufferStream(key, options) {
  539. return this.createScanStream("hscanBuffer", { key, options });
  540. }
  541. zscanStream(key, options) {
  542. return this.createScanStream("zscan", { key, options });
  543. }
  544. zscanBufferStream(key, options) {
  545. return this.createScanStream("zscanBuffer", { key, options });
  546. }
  547. /**
  548. * @ignore
  549. */
  550. handleError(error, ttl, handlers) {
  551. if (typeof ttl.value === "undefined") {
  552. ttl.value = this.options.maxRedirections;
  553. }
  554. else {
  555. ttl.value -= 1;
  556. }
  557. if (ttl.value <= 0) {
  558. handlers.maxRedirections(new Error("Too many Cluster redirections. Last error: " + error));
  559. return;
  560. }
  561. const errv = error.message.split(" ");
  562. if (errv[0] === "MOVED") {
  563. const timeout = this.options.retryDelayOnMoved;
  564. if (timeout && typeof timeout === "number") {
  565. this.delayQueue.push("moved", handlers.moved.bind(null, errv[1], errv[2]), { timeout });
  566. }
  567. else {
  568. handlers.moved(errv[1], errv[2]);
  569. }
  570. }
  571. else if (errv[0] === "ASK") {
  572. handlers.ask(errv[1], errv[2]);
  573. }
  574. else if (errv[0] === "TRYAGAIN") {
  575. this.delayQueue.push("tryagain", handlers.tryagain, {
  576. timeout: this.options.retryDelayOnTryAgain,
  577. });
  578. }
  579. else if (errv[0] === "CLUSTERDOWN" &&
  580. this.options.retryDelayOnClusterDown > 0) {
  581. this.delayQueue.push("clusterdown", handlers.connectionClosed, {
  582. timeout: this.options.retryDelayOnClusterDown,
  583. callback: this.refreshSlotsCache.bind(this),
  584. });
  585. }
  586. else if (error.message === utils_1.CONNECTION_CLOSED_ERROR_MSG &&
  587. this.options.retryDelayOnFailover > 0 &&
  588. this.status === "ready") {
  589. this.delayQueue.push("failover", handlers.connectionClosed, {
  590. timeout: this.options.retryDelayOnFailover,
  591. callback: this.refreshSlotsCache.bind(this),
  592. });
  593. }
  594. else {
  595. handlers.defaults();
  596. }
  597. }
  598. resetOfflineQueue() {
  599. this.offlineQueue = new Deque();
  600. }
  601. clearNodesRefreshInterval() {
  602. if (this.slotsTimer) {
  603. clearTimeout(this.slotsTimer);
  604. this.slotsTimer = null;
  605. }
  606. }
  607. resetNodesRefreshInterval() {
  608. if (this.slotsTimer || !this.options.slotsRefreshInterval) {
  609. return;
  610. }
  611. const nextRound = () => {
  612. this.slotsTimer = setTimeout(() => {
  613. debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)');
  614. this.refreshSlotsCache(() => {
  615. nextRound();
  616. });
  617. }, this.options.slotsRefreshInterval);
  618. };
  619. nextRound();
  620. }
  621. /**
  622. * Change cluster instance's status
  623. */
  624. setStatus(status) {
  625. debug("status: %s -> %s", this.status || "[empty]", status);
  626. this.status = status;
  627. process.nextTick(() => {
  628. this.emit(status);
  629. });
  630. }
  631. /**
  632. * Called when closed to check whether a reconnection should be made
  633. */
  634. handleCloseEvent(reason) {
  635. var _a;
  636. if (reason) {
  637. debug("closed because %s", reason);
  638. }
  639. let retryDelay;
  640. if (!this.manuallyClosing &&
  641. typeof this.options.clusterRetryStrategy === "function") {
  642. retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts, reason);
  643. }
  644. if (typeof retryDelay === "number") {
  645. this.setStatus("reconnecting");
  646. this.reconnectTimeout = setTimeout(() => {
  647. this.reconnectTimeout = null;
  648. debug("Cluster is disconnected. Retrying after %dms", retryDelay);
  649. this.connect().catch(function (err) {
  650. debug("Got error %s when reconnecting. Ignoring...", err);
  651. });
  652. }, retryDelay);
  653. }
  654. else {
  655. if (this.options.shardedSubscribers) {
  656. (_a = this.subscriberGroupEmitter) === null || _a === void 0 ? void 0 : _a.removeAllListeners();
  657. }
  658. this.setStatus("end");
  659. this.flushQueue(new Error("None of startup nodes is available"));
  660. }
  661. }
  662. /**
  663. * Flush offline queue with error.
  664. */
  665. flushQueue(error) {
  666. let item;
  667. while ((item = this.offlineQueue.shift())) {
  668. item.command.reject(error);
  669. }
  670. }
  671. executeOfflineCommands() {
  672. if (this.offlineQueue.length) {
  673. debug("send %d commands in offline queue", this.offlineQueue.length);
  674. const offlineQueue = this.offlineQueue;
  675. this.resetOfflineQueue();
  676. let item;
  677. while ((item = offlineQueue.shift())) {
  678. this.sendCommand(item.command, item.stream, item.node);
  679. }
  680. }
  681. }
  682. natMapper(nodeKey) {
  683. const key = typeof nodeKey === "string"
  684. ? nodeKey
  685. : `${nodeKey.host}:${nodeKey.port}`;
  686. let mapped = null;
  687. if (this.options.natMap && typeof this.options.natMap === "function") {
  688. mapped = this.options.natMap(key);
  689. }
  690. else if (this.options.natMap && typeof this.options.natMap === "object") {
  691. mapped = this.options.natMap[key];
  692. }
  693. if (mapped) {
  694. debug("NAT mapping %s -> %O", key, mapped);
  695. return Object.assign({}, mapped);
  696. }
  697. return typeof nodeKey === "string"
  698. ? (0, util_1.nodeKeyToRedisOptions)(nodeKey)
  699. : nodeKey;
  700. }
  701. getInfoFromNode(redis, callback) {
  702. if (!redis) {
  703. return callback(new Error("Node is disconnected"));
  704. }
  705. // Use a duplication of the connection to avoid
  706. // timeouts when the connection is in the blocking
  707. // mode (e.g. waiting for BLPOP).
  708. const duplicatedConnection = redis.duplicate({
  709. enableOfflineQueue: true,
  710. enableReadyCheck: false,
  711. retryStrategy: null,
  712. connectionName: (0, util_1.getConnectionName)("refresher", this.options.redisOptions && this.options.redisOptions.connectionName),
  713. });
  714. // Ignore error events since we will handle
  715. // exceptions for the CLUSTER SLOTS command.
  716. duplicatedConnection.on("error", utils_1.noop);
  717. duplicatedConnection.cluster("SLOTS", (0, utils_1.timeout)((err, result) => {
  718. duplicatedConnection.disconnect();
  719. if (err) {
  720. debug("error encountered running CLUSTER.SLOTS: %s", err);
  721. return callback(err);
  722. }
  723. if (this.status === "disconnecting" ||
  724. this.status === "close" ||
  725. this.status === "end") {
  726. debug("ignore CLUSTER.SLOTS results (count: %d) since cluster status is %s", result.length, this.status);
  727. callback();
  728. return;
  729. }
  730. const nodes = [];
  731. debug("cluster slots result count: %d", result.length);
  732. for (let i = 0; i < result.length; ++i) {
  733. const items = result[i];
  734. const slotRangeStart = items[0];
  735. const slotRangeEnd = items[1];
  736. const keys = [];
  737. for (let j = 2; j < items.length; j++) {
  738. if (!items[j][0]) {
  739. continue;
  740. }
  741. const node = this.natMapper({
  742. host: items[j][0],
  743. port: items[j][1],
  744. });
  745. node.readOnly = j !== 2;
  746. nodes.push(node);
  747. keys.push(node.host + ":" + node.port);
  748. }
  749. debug("cluster slots result [%d]: slots %d~%d served by %s", i, slotRangeStart, slotRangeEnd, keys);
  750. for (let slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
  751. this.slots[slot] = keys;
  752. }
  753. }
  754. // Assign to each node keys a numeric value to make autopipeline comparison faster.
  755. this._groupsIds = Object.create(null);
  756. let j = 0;
  757. for (let i = 0; i < 16384; i++) {
  758. const target = (this.slots[i] || []).join(";");
  759. if (!target.length) {
  760. this._groupsBySlot[i] = undefined;
  761. continue;
  762. }
  763. if (!this._groupsIds[target]) {
  764. this._groupsIds[target] = ++j;
  765. }
  766. this._groupsBySlot[i] = this._groupsIds[target];
  767. }
  768. this.connectionPool.reset(nodes);
  769. if (this.options.shardedSubscribers) {
  770. this.shardedSubscribers
  771. .reset(this.slots, this.connectionPool.getNodes("all"))
  772. .catch((err) => {
  773. // TODO should we emit an error event here?
  774. debug("Error while starting subscribers: %s", err);
  775. });
  776. }
  777. callback();
  778. }, this.options.slotsRefreshTimeout));
  779. }
  780. invokeReadyDelayedCallbacks(err) {
  781. for (const c of this._readyDelayedCallbacks) {
  782. process.nextTick(c, err);
  783. }
  784. this._readyDelayedCallbacks = [];
  785. }
  786. /**
  787. * Check whether Cluster is able to process commands
  788. */
  789. readyCheck(callback) {
  790. this.cluster("INFO", (err, res) => {
  791. if (err) {
  792. return callback(err);
  793. }
  794. if (typeof res !== "string") {
  795. return callback();
  796. }
  797. let state;
  798. const lines = res.split("\r\n");
  799. for (let i = 0; i < lines.length; ++i) {
  800. const parts = lines[i].split(":");
  801. if (parts[0] === "cluster_state") {
  802. state = parts[1];
  803. break;
  804. }
  805. }
  806. if (state === "fail") {
  807. debug("cluster state not ok (%s)", state);
  808. callback(null, state);
  809. }
  810. else {
  811. callback();
  812. }
  813. });
  814. }
  815. resolveSrv(hostname) {
  816. return new Promise((resolve, reject) => {
  817. this.options.resolveSrv(hostname, (err, records) => {
  818. if (err) {
  819. return reject(err);
  820. }
  821. const self = this, groupedRecords = (0, util_1.groupSrvRecords)(records), sortedKeys = Object.keys(groupedRecords).sort((a, b) => parseInt(a) - parseInt(b));
  822. function tryFirstOne(err) {
  823. if (!sortedKeys.length) {
  824. return reject(err);
  825. }
  826. const key = sortedKeys[0], group = groupedRecords[key], record = (0, util_1.weightSrvRecords)(group);
  827. if (!group.records.length) {
  828. sortedKeys.shift();
  829. }
  830. self.dnsLookup(record.name).then((host) => resolve({
  831. host,
  832. port: record.port,
  833. }), tryFirstOne);
  834. }
  835. tryFirstOne();
  836. });
  837. });
  838. }
  839. dnsLookup(hostname) {
  840. return new Promise((resolve, reject) => {
  841. this.options.dnsLookup(hostname, (err, address) => {
  842. if (err) {
  843. debug("failed to resolve hostname %s to IP: %s", hostname, err.message);
  844. reject(err);
  845. }
  846. else {
  847. debug("resolved hostname %s to IP %s", hostname, address);
  848. resolve(address);
  849. }
  850. });
  851. });
  852. }
  853. /**
  854. * Normalize startup nodes, and resolving hostnames to IPs.
  855. *
  856. * This process happens every time when #connect() is called since
  857. * #startupNodes and DNS records may chanage.
  858. */
  859. async resolveStartupNodeHostnames() {
  860. if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
  861. throw new Error("`startupNodes` should contain at least one node.");
  862. }
  863. const startupNodes = (0, util_1.normalizeNodeOptions)(this.startupNodes);
  864. const hostnames = (0, util_1.getUniqueHostnamesFromOptions)(startupNodes);
  865. if (hostnames.length === 0) {
  866. return startupNodes;
  867. }
  868. const configs = await Promise.all(hostnames.map((this.options.useSRVRecords ? this.resolveSrv : this.dnsLookup).bind(this)));
  869. const hostnameToConfig = (0, utils_1.zipMap)(hostnames, configs);
  870. return startupNodes.map((node) => {
  871. const config = hostnameToConfig.get(node.host);
  872. if (!config) {
  873. return node;
  874. }
  875. if (this.options.useSRVRecords) {
  876. return Object.assign({}, node, config);
  877. }
  878. return Object.assign({}, node, { host: config });
  879. });
  880. }
  881. createScanStream(command, { key, options = {} }) {
  882. return new ScanStream_1.default({
  883. objectMode: true,
  884. key: key,
  885. redis: this,
  886. command: command,
  887. ...options,
  888. });
  889. }
  890. createShardedSubscriberGroup() {
  891. this.subscriberGroupEmitter = new events_1.EventEmitter();
  892. this.shardedSubscribers = new ClusterSubscriberGroup_1.default(this.subscriberGroupEmitter, this.options);
  893. // Error handler used only for sharded-subscriber-triggered slots cache refreshes.
  894. // Normal (non-subscriber) connections are created with lazyConnect: true and can
  895. // become zombied. For sharded subscribers, a ClusterAllFailedError means
  896. // we have lost all nodes from the subscriber perspective and must tear down.
  897. const refreshSlotsCacheCallback = (err) => {
  898. // Disconnect only when refreshing the slots cache fails with ClusterAllFailedError
  899. if (err instanceof ClusterAllFailedError_1.default) {
  900. this.disconnect(true);
  901. }
  902. };
  903. this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => {
  904. this.emit("-node", redis, nodeKey);
  905. this.refreshSlotsCache(refreshSlotsCacheCallback);
  906. });
  907. this.subscriberGroupEmitter.on("subscriberConnectFailed", ({ delay, error }) => {
  908. this.emit("error", error);
  909. setTimeout(() => {
  910. this.refreshSlotsCache(refreshSlotsCacheCallback);
  911. }, delay);
  912. });
  913. this.subscriberGroupEmitter.on("moved", () => {
  914. this.refreshSlotsCache(refreshSlotsCacheCallback);
  915. });
  916. this.subscriberGroupEmitter.on("-subscriber", () => {
  917. this.emit("-subscriber");
  918. });
  919. this.subscriberGroupEmitter.on("+subscriber", () => {
  920. this.emit("+subscriber");
  921. });
  922. this.subscriberGroupEmitter.on("nodeError", (error, nodeKey) => {
  923. this.emit("nodeError", error, nodeKey);
  924. });
  925. this.subscriberGroupEmitter.on("subscribersReady", () => {
  926. this.emit("subscribersReady");
  927. });
  928. for (const event of ["smessage", "smessageBuffer"]) {
  929. this.subscriberGroupEmitter.on(event, (arg1, arg2, arg3) => {
  930. this.emit(event, arg1, arg2, arg3);
  931. });
  932. }
  933. }
  934. }
  935. (0, applyMixin_1.default)(Cluster, events_1.EventEmitter);
  936. (0, transaction_1.addTransactionSupport)(Cluster.prototype);
  937. exports.default = Cluster;