repeat.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.getNextMillis = exports.Repeat = void 0;
  4. const tslib_1 = require("tslib");
  5. const cron_parser_1 = require("cron-parser");
  6. const crypto_1 = require("crypto");
  7. const queue_base_1 = require("./queue-base");
  8. class Repeat extends queue_base_1.QueueBase {
  9. constructor(name, opts, Connection) {
  10. super(name, opts, Connection);
  11. this.repeatStrategy =
  12. (opts.settings && opts.settings.repeatStrategy) || exports.getNextMillis;
  13. this.repeatKeyHashAlgorithm =
  14. (opts.settings && opts.settings.repeatKeyHashAlgorithm) || 'md5';
  15. }
  16. async updateRepeatableJob(name, data, opts, { override }) {
  17. var _a, _b;
  18. // Backwards compatibility for repeatable jobs for versions <= 3.0.0
  19. const repeatOpts = Object.assign({}, opts.repeat);
  20. (_a = repeatOpts.pattern) !== null && _a !== void 0 ? _a : (repeatOpts.pattern = repeatOpts.cron);
  21. delete repeatOpts.cron;
  22. // Check if we reached the limit of the repeatable job's iterations
  23. const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
  24. if (typeof repeatOpts.limit !== 'undefined' &&
  25. iterationCount > repeatOpts.limit) {
  26. return;
  27. }
  28. // Check if we reached the end date of the repeatable job
  29. let now = Date.now();
  30. const { endDate } = repeatOpts;
  31. if (endDate && now > new Date(endDate).getTime()) {
  32. return;
  33. }
  34. const prevMillis = opts.prevMillis || 0;
  35. now = prevMillis < now ? now : prevMillis;
  36. const nextMillis = await this.repeatStrategy(now, repeatOpts, name);
  37. const { every, pattern } = repeatOpts;
  38. const hasImmediately = Boolean((every || pattern) && repeatOpts.immediately);
  39. const offset = hasImmediately && every ? now - nextMillis : undefined;
  40. if (nextMillis) {
  41. // We store the undecorated opts.jobId into the repeat options
  42. if (!prevMillis && opts.jobId) {
  43. repeatOpts.jobId = opts.jobId;
  44. }
  45. const legacyRepeatKey = getRepeatConcatOptions(name, repeatOpts);
  46. const newRepeatKey = (_b = opts.repeat.key) !== null && _b !== void 0 ? _b : this.hash(legacyRepeatKey);
  47. let repeatJobKey;
  48. if (override) {
  49. repeatJobKey = await this.scripts.addRepeatableJob(newRepeatKey, nextMillis, {
  50. name,
  51. endDate: endDate ? new Date(endDate).getTime() : undefined,
  52. tz: repeatOpts.tz,
  53. pattern,
  54. every,
  55. }, legacyRepeatKey);
  56. }
  57. else {
  58. const client = await this.client;
  59. repeatJobKey = await this.scripts.updateRepeatableJobMillis(client, newRepeatKey, nextMillis, legacyRepeatKey);
  60. }
  61. const { immediately } = repeatOpts, filteredRepeatOpts = tslib_1.__rest(repeatOpts, ["immediately"]);
  62. return this.createNextJob(name, nextMillis, repeatJobKey, Object.assign(Object.assign({}, opts), { repeat: Object.assign({ offset }, filteredRepeatOpts) }), data, iterationCount, hasImmediately);
  63. }
  64. }
  65. async createNextJob(name, nextMillis, repeatJobKey, opts, data, currentCount, hasImmediately) {
  66. //
  67. // Generate unique job id for this iteration.
  68. //
  69. const jobId = this.getRepeatJobKey(name, nextMillis, repeatJobKey, data);
  70. const now = Date.now();
  71. const delay = nextMillis + (opts.repeat.offset ? opts.repeat.offset : 0) - now;
  72. const mergedOpts = Object.assign(Object.assign({}, opts), { jobId, delay: delay < 0 || hasImmediately ? 0 : delay, timestamp: now, prevMillis: nextMillis, repeatJobKey });
  73. mergedOpts.repeat = Object.assign(Object.assign({}, opts.repeat), { count: currentCount });
  74. return this.Job.create(this, name, data, mergedOpts);
  75. }
  76. // TODO: remove legacy code in next breaking change
  77. getRepeatJobKey(name, nextMillis, repeatJobKey, data) {
  78. if (repeatJobKey.split(':').length > 2) {
  79. return this.getRepeatJobId({
  80. name: name,
  81. nextMillis: nextMillis,
  82. namespace: this.hash(repeatJobKey),
  83. jobId: data === null || data === void 0 ? void 0 : data.id,
  84. });
  85. }
  86. return this.getRepeatDelayedJobId({
  87. customKey: repeatJobKey,
  88. nextMillis,
  89. });
  90. }
  91. async removeRepeatable(name, repeat, jobId) {
  92. var _a;
  93. const repeatConcatOptions = getRepeatConcatOptions(name, Object.assign(Object.assign({}, repeat), { jobId }));
  94. const repeatJobKey = (_a = repeat.key) !== null && _a !== void 0 ? _a : this.hash(repeatConcatOptions);
  95. const legacyRepeatJobId = this.getRepeatJobId({
  96. name,
  97. nextMillis: '',
  98. namespace: this.hash(repeatConcatOptions),
  99. jobId: jobId !== null && jobId !== void 0 ? jobId : repeat.jobId,
  100. key: repeat.key,
  101. });
  102. return this.scripts.removeRepeatable(legacyRepeatJobId, repeatConcatOptions, repeatJobKey);
  103. }
  104. async removeRepeatableByKey(repeatJobKey) {
  105. const data = this.keyToData(repeatJobKey);
  106. const legacyRepeatJobId = this.getRepeatJobId({
  107. name: data.name,
  108. nextMillis: '',
  109. namespace: this.hash(repeatJobKey),
  110. jobId: data.id,
  111. });
  112. return this.scripts.removeRepeatable(legacyRepeatJobId, '', repeatJobKey);
  113. }
  114. async getRepeatableData(client, key, next) {
  115. const jobData = await client.hgetall(this.toKey('repeat:' + key));
  116. if (jobData) {
  117. return {
  118. key,
  119. name: jobData.name,
  120. endDate: parseInt(jobData.endDate) || null,
  121. tz: jobData.tz || null,
  122. pattern: jobData.pattern || null,
  123. every: jobData.every || null,
  124. next,
  125. };
  126. }
  127. return this.keyToData(key, next);
  128. }
  129. keyToData(key, next) {
  130. const data = key.split(':');
  131. const pattern = data.slice(4).join(':') || null;
  132. return {
  133. key,
  134. name: data[0],
  135. id: data[1] || null,
  136. endDate: parseInt(data[2]) || null,
  137. tz: data[3] || null,
  138. pattern,
  139. next,
  140. };
  141. }
  142. async getRepeatableJobs(start = 0, end = -1, asc = false) {
  143. const client = await this.client;
  144. const key = this.keys.repeat;
  145. const result = asc
  146. ? await client.zrange(key, start, end, 'WITHSCORES')
  147. : await client.zrevrange(key, start, end, 'WITHSCORES');
  148. const jobs = [];
  149. for (let i = 0; i < result.length; i += 2) {
  150. jobs.push(this.getRepeatableData(client, result[i], parseInt(result[i + 1])));
  151. }
  152. return Promise.all(jobs);
  153. }
  154. async getRepeatableCount() {
  155. const client = await this.client;
  156. return client.zcard(this.toKey('repeat'));
  157. }
  158. hash(str) {
  159. return (0, crypto_1.createHash)(this.repeatKeyHashAlgorithm).update(str).digest('hex');
  160. }
  161. getRepeatDelayedJobId({ nextMillis, customKey, }) {
  162. return `repeat:${customKey}:${nextMillis}`;
  163. }
  164. getRepeatJobId({ name, nextMillis, namespace, jobId, key, }) {
  165. const checksum = key !== null && key !== void 0 ? key : this.hash(`${name}${jobId || ''}${namespace}`);
  166. return `repeat:${checksum}:${nextMillis}`;
  167. }
  168. }
  169. exports.Repeat = Repeat;
  170. function getRepeatConcatOptions(name, repeat) {
  171. const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : '';
  172. const tz = repeat.tz || '';
  173. const pattern = repeat.pattern;
  174. const suffix = (pattern ? pattern : String(repeat.every)) || '';
  175. const jobId = repeat.jobId ? repeat.jobId : '';
  176. return `${name}:${jobId}:${endDate}:${tz}:${suffix}`;
  177. }
  178. const getNextMillis = (millis, opts) => {
  179. const pattern = opts.pattern;
  180. if (pattern && opts.every) {
  181. throw new Error('Both .pattern and .every options are defined for this repeatable job');
  182. }
  183. if (opts.every) {
  184. return (Math.floor(millis / opts.every) * opts.every +
  185. (opts.immediately ? 0 : opts.every));
  186. }
  187. const currentDate = opts.startDate && new Date(opts.startDate) > new Date(millis)
  188. ? new Date(opts.startDate)
  189. : new Date(millis);
  190. const interval = (0, cron_parser_1.parseExpression)(pattern, Object.assign(Object.assign({}, opts), { currentDate }));
  191. try {
  192. if (opts.immediately) {
  193. return new Date().getTime();
  194. }
  195. else {
  196. return interval.next().getTime();
  197. }
  198. }
  199. catch (e) {
  200. // Ignore error
  201. }
  202. };
  203. exports.getNextMillis = getNextMillis;
  204. //# sourceMappingURL=repeat.js.map