| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- import { EventEmitter } from 'events';
- import { v4 } from 'uuid';
- import { getParentKey, isRedisInstance, trace } from '../utils';
- import { Job } from './job';
- import { QueueKeys } from './queue-keys';
- import { RedisConnection } from './redis-connection';
- import { SpanKind, TelemetryAttributes } from '../enums';
- /**
- * This class allows to add jobs with dependencies between them in such
- * a way that it is possible to build complex flows.
- * Note: A flow is a tree-like structure of jobs that depend on each other.
- * Whenever the children of a given parent are completed, the parent
- * will be processed, being able to access the children's result data.
- * All Jobs can be in different queues, either children or parents,
- */
- export class FlowProducer extends EventEmitter {
- constructor(opts = { connection: {} }, Connection = RedisConnection) {
- super();
- this.opts = opts;
- this.opts = Object.assign({ prefix: 'bull' }, opts);
- this.connection = new Connection(opts.connection, {
- shared: isRedisInstance(opts.connection),
- blocking: false,
- skipVersionCheck: opts.skipVersionCheck,
- skipWaitingForReady: opts.skipWaitingForReady,
- });
- this.connection.on('error', (error) => this.emit('error', error));
- this.connection.on('close', () => {
- if (!this.closing) {
- this.emit('ioredis:close');
- }
- });
- this.queueKeys = new QueueKeys(opts.prefix);
- if (opts === null || opts === void 0 ? void 0 : opts.telemetry) {
- this.telemetry = opts.telemetry;
- }
- }
- emit(event, ...args) {
- return super.emit(event, ...args);
- }
- off(eventName, listener) {
- super.off(eventName, listener);
- return this;
- }
- on(event, listener) {
- super.on(event, listener);
- return this;
- }
- once(event, listener) {
- super.once(event, listener);
- return this;
- }
- /**
- * Returns a promise that resolves to a redis client. Normally used only by subclasses.
- */
- get client() {
- return this.connection.client;
- }
- /**
- * Helper to easily extend Job class calls.
- */
- get Job() {
- return Job;
- }
- waitUntilReady() {
- return this.client;
- }
- /**
- * Adds a flow.
- *
- * This call would be atomic, either it fails and no jobs will
- * be added to the queues, or it succeeds and all jobs will be added.
- *
- * @param flow - an object with a tree-like structure where children jobs
- * will be processed before their parents.
- * @param opts - options that will be applied to the flow object.
- */
- async add(flow, opts) {
- var _a;
- if (this.closing) {
- return;
- }
- const client = await this.connection.client;
- const multi = client.multi();
- const parentOpts = (_a = flow === null || flow === void 0 ? void 0 : flow.opts) === null || _a === void 0 ? void 0 : _a.parent;
- const parentKey = getParentKey(parentOpts);
- const parentDependenciesKey = parentKey
- ? `${parentKey}:dependencies`
- : undefined;
- return trace(this.telemetry, SpanKind.PRODUCER, flow.queueName, 'addFlow', flow.queueName, async (span) => {
- span === null || span === void 0 ? void 0 : span.setAttributes({
- [TelemetryAttributes.FlowName]: flow.name,
- });
- const jobsTree = await this.addNode({
- multi,
- node: flow,
- queuesOpts: opts === null || opts === void 0 ? void 0 : opts.queuesOptions,
- parent: {
- parentOpts,
- parentDependenciesKey,
- },
- });
- const results = (await multi.exec());
- const [result] = results || [];
- if (result) {
- const [err, jobId] = result;
- if (!err && typeof jobId === 'string') {
- jobsTree.job.id = jobId;
- }
- }
- return jobsTree;
- });
- }
- /**
- * Get a flow.
- *
- * @param opts - an object with options for getting a JobNode.
- */
- async getFlow(opts) {
- if (this.closing) {
- return;
- }
- const client = await this.connection.client;
- const updatedOpts = Object.assign({
- depth: 10,
- maxChildren: 20,
- prefix: this.opts.prefix,
- }, opts);
- const jobsTree = this.getNode(client, updatedOpts);
- return jobsTree;
- }
- /**
- * Adds multiple flows.
- *
- * A flow is a tree-like structure of jobs that depend on each other.
- * Whenever the children of a given parent are completed, the parent
- * will be processed, being able to access the children's result data.
- *
- * All Jobs can be in different queues, either children or parents,
- * however this call would be atomic, either it fails and no jobs will
- * be added to the queues, or it succeeds and all jobs will be added.
- *
- * @param flows - an array of objects with a tree-like structure where children jobs
- * will be processed before their parents.
- */
- async addBulk(flows) {
- if (this.closing) {
- return;
- }
- const client = await this.connection.client;
- const multi = client.multi();
- return trace(this.telemetry, SpanKind.PRODUCER, '', 'addBulkFlows', '', async (span) => {
- span === null || span === void 0 ? void 0 : span.setAttributes({
- [TelemetryAttributes.BulkCount]: flows.length,
- [TelemetryAttributes.BulkNames]: flows
- .map(flow => flow.name)
- .join(','),
- });
- const jobsTrees = await this.addNodes(multi, flows);
- const results = (await multi.exec());
- for (let index = 0; index < jobsTrees.length; ++index) {
- const result = results === null || results === void 0 ? void 0 : results[index];
- if (!result) {
- continue;
- }
- const [err, jobId] = result;
- if (!err && typeof jobId === 'string') {
- jobsTrees[index].job.id = jobId;
- }
- }
- return jobsTrees;
- });
- }
- /**
- * Add a node (job) of a flow to the queue. This method will recursively
- * add all its children as well. Note that a given job can potentially be
- * a parent and a child job at the same time depending on where it is located
- * in the tree hierarchy.
- *
- * @param multi - ioredis ChainableCommander
- * @param node - the node representing a job to be added to some queue
- * @param parent - parent data sent to children to create the "links" to their parent
- * @returns
- */
- async addNode({ multi, node, parent, queuesOpts, }) {
- var _a, _b;
- const prefix = node.prefix || this.opts.prefix;
- const queue = this.queueFromNode(node, new QueueKeys(prefix), prefix);
- const queueOpts = queuesOpts && queuesOpts[node.queueName];
- const jobsOpts = (_a = queueOpts === null || queueOpts === void 0 ? void 0 : queueOpts.defaultJobOptions) !== null && _a !== void 0 ? _a : {};
- const jobId = ((_b = node.opts) === null || _b === void 0 ? void 0 : _b.jobId) || v4();
- return trace(this.telemetry, SpanKind.PRODUCER, node.queueName, 'addNode', node.queueName, async (span, srcPropagationMetadata) => {
- var _a, _b;
- span === null || span === void 0 ? void 0 : span.setAttributes({
- [TelemetryAttributes.JobName]: node.name,
- [TelemetryAttributes.JobId]: jobId,
- });
- const opts = node.opts;
- let telemetry = opts === null || opts === void 0 ? void 0 : opts.telemetry;
- if (srcPropagationMetadata && opts) {
- const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
- const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
- (!omitContext && srcPropagationMetadata);
- if (telemetryMetadata || omitContext) {
- telemetry = {
- metadata: telemetryMetadata,
- omitContext,
- };
- }
- }
- const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetry }), jobId);
- const parentKey = getParentKey(parent === null || parent === void 0 ? void 0 : parent.parentOpts);
- if (node.children && node.children.length > 0) {
- // Create the parent job, it will be a job in status "waiting-children".
- const parentId = jobId;
- const queueKeysParent = new QueueKeys(node.prefix || this.opts.prefix);
- await job.addJob(multi, {
- parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
- addToWaitingChildren: true,
- parentKey,
- });
- const parentDependenciesKey = `${queueKeysParent.toKey(node.queueName, parentId)}:dependencies`;
- const children = await this.addChildren({
- multi,
- nodes: node.children,
- parent: {
- parentOpts: {
- id: parentId,
- queue: queueKeysParent.getQueueQualifiedName(node.queueName),
- },
- parentDependenciesKey,
- },
- queuesOpts,
- });
- return { job, children };
- }
- else {
- await job.addJob(multi, {
- parentDependenciesKey: parent === null || parent === void 0 ? void 0 : parent.parentDependenciesKey,
- parentKey,
- });
- return { job };
- }
- });
- }
- /**
- * Adds nodes (jobs) of multiple flows to the queue. This method will recursively
- * add all its children as well. Note that a given job can potentially be
- * a parent and a child job at the same time depending on where it is located
- * in the tree hierarchy.
- *
- * @param multi - ioredis ChainableCommander
- * @param nodes - the nodes representing jobs to be added to some queue
- * @returns
- */
- addNodes(multi, nodes) {
- return Promise.all(nodes.map(node => {
- var _a;
- const parentOpts = (_a = node === null || node === void 0 ? void 0 : node.opts) === null || _a === void 0 ? void 0 : _a.parent;
- const parentKey = getParentKey(parentOpts);
- const parentDependenciesKey = parentKey
- ? `${parentKey}:dependencies`
- : undefined;
- return this.addNode({
- multi,
- node,
- parent: {
- parentOpts,
- parentDependenciesKey,
- },
- });
- }));
- }
- async getNode(client, node) {
- const queue = this.queueFromNode(node, new QueueKeys(node.prefix), node.prefix);
- const job = await this.Job.fromId(queue, node.id);
- if (job) {
- const { processed = {}, unprocessed = [], failed = [], ignored = {}, } = await job.getDependencies({
- failed: {
- count: node.maxChildren,
- },
- processed: {
- count: node.maxChildren,
- },
- unprocessed: {
- count: node.maxChildren,
- },
- ignored: {
- count: node.maxChildren,
- },
- });
- const processedKeys = Object.keys(processed);
- const ignoredKeys = Object.keys(ignored);
- const childrenCount = processedKeys.length +
- unprocessed.length +
- ignoredKeys.length +
- failed.length;
- const newDepth = node.depth - 1;
- if (childrenCount > 0 && newDepth) {
- const children = await this.getChildren(client, [...processedKeys, ...unprocessed, ...failed, ...ignoredKeys], newDepth, node.maxChildren);
- return { job, children };
- }
- else {
- return { job };
- }
- }
- }
- addChildren({ multi, nodes, parent, queuesOpts }) {
- return Promise.all(nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })));
- }
- getChildren(client, childrenKeys, depth, maxChildren) {
- const getChild = (key) => {
- const [prefix, queueName, id] = key.split(':');
- return this.getNode(client, {
- id,
- queueName,
- prefix,
- depth,
- maxChildren,
- });
- };
- return Promise.all([...childrenKeys.map(getChild)]);
- }
- /**
- * Helper factory method that creates a queue-like object
- * required to create jobs in any queue.
- *
- * @param node -
- * @param queueKeys -
- * @returns
- */
- queueFromNode(node, queueKeys, prefix) {
- return {
- client: this.connection.client,
- name: node.queueName,
- keys: queueKeys.getKeys(node.queueName),
- toKey: (type) => queueKeys.toKey(node.queueName, type),
- opts: { prefix, connection: {} },
- qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),
- closing: this.closing,
- waitUntilReady: async () => this.connection.client,
- removeListener: this.removeListener.bind(this),
- emit: this.emit.bind(this),
- on: this.on.bind(this),
- redisVersion: this.connection.redisVersion,
- databaseType: this.connection.databaseType,
- trace: async () => { },
- };
- }
- /**
- *
- * Closes the connection and returns a promise that resolves when the connection is closed.
- */
- async close() {
- if (!this.closing) {
- this.closing = this.connection.close();
- }
- await this.closing;
- }
- /**
- *
- * Force disconnects a connection.
- */
- disconnect() {
- return this.connection.disconnect();
- }
- }
- //# sourceMappingURL=flow-producer.js.map
|