child-processor.js 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.ChildProcessor = void 0;
  4. const abort_controller_1 = require("./abort-controller");
  5. const enums_1 = require("../enums");
  6. const utils_1 = require("../utils");
  7. var ChildStatus;
  8. (function (ChildStatus) {
  9. ChildStatus[ChildStatus["Idle"] = 0] = "Idle";
  10. ChildStatus[ChildStatus["Started"] = 1] = "Started";
  11. ChildStatus[ChildStatus["Terminating"] = 2] = "Terminating";
  12. ChildStatus[ChildStatus["Errored"] = 3] = "Errored";
  13. })(ChildStatus || (ChildStatus = {}));
  14. const RESPONSE_TIMEOUT = process.env.NODE_ENV === 'test' ? 500 : 5000;
  15. /**
  16. * ChildProcessor
  17. *
  18. * This class acts as the interface between a child process and it parent process
  19. * so that jobs can be processed in different processes.
  20. *
  21. */
  22. class ChildProcessor {
  23. constructor(send, receiver) {
  24. this.send = send;
  25. this.receiver = receiver;
  26. }
  27. async init(processorFile) {
  28. let processor;
  29. try {
  30. const { default: processorFn } = await import(processorFile);
  31. processor = processorFn;
  32. if (processor.default) {
  33. // support es2015 module.
  34. processor = processor.default;
  35. }
  36. if (typeof processor !== 'function') {
  37. throw new Error('No function is exported in processor file');
  38. }
  39. }
  40. catch (err) {
  41. this.status = ChildStatus.Errored;
  42. return this.send({
  43. cmd: enums_1.ParentCommand.InitFailed,
  44. err: (0, utils_1.errorToJSON)(err),
  45. });
  46. }
  47. const origProcessor = processor;
  48. processor = function (job, token, signal) {
  49. try {
  50. return Promise.resolve(origProcessor(job, token, signal));
  51. }
  52. catch (err) {
  53. return Promise.reject(err);
  54. }
  55. };
  56. this.processor = processor;
  57. this.status = ChildStatus.Idle;
  58. await this.send({
  59. cmd: enums_1.ParentCommand.InitCompleted,
  60. });
  61. }
  62. async start(jobJson, token) {
  63. if (this.status !== ChildStatus.Idle) {
  64. return this.send({
  65. cmd: enums_1.ParentCommand.Error,
  66. err: (0, utils_1.errorToJSON)(new Error('cannot start a not idling child process')),
  67. });
  68. }
  69. this.status = ChildStatus.Started;
  70. this.abortController = new abort_controller_1.AbortController();
  71. this.currentJobPromise = (async () => {
  72. try {
  73. const job = this.wrapJob(jobJson, this.send);
  74. const result = await this.processor(job, token, this.abortController.signal);
  75. await this.send({
  76. cmd: enums_1.ParentCommand.Completed,
  77. value: typeof result === 'undefined' ? null : result,
  78. });
  79. }
  80. catch (err) {
  81. await this.send({
  82. cmd: enums_1.ParentCommand.Failed,
  83. value: (0, utils_1.errorToJSON)(!err.message ? new Error(err) : err),
  84. });
  85. }
  86. finally {
  87. this.status = ChildStatus.Idle;
  88. this.currentJobPromise = undefined;
  89. this.abortController = undefined;
  90. }
  91. })();
  92. }
  93. /**
  94. * Cancels the currently running job by aborting its signal.
  95. * @param reason - Optional reason for the cancellation
  96. */
  97. cancel(reason) {
  98. if (this.abortController) {
  99. this.abortController.abort(reason);
  100. }
  101. }
  102. async stop() { }
  103. async waitForCurrentJobAndExit() {
  104. this.status = ChildStatus.Terminating;
  105. try {
  106. await this.currentJobPromise;
  107. }
  108. finally {
  109. process.exit(process.exitCode || 0);
  110. }
  111. }
  112. /**
  113. * Enhance the given job argument with some functions
  114. * that can be called from the sandboxed job processor.
  115. *
  116. * Note, the `job` argument is a JSON deserialized message
  117. * from the main node process to this forked child process,
  118. * the functions on the original job object are not in tact.
  119. * The wrapped job adds back some of those original functions.
  120. */
  121. wrapJob(job, send) {
  122. const wrappedJob = Object.assign(Object.assign({}, job), { queueQualifiedName: job.queueQualifiedName, data: JSON.parse(job.data || '{}'), opts: job.opts, returnValue: JSON.parse(job.returnvalue || '{}'),
  123. /*
  124. * Proxy `updateProgress` function, should works as `progress` function.
  125. */
  126. async updateProgress(progress) {
  127. // Locally store reference to new progress value
  128. // so that we can return it from this process synchronously.
  129. this.progress = progress;
  130. // Send message to update job progress.
  131. await send({
  132. cmd: enums_1.ParentCommand.Progress,
  133. value: progress,
  134. });
  135. },
  136. /*
  137. * Proxy job `log` function.
  138. */
  139. log: async (row) => {
  140. await send({
  141. cmd: enums_1.ParentCommand.Log,
  142. value: row,
  143. });
  144. },
  145. /*
  146. * Proxy `moveToDelayed` function.
  147. */
  148. moveToDelayed: async (timestamp, token) => {
  149. await send({
  150. cmd: enums_1.ParentCommand.MoveToDelayed,
  151. value: { timestamp, token },
  152. });
  153. },
  154. /*
  155. * Proxy `moveToWait` function.
  156. */
  157. moveToWait: async (token) => {
  158. await send({
  159. cmd: enums_1.ParentCommand.MoveToWait,
  160. value: { token },
  161. });
  162. },
  163. /*
  164. * Proxy `moveToWaitingChildren` function.
  165. */
  166. moveToWaitingChildren: async (token, opts) => {
  167. const requestId = Math.random().toString(36).substring(2, 15);
  168. await send({
  169. requestId,
  170. cmd: enums_1.ParentCommand.MoveToWaitingChildren,
  171. value: { token, opts },
  172. });
  173. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'moveToWaitingChildren');
  174. },
  175. /*
  176. * Proxy `updateData` function.
  177. */
  178. updateData: async (data) => {
  179. await send({
  180. cmd: enums_1.ParentCommand.Update,
  181. value: data,
  182. });
  183. wrappedJob.data = data;
  184. },
  185. /**
  186. * Proxy `getChildrenValues` function.
  187. */
  188. getChildrenValues: async () => {
  189. const requestId = Math.random().toString(36).substring(2, 15);
  190. await send({
  191. requestId,
  192. cmd: enums_1.ParentCommand.GetChildrenValues,
  193. });
  194. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getChildrenValues');
  195. },
  196. /**
  197. * Proxy `getIgnoredChildrenFailures` function.
  198. *
  199. * This method sends a request to retrieve the failures of ignored children
  200. * and waits for a response from the parent process.
  201. *
  202. * @returns - A promise that resolves with the ignored children failures.
  203. * The exact structure of the returned data depends on the parent process implementation.
  204. */
  205. getIgnoredChildrenFailures: async () => {
  206. const requestId = Math.random().toString(36).substring(2, 15);
  207. await send({
  208. requestId,
  209. cmd: enums_1.ParentCommand.GetIgnoredChildrenFailures,
  210. });
  211. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getIgnoredChildrenFailures');
  212. },
  213. /**
  214. * Proxy `getDependenciesCount` function.
  215. */
  216. getDependenciesCount: async (opts) => {
  217. const requestId = Math.random().toString(36).substring(2, 15);
  218. await send({
  219. requestId,
  220. cmd: enums_1.ParentCommand.GetDependenciesCount,
  221. value: opts,
  222. });
  223. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getDependenciesCount');
  224. },
  225. /**
  226. * Proxy `getDependencies` function.
  227. */
  228. getDependencies: async (opts) => {
  229. const requestId = Math.random().toString(36).substring(2, 15);
  230. await send({
  231. requestId,
  232. cmd: enums_1.ParentCommand.GetDependencies,
  233. value: opts,
  234. });
  235. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getDependencies');
  236. } });
  237. return wrappedJob;
  238. }
  239. }
  240. exports.ChildProcessor = ChildProcessor;
  241. const waitResponse = async (requestId, receiver, timeout, cmd) => {
  242. return new Promise((resolve, reject) => {
  243. const listener = (msg) => {
  244. if (msg.requestId === requestId) {
  245. resolve(msg.value);
  246. receiver.off('message', listener);
  247. }
  248. };
  249. receiver.on('message', listener);
  250. setTimeout(() => {
  251. receiver.off('message', listener);
  252. reject(new Error(`TimeoutError: ${cmd} timed out in (${timeout}ms)`));
  253. }, timeout);
  254. });
  255. };
  256. //# sourceMappingURL=child-processor.js.map