| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.RedisConnection = void 0;
- const tslib_1 = require("tslib");
- const events_1 = require("events");
- const ioredis_1 = require("ioredis");
- // eslint-disable-next-line @typescript-eslint/ban-ts-comment
- // @ts-ignore
- const utils_1 = require("ioredis/built/utils");
- const utils_2 = require("../utils");
- const version_1 = require("../version");
- const scripts = require("../scripts");
- const overrideMessage = [
- 'BullMQ: WARNING! Your redis options maxRetriesPerRequest must be null',
- 'and will be overridden by BullMQ.',
- ].join(' ');
- const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.';
- class RedisConnection extends events_1.EventEmitter {
- constructor(opts, extraOptions) {
- super();
- this.extraOptions = extraOptions;
- this.capabilities = {
- canDoubleTimeout: false,
- canBlockFor1Ms: true,
- };
- this.status = 'initializing';
- this.dbType = 'redis';
- this.packageVersion = version_1.version;
- // Set extra options defaults
- this.extraOptions = Object.assign({ shared: false, blocking: true, skipVersionCheck: false, skipWaitingForReady: false }, extraOptions);
- if (!(0, utils_2.isRedisInstance)(opts)) {
- this.checkBlockingOptions(overrideMessage, opts);
- this.opts = Object.assign({ port: 6379, host: '127.0.0.1', retryStrategy: function (times) {
- return Math.max(Math.min(Math.exp(times), 20000), 1000);
- } }, opts);
- if (this.extraOptions.blocking) {
- this.opts.maxRetriesPerRequest = null;
- }
- }
- else {
- this._client = opts;
- // Test if the redis instance is using keyPrefix
- // and if so, throw an error.
- if (this._client.options.keyPrefix) {
- throw new Error('BullMQ: ioredis does not support ioredis prefixes, use the prefix option instead.');
- }
- if ((0, utils_2.isRedisCluster)(this._client)) {
- this.opts = this._client.options.redisOptions;
- }
- else {
- this.opts = this._client.options;
- }
- this.checkBlockingOptions(deprecationMessage, this.opts, true);
- }
- this.skipVersionCheck =
- (extraOptions === null || extraOptions === void 0 ? void 0 : extraOptions.skipVersionCheck) ||
- !!(this.opts && this.opts.skipVersionCheck);
- this.handleClientError = (err) => {
- this.emit('error', err);
- };
- this.handleClientClose = () => {
- this.emit('close');
- };
- this.handleClientReady = () => {
- this.emit('ready');
- };
- this.initializing = this.init();
- this.initializing.catch(err => this.emit('error', err));
- }
- checkBlockingOptions(msg, options, throwError = false) {
- if (this.extraOptions.blocking && options && options.maxRetriesPerRequest) {
- if (throwError) {
- throw new Error(msg);
- }
- else {
- console.error(msg);
- }
- }
- }
- /**
- * Waits for a redis client to be ready.
- * @param redis - client
- */
- static async waitUntilReady(client) {
- if (client.status === 'ready') {
- return;
- }
- if (client.status === 'wait') {
- return client.connect();
- }
- if (client.status === 'end') {
- throw new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG);
- }
- let handleReady;
- let handleEnd;
- let handleError;
- try {
- await new Promise((resolve, reject) => {
- let lastError;
- handleError = (err) => {
- lastError = err;
- };
- handleReady = () => {
- resolve();
- };
- handleEnd = () => {
- if (client.status !== 'end') {
- reject(lastError || new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
- }
- else {
- if (lastError) {
- reject(lastError);
- }
- else {
- // when custom 'end' status is set we already closed
- resolve();
- }
- }
- };
- (0, utils_2.increaseMaxListeners)(client, 3);
- client.once('ready', handleReady);
- client.on('end', handleEnd);
- client.once('error', handleError);
- });
- }
- finally {
- client.removeListener('end', handleEnd);
- client.removeListener('error', handleError);
- client.removeListener('ready', handleReady);
- (0, utils_2.decreaseMaxListeners)(client, 3);
- }
- }
- get client() {
- return this.initializing;
- }
- loadCommands(packageVersion, providedScripts) {
- const finalScripts = providedScripts || scripts;
- for (const property in finalScripts) {
- // Only define the command if not already defined
- const commandName = `${finalScripts[property].name}:${packageVersion}`;
- if (!this._client[commandName]) {
- this._client.defineCommand(commandName, {
- numberOfKeys: finalScripts[property].keys,
- lua: finalScripts[property].content,
- });
- }
- }
- }
- async init() {
- if (!this._client) {
- const _a = this.opts, { url } = _a, rest = tslib_1.__rest(_a, ["url"]);
- this._client = url ? new ioredis_1.default(url, rest) : new ioredis_1.default(rest);
- }
- (0, utils_2.increaseMaxListeners)(this._client, 3);
- this._client.on('error', this.handleClientError);
- // ioredis treats connection errors as a different event ('close')
- this._client.on('close', this.handleClientClose);
- this._client.on('ready', this.handleClientReady);
- if (!this.extraOptions.skipWaitingForReady) {
- await RedisConnection.waitUntilReady(this._client);
- }
- this.loadCommands(this.packageVersion);
- if (this._client['status'] !== 'end') {
- const versionResult = await this.getRedisVersionAndType();
- this.version = versionResult.version;
- this.dbType = versionResult.databaseType;
- if (this.skipVersionCheck !== true && !this.closing) {
- if ((0, utils_2.isRedisVersionLowerThan)(this.version, RedisConnection.minimumVersion, this.dbType)) {
- throw new Error(`Redis version needs to be greater or equal than ${RedisConnection.minimumVersion} ` +
- `Current: ${this.version}`);
- }
- if ((0, utils_2.isRedisVersionLowerThan)(this.version, RedisConnection.recommendedMinimumVersion, this.dbType)) {
- console.warn(`It is highly recommended to use a minimum Redis version of ${RedisConnection.recommendedMinimumVersion}
- Current: ${this.version}`);
- }
- }
- this.capabilities = {
- canDoubleTimeout: !(0, utils_2.isRedisVersionLowerThan)(this.version, '6.0.0', this.dbType),
- canBlockFor1Ms: !(0, utils_2.isRedisVersionLowerThan)(this.version, '7.0.8', this.dbType),
- };
- this.status = 'ready';
- }
- return this._client;
- }
- async disconnect(wait = true) {
- const client = await this.client;
- if (client.status !== 'end') {
- let _resolve, _reject;
- if (!wait) {
- return client.disconnect();
- }
- const disconnecting = new Promise((resolve, reject) => {
- (0, utils_2.increaseMaxListeners)(client, 2);
- client.once('end', resolve);
- client.once('error', reject);
- _resolve = resolve;
- _reject = reject;
- });
- client.disconnect();
- try {
- await disconnecting;
- }
- finally {
- (0, utils_2.decreaseMaxListeners)(client, 2);
- client.removeListener('end', _resolve);
- client.removeListener('error', _reject);
- }
- }
- }
- async reconnect() {
- const client = await this.client;
- return client.connect();
- }
- async close(force = false) {
- if (!this.closing) {
- const status = this.status;
- this.status = 'closing';
- this.closing = true;
- try {
- if (status === 'ready') {
- // Not sure if we need to wait for this
- await this.initializing;
- }
- if (!this.extraOptions.shared) {
- if (status == 'initializing' || force) {
- // If we have not still connected to Redis, we need to disconnect.
- this._client.disconnect();
- }
- else {
- await this._client.quit();
- }
- // As IORedis does not update this status properly, we do it ourselves.
- this._client['status'] = 'end';
- }
- }
- catch (error) {
- if ((0, utils_2.isNotConnectionError)(error)) {
- throw error;
- }
- }
- finally {
- this._client.off('error', this.handleClientError);
- this._client.off('close', this.handleClientClose);
- this._client.off('ready', this.handleClientReady);
- (0, utils_2.decreaseMaxListeners)(this._client, 3);
- this.removeAllListeners();
- this.status = 'closed';
- }
- }
- }
- async getRedisVersionAndType() {
- if (this.skipVersionCheck) {
- return {
- version: RedisConnection.minimumVersion,
- databaseType: 'redis',
- };
- }
- const doc = await this._client.info();
- const redisPrefix = 'redis_version:';
- const maxMemoryPolicyPrefix = 'maxmemory_policy:';
- const lines = doc.split(/\r?\n/);
- let redisVersion;
- let databaseType = 'redis';
- // Detect database type from server info
- for (let i = 0; i < lines.length; i++) {
- const line = lines[i];
- // Check for Dragonfly
- if (line.includes('dragonfly_version:') ||
- line.includes('server:Dragonfly')) {
- databaseType = 'dragonfly';
- // For Dragonfly, extract version from dragonfly_version field
- if (line.indexOf('dragonfly_version:') === 0) {
- redisVersion = line.substr('dragonfly_version:'.length);
- }
- }
- // Check for Valkey
- else if (line.includes('valkey_version:') ||
- line.includes('server:Valkey')) {
- databaseType = 'valkey';
- // For Valkey, extract version from valkey_version field
- if (line.indexOf('valkey_version:') === 0) {
- redisVersion = line.substr('valkey_version:'.length);
- }
- }
- // Standard Redis version detection
- else if (line.indexOf(redisPrefix) === 0) {
- redisVersion = line.substr(redisPrefix.length);
- // Keep Redis as default unless we find evidence of other databases above
- if (databaseType === 'redis') {
- databaseType = 'redis';
- }
- }
- if (line.indexOf(maxMemoryPolicyPrefix) === 0) {
- const maxMemoryPolicy = line.substr(maxMemoryPolicyPrefix.length);
- if (maxMemoryPolicy !== 'noeviction') {
- console.warn(`IMPORTANT! Eviction policy is ${maxMemoryPolicy}. It should be "noeviction"`);
- }
- }
- }
- // Fallback version detection if specific database version field wasn't found
- if (!redisVersion) {
- // Try to find any version field as fallback
- for (const line of lines) {
- if (line.includes('version:')) {
- const parts = line.split(':');
- if (parts.length >= 2) {
- redisVersion = parts[1];
- break;
- }
- }
- }
- }
- return {
- version: redisVersion || RedisConnection.minimumVersion,
- databaseType,
- };
- }
- get redisVersion() {
- return this.version;
- }
- get databaseType() {
- return this.dbType;
- }
- }
- exports.RedisConnection = RedisConnection;
- RedisConnection.minimumVersion = '5.0.0';
- RedisConnection.recommendedMinimumVersion = '6.2.0';
- //# sourceMappingURL=redis-connection.js.map
|