redis-connection.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. import { __rest } from "tslib";
  2. import { EventEmitter } from 'events';
  3. import { default as IORedis } from 'ioredis';
  4. // eslint-disable-next-line @typescript-eslint/ban-ts-comment
  5. // @ts-ignore
  6. import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils';
  7. import { decreaseMaxListeners, increaseMaxListeners, isNotConnectionError, isRedisCluster, isRedisInstance, isRedisVersionLowerThan, } from '../utils';
  8. import { version as packageVersion } from '../version';
  9. import * as scripts from '../scripts';
  10. const overrideMessage = [
  11. 'BullMQ: WARNING! Your redis options maxRetriesPerRequest must be null',
  12. 'and will be overridden by BullMQ.',
  13. ].join(' ');
  14. const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.';
  15. export class RedisConnection extends EventEmitter {
  16. constructor(opts, extraOptions) {
  17. super();
  18. this.extraOptions = extraOptions;
  19. this.capabilities = {
  20. canDoubleTimeout: false,
  21. canBlockFor1Ms: true,
  22. };
  23. this.status = 'initializing';
  24. this.dbType = 'redis';
  25. this.packageVersion = packageVersion;
  26. // Set extra options defaults
  27. this.extraOptions = Object.assign({ shared: false, blocking: true, skipVersionCheck: false, skipWaitingForReady: false }, extraOptions);
  28. if (!isRedisInstance(opts)) {
  29. this.checkBlockingOptions(overrideMessage, opts);
  30. this.opts = Object.assign({ port: 6379, host: '127.0.0.1', retryStrategy: function (times) {
  31. return Math.max(Math.min(Math.exp(times), 20000), 1000);
  32. } }, opts);
  33. if (this.extraOptions.blocking) {
  34. this.opts.maxRetriesPerRequest = null;
  35. }
  36. }
  37. else {
  38. this._client = opts;
  39. // Test if the redis instance is using keyPrefix
  40. // and if so, throw an error.
  41. if (this._client.options.keyPrefix) {
  42. throw new Error('BullMQ: ioredis does not support ioredis prefixes, use the prefix option instead.');
  43. }
  44. if (isRedisCluster(this._client)) {
  45. this.opts = this._client.options.redisOptions;
  46. }
  47. else {
  48. this.opts = this._client.options;
  49. }
  50. this.checkBlockingOptions(deprecationMessage, this.opts, true);
  51. }
  52. this.skipVersionCheck =
  53. (extraOptions === null || extraOptions === void 0 ? void 0 : extraOptions.skipVersionCheck) ||
  54. !!(this.opts && this.opts.skipVersionCheck);
  55. this.handleClientError = (err) => {
  56. this.emit('error', err);
  57. };
  58. this.handleClientClose = () => {
  59. this.emit('close');
  60. };
  61. this.handleClientReady = () => {
  62. this.emit('ready');
  63. };
  64. this.initializing = this.init();
  65. this.initializing.catch(err => this.emit('error', err));
  66. }
  67. checkBlockingOptions(msg, options, throwError = false) {
  68. if (this.extraOptions.blocking && options && options.maxRetriesPerRequest) {
  69. if (throwError) {
  70. throw new Error(msg);
  71. }
  72. else {
  73. console.error(msg);
  74. }
  75. }
  76. }
  77. /**
  78. * Waits for a redis client to be ready.
  79. * @param redis - client
  80. */
  81. static async waitUntilReady(client) {
  82. if (client.status === 'ready') {
  83. return;
  84. }
  85. if (client.status === 'wait') {
  86. return client.connect();
  87. }
  88. if (client.status === 'end') {
  89. throw new Error(CONNECTION_CLOSED_ERROR_MSG);
  90. }
  91. let handleReady;
  92. let handleEnd;
  93. let handleError;
  94. try {
  95. await new Promise((resolve, reject) => {
  96. let lastError;
  97. handleError = (err) => {
  98. lastError = err;
  99. };
  100. handleReady = () => {
  101. resolve();
  102. };
  103. handleEnd = () => {
  104. if (client.status !== 'end') {
  105. reject(lastError || new Error(CONNECTION_CLOSED_ERROR_MSG));
  106. }
  107. else {
  108. if (lastError) {
  109. reject(lastError);
  110. }
  111. else {
  112. // when custom 'end' status is set we already closed
  113. resolve();
  114. }
  115. }
  116. };
  117. increaseMaxListeners(client, 3);
  118. client.once('ready', handleReady);
  119. client.on('end', handleEnd);
  120. client.once('error', handleError);
  121. });
  122. }
  123. finally {
  124. client.removeListener('end', handleEnd);
  125. client.removeListener('error', handleError);
  126. client.removeListener('ready', handleReady);
  127. decreaseMaxListeners(client, 3);
  128. }
  129. }
  130. get client() {
  131. return this.initializing;
  132. }
  133. loadCommands(packageVersion, providedScripts) {
  134. const finalScripts = providedScripts || scripts;
  135. for (const property in finalScripts) {
  136. // Only define the command if not already defined
  137. const commandName = `${finalScripts[property].name}:${packageVersion}`;
  138. if (!this._client[commandName]) {
  139. this._client.defineCommand(commandName, {
  140. numberOfKeys: finalScripts[property].keys,
  141. lua: finalScripts[property].content,
  142. });
  143. }
  144. }
  145. }
  146. async init() {
  147. if (!this._client) {
  148. const _a = this.opts, { url } = _a, rest = __rest(_a, ["url"]);
  149. this._client = url ? new IORedis(url, rest) : new IORedis(rest);
  150. }
  151. increaseMaxListeners(this._client, 3);
  152. this._client.on('error', this.handleClientError);
  153. // ioredis treats connection errors as a different event ('close')
  154. this._client.on('close', this.handleClientClose);
  155. this._client.on('ready', this.handleClientReady);
  156. if (!this.extraOptions.skipWaitingForReady) {
  157. await RedisConnection.waitUntilReady(this._client);
  158. }
  159. this.loadCommands(this.packageVersion);
  160. if (this._client['status'] !== 'end') {
  161. const versionResult = await this.getRedisVersionAndType();
  162. this.version = versionResult.version;
  163. this.dbType = versionResult.databaseType;
  164. if (this.skipVersionCheck !== true && !this.closing) {
  165. if (isRedisVersionLowerThan(this.version, RedisConnection.minimumVersion, this.dbType)) {
  166. throw new Error(`Redis version needs to be greater or equal than ${RedisConnection.minimumVersion} ` +
  167. `Current: ${this.version}`);
  168. }
  169. if (isRedisVersionLowerThan(this.version, RedisConnection.recommendedMinimumVersion, this.dbType)) {
  170. console.warn(`It is highly recommended to use a minimum Redis version of ${RedisConnection.recommendedMinimumVersion}
  171. Current: ${this.version}`);
  172. }
  173. }
  174. this.capabilities = {
  175. canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0', this.dbType),
  176. canBlockFor1Ms: !isRedisVersionLowerThan(this.version, '7.0.8', this.dbType),
  177. };
  178. this.status = 'ready';
  179. }
  180. return this._client;
  181. }
  182. async disconnect(wait = true) {
  183. const client = await this.client;
  184. if (client.status !== 'end') {
  185. let _resolve, _reject;
  186. if (!wait) {
  187. return client.disconnect();
  188. }
  189. const disconnecting = new Promise((resolve, reject) => {
  190. increaseMaxListeners(client, 2);
  191. client.once('end', resolve);
  192. client.once('error', reject);
  193. _resolve = resolve;
  194. _reject = reject;
  195. });
  196. client.disconnect();
  197. try {
  198. await disconnecting;
  199. }
  200. finally {
  201. decreaseMaxListeners(client, 2);
  202. client.removeListener('end', _resolve);
  203. client.removeListener('error', _reject);
  204. }
  205. }
  206. }
  207. async reconnect() {
  208. const client = await this.client;
  209. return client.connect();
  210. }
  211. async close(force = false) {
  212. if (!this.closing) {
  213. const status = this.status;
  214. this.status = 'closing';
  215. this.closing = true;
  216. try {
  217. if (status === 'ready') {
  218. // Not sure if we need to wait for this
  219. await this.initializing;
  220. }
  221. if (!this.extraOptions.shared) {
  222. if (status == 'initializing' || force) {
  223. // If we have not still connected to Redis, we need to disconnect.
  224. this._client.disconnect();
  225. }
  226. else {
  227. await this._client.quit();
  228. }
  229. // As IORedis does not update this status properly, we do it ourselves.
  230. this._client['status'] = 'end';
  231. }
  232. }
  233. catch (error) {
  234. if (isNotConnectionError(error)) {
  235. throw error;
  236. }
  237. }
  238. finally {
  239. this._client.off('error', this.handleClientError);
  240. this._client.off('close', this.handleClientClose);
  241. this._client.off('ready', this.handleClientReady);
  242. decreaseMaxListeners(this._client, 3);
  243. this.removeAllListeners();
  244. this.status = 'closed';
  245. }
  246. }
  247. }
  248. async getRedisVersionAndType() {
  249. if (this.skipVersionCheck) {
  250. return {
  251. version: RedisConnection.minimumVersion,
  252. databaseType: 'redis',
  253. };
  254. }
  255. const doc = await this._client.info();
  256. const redisPrefix = 'redis_version:';
  257. const maxMemoryPolicyPrefix = 'maxmemory_policy:';
  258. const lines = doc.split(/\r?\n/);
  259. let redisVersion;
  260. let databaseType = 'redis';
  261. // Detect database type from server info
  262. for (let i = 0; i < lines.length; i++) {
  263. const line = lines[i];
  264. // Check for Dragonfly
  265. if (line.includes('dragonfly_version:') ||
  266. line.includes('server:Dragonfly')) {
  267. databaseType = 'dragonfly';
  268. // For Dragonfly, extract version from dragonfly_version field
  269. if (line.indexOf('dragonfly_version:') === 0) {
  270. redisVersion = line.substr('dragonfly_version:'.length);
  271. }
  272. }
  273. // Check for Valkey
  274. else if (line.includes('valkey_version:') ||
  275. line.includes('server:Valkey')) {
  276. databaseType = 'valkey';
  277. // For Valkey, extract version from valkey_version field
  278. if (line.indexOf('valkey_version:') === 0) {
  279. redisVersion = line.substr('valkey_version:'.length);
  280. }
  281. }
  282. // Standard Redis version detection
  283. else if (line.indexOf(redisPrefix) === 0) {
  284. redisVersion = line.substr(redisPrefix.length);
  285. // Keep Redis as default unless we find evidence of other databases above
  286. if (databaseType === 'redis') {
  287. databaseType = 'redis';
  288. }
  289. }
  290. if (line.indexOf(maxMemoryPolicyPrefix) === 0) {
  291. const maxMemoryPolicy = line.substr(maxMemoryPolicyPrefix.length);
  292. if (maxMemoryPolicy !== 'noeviction') {
  293. console.warn(`IMPORTANT! Eviction policy is ${maxMemoryPolicy}. It should be "noeviction"`);
  294. }
  295. }
  296. }
  297. // Fallback version detection if specific database version field wasn't found
  298. if (!redisVersion) {
  299. // Try to find any version field as fallback
  300. for (const line of lines) {
  301. if (line.includes('version:')) {
  302. const parts = line.split(':');
  303. if (parts.length >= 2) {
  304. redisVersion = parts[1];
  305. break;
  306. }
  307. }
  308. }
  309. }
  310. return {
  311. version: redisVersion || RedisConnection.minimumVersion,
  312. databaseType,
  313. };
  314. }
  315. get redisVersion() {
  316. return this.version;
  317. }
  318. get databaseType() {
  319. return this.dbType;
  320. }
  321. }
  322. RedisConnection.minimumVersion = '5.0.0';
  323. RedisConnection.recommendedMinimumVersion = '6.2.0';
  324. //# sourceMappingURL=redis-connection.js.map