flow-producer.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. import { EventEmitter } from 'events';
  2. import { v4 } from 'uuid';
  3. import { getParentKey, isRedisInstance, trace } from '../utils';
  4. import { Job } from './job';
  5. import { QueueKeys } from './queue-keys';
  6. import { RedisConnection } from './redis-connection';
  7. import { SpanKind, TelemetryAttributes } from '../enums';
  8. /**
  9. * This class allows to add jobs with dependencies between them in such
  10. * a way that it is possible to build complex flows.
  11. * Note: A flow is a tree-like structure of jobs that depend on each other.
  12. * Whenever the children of a given parent are completed, the parent
  13. * will be processed, being able to access the children's result data.
  14. * All Jobs can be in different queues, either children or parents,
  15. */
  16. export class FlowProducer extends EventEmitter {
  17. constructor(opts = { connection: {} }, Connection = RedisConnection) {
  18. super();
  19. this.opts = opts;
  20. this.opts = Object.assign({ prefix: 'bull' }, opts);
  21. this.connection = new Connection(opts.connection, {
  22. shared: isRedisInstance(opts.connection),
  23. blocking: false,
  24. skipVersionCheck: opts.skipVersionCheck,
  25. skipWaitingForReady: opts.skipWaitingForReady,
  26. });
  27. this.connection.on('error', (error) => this.emit('error', error));
  28. this.connection.on('close', () => {
  29. if (!this.closing) {
  30. this.emit('ioredis:close');
  31. }
  32. });
  33. this.queueKeys = new QueueKeys(opts.prefix);
  34. if (opts === null || opts === void 0 ? void 0 : opts.telemetry) {
  35. this.telemetry = opts.telemetry;
  36. }
  37. }
  38. emit(event, ...args) {
  39. return super.emit(event, ...args);
  40. }
  41. off(eventName, listener) {
  42. super.off(eventName, listener);
  43. return this;
  44. }
  45. on(event, listener) {
  46. super.on(event, listener);
  47. return this;
  48. }
  49. once(event, listener) {
  50. super.once(event, listener);
  51. return this;
  52. }
  53. /**
  54. * Returns a promise that resolves to a redis client. Normally used only by subclasses.
  55. */
  56. get client() {
  57. return this.connection.client;
  58. }
  59. /**
  60. * Helper to easily extend Job class calls.
  61. */
  62. get Job() {
  63. return Job;
  64. }
  65. waitUntilReady() {
  66. return this.client;
  67. }
  68. /**
  69. * Adds a flow.
  70. *
  71. * This call would be atomic, either it fails and no jobs will
  72. * be added to the queues, or it succeeds and all jobs will be added.
  73. *
  74. * @param flow - an object with a tree-like structure where children jobs
  75. * will be processed before their parents.
  76. * @param opts - options that will be applied to the flow object.
  77. */
  78. async add(flow, opts) {
  79. var _a;
  80. if (this.closing) {
  81. return;
  82. }
  83. const client = await this.connection.client;
  84. const multi = client.multi();
  85. const parentOpts = (_a = flow === null || flow === void 0 ? void 0 : flow.opts) === null || _a === void 0 ? void 0 : _a.parent;
  86. const parentKey = getParentKey(parentOpts);
  87. const parentDependenciesKey = parentKey
  88. ? `${parentKey}:dependencies`
  89. : undefined;
  90. return trace(this.telemetry, SpanKind.PRODUCER, flow.queueName, 'addFlow', flow.queueName, async (span) => {
  91. span === null || span === void 0 ? void 0 : span.setAttributes({
  92. [TelemetryAttributes.FlowName]: flow.name,
  93. });
  94. const jobsTree = await this.addNode({
  95. multi,
  96. node: flow,
  97. queuesOpts: opts === null || opts === void 0 ? void 0 : opts.queuesOptions,
  98. parent: {
  99. parentOpts,
  100. parentDependenciesKey,
  101. },
  102. });
  103. const results = (await multi.exec());
  104. const [result] = results || [];
  105. if (result) {
  106. const [err, jobId] = result;
  107. if (!err && typeof jobId === 'string') {
  108. jobsTree.job.id = jobId;
  109. }
  110. }
  111. return jobsTree;
  112. });
  113. }
  114. /**
  115. * Get a flow.
  116. *
  117. * @param opts - an object with options for getting a JobNode.
  118. */
  119. async getFlow(opts) {
  120. if (this.closing) {
  121. return;
  122. }
  123. const client = await this.connection.client;
  124. const updatedOpts = Object.assign({
  125. depth: 10,
  126. maxChildren: 20,
  127. prefix: this.opts.prefix,
  128. }, opts);
  129. const jobsTree = this.getNode(client, updatedOpts);
  130. return jobsTree;
  131. }
  132. /**
  133. * Adds multiple flows.
  134. *
  135. * A flow is a tree-like structure of jobs that depend on each other.
  136. * Whenever the children of a given parent are completed, the parent
  137. * will be processed, being able to access the children's result data.
  138. *
  139. * All Jobs can be in different queues, either children or parents,
  140. * however this call would be atomic, either it fails and no jobs will
  141. * be added to the queues, or it succeeds and all jobs will be added.
  142. *
  143. * @param flows - an array of objects with a tree-like structure where children jobs
  144. * will be processed before their parents.
  145. */
  146. async addBulk(flows) {
  147. if (this.closing) {
  148. return;
  149. }
  150. const client = await this.connection.client;
  151. const multi = client.multi();
  152. return trace(this.telemetry, SpanKind.PRODUCER, '', 'addBulkFlows', '', async (span) => {
  153. span === null || span === void 0 ? void 0 : span.setAttributes({
  154. [TelemetryAttributes.BulkCount]: flows.length,
  155. [TelemetryAttributes.BulkNames]: flows
  156. .map(flow => flow.name)
  157. .join(','),
  158. });
  159. const jobsTrees = await this.addNodes(multi, flows);
  160. const results = (await multi.exec());
  161. for (let index = 0; index < jobsTrees.length; ++index) {
  162. const result = results === null || results === void 0 ? void 0 : results[index];
  163. if (!result) {
  164. continue;
  165. }
  166. const [err, jobId] = result;
  167. if (!err && typeof jobId === 'string') {
  168. jobsTrees[index].job.id = jobId;
  169. }
  170. }
  171. return jobsTrees;
  172. });
  173. }
  174. /**
  175. * Add a node (job) of a flow to the queue. This method will recursively
  176. * add all its children as well. Note that a given job can potentially be
  177. * a parent and a child job at the same time depending on where it is located
  178. * in the tree hierarchy.
  179. *
  180. * @param multi - ioredis ChainableCommander
  181. * @param node - the node representing a job to be added to some queue
  182. * @param parent - parent data sent to children to create the "links" to their parent
  183. * @returns
  184. */
  185. async addNode({ multi, node, parent, queuesOpts, }) {
  186. var _a, _b;
  187. const prefix = node.prefix || this.opts.prefix;
  188. const queue = this.queueFromNode(node, new QueueKeys(prefix), prefix);
  189. const queueOpts = queuesOpts && queuesOpts[node.queueName];
  190. const jobsOpts = (_a = queueOpts === null || queueOpts === void 0 ? void 0 : queueOpts.defaultJobOptions) !== null && _a !== void 0 ? _a : {};
  191. const jobId = ((_b = node.opts) === null || _b === void 0 ? void 0 : _b.jobId) || v4();
  192. return trace(this.telemetry, SpanKind.PRODUCER, node.queueName, 'addNode', node.queueName, async (span, srcPropagationMetadata) => {
  193. var _a, _b;
  194. span === null || span === void 0 ? void 0 : span.setAttributes({
  195. [TelemetryAttributes.JobName]: node.name,
  196. [TelemetryAttributes.JobId]: jobId,
  197. });
  198. const opts = node.opts;
  199. let telemetry = opts === null || opts === void 0 ? void 0 : opts.telemetry;
  200. if (srcPropagationMetadata && opts) {
  201. const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
  202. const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
  203. (!omitContext && srcPropagationMetadata);
  204. if (telemetryMetadata || omitContext) {
  205. telemetry = {
  206. metadata: telemetryMetadata,
  207. omitContext,
  208. };
  209. }
  210. }
  211. const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetry }), jobId);
  212. const parentKey = getParentKey(parent === null || parent === void 0 ? void 0 : parent.parentOpts);
  213. if (node.children && node.children.length > 0) {
  214. // Create the parent job, it will be a job in status "waiting-children".
  215. const parentId = jobId;
  216. const queueKeysParent = new QueueKeys(node.prefix || this.opts.prefix);
  217. await job.addJob(multi, {
  218. parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
  219. addToWaitingChildren: true,
  220. parentKey,
  221. });
  222. const parentDependenciesKey = `${queueKeysParent.toKey(node.queueName, parentId)}:dependencies`;
  223. const children = await this.addChildren({
  224. multi,
  225. nodes: node.children,
  226. parent: {
  227. parentOpts: {
  228. id: parentId,
  229. queue: queueKeysParent.getQueueQualifiedName(node.queueName),
  230. },
  231. parentDependenciesKey,
  232. },
  233. queuesOpts,
  234. });
  235. return { job, children };
  236. }
  237. else {
  238. await job.addJob(multi, {
  239. parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
  240. parentKey,
  241. });
  242. return { job };
  243. }
  244. });
  245. }
  246. /**
  247. * Adds nodes (jobs) of multiple flows to the queue. This method will recursively
  248. * add all its children as well. Note that a given job can potentially be
  249. * a parent and a child job at the same time depending on where it is located
  250. * in the tree hierarchy.
  251. *
  252. * @param multi - ioredis ChainableCommander
  253. * @param nodes - the nodes representing jobs to be added to some queue
  254. * @returns
  255. */
  256. addNodes(multi, nodes) {
  257. return Promise.all(nodes.map(node => {
  258. var _a;
  259. const parentOpts = (_a = node === null || node === void 0 ? void 0 : node.opts) === null || _a === void 0 ? void 0 : _a.parent;
  260. const parentKey = getParentKey(parentOpts);
  261. const parentDependenciesKey = parentKey
  262. ? `${parentKey}:dependencies`
  263. : undefined;
  264. return this.addNode({
  265. multi,
  266. node,
  267. parent: {
  268. parentOpts,
  269. parentDependenciesKey,
  270. },
  271. });
  272. }));
  273. }
  274. async getNode(client, node) {
  275. const queue = this.queueFromNode(node, new QueueKeys(node.prefix), node.prefix);
  276. const job = await this.Job.fromId(queue, node.id);
  277. if (job) {
  278. const { processed = {}, unprocessed = [], failed = [], ignored = {}, } = await job.getDependencies({
  279. failed: {
  280. count: node.maxChildren,
  281. },
  282. processed: {
  283. count: node.maxChildren,
  284. },
  285. unprocessed: {
  286. count: node.maxChildren,
  287. },
  288. ignored: {
  289. count: node.maxChildren,
  290. },
  291. });
  292. const processedKeys = Object.keys(processed);
  293. const ignoredKeys = Object.keys(ignored);
  294. const childrenCount = processedKeys.length +
  295. unprocessed.length +
  296. ignoredKeys.length +
  297. failed.length;
  298. const newDepth = node.depth - 1;
  299. if (childrenCount > 0 && newDepth) {
  300. const children = await this.getChildren(client, [...processedKeys, ...unprocessed, ...failed, ...ignoredKeys], newDepth, node.maxChildren);
  301. return { job, children };
  302. }
  303. else {
  304. return { job };
  305. }
  306. }
  307. }
  308. addChildren({ multi, nodes, parent, queuesOpts }) {
  309. return Promise.all(nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })));
  310. }
  311. getChildren(client, childrenKeys, depth, maxChildren) {
  312. const getChild = (key) => {
  313. const [prefix, queueName, id] = key.split(':');
  314. return this.getNode(client, {
  315. id,
  316. queueName,
  317. prefix,
  318. depth,
  319. maxChildren,
  320. });
  321. };
  322. return Promise.all([...childrenKeys.map(getChild)]);
  323. }
  324. /**
  325. * Helper factory method that creates a queue-like object
  326. * required to create jobs in any queue.
  327. *
  328. * @param node -
  329. * @param queueKeys -
  330. * @returns
  331. */
  332. queueFromNode(node, queueKeys, prefix) {
  333. return {
  334. client: this.connection.client,
  335. name: node.queueName,
  336. keys: queueKeys.getKeys(node.queueName),
  337. toKey: (type) => queueKeys.toKey(node.queueName, type),
  338. opts: { prefix, connection: {} },
  339. qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),
  340. closing: this.closing,
  341. waitUntilReady: async () => this.connection.client,
  342. removeListener: this.removeListener.bind(this),
  343. emit: this.emit.bind(this),
  344. on: this.on.bind(this),
  345. redisVersion: this.connection.redisVersion,
  346. databaseType: this.connection.databaseType,
  347. trace: async () => { },
  348. };
  349. }
  350. /**
  351. *
  352. * Closes the connection and returns a promise that resolves when the connection is closed.
  353. */
  354. async close() {
  355. if (!this.closing) {
  356. this.closing = this.connection.close();
  357. }
  358. await this.closing;
  359. }
  360. /**
  361. *
  362. * Force disconnects a connection.
  363. */
  364. disconnect() {
  365. return this.connection.disconnect();
  366. }
  367. }
  368. //# sourceMappingURL=flow-producer.js.map