child-processor.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import { AbortController } from './abort-controller';
  2. import { ParentCommand } from '../enums';
  3. import { errorToJSON } from '../utils';
  4. var ChildStatus;
  5. (function (ChildStatus) {
  6. ChildStatus[ChildStatus["Idle"] = 0] = "Idle";
  7. ChildStatus[ChildStatus["Started"] = 1] = "Started";
  8. ChildStatus[ChildStatus["Terminating"] = 2] = "Terminating";
  9. ChildStatus[ChildStatus["Errored"] = 3] = "Errored";
  10. })(ChildStatus || (ChildStatus = {}));
  11. const RESPONSE_TIMEOUT = process.env.NODE_ENV === 'test' ? 500 : 5000;
  12. /**
  13. * ChildProcessor
  14. *
  15. * This class acts as the interface between a child process and it parent process
  16. * so that jobs can be processed in different processes.
  17. *
  18. */
  19. export class ChildProcessor {
  20. constructor(send, receiver) {
  21. this.send = send;
  22. this.receiver = receiver;
  23. }
  24. async init(processorFile) {
  25. let processor;
  26. try {
  27. const { default: processorFn } = await import(processorFile);
  28. processor = processorFn;
  29. if (processor.default) {
  30. // support es2015 module.
  31. processor = processor.default;
  32. }
  33. if (typeof processor !== 'function') {
  34. throw new Error('No function is exported in processor file');
  35. }
  36. }
  37. catch (err) {
  38. this.status = ChildStatus.Errored;
  39. return this.send({
  40. cmd: ParentCommand.InitFailed,
  41. err: errorToJSON(err),
  42. });
  43. }
  44. const origProcessor = processor;
  45. processor = function (job, token, signal) {
  46. try {
  47. return Promise.resolve(origProcessor(job, token, signal));
  48. }
  49. catch (err) {
  50. return Promise.reject(err);
  51. }
  52. };
  53. this.processor = processor;
  54. this.status = ChildStatus.Idle;
  55. await this.send({
  56. cmd: ParentCommand.InitCompleted,
  57. });
  58. }
  59. async start(jobJson, token) {
  60. if (this.status !== ChildStatus.Idle) {
  61. return this.send({
  62. cmd: ParentCommand.Error,
  63. err: errorToJSON(new Error('cannot start a not idling child process')),
  64. });
  65. }
  66. this.status = ChildStatus.Started;
  67. this.abortController = new AbortController();
  68. this.currentJobPromise = (async () => {
  69. try {
  70. const job = this.wrapJob(jobJson, this.send);
  71. const result = await this.processor(job, token, this.abortController.signal);
  72. await this.send({
  73. cmd: ParentCommand.Completed,
  74. value: typeof result === 'undefined' ? null : result,
  75. });
  76. }
  77. catch (err) {
  78. await this.send({
  79. cmd: ParentCommand.Failed,
  80. value: errorToJSON(!err.message ? new Error(err) : err),
  81. });
  82. }
  83. finally {
  84. this.status = ChildStatus.Idle;
  85. this.currentJobPromise = undefined;
  86. this.abortController = undefined;
  87. }
  88. })();
  89. }
  90. /**
  91. * Cancels the currently running job by aborting its signal.
  92. * @param reason - Optional reason for the cancellation
  93. */
  94. cancel(reason) {
  95. if (this.abortController) {
  96. this.abortController.abort(reason);
  97. }
  98. }
  99. async stop() { }
  100. async waitForCurrentJobAndExit() {
  101. this.status = ChildStatus.Terminating;
  102. try {
  103. await this.currentJobPromise;
  104. }
  105. finally {
  106. process.exit(process.exitCode || 0);
  107. }
  108. }
  109. /**
  110. * Enhance the given job argument with some functions
  111. * that can be called from the sandboxed job processor.
  112. *
  113. * Note, the `job` argument is a JSON deserialized message
  114. * from the main node process to this forked child process,
  115. * the functions on the original job object are not in tact.
  116. * The wrapped job adds back some of those original functions.
  117. */
  118. wrapJob(job, send) {
  119. const wrappedJob = Object.assign(Object.assign({}, job), { queueQualifiedName: job.queueQualifiedName, data: JSON.parse(job.data || '{}'), opts: job.opts, returnValue: JSON.parse(job.returnvalue || '{}'),
  120. /*
  121. * Proxy `updateProgress` function, should works as `progress` function.
  122. */
  123. async updateProgress(progress) {
  124. // Locally store reference to new progress value
  125. // so that we can return it from this process synchronously.
  126. this.progress = progress;
  127. // Send message to update job progress.
  128. await send({
  129. cmd: ParentCommand.Progress,
  130. value: progress,
  131. });
  132. },
  133. /*
  134. * Proxy job `log` function.
  135. */
  136. log: async (row) => {
  137. await send({
  138. cmd: ParentCommand.Log,
  139. value: row,
  140. });
  141. },
  142. /*
  143. * Proxy `moveToDelayed` function.
  144. */
  145. moveToDelayed: async (timestamp, token) => {
  146. await send({
  147. cmd: ParentCommand.MoveToDelayed,
  148. value: { timestamp, token },
  149. });
  150. },
  151. /*
  152. * Proxy `moveToWait` function.
  153. */
  154. moveToWait: async (token) => {
  155. await send({
  156. cmd: ParentCommand.MoveToWait,
  157. value: { token },
  158. });
  159. },
  160. /*
  161. * Proxy `moveToWaitingChildren` function.
  162. */
  163. moveToWaitingChildren: async (token, opts) => {
  164. const requestId = Math.random().toString(36).substring(2, 15);
  165. await send({
  166. requestId,
  167. cmd: ParentCommand.MoveToWaitingChildren,
  168. value: { token, opts },
  169. });
  170. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'moveToWaitingChildren');
  171. },
  172. /*
  173. * Proxy `updateData` function.
  174. */
  175. updateData: async (data) => {
  176. await send({
  177. cmd: ParentCommand.Update,
  178. value: data,
  179. });
  180. wrappedJob.data = data;
  181. },
  182. /**
  183. * Proxy `getChildrenValues` function.
  184. */
  185. getChildrenValues: async () => {
  186. const requestId = Math.random().toString(36).substring(2, 15);
  187. await send({
  188. requestId,
  189. cmd: ParentCommand.GetChildrenValues,
  190. });
  191. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getChildrenValues');
  192. },
  193. /**
  194. * Proxy `getIgnoredChildrenFailures` function.
  195. *
  196. * This method sends a request to retrieve the failures of ignored children
  197. * and waits for a response from the parent process.
  198. *
  199. * @returns - A promise that resolves with the ignored children failures.
  200. * The exact structure of the returned data depends on the parent process implementation.
  201. */
  202. getIgnoredChildrenFailures: async () => {
  203. const requestId = Math.random().toString(36).substring(2, 15);
  204. await send({
  205. requestId,
  206. cmd: ParentCommand.GetIgnoredChildrenFailures,
  207. });
  208. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getIgnoredChildrenFailures');
  209. },
  210. /**
  211. * Proxy `getDependenciesCount` function.
  212. */
  213. getDependenciesCount: async (opts) => {
  214. const requestId = Math.random().toString(36).substring(2, 15);
  215. await send({
  216. requestId,
  217. cmd: ParentCommand.GetDependenciesCount,
  218. value: opts,
  219. });
  220. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getDependenciesCount');
  221. },
  222. /**
  223. * Proxy `getDependencies` function.
  224. */
  225. getDependencies: async (opts) => {
  226. const requestId = Math.random().toString(36).substring(2, 15);
  227. await send({
  228. requestId,
  229. cmd: ParentCommand.GetDependencies,
  230. value: opts,
  231. });
  232. return waitResponse(requestId, this.receiver, RESPONSE_TIMEOUT, 'getDependencies');
  233. } });
  234. return wrappedJob;
  235. }
  236. }
  237. const waitResponse = async (requestId, receiver, timeout, cmd) => {
  238. return new Promise((resolve, reject) => {
  239. const listener = (msg) => {
  240. if (msg.requestId === requestId) {
  241. resolve(msg.value);
  242. receiver.off('message', listener);
  243. }
  244. };
  245. receiver.on('message', listener);
  246. setTimeout(() => {
  247. receiver.off('message', listener);
  248. reject(new Error(`TimeoutError: ${cmd} timed out in (${timeout}ms)`));
  249. }, timeout);
  250. });
  251. };
  252. //# sourceMappingURL=child-processor.js.map