job-scheduler.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.defaultRepeatStrategy = exports.JobScheduler = void 0;
  4. const tslib_1 = require("tslib");
  5. const cron_parser_1 = require("cron-parser");
  6. const job_1 = require("./job");
  7. const queue_base_1 = require("./queue-base");
  8. const enums_1 = require("../enums");
  9. const utils_1 = require("../utils");
  10. class JobScheduler extends queue_base_1.QueueBase {
  11. constructor(name, opts, Connection) {
  12. super(name, opts, Connection);
  13. this.repeatStrategy =
  14. (opts.settings && opts.settings.repeatStrategy) || exports.defaultRepeatStrategy;
  15. }
  16. async upsertJobScheduler(jobSchedulerId, repeatOpts, jobName, jobData, opts, { override, producerId }) {
  17. const { every, limit, pattern, offset } = repeatOpts;
  18. if (pattern && every) {
  19. throw new Error('Both .pattern and .every options are defined for this repeatable job');
  20. }
  21. if (!pattern && !every) {
  22. throw new Error('Either .pattern or .every options must be defined for this repeatable job');
  23. }
  24. if (repeatOpts.immediately && repeatOpts.startDate) {
  25. throw new Error('Both .immediately and .startDate options are defined for this repeatable job');
  26. }
  27. if (repeatOpts.immediately && repeatOpts.every) {
  28. console.warn("Using option immediately with every does not affect the job's schedule. Job will run immediately anyway.");
  29. }
  30. // Check if we reached the limit of the repeatable job's iterations
  31. const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
  32. if (typeof repeatOpts.limit !== 'undefined' &&
  33. iterationCount > repeatOpts.limit) {
  34. return;
  35. }
  36. // Check if we reached the end date of the repeatable job
  37. let now = Date.now();
  38. const { endDate } = repeatOpts;
  39. if (endDate && now > new Date(endDate).getTime()) {
  40. return;
  41. }
  42. const prevMillis = opts.prevMillis || 0;
  43. now = prevMillis < now ? now : prevMillis;
  44. // Check if we have a start date for the repeatable job
  45. const { immediately } = repeatOpts, filteredRepeatOpts = tslib_1.__rest(repeatOpts, ["immediately"]);
  46. let nextMillis;
  47. const newOffset = null;
  48. if (pattern) {
  49. nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
  50. if (nextMillis < now) {
  51. nextMillis = now;
  52. }
  53. }
  54. if (nextMillis || every) {
  55. return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMetadata) => {
  56. var _a, _b;
  57. let telemetry = opts.telemetry;
  58. if (srcPropagationMetadata) {
  59. const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
  60. const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
  61. (!omitContext && srcPropagationMetadata);
  62. if (telemetryMetadata || omitContext) {
  63. telemetry = {
  64. metadata: telemetryMetadata,
  65. omitContext,
  66. };
  67. }
  68. }
  69. const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), iterationCount, newOffset);
  70. if (override) {
  71. // Clamp nextMillis to now if it's in the past
  72. if (nextMillis < now) {
  73. nextMillis = now;
  74. }
  75. const [jobId, delay] = await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), job_1.Job.optsAsJSON(opts), {
  76. name: jobName,
  77. startDate: repeatOpts.startDate
  78. ? new Date(repeatOpts.startDate).getTime()
  79. : undefined,
  80. endDate: endDate ? new Date(endDate).getTime() : undefined,
  81. tz: repeatOpts.tz,
  82. pattern,
  83. every,
  84. limit,
  85. offset: newOffset,
  86. }, job_1.Job.optsAsJSON(mergedOpts), producerId);
  87. // Ensure delay is a number (Dragonflydb may return it as a string)
  88. const numericDelay = typeof delay === 'string' ? parseInt(delay, 10) : delay;
  89. const job = new this.Job(this, jobName, jobData, Object.assign(Object.assign({}, mergedOpts), { delay: numericDelay }), jobId);
  90. job.id = jobId;
  91. span === null || span === void 0 ? void 0 : span.setAttributes({
  92. [enums_1.TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
  93. [enums_1.TelemetryAttributes.JobId]: job.id,
  94. });
  95. return job;
  96. }
  97. else {
  98. const jobId = await this.scripts.updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), job_1.Job.optsAsJSON(mergedOpts), producerId);
  99. if (jobId) {
  100. const job = new this.Job(this, jobName, jobData, mergedOpts, jobId);
  101. job.id = jobId;
  102. span === null || span === void 0 ? void 0 : span.setAttributes({
  103. [enums_1.TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
  104. [enums_1.TelemetryAttributes.JobId]: job.id,
  105. });
  106. return job;
  107. }
  108. }
  109. });
  110. }
  111. }
  112. getNextJobOpts(nextMillis, jobSchedulerId, opts, currentCount, offset) {
  113. var _a, _b;
  114. //
  115. // Generate unique job id for this iteration.
  116. //
  117. const jobId = this.getSchedulerNextJobId({
  118. jobSchedulerId,
  119. nextMillis,
  120. });
  121. const now = Date.now();
  122. const delay = nextMillis + offset - now;
  123. const mergedOpts = Object.assign(Object.assign({}, opts), { jobId, delay: delay < 0 ? 0 : delay, timestamp: now, prevMillis: nextMillis, repeatJobKey: jobSchedulerId });
  124. mergedOpts.repeat = Object.assign(Object.assign({}, opts.repeat), { offset, count: currentCount, startDate: ((_a = opts.repeat) === null || _a === void 0 ? void 0 : _a.startDate)
  125. ? new Date(opts.repeat.startDate).getTime()
  126. : undefined, endDate: ((_b = opts.repeat) === null || _b === void 0 ? void 0 : _b.endDate)
  127. ? new Date(opts.repeat.endDate).getTime()
  128. : undefined });
  129. return mergedOpts;
  130. }
  131. async removeJobScheduler(jobSchedulerId) {
  132. return this.scripts.removeJobScheduler(jobSchedulerId);
  133. }
  134. async getSchedulerData(client, key, next) {
  135. const jobData = await client.hgetall(this.toKey('repeat:' + key));
  136. return this.transformSchedulerData(key, jobData, next);
  137. }
  138. transformSchedulerData(key, jobData, next) {
  139. if (jobData && Object.keys(jobData).length > 0) {
  140. const jobSchedulerData = {
  141. key,
  142. name: jobData.name,
  143. next,
  144. };
  145. if (jobData.ic) {
  146. jobSchedulerData.iterationCount = parseInt(jobData.ic);
  147. }
  148. if (jobData.limit) {
  149. jobSchedulerData.limit = parseInt(jobData.limit);
  150. }
  151. if (jobData.startDate) {
  152. jobSchedulerData.startDate = parseInt(jobData.startDate);
  153. }
  154. if (jobData.endDate) {
  155. jobSchedulerData.endDate = parseInt(jobData.endDate);
  156. }
  157. if (jobData.tz) {
  158. jobSchedulerData.tz = jobData.tz;
  159. }
  160. if (jobData.pattern) {
  161. jobSchedulerData.pattern = jobData.pattern;
  162. }
  163. if (jobData.every) {
  164. jobSchedulerData.every = parseInt(jobData.every);
  165. }
  166. if (jobData.offset) {
  167. jobSchedulerData.offset = parseInt(jobData.offset);
  168. }
  169. if (jobData.data || jobData.opts) {
  170. jobSchedulerData.template = this.getTemplateFromJSON(jobData.data, jobData.opts);
  171. }
  172. return jobSchedulerData;
  173. }
  174. // TODO: remove this check and keyToData as it is here only to support legacy code
  175. if (key.includes(':')) {
  176. return this.keyToData(key, next);
  177. }
  178. }
  179. keyToData(key, next) {
  180. const data = key.split(':');
  181. const pattern = data.slice(4).join(':') || null;
  182. return {
  183. key,
  184. name: data[0],
  185. id: data[1] || null,
  186. endDate: parseInt(data[2]) || null,
  187. tz: data[3] || null,
  188. pattern,
  189. next,
  190. };
  191. }
  192. async getScheduler(id) {
  193. const [rawJobData, next] = await this.scripts.getJobScheduler(id);
  194. return this.transformSchedulerData(id, rawJobData ? (0, utils_1.array2obj)(rawJobData) : null, next ? parseInt(next) : null);
  195. }
  196. getTemplateFromJSON(rawData, rawOpts) {
  197. const template = {};
  198. if (rawData) {
  199. template.data = JSON.parse(rawData);
  200. }
  201. if (rawOpts) {
  202. template.opts = job_1.Job.optsFromJSON(rawOpts);
  203. }
  204. return template;
  205. }
  206. async getJobSchedulers(start = 0, end = -1, asc = false) {
  207. const client = await this.client;
  208. const jobSchedulersKey = this.keys.repeat;
  209. const result = asc
  210. ? await client.zrange(jobSchedulersKey, start, end, 'WITHSCORES')
  211. : await client.zrevrange(jobSchedulersKey, start, end, 'WITHSCORES');
  212. const jobs = [];
  213. for (let i = 0; i < result.length; i += 2) {
  214. jobs.push(this.getSchedulerData(client, result[i], parseInt(result[i + 1])));
  215. }
  216. return Promise.all(jobs);
  217. }
  218. async getSchedulersCount() {
  219. const jobSchedulersKey = this.keys.repeat;
  220. const client = await this.client;
  221. return client.zcard(jobSchedulersKey);
  222. }
  223. getSchedulerNextJobId({ nextMillis, jobSchedulerId, }) {
  224. return `repeat:${jobSchedulerId}:${nextMillis}`;
  225. }
  226. }
  227. exports.JobScheduler = JobScheduler;
  228. const defaultRepeatStrategy = (millis, opts) => {
  229. const { pattern } = opts;
  230. const dateFromMillis = new Date(millis);
  231. const startDate = opts.startDate && new Date(opts.startDate);
  232. const currentDate = startDate > dateFromMillis ? startDate : dateFromMillis;
  233. const interval = (0, cron_parser_1.parseExpression)(pattern, Object.assign(Object.assign({}, opts), { currentDate }));
  234. try {
  235. if (opts.immediately) {
  236. return new Date().getTime();
  237. }
  238. else {
  239. return interval.next().getTime();
  240. }
  241. }
  242. catch (e) {
  243. // Ignore error
  244. }
  245. };
  246. exports.defaultRepeatStrategy = defaultRepeatStrategy;
  247. //# sourceMappingURL=job-scheduler.js.map