redis-connection.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RedisConnection = void 0;
  4. const tslib_1 = require("tslib");
  5. const events_1 = require("events");
  6. const ioredis_1 = require("ioredis");
  7. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  8. // @ts-ignore
  9. const utils_1 = require("ioredis/built/utils");
  10. const utils_2 = require("../utils");
  11. const version_1 = require("../version");
  12. const scripts = require("../scripts");
  13. const overrideMessage = [
  14. 'BullMQ: WARNING! Your redis options maxRetriesPerRequest must be null',
  15. 'and will be overridden by BullMQ.',
  16. ].join(' ');
  17. const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.';
  18. class RedisConnection extends events_1.EventEmitter {
  19. constructor(opts, extraOptions) {
  20. super();
  21. this.extraOptions = extraOptions;
  22. this.capabilities = {
  23. canDoubleTimeout: false,
  24. canBlockFor1Ms: true,
  25. };
  26. this.status = 'initializing';
  27. this.dbType = 'redis';
  28. this.packageVersion = version_1.version;
  29. // Set extra options defaults
  30. this.extraOptions = Object.assign({ shared: false, blocking: true, skipVersionCheck: false, skipWaitingForReady: false }, extraOptions);
  31. if (!(0, utils_2.isRedisInstance)(opts)) {
  32. this.checkBlockingOptions(overrideMessage, opts);
  33. this.opts = Object.assign({ port: 6379, host: '127.0.0.1', retryStrategy: function (times) {
  34. return Math.max(Math.min(Math.exp(times), 20000), 1000);
  35. } }, opts);
  36. if (this.extraOptions.blocking) {
  37. this.opts.maxRetriesPerRequest = null;
  38. }
  39. }
  40. else {
  41. this._client = opts;
  42. // Test if the redis instance is using keyPrefix
  43. // and if so, throw an error.
  44. if (this._client.options.keyPrefix) {
  45. throw new Error('BullMQ: ioredis does not support ioredis prefixes, use the prefix option instead.');
  46. }
  47. if ((0, utils_2.isRedisCluster)(this._client)) {
  48. this.opts = this._client.options.redisOptions;
  49. }
  50. else {
  51. this.opts = this._client.options;
  52. }
  53. this.checkBlockingOptions(deprecationMessage, this.opts, true);
  54. }
  55. this.skipVersionCheck =
  56. (extraOptions === null || extraOptions === void 0 ? void 0 : extraOptions.skipVersionCheck) ||
  57. !!(this.opts && this.opts.skipVersionCheck);
  58. this.handleClientError = (err) => {
  59. this.emit('error', err);
  60. };
  61. this.handleClientClose = () => {
  62. this.emit('close');
  63. };
  64. this.handleClientReady = () => {
  65. this.emit('ready');
  66. };
  67. this.initializing = this.init();
  68. this.initializing.catch(err => this.emit('error', err));
  69. }
  70. checkBlockingOptions(msg, options, throwError = false) {
  71. if (this.extraOptions.blocking && options && options.maxRetriesPerRequest) {
  72. if (throwError) {
  73. throw new Error(msg);
  74. }
  75. else {
  76. console.error(msg);
  77. }
  78. }
  79. }
  80. /**
  81. * Waits for a redis client to be ready.
  82. * @param redis - client
  83. */
  84. static async waitUntilReady(client) {
  85. if (client.status === 'ready') {
  86. return;
  87. }
  88. if (client.status === 'wait') {
  89. return client.connect();
  90. }
  91. if (client.status === 'end') {
  92. throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
  93. }
  94. let handleReady;
  95. let handleEnd;
  96. let handleError;
  97. try {
  98. await new Promise((resolve, reject) => {
  99. let lastError;
  100. handleError = (err) => {
  101. lastError = err;
  102. };
  103. handleReady = () => {
  104. resolve();
  105. };
  106. handleEnd = () => {
  107. if (client.status !== 'end') {
  108. reject(lastError || new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  109. }
  110. else {
  111. if (lastError) {
  112. reject(lastError);
  113. }
  114. else {
  115. // when custom 'end' status is set we already closed
  116. resolve();
  117. }
  118. }
  119. };
  120. (0, utils_2.increaseMaxListeners)(client, 3);
  121. client.once('ready', handleReady);
  122. client.on('end', handleEnd);
  123. client.once('error', handleError);
  124. });
  125. }
  126. finally {
  127. client.removeListener('end', handleEnd);
  128. client.removeListener('error', handleError);
  129. client.removeListener('ready', handleReady);
  130. (0, utils_2.decreaseMaxListeners)(client, 3);
  131. }
  132. }
  133. get client() {
  134. return this.initializing;
  135. }
  136. loadCommands(packageVersion, providedScripts) {
  137. const finalScripts = providedScripts || scripts;
  138. for (const property in finalScripts) {
  139. // Only define the command if not already defined
  140. const commandName = `${finalScripts[property].name}:${packageVersion}`;
  141. if (!this._client[commandName]) {
  142. this._client.defineCommand(commandName, {
  143. numberOfKeys: finalScripts[property].keys,
  144. lua: finalScripts[property].content,
  145. });
  146. }
  147. }
  148. }
  149. async init() {
  150. if (!this._client) {
  151. const _a = this.opts, { url } = _a, rest = tslib_1.__rest(_a, ["url"]);
  152. this._client = url ? new ioredis_1.default(url, rest) : new ioredis_1.default(rest);
  153. }
  154. (0, utils_2.increaseMaxListeners)(this._client, 3);
  155. this._client.on('error', this.handleClientError);
  156. // ioredis treats connection errors as a different event ('close')
  157. this._client.on('close', this.handleClientClose);
  158. this._client.on('ready', this.handleClientReady);
  159. if (!this.extraOptions.skipWaitingForReady) {
  160. await RedisConnection.waitUntilReady(this._client);
  161. }
  162. this.loadCommands(this.packageVersion);
  163. if (this._client['status'] !== 'end') {
  164. const versionResult = await this.getRedisVersionAndType();
  165. this.version = versionResult.version;
  166. this.dbType = versionResult.databaseType;
  167. if (this.skipVersionCheck !== true && !this.closing) {
  168. if ((0, utils_2.isRedisVersionLowerThan)(this.version, RedisConnection.minimumVersion, this.dbType)) {
  169. throw new Error(`Redis version needs to be greater or equal than ${RedisConnection.minimumVersion} ` +
  170. `Current: ${this.version}`);
  171. }
  172. if ((0, utils_2.isRedisVersionLowerThan)(this.version, RedisConnection.recommendedMinimumVersion, this.dbType)) {
  173. console.warn(`It is highly recommended to use a minimum Redis version of ${RedisConnection.recommendedMinimumVersion}
  174. Current: ${this.version}`);
  175. }
  176. }
  177. this.capabilities = {
  178. canDoubleTimeout: !(0, utils_2.isRedisVersionLowerThan)(this.version, '6.0.0', this.dbType),
  179. canBlockFor1Ms: !(0, utils_2.isRedisVersionLowerThan)(this.version, '7.0.8', this.dbType),
  180. };
  181. this.status = 'ready';
  182. }
  183. return this._client;
  184. }
  185. async disconnect(wait = true) {
  186. const client = await this.client;
  187. if (client.status !== 'end') {
  188. let _resolve, _reject;
  189. if (!wait) {
  190. return client.disconnect();
  191. }
  192. const disconnecting = new Promise((resolve, reject) => {
  193. (0, utils_2.increaseMaxListeners)(client, 2);
  194. client.once('end', resolve);
  195. client.once('error', reject);
  196. _resolve = resolve;
  197. _reject = reject;
  198. });
  199. client.disconnect();
  200. try {
  201. await disconnecting;
  202. }
  203. finally {
  204. (0, utils_2.decreaseMaxListeners)(client, 2);
  205. client.removeListener('end', _resolve);
  206. client.removeListener('error', _reject);
  207. }
  208. }
  209. }
  210. async reconnect() {
  211. const client = await this.client;
  212. return client.connect();
  213. }
  214. async close(force = false) {
  215. if (!this.closing) {
  216. const status = this.status;
  217. this.status = 'closing';
  218. this.closing = true;
  219. try {
  220. if (status === 'ready') {
  221. // Not sure if we need to wait for this
  222. await this.initializing;
  223. }
  224. if (!this.extraOptions.shared) {
  225. if (status == 'initializing' || force) {
  226. // If we have not still connected to Redis, we need to disconnect.
  227. this._client.disconnect();
  228. }
  229. else {
  230. await this._client.quit();
  231. }
  232. // As IORedis does not update this status properly, we do it ourselves.
  233. this._client['status'] = 'end';
  234. }
  235. }
  236. catch (error) {
  237. if ((0, utils_2.isNotConnectionError)(error)) {
  238. throw error;
  239. }
  240. }
  241. finally {
  242. this._client.off('error', this.handleClientError);
  243. this._client.off('close', this.handleClientClose);
  244. this._client.off('ready', this.handleClientReady);
  245. (0, utils_2.decreaseMaxListeners)(this._client, 3);
  246. this.removeAllListeners();
  247. this.status = 'closed';
  248. }
  249. }
  250. }
  251. async getRedisVersionAndType() {
  252. if (this.skipVersionCheck) {
  253. return {
  254. version: RedisConnection.minimumVersion,
  255. databaseType: 'redis',
  256. };
  257. }
  258. const doc = await this._client.info();
  259. const redisPrefix = 'redis_version:';
  260. const maxMemoryPolicyPrefix = 'maxmemory_policy:';
  261. const lines = doc.split(/\r?\n/);
  262. let redisVersion;
  263. let databaseType = 'redis';
  264. // Detect database type from server info
  265. for (let i = 0; i < lines.length; i++) {
  266. const line = lines[i];
  267. // Check for Dragonfly
  268. if (line.includes('dragonfly_version:') ||
  269. line.includes('server:Dragonfly')) {
  270. databaseType = 'dragonfly';
  271. // For Dragonfly, extract version from dragonfly_version field
  272. if (line.indexOf('dragonfly_version:') === 0) {
  273. redisVersion = line.substr('dragonfly_version:'.length);
  274. }
  275. }
  276. // Check for Valkey
  277. else if (line.includes('valkey_version:') ||
  278. line.includes('server:Valkey')) {
  279. databaseType = 'valkey';
  280. // For Valkey, extract version from valkey_version field
  281. if (line.indexOf('valkey_version:') === 0) {
  282. redisVersion = line.substr('valkey_version:'.length);
  283. }
  284. }
  285. // Standard Redis version detection
  286. else if (line.indexOf(redisPrefix) === 0) {
  287. redisVersion = line.substr(redisPrefix.length);
  288. // Keep Redis as default unless we find evidence of other databases above
  289. if (databaseType === 'redis') {
  290. databaseType = 'redis';
  291. }
  292. }
  293. if (line.indexOf(maxMemoryPolicyPrefix) === 0) {
  294. const maxMemoryPolicy = line.substr(maxMemoryPolicyPrefix.length);
  295. if (maxMemoryPolicy !== 'noeviction') {
  296. console.warn(`IMPORTANT! Eviction policy is ${maxMemoryPolicy}. It should be "noeviction"`);
  297. }
  298. }
  299. }
  300. // Fallback version detection if specific database version field wasn't found
  301. if (!redisVersion) {
  302. // Try to find any version field as fallback
  303. for (const line of lines) {
  304. if (line.includes('version:')) {
  305. const parts = line.split(':');
  306. if (parts.length >= 2) {
  307. redisVersion = parts[1];
  308. break;
  309. }
  310. }
  311. }
  312. }
  313. return {
  314. version: redisVersion || RedisConnection.minimumVersion,
  315. databaseType,
  316. };
  317. }
  318. get redisVersion() {
  319. return this.version;
  320. }
  321. get databaseType() {
  322. return this.dbType;
  323. }
  324. }
  325. exports.RedisConnection = RedisConnection;
  326. RedisConnection.minimumVersion = '5.0.0';
  327. RedisConnection.recommendedMinimumVersion = '6.2.0';
  328. //# sourceMappingURL=redis-connection.js.map