summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/internal/main/worker_thread.js116
-rw-r--r--lib/internal/process/worker_thread_only.js107
2 files changed, 103 insertions, 120 deletions
diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js
index 62a4bb1b7d..94d0e613e8 100644
--- a/lib/internal/main/worker_thread.js
+++ b/lib/internal/main/worker_thread.js
@@ -10,30 +10,118 @@ const {
} = require('internal/bootstrap/pre_execution');
const {
- getEnvMessagePort,
- threadId
+ threadId,
+ getEnvMessagePort
} = internalBinding('worker');
const {
- createMessageHandler,
- createWorkerFatalExeception
-} = require('internal/process/worker_thread_only');
+ messageTypes: {
+ // Messages that may be received by workers
+ LOAD_SCRIPT,
+ // Messages that may be posted from workers
+ UP_AND_RUNNING,
+ ERROR_MESSAGE,
+ COULD_NOT_SERIALIZE_ERROR,
+ // Messages that may be either received or posted
+ STDIO_PAYLOAD,
+ STDIO_WANTS_MORE_DATA,
+ },
+ kStdioWantsMoreDataCallback
+} = require('internal/worker/io');
+const {
+ fatalException: originalFatalException
+} = require('internal/process/execution');
+
+const publicWorker = require('worker_threads');
const debug = require('util').debuglog('worker');
-debug(`[${threadId}] is setting up worker child environment`);
-function prepareUserCodeExecution() {
- initializeClusterIPC();
- initializeESMLoader();
- loadPreloadModules();
-}
+debug(`[${threadId}] is setting up worker child environment`);
// Set up the message port and start listening
const port = getEnvMessagePort();
-port.on('message', createMessageHandler(port, prepareUserCodeExecution));
-port.start();
+
+port.on('message', (message) => {
+ if (message.type === LOAD_SCRIPT) {
+ const {
+ filename,
+ doEval,
+ workerData,
+ publicPort,
+ manifestSrc,
+ manifestURL,
+ hasStdin
+ } = message;
+ if (manifestSrc) {
+ require('internal/process/policy').setup(manifestSrc, manifestURL);
+ }
+ initializeClusterIPC();
+ initializeESMLoader();
+ loadPreloadModules();
+ publicWorker.parentPort = publicPort;
+ publicWorker.workerData = workerData;
+
+ if (!hasStdin)
+ process.stdin.push(null);
+
+ debug(`[${threadId}] starts worker script ${filename} ` +
+ `(eval = ${eval}) at cwd = ${process.cwd()}`);
+ port.unref();
+ port.postMessage({ type: UP_AND_RUNNING });
+ if (doEval) {
+ const { evalScript } = require('internal/process/execution');
+ evalScript('[worker eval]', filename);
+ } else {
+ process.argv[1] = filename; // script filename
+ require('module').runMain();
+ }
+ return;
+ } else if (message.type === STDIO_PAYLOAD) {
+ const { stream, chunk, encoding } = message;
+ process[stream].push(chunk, encoding);
+ return;
+ } else if (message.type === STDIO_WANTS_MORE_DATA) {
+ const { stream } = message;
+ process[stream][kStdioWantsMoreDataCallback]();
+ return;
+ }
+
+ require('assert').fail(`Unknown worker message type ${message.type}`);
+});
// Overwrite fatalException
-process._fatalException = createWorkerFatalExeception(port);
+process._fatalException = (error) => {
+ debug(`[${threadId}] gets fatal exception`);
+ let caught = false;
+ try {
+ caught = originalFatalException.call(this, error);
+ } catch (e) {
+ error = e;
+ }
+ debug(`[${threadId}] fatal exception caught = ${caught}`);
+
+ if (!caught) {
+ let serialized;
+ try {
+ const { serializeError } = require('internal/error-serdes');
+ serialized = serializeError(error);
+ } catch {}
+ debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
+ if (serialized)
+ port.postMessage({
+ type: ERROR_MESSAGE,
+ error: serialized
+ });
+ else
+ port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });
+
+ const { clearAsyncIdStack } = require('internal/async_hooks');
+ clearAsyncIdStack();
+
+ process.exit();
+ }
+};
markBootstrapComplete();
+
+port.start();
diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js
index 2171d2b586..f05d5e932b 100644
--- a/lib/internal/process/worker_thread_only.js
+++ b/lib/internal/process/worker_thread_only.js
@@ -3,13 +3,10 @@
// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
- getEnvMessagePort,
- threadId
+ getEnvMessagePort
} = internalBinding('worker');
const {
- messageTypes,
- kStdioWantsMoreDataCallback,
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
@@ -18,15 +15,6 @@ const {
const {
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
} = require('internal/errors');
-
-let debuglog;
-function debug(...args) {
- if (!debuglog) {
- debuglog = require('util').debuglog('worker');
- }
- return debuglog(...args);
-}
-
const workerStdio = {};
function initializeWorkerStdio() {
@@ -43,97 +31,6 @@ function initializeWorkerStdio() {
};
}
-function createMessageHandler(port, prepareUserCodeExecution) {
- const publicWorker = require('worker_threads');
-
- return function(message) {
- if (message.type === messageTypes.LOAD_SCRIPT) {
- const {
- filename,
- doEval,
- workerData,
- publicPort,
- manifestSrc,
- manifestURL,
- hasStdin
- } = message;
- if (manifestSrc) {
- require('internal/process/policy').setup(manifestSrc, manifestURL);
- }
- prepareUserCodeExecution();
- publicWorker.parentPort = publicPort;
- publicWorker.workerData = workerData;
-
- if (!hasStdin)
- workerStdio.stdin.push(null);
-
- debug(`[${threadId}] starts worker script ${filename} ` +
- `(eval = ${eval}) at cwd = ${process.cwd()}`);
- port.unref();
- port.postMessage({ type: messageTypes.UP_AND_RUNNING });
- if (doEval) {
- const { evalScript } = require('internal/process/execution');
- evalScript('[worker eval]', filename);
- } else {
- process.argv[1] = filename; // script filename
- require('module').runMain();
- }
- return;
- } else if (message.type === messageTypes.STDIO_PAYLOAD) {
- const { stream, chunk, encoding } = message;
- workerStdio[stream].push(chunk, encoding);
- return;
- } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) {
- const { stream } = message;
- workerStdio[stream][kStdioWantsMoreDataCallback]();
- return;
- }
-
- require('assert').fail(`Unknown worker message type ${message.type}`);
- };
-}
-
-// XXX(joyeecheung): this has to be returned as an anonymous function
-// wrapped in a closure, see the comment of the original
-// process._fatalException in lib/internal/process/execution.js
-function createWorkerFatalExeception(port) {
- const {
- fatalException: originalFatalException
- } = require('internal/process/execution');
-
- return (error) => {
- debug(`[${threadId}] gets fatal exception`);
- let caught = false;
- try {
- caught = originalFatalException.call(this, error);
- } catch (e) {
- error = e;
- }
- debug(`[${threadId}] fatal exception caught = ${caught}`);
-
- if (!caught) {
- let serialized;
- try {
- const { serializeError } = require('internal/error-serdes');
- serialized = serializeError(error);
- } catch {}
- debug(`[${threadId}] fatal exception serialized = ${!!serialized}`);
- if (serialized)
- port.postMessage({
- type: messageTypes.ERROR_MESSAGE,
- error: serialized
- });
- else
- port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR });
-
- const { clearAsyncIdStack } = require('internal/async_hooks');
- clearAsyncIdStack();
-
- process.exit();
- }
- };
-}
-
// The execution of this function itself should not cause any side effects.
function wrapProcessMethods(binding) {
function umask(mask) {
@@ -150,7 +47,5 @@ function wrapProcessMethods(binding) {
module.exports = {
initializeWorkerStdio,
- createMessageHandler,
- createWorkerFatalExeception,
wrapProcessMethods
};