event_handler.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.readyHandler = exports.errorHandler = exports.closeHandler = exports.connectHandler = void 0;
  4. const redis_errors_1 = require("redis-errors");
  5. const Command_1 = require("../Command");
  6. const errors_1 = require("../errors");
  7. const utils_1 = require("../utils");
  8. const DataHandler_1 = require("../DataHandler");
  9. const debug = (0, utils_1.Debug)("connection");
  10. function connectHandler(self) {
  11. return function () {
  12. var _a;
  13. self.setStatus("connect");
  14. self.resetCommandQueue();
  15. // AUTH command should be processed before any other commands
  16. let flushed = false;
  17. const { connectionEpoch } = self;
  18. if (self.condition.auth) {
  19. self.auth(self.condition.auth, function (err) {
  20. if (connectionEpoch !== self.connectionEpoch) {
  21. return;
  22. }
  23. if (err) {
  24. if (err.message.indexOf("no password is set") !== -1) {
  25. console.warn("[WARN] Redis server does not require a password, but a password was supplied.");
  26. }
  27. else if (err.message.indexOf("without any password configured for the default user") !== -1) {
  28. console.warn("[WARN] This Redis server's `default` user does not require a password, but a password was supplied");
  29. }
  30. else if (err.message.indexOf("wrong number of arguments for 'auth' command") !== -1) {
  31. console.warn(`[ERROR] The server returned "wrong number of arguments for 'auth' command". You are probably passing both username and password to Redis version 5 or below. You should only pass the 'password' option for Redis version 5 and under.`);
  32. }
  33. else {
  34. flushed = true;
  35. self.recoverFromFatalError(err, err);
  36. }
  37. }
  38. });
  39. }
  40. if (self.condition.select) {
  41. self.select(self.condition.select).catch((err) => {
  42. // If the node is in cluster mode, select is disallowed.
  43. // In this case, reconnect won't help.
  44. self.silentEmit("error", err);
  45. });
  46. }
  47. /*
  48. No need to keep the reference of DataHandler here
  49. because we don't need to do the cleanup.
  50. `Stream#end()` will remove all listeners for us.
  51. */
  52. new DataHandler_1.default(self, {
  53. stringNumbers: self.options.stringNumbers,
  54. });
  55. const clientCommandPromises = [];
  56. if (self.options.connectionName) {
  57. debug("set the connection name [%s]", self.options.connectionName);
  58. clientCommandPromises.push(self.client("setname", self.options.connectionName).catch(utils_1.noop));
  59. }
  60. if (!self.options.disableClientInfo) {
  61. debug("set the client info");
  62. clientCommandPromises.push((0, utils_1.getPackageMeta)()
  63. .then((packageMeta) => {
  64. return self
  65. .client("SETINFO", "LIB-VER", packageMeta.version)
  66. .catch(utils_1.noop);
  67. })
  68. .catch(utils_1.noop));
  69. clientCommandPromises.push(self
  70. .client("SETINFO", "LIB-NAME", ((_a = self.options) === null || _a === void 0 ? void 0 : _a.clientInfoTag)
  71. ? `ioredis(${self.options.clientInfoTag})`
  72. : "ioredis")
  73. .catch(utils_1.noop));
  74. }
  75. Promise.all(clientCommandPromises)
  76. .catch(utils_1.noop)
  77. .finally(() => {
  78. if (!self.options.enableReadyCheck) {
  79. exports.readyHandler(self)();
  80. }
  81. if (self.options.enableReadyCheck) {
  82. self._readyCheck(function (err, info) {
  83. if (connectionEpoch !== self.connectionEpoch) {
  84. return;
  85. }
  86. if (err) {
  87. if (!flushed) {
  88. self.recoverFromFatalError(new Error("Ready check failed: " + err.message), err);
  89. }
  90. }
  91. else {
  92. if (self.connector.check(info)) {
  93. exports.readyHandler(self)();
  94. }
  95. else {
  96. self.disconnect(true);
  97. }
  98. }
  99. });
  100. }
  101. });
  102. };
  103. }
  104. exports.connectHandler = connectHandler;
  105. function abortError(command) {
  106. const err = new redis_errors_1.AbortError("Command aborted due to connection close");
  107. err.command = {
  108. name: command.name,
  109. args: command.args,
  110. };
  111. return err;
  112. }
  113. // If a contiguous set of pipeline commands starts from index zero then they
  114. // can be safely reattempted. If however we have a chain of pipelined commands
  115. // starting at index 1 or more it means we received a partial response before
  116. // the connection close and those pipelined commands must be aborted. For
  117. // example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
  118. // aborting and purging we'll have a queue that looks like this: [0, 1, 2]
  119. function abortIncompletePipelines(commandQueue) {
  120. var _a;
  121. let expectedIndex = 0;
  122. for (let i = 0; i < commandQueue.length;) {
  123. const command = (_a = commandQueue.peekAt(i)) === null || _a === void 0 ? void 0 : _a.command;
  124. const pipelineIndex = command.pipelineIndex;
  125. if (pipelineIndex === undefined || pipelineIndex === 0) {
  126. expectedIndex = 0;
  127. }
  128. if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
  129. commandQueue.remove(i, 1);
  130. command.reject(abortError(command));
  131. continue;
  132. }
  133. i++;
  134. }
  135. }
  136. // If only a partial transaction result was received before connection close,
  137. // we have to abort any transaction fragments that may have ended up in the
  138. // offline queue
  139. function abortTransactionFragments(commandQueue) {
  140. var _a;
  141. for (let i = 0; i < commandQueue.length;) {
  142. const command = (_a = commandQueue.peekAt(i)) === null || _a === void 0 ? void 0 : _a.command;
  143. if (command.name === "multi") {
  144. break;
  145. }
  146. if (command.name === "exec") {
  147. commandQueue.remove(i, 1);
  148. command.reject(abortError(command));
  149. break;
  150. }
  151. if (command.inTransaction) {
  152. commandQueue.remove(i, 1);
  153. command.reject(abortError(command));
  154. }
  155. else {
  156. i++;
  157. }
  158. }
  159. }
  160. function closeHandler(self) {
  161. return function () {
  162. const prevStatus = self.status;
  163. self.setStatus("close");
  164. if (self.commandQueue.length) {
  165. abortIncompletePipelines(self.commandQueue);
  166. }
  167. if (self.offlineQueue.length) {
  168. abortTransactionFragments(self.offlineQueue);
  169. }
  170. if (prevStatus === "ready") {
  171. if (!self.prevCondition) {
  172. self.prevCondition = self.condition;
  173. }
  174. if (self.commandQueue.length) {
  175. self.prevCommandQueue = self.commandQueue;
  176. }
  177. }
  178. if (self.manuallyClosing) {
  179. self.manuallyClosing = false;
  180. debug("skip reconnecting since the connection is manually closed.");
  181. return close();
  182. }
  183. if (typeof self.options.retryStrategy !== "function") {
  184. debug("skip reconnecting because `retryStrategy` is not a function");
  185. return close();
  186. }
  187. const retryDelay = self.options.retryStrategy(++self.retryAttempts);
  188. if (typeof retryDelay !== "number") {
  189. debug("skip reconnecting because `retryStrategy` doesn't return a number");
  190. return close();
  191. }
  192. debug("reconnect in %sms", retryDelay);
  193. self.setStatus("reconnecting", retryDelay);
  194. self.reconnectTimeout = setTimeout(function () {
  195. self.reconnectTimeout = null;
  196. self.connect().catch(utils_1.noop);
  197. }, retryDelay);
  198. const { maxRetriesPerRequest } = self.options;
  199. if (typeof maxRetriesPerRequest === "number") {
  200. if (maxRetriesPerRequest < 0) {
  201. debug("maxRetriesPerRequest is negative, ignoring...");
  202. }
  203. else {
  204. const remainder = self.retryAttempts % (maxRetriesPerRequest + 1);
  205. if (remainder === 0) {
  206. debug("reach maxRetriesPerRequest limitation, flushing command queue...");
  207. self.flushQueue(new errors_1.MaxRetriesPerRequestError(maxRetriesPerRequest));
  208. }
  209. }
  210. }
  211. };
  212. function close() {
  213. self.setStatus("end");
  214. self.flushQueue(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  215. }
  216. }
  217. exports.closeHandler = closeHandler;
  218. function errorHandler(self) {
  219. return function (error) {
  220. debug("error: %s", error);
  221. self.silentEmit("error", error);
  222. };
  223. }
  224. exports.errorHandler = errorHandler;
  225. function readyHandler(self) {
  226. return function () {
  227. self.setStatus("ready");
  228. self.retryAttempts = 0;
  229. if (self.options.monitor) {
  230. self.call("monitor").then(() => self.setStatus("monitoring"), (error) => self.emit("error", error));
  231. const { sendCommand } = self;
  232. self.sendCommand = function (command) {
  233. if (Command_1.default.checkFlag("VALID_IN_MONITOR_MODE", command.name)) {
  234. return sendCommand.call(self, command);
  235. }
  236. command.reject(new Error("Connection is in monitoring mode, can't process commands."));
  237. return command.promise;
  238. };
  239. self.once("close", function () {
  240. delete self.sendCommand;
  241. });
  242. return;
  243. }
  244. const finalSelect = self.prevCondition
  245. ? self.prevCondition.select
  246. : self.condition.select;
  247. if (self.options.readOnly) {
  248. debug("set the connection to readonly mode");
  249. self.readonly().catch(utils_1.noop);
  250. }
  251. if (self.prevCondition) {
  252. const condition = self.prevCondition;
  253. self.prevCondition = null;
  254. if (condition.subscriber && self.options.autoResubscribe) {
  255. // We re-select the previous db first since
  256. // `SELECT` command is not valid in sub mode.
  257. if (self.condition.select !== finalSelect) {
  258. debug("connect to db [%d]", finalSelect);
  259. self.select(finalSelect);
  260. }
  261. const subscribeChannels = condition.subscriber.channels("subscribe");
  262. if (subscribeChannels.length) {
  263. debug("subscribe %d channels", subscribeChannels.length);
  264. self.subscribe(subscribeChannels);
  265. }
  266. const psubscribeChannels = condition.subscriber.channels("psubscribe");
  267. if (psubscribeChannels.length) {
  268. debug("psubscribe %d channels", psubscribeChannels.length);
  269. self.psubscribe(psubscribeChannels);
  270. }
  271. const ssubscribeChannels = condition.subscriber.channels("ssubscribe");
  272. if (ssubscribeChannels.length) {
  273. debug("ssubscribe %s", ssubscribeChannels.length);
  274. for (const channel of ssubscribeChannels) {
  275. self.ssubscribe(channel);
  276. }
  277. }
  278. }
  279. }
  280. if (self.prevCommandQueue) {
  281. if (self.options.autoResendUnfulfilledCommands) {
  282. debug("resend %d unfulfilled commands", self.prevCommandQueue.length);
  283. while (self.prevCommandQueue.length > 0) {
  284. const item = self.prevCommandQueue.shift();
  285. if (item.select !== self.condition.select &&
  286. item.command.name !== "select") {
  287. self.select(item.select);
  288. }
  289. self.sendCommand(item.command, item.stream);
  290. }
  291. }
  292. else {
  293. self.prevCommandQueue = null;
  294. }
  295. }
  296. if (self.offlineQueue.length) {
  297. debug("send %d commands in offline queue", self.offlineQueue.length);
  298. const offlineQueue = self.offlineQueue;
  299. self.resetOfflineQueue();
  300. while (offlineQueue.length > 0) {
  301. const item = offlineQueue.shift();
  302. if (item.select !== self.condition.select &&
  303. item.command.name !== "select") {
  304. self.select(item.select);
  305. }
  306. self.sendCommand(item.command, item.stream);
  307. }
  308. }
  309. if (self.condition.select !== finalSelect) {
  310. debug("connect to db [%d]", finalSelect);
  311. self.select(finalSelect);
  312. }
  313. };
  314. }
  315. exports.readyHandler = readyHandler;