job-scheduler.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import { __rest } from "tslib";
  2. import { parseExpression } from 'cron-parser';
  3. import { Job } from './job';
  4. import { QueueBase } from './queue-base';
  5. import { SpanKind, TelemetryAttributes } from '../enums';
  6. import { array2obj } from '../utils';
  7. export class JobScheduler extends QueueBase {
  8. constructor(name, opts, Connection) {
  9. super(name, opts, Connection);
  10. this.repeatStrategy =
  11. (opts.settings && opts.settings.repeatStrategy) || defaultRepeatStrategy;
  12. }
  13. async upsertJobScheduler(jobSchedulerId, repeatOpts, jobName, jobData, opts, { override, producerId }) {
  14. const { every, limit, pattern, offset } = repeatOpts;
  15. if (pattern && every) {
  16. throw new Error('Both .pattern and .every options are defined for this repeatable job');
  17. }
  18. if (!pattern && !every) {
  19. throw new Error('Either .pattern or .every options must be defined for this repeatable job');
  20. }
  21. if (repeatOpts.immediately && repeatOpts.startDate) {
  22. throw new Error('Both .immediately and .startDate options are defined for this repeatable job');
  23. }
  24. if (repeatOpts.immediately && repeatOpts.every) {
  25. console.warn("Using option immediately with every does not affect the job's schedule. Job will run immediately anyway.");
  26. }
  27. // Check if we reached the limit of the repeatable job's iterations
  28. const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
  29. if (typeof repeatOpts.limit !== 'undefined' &&
  30. iterationCount > repeatOpts.limit) {
  31. return;
  32. }
  33. // Check if we reached the end date of the repeatable job
  34. let now = Date.now();
  35. const { endDate } = repeatOpts;
  36. if (endDate && now > new Date(endDate).getTime()) {
  37. return;
  38. }
  39. const prevMillis = opts.prevMillis || 0;
  40. now = prevMillis < now ? now : prevMillis;
  41. // Check if we have a start date for the repeatable job
  42. const { immediately } = repeatOpts, filteredRepeatOpts = __rest(repeatOpts, ["immediately"]);
  43. let nextMillis;
  44. const newOffset = null;
  45. if (pattern) {
  46. nextMillis = await this.repeatStrategy(now, repeatOpts, jobName);
  47. if (nextMillis < now) {
  48. nextMillis = now;
  49. }
  50. }
  51. if (nextMillis || every) {
  52. return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMetadata) => {
  53. var _a, _b;
  54. let telemetry = opts.telemetry;
  55. if (srcPropagationMetadata) {
  56. const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
  57. const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
  58. (!omitContext && srcPropagationMetadata);
  59. if (telemetryMetadata || omitContext) {
  60. telemetry = {
  61. metadata: telemetryMetadata,
  62. omitContext,
  63. };
  64. }
  65. }
  66. const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), iterationCount, newOffset);
  67. if (override) {
  68. // Clamp nextMillis to now if it's in the past
  69. if (nextMillis < now) {
  70. nextMillis = now;
  71. }
  72. const [jobId, delay] = await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), Job.optsAsJSON(opts), {
  73. name: jobName,
  74. startDate: repeatOpts.startDate
  75. ? new Date(repeatOpts.startDate).getTime()
  76. : undefined,
  77. endDate: endDate ? new Date(endDate).getTime() : undefined,
  78. tz: repeatOpts.tz,
  79. pattern,
  80. every,
  81. limit,
  82. offset: newOffset,
  83. }, Job.optsAsJSON(mergedOpts), producerId);
  84. // Ensure delay is a number (Dragonflydb may return it as a string)
  85. const numericDelay = typeof delay === 'string' ? parseInt(delay, 10) : delay;
  86. const job = new this.Job(this, jobName, jobData, Object.assign(Object.assign({}, mergedOpts), { delay: numericDelay }), jobId);
  87. job.id = jobId;
  88. span === null || span === void 0 ? void 0 : span.setAttributes({
  89. [TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
  90. [TelemetryAttributes.JobId]: job.id,
  91. });
  92. return job;
  93. }
  94. else {
  95. const jobId = await this.scripts.updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), Job.optsAsJSON(mergedOpts), producerId);
  96. if (jobId) {
  97. const job = new this.Job(this, jobName, jobData, mergedOpts, jobId);
  98. job.id = jobId;
  99. span === null || span === void 0 ? void 0 : span.setAttributes({
  100. [TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
  101. [TelemetryAttributes.JobId]: job.id,
  102. });
  103. return job;
  104. }
  105. }
  106. });
  107. }
  108. }
  109. getNextJobOpts(nextMillis, jobSchedulerId, opts, currentCount, offset) {
  110. var _a, _b;
  111. //
  112. // Generate unique job id for this iteration.
  113. //
  114. const jobId = this.getSchedulerNextJobId({
  115. jobSchedulerId,
  116. nextMillis,
  117. });
  118. const now = Date.now();
  119. const delay = nextMillis + offset - now;
  120. const mergedOpts = Object.assign(Object.assign({}, opts), { jobId, delay: delay < 0 ? 0 : delay, timestamp: now, prevMillis: nextMillis, repeatJobKey: jobSchedulerId });
  121. mergedOpts.repeat = Object.assign(Object.assign({}, opts.repeat), { offset, count: currentCount, startDate: ((_a = opts.repeat) === null || _a === void 0 ? void 0 : _a.startDate)
  122. ? new Date(opts.repeat.startDate).getTime()
  123. : undefined, endDate: ((_b = opts.repeat) === null || _b === void 0 ? void 0 : _b.endDate)
  124. ? new Date(opts.repeat.endDate).getTime()
  125. : undefined });
  126. return mergedOpts;
  127. }
  128. async removeJobScheduler(jobSchedulerId) {
  129. return this.scripts.removeJobScheduler(jobSchedulerId);
  130. }
  131. async getSchedulerData(client, key, next) {
  132. const jobData = await client.hgetall(this.toKey('repeat:' + key));
  133. return this.transformSchedulerData(key, jobData, next);
  134. }
  135. transformSchedulerData(key, jobData, next) {
  136. if (jobData && Object.keys(jobData).length > 0) {
  137. const jobSchedulerData = {
  138. key,
  139. name: jobData.name,
  140. next,
  141. };
  142. if (jobData.ic) {
  143. jobSchedulerData.iterationCount = parseInt(jobData.ic);
  144. }
  145. if (jobData.limit) {
  146. jobSchedulerData.limit = parseInt(jobData.limit);
  147. }
  148. if (jobData.startDate) {
  149. jobSchedulerData.startDate = parseInt(jobData.startDate);
  150. }
  151. if (jobData.endDate) {
  152. jobSchedulerData.endDate = parseInt(jobData.endDate);
  153. }
  154. if (jobData.tz) {
  155. jobSchedulerData.tz = jobData.tz;
  156. }
  157. if (jobData.pattern) {
  158. jobSchedulerData.pattern = jobData.pattern;
  159. }
  160. if (jobData.every) {
  161. jobSchedulerData.every = parseInt(jobData.every);
  162. }
  163. if (jobData.offset) {
  164. jobSchedulerData.offset = parseInt(jobData.offset);
  165. }
  166. if (jobData.data || jobData.opts) {
  167. jobSchedulerData.template = this.getTemplateFromJSON(jobData.data, jobData.opts);
  168. }
  169. return jobSchedulerData;
  170. }
  171. // TODO: remove this check and keyToData as it is here only to support legacy code
  172. if (key.includes(':')) {
  173. return this.keyToData(key, next);
  174. }
  175. }
  176. keyToData(key, next) {
  177. const data = key.split(':');
  178. const pattern = data.slice(4).join(':') || null;
  179. return {
  180. key,
  181. name: data[0],
  182. id: data[1] || null,
  183. endDate: parseInt(data[2]) || null,
  184. tz: data[3] || null,
  185. pattern,
  186. next,
  187. };
  188. }
  189. async getScheduler(id) {
  190. const [rawJobData, next] = await this.scripts.getJobScheduler(id);
  191. return this.transformSchedulerData(id, rawJobData ? array2obj(rawJobData) : null, next ? parseInt(next) : null);
  192. }
  193. getTemplateFromJSON(rawData, rawOpts) {
  194. const template = {};
  195. if (rawData) {
  196. template.data = JSON.parse(rawData);
  197. }
  198. if (rawOpts) {
  199. template.opts = Job.optsFromJSON(rawOpts);
  200. }
  201. return template;
  202. }
  203. async getJobSchedulers(start = 0, end = -1, asc = false) {
  204. const client = await this.client;
  205. const jobSchedulersKey = this.keys.repeat;
  206. const result = asc
  207. ? await client.zrange(jobSchedulersKey, start, end, 'WITHSCORES')
  208. : await client.zrevrange(jobSchedulersKey, start, end, 'WITHSCORES');
  209. const jobs = [];
  210. for (let i = 0; i < result.length; i += 2) {
  211. jobs.push(this.getSchedulerData(client, result[i], parseInt(result[i + 1])));
  212. }
  213. return Promise.all(jobs);
  214. }
  215. async getSchedulersCount() {
  216. const jobSchedulersKey = this.keys.repeat;
  217. const client = await this.client;
  218. return client.zcard(jobSchedulersKey);
  219. }
  220. getSchedulerNextJobId({ nextMillis, jobSchedulerId, }) {
  221. return `repeat:${jobSchedulerId}:${nextMillis}`;
  222. }
  223. }
  224. export const defaultRepeatStrategy = (millis, opts) => {
  225. const { pattern } = opts;
  226. const dateFromMillis = new Date(millis);
  227. const startDate = opts.startDate && new Date(opts.startDate);
  228. const currentDate = startDate > dateFromMillis ? startDate : dateFromMillis;
  229. const interval = parseExpression(pattern, Object.assign(Object.assign({}, opts), { currentDate }));
  230. try {
  231. if (opts.immediately) {
  232. return new Date().getTime();
  233. }
  234. else {
  235. return interval.next().getTime();
  236. }
  237. }
  238. catch (e) {
  239. // Ignore error
  240. }
  241. };
  242. //# sourceMappingURL=job-scheduler.js.map