flow-producer.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.FlowProducer = void 0;
  4. const events_1 = require("events");
  5. const uuid_1 = require("uuid");
  6. const utils_1 = require("../utils");
  7. const job_1 = require("./job");
  8. const queue_keys_1 = require("./queue-keys");
  9. const redis_connection_1 = require("./redis-connection");
  10. const enums_1 = require("../enums");
  11. /**
  12. * This class allows to add jobs with dependencies between them in such
  13. * a way that it is possible to build complex flows.
  14. * Note: A flow is a tree-like structure of jobs that depend on each other.
  15. * Whenever the children of a given parent are completed, the parent
  16. * will be processed, being able to access the children's result data.
  17. * All Jobs can be in different queues, either children or parents,
  18. */
  19. class FlowProducer extends events_1.EventEmitter {
  20. constructor(opts = { connection: {} }, Connection = redis_connection_1.RedisConnection) {
  21. super();
  22. this.opts = opts;
  23. this.opts = Object.assign({ prefix: 'bull' }, opts);
  24. this.connection = new Connection(opts.connection, {
  25. shared: (0, utils_1.isRedisInstance)(opts.connection),
  26. blocking: false,
  27. skipVersionCheck: opts.skipVersionCheck,
  28. skipWaitingForReady: opts.skipWaitingForReady,
  29. });
  30. this.connection.on('error', (error) => this.emit('error', error));
  31. this.connection.on('close', () => {
  32. if (!this.closing) {
  33. this.emit('ioredis:close');
  34. }
  35. });
  36. this.queueKeys = new queue_keys_1.QueueKeys(opts.prefix);
  37. if (opts === null || opts === void 0 ? void 0 : opts.telemetry) {
  38. this.telemetry = opts.telemetry;
  39. }
  40. }
  41. emit(event, ...args) {
  42. return super.emit(event, ...args);
  43. }
  44. off(eventName, listener) {
  45. super.off(eventName, listener);
  46. return this;
  47. }
  48. on(event, listener) {
  49. super.on(event, listener);
  50. return this;
  51. }
  52. once(event, listener) {
  53. super.once(event, listener);
  54. return this;
  55. }
  56. /**
  57. * Returns a promise that resolves to a redis client. Normally used only by subclasses.
  58. */
  59. get client() {
  60. return this.connection.client;
  61. }
  62. /**
  63. * Helper to easily extend Job class calls.
  64. */
  65. get Job() {
  66. return job_1.Job;
  67. }
  68. waitUntilReady() {
  69. return this.client;
  70. }
  71. /**
  72. * Adds a flow.
  73. *
  74. * This call would be atomic, either it fails and no jobs will
  75. * be added to the queues, or it succeeds and all jobs will be added.
  76. *
  77. * @param flow - an object with a tree-like structure where children jobs
  78. * will be processed before their parents.
  79. * @param opts - options that will be applied to the flow object.
  80. */
  81. async add(flow, opts) {
  82. var _a;
  83. if (this.closing) {
  84. return;
  85. }
  86. const client = await this.connection.client;
  87. const multi = client.multi();
  88. const parentOpts = (_a = flow === null || flow === void 0 ? void 0 : flow.opts) === null || _a === void 0 ? void 0 : _a.parent;
  89. const parentKey = (0, utils_1.getParentKey)(parentOpts);
  90. const parentDependenciesKey = parentKey
  91. ? `${parentKey}:dependencies`
  92. : undefined;
  93. return (0, utils_1.trace)(this.telemetry, enums_1.SpanKind.PRODUCER, flow.queueName, 'addFlow', flow.queueName, async (span) => {
  94. span === null || span === void 0 ? void 0 : span.setAttributes({
  95. [enums_1.TelemetryAttributes.FlowName]: flow.name,
  96. });
  97. const jobsTree = await this.addNode({
  98. multi,
  99. node: flow,
  100. queuesOpts: opts === null || opts === void 0 ? void 0 : opts.queuesOptions,
  101. parent: {
  102. parentOpts,
  103. parentDependenciesKey,
  104. },
  105. });
  106. const results = (await multi.exec());
  107. const [result] = results || [];
  108. if (result) {
  109. const [err, jobId] = result;
  110. if (!err && typeof jobId === 'string') {
  111. jobsTree.job.id = jobId;
  112. }
  113. }
  114. return jobsTree;
  115. });
  116. }
  117. /**
  118. * Get a flow.
  119. *
  120. * @param opts - an object with options for getting a JobNode.
  121. */
  122. async getFlow(opts) {
  123. if (this.closing) {
  124. return;
  125. }
  126. const client = await this.connection.client;
  127. const updatedOpts = Object.assign({
  128. depth: 10,
  129. maxChildren: 20,
  130. prefix: this.opts.prefix,
  131. }, opts);
  132. const jobsTree = this.getNode(client, updatedOpts);
  133. return jobsTree;
  134. }
  135. /**
  136. * Adds multiple flows.
  137. *
  138. * A flow is a tree-like structure of jobs that depend on each other.
  139. * Whenever the children of a given parent are completed, the parent
  140. * will be processed, being able to access the children's result data.
  141. *
  142. * All Jobs can be in different queues, either children or parents,
  143. * however this call would be atomic, either it fails and no jobs will
  144. * be added to the queues, or it succeeds and all jobs will be added.
  145. *
  146. * @param flows - an array of objects with a tree-like structure where children jobs
  147. * will be processed before their parents.
  148. */
  149. async addBulk(flows) {
  150. if (this.closing) {
  151. return;
  152. }
  153. const client = await this.connection.client;
  154. const multi = client.multi();
  155. return (0, utils_1.trace)(this.telemetry, enums_1.SpanKind.PRODUCER, '', 'addBulkFlows', '', async (span) => {
  156. span === null || span === void 0 ? void 0 : span.setAttributes({
  157. [enums_1.TelemetryAttributes.BulkCount]: flows.length,
  158. [enums_1.TelemetryAttributes.BulkNames]: flows
  159. .map(flow => flow.name)
  160. .join(','),
  161. });
  162. const jobsTrees = await this.addNodes(multi, flows);
  163. const results = (await multi.exec());
  164. for (let index = 0; index < jobsTrees.length; ++index) {
  165. const result = results === null || results === void 0 ? void 0 : results[index];
  166. if (!result) {
  167. continue;
  168. }
  169. const [err, jobId] = result;
  170. if (!err && typeof jobId === 'string') {
  171. jobsTrees[index].job.id = jobId;
  172. }
  173. }
  174. return jobsTrees;
  175. });
  176. }
  177. /**
  178. * Add a node (job) of a flow to the queue. This method will recursively
  179. * add all its children as well. Note that a given job can potentially be
  180. * a parent and a child job at the same time depending on where it is located
  181. * in the tree hierarchy.
  182. *
  183. * @param multi - ioredis ChainableCommander
  184. * @param node - the node representing a job to be added to some queue
  185. * @param parent - parent data sent to children to create the "links" to their parent
  186. * @returns
  187. */
  188. async addNode({ multi, node, parent, queuesOpts, }) {
  189. var _a, _b;
  190. const prefix = node.prefix || this.opts.prefix;
  191. const queue = this.queueFromNode(node, new queue_keys_1.QueueKeys(prefix), prefix);
  192. const queueOpts = queuesOpts && queuesOpts[node.queueName];
  193. const jobsOpts = (_a = queueOpts === null || queueOpts === void 0 ? void 0 : queueOpts.defaultJobOptions) !== null && _a !== void 0 ? _a : {};
  194. const jobId = ((_b = node.opts) === null || _b === void 0 ? void 0 : _b.jobId) || (0, uuid_1.v4)();
  195. return (0, utils_1.trace)(this.telemetry, enums_1.SpanKind.PRODUCER, node.queueName, 'addNode', node.queueName, async (span, srcPropagationMetadata) => {
  196. var _a, _b;
  197. span === null || span === void 0 ? void 0 : span.setAttributes({
  198. [enums_1.TelemetryAttributes.JobName]: node.name,
  199. [enums_1.TelemetryAttributes.JobId]: jobId,
  200. });
  201. const opts = node.opts;
  202. let telemetry = opts === null || opts === void 0 ? void 0 : opts.telemetry;
  203. if (srcPropagationMetadata && opts) {
  204. const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
  205. const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
  206. (!omitContext && srcPropagationMetadata);
  207. if (telemetryMetadata || omitContext) {
  208. telemetry = {
  209. metadata: telemetryMetadata,
  210. omitContext,
  211. };
  212. }
  213. }
  214. const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetry }), jobId);
  215. const parentKey = (0, utils_1.getParentKey)(parent === null || parent === void 0 ? void 0 : parent.parentOpts);
  216. if (node.children && node.children.length > 0) {
  217. // Create the parent job, it will be a job in status "waiting-children".
  218. const parentId = jobId;
  219. const queueKeysParent = new queue_keys_1.QueueKeys(node.prefix || this.opts.prefix);
  220. await job.addJob(multi, {
  221. parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
  222. addToWaitingChildren: true,
  223. parentKey,
  224. });
  225. const parentDependenciesKey = `${queueKeysParent.toKey(node.queueName, parentId)}:dependencies`;
  226. const children = await this.addChildren({
  227. multi,
  228. nodes: node.children,
  229. parent: {
  230. parentOpts: {
  231. id: parentId,
  232. queue: queueKeysParent.getQueueQualifiedName(node.queueName),
  233. },
  234. parentDependenciesKey,
  235. },
  236. queuesOpts,
  237. });
  238. return { job, children };
  239. }
  240. else {
  241. await job.addJob(multi, {
  242. parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
  243. parentKey,
  244. });
  245. return { job };
  246. }
  247. });
  248. }
  249. /**
  250. * Adds nodes (jobs) of multiple flows to the queue. This method will recursively
  251. * add all its children as well. Note that a given job can potentially be
  252. * a parent and a child job at the same time depending on where it is located
  253. * in the tree hierarchy.
  254. *
  255. * @param multi - ioredis ChainableCommander
  256. * @param nodes - the nodes representing jobs to be added to some queue
  257. * @returns
  258. */
  259. addNodes(multi, nodes) {
  260. return Promise.all(nodes.map(node => {
  261. var _a;
  262. const parentOpts = (_a = node === null || node === void 0 ? void 0 : node.opts) === null || _a === void 0 ? void 0 : _a.parent;
  263. const parentKey = (0, utils_1.getParentKey)(parentOpts);
  264. const parentDependenciesKey = parentKey
  265. ? `${parentKey}:dependencies`
  266. : undefined;
  267. return this.addNode({
  268. multi,
  269. node,
  270. parent: {
  271. parentOpts,
  272. parentDependenciesKey,
  273. },
  274. });
  275. }));
  276. }
  277. async getNode(client, node) {
  278. const queue = this.queueFromNode(node, new queue_keys_1.QueueKeys(node.prefix), node.prefix);
  279. const job = await this.Job.fromId(queue, node.id);
  280. if (job) {
  281. const { processed = {}, unprocessed = [], failed = [], ignored = {}, } = await job.getDependencies({
  282. failed: {
  283. count: node.maxChildren,
  284. },
  285. processed: {
  286. count: node.maxChildren,
  287. },
  288. unprocessed: {
  289. count: node.maxChildren,
  290. },
  291. ignored: {
  292. count: node.maxChildren,
  293. },
  294. });
  295. const processedKeys = Object.keys(processed);
  296. const ignoredKeys = Object.keys(ignored);
  297. const childrenCount = processedKeys.length +
  298. unprocessed.length +
  299. ignoredKeys.length +
  300. failed.length;
  301. const newDepth = node.depth - 1;
  302. if (childrenCount > 0 && newDepth) {
  303. const children = await this.getChildren(client, [...processedKeys, ...unprocessed, ...failed, ...ignoredKeys], newDepth, node.maxChildren);
  304. return { job, children };
  305. }
  306. else {
  307. return { job };
  308. }
  309. }
  310. }
  311. addChildren({ multi, nodes, parent, queuesOpts }) {
  312. return Promise.all(nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })));
  313. }
  314. getChildren(client, childrenKeys, depth, maxChildren) {
  315. const getChild = (key) => {
  316. const [prefix, queueName, id] = key.split(':');
  317. return this.getNode(client, {
  318. id,
  319. queueName,
  320. prefix,
  321. depth,
  322. maxChildren,
  323. });
  324. };
  325. return Promise.all([...childrenKeys.map(getChild)]);
  326. }
  327. /**
  328. * Helper factory method that creates a queue-like object
  329. * required to create jobs in any queue.
  330. *
  331. * @param node -
  332. * @param queueKeys -
  333. * @returns
  334. */
  335. queueFromNode(node, queueKeys, prefix) {
  336. return {
  337. client: this.connection.client,
  338. name: node.queueName,
  339. keys: queueKeys.getKeys(node.queueName),
  340. toKey: (type) => queueKeys.toKey(node.queueName, type),
  341. opts: { prefix, connection: {} },
  342. qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),
  343. closing: this.closing,
  344. waitUntilReady: async () => this.connection.client,
  345. removeListener: this.removeListener.bind(this),
  346. emit: this.emit.bind(this),
  347. on: this.on.bind(this),
  348. redisVersion: this.connection.redisVersion,
  349. databaseType: this.connection.databaseType,
  350. trace: async () => { },
  351. };
  352. }
  353. /**
  354. *
  355. * Closes the connection and returns a promise that resolves when the connection is closed.
  356. */
  357. async close() {
  358. if (!this.closing) {
  359. this.closing = this.connection.close();
  360. }
  361. await this.closing;
  362. }
  363. /**
  364. *
  365. * Force disconnects a connection.
  366. */
  367. disconnect() {
  368. return this.connection.disconnect();
  369. }
  370. }
  371. exports.FlowProducer = FlowProducer;
  372. //# sourceMappingURL=flow-producer.js.map