summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2020-06-26 00:40:50 +0200
committerRuy Adorno <ruyadorno@hotmail.com>2020-07-29 11:20:46 -0400
commit0aa3809b6be1c296977ae2fe66b3519af092fc86 (patch)
tree17f4883aec580f5e08f80aa72c15969169ae2cda
parentc93a898028efb64855f9f5948defe1201c740704 (diff)
downloadnode-new-0aa3809b6be1c296977ae2fe66b3519af092fc86.tar.gz
worker: make MessagePort inherit from EventTarget
Use `NodeEventTarget` to provide a mixed `EventEmitter`/`EventTarget` API interface. PR-URL: https://github.com/nodejs/node/pull/34057 Refs: https://twitter.com/addaleax/status/1276289101671608320 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: David Carlier <devnexen@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--benchmark/worker/messageport.js17
-rw-r--r--lib/internal/per_context/messageport.js27
-rw-r--r--lib/internal/worker/io.js111
-rw-r--r--src/node_messaging.cc17
-rw-r--r--test/parallel/test-bootstrap-modules.js2
-rw-r--r--test/parallel/test-crypto-key-objects-messageport.js6
-rw-r--r--test/parallel/test-worker-message-port-inspect-during-init-hook.js5
-rw-r--r--test/parallel/test-worker-message-port-transfer-filehandle.js6
-rw-r--r--test/parallel/test-worker-message-port.js4
9 files changed, 126 insertions, 69 deletions
diff --git a/benchmark/worker/messageport.js b/benchmark/worker/messageport.js
index 8e2ddae73f..2f0d6f0621 100644
--- a/benchmark/worker/messageport.js
+++ b/benchmark/worker/messageport.js
@@ -4,6 +4,7 @@ const common = require('../common.js');
const { MessageChannel } = require('worker_threads');
const bench = common.createBenchmark(main, {
payload: ['string', 'object'],
+ style: ['eventtarget', 'eventemitter'],
n: [1e6]
});
@@ -25,14 +26,26 @@ function main(conf) {
const { port1, port2 } = new MessageChannel();
let messages = 0;
- port2.onmessage = () => {
+ function listener() {
if (messages++ === n) {
bench.end(n);
port1.close();
} else {
write();
}
- };
+ }
+
+ switch (conf.style) {
+ case 'eventtarget':
+ port2.onmessage = listener;
+ break;
+ case 'eventemitter':
+ port2.on('message', listener);
+ break;
+ default:
+ throw new Error('Unsupported listener type');
+ }
+
bench.start();
write();
diff --git a/lib/internal/per_context/messageport.js b/lib/internal/per_context/messageport.js
index ee08a59612..43587319b0 100644
--- a/lib/internal/per_context/messageport.js
+++ b/lib/internal/per_context/messageport.js
@@ -1,12 +1,31 @@
'use strict';
+const {
+ SymbolFor,
+} = primordials;
+
class MessageEvent {
- constructor(data, target) {
+ constructor(data, target, type) {
this.data = data;
this.target = target;
+ this.type = type;
}
}
-exports.emitMessage = function(data) {
- if (typeof this.onmessage === 'function')
- this.onmessage(new MessageEvent(data, this));
+const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
+
+exports.emitMessage = function(data, type) {
+ if (typeof this[kHybridDispatch] === 'function') {
+ this[kHybridDispatch](data, type, undefined);
+ return;
+ }
+
+ const event = new MessageEvent(data, this, type);
+ if (type === 'message') {
+ if (typeof this.onmessage === 'function')
+ this.onmessage(event);
+ } else {
+ // eslint-disable-next-line no-lonely-if
+ if (typeof this.onmessageerror === 'function')
+ this.onmessageerror(event);
+ }
};
diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js
index 60dd8cd67d..5b5118a1d7 100644
--- a/lib/internal/worker/io.js
+++ b/lib/internal/worker/io.js
@@ -24,20 +24,23 @@ const {
stopMessagePort
} = internalBinding('messaging');
const {
- threadId,
getEnvMessagePort
} = internalBinding('worker');
const { Readable, Writable } = require('stream');
-const EventEmitter = require('events');
+const {
+ Event,
+ NodeEventTarget,
+ defineEventHandler,
+ initNodeEventTarget,
+ kCreateEvent,
+ kNewListener,
+ kRemoveListener,
+} = require('internal/event_target');
const { inspect } = require('internal/util/inspect');
-let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
- debug = fn;
-});
const kIncrementsPortRef = Symbol('kIncrementsPortRef');
const kName = Symbol('kName');
-const kOnMessageListener = Symbol('kOnMessageListener');
const kPort = Symbol('kPort');
const kWaitingStreams = Symbol('kWaitingStreams');
const kWritableCallbacks = Symbol('kWritableCallbacks');
@@ -54,7 +57,7 @@ const messageTypes = {
};
// We have to mess with the MessagePort prototype a bit, so that a) we can make
-// it inherit from EventEmitter, even though it is a C++ class, and b) we do
+// it inherit from NodeEventTarget, even though it is a C++ class, and b) we do
// not provide methods that are not present in the Browser and not documented
// on our side (e.g. hasRef).
// Save a copy of the original set of methods as a shallow clone.
@@ -62,47 +65,39 @@ const MessagePortPrototype = ObjectCreate(
ObjectGetPrototypeOf(MessagePort.prototype),
ObjectGetOwnPropertyDescriptors(MessagePort.prototype));
// Set up the new inheritance chain.
-ObjectSetPrototypeOf(MessagePort, EventEmitter);
-ObjectSetPrototypeOf(MessagePort.prototype, EventEmitter.prototype);
+ObjectSetPrototypeOf(MessagePort, NodeEventTarget);
+ObjectSetPrototypeOf(MessagePort.prototype, NodeEventTarget.prototype);
// Copy methods that are inherited from HandleWrap, because
// changing the prototype of MessagePort.prototype implicitly removed them.
MessagePort.prototype.ref = MessagePortPrototype.ref;
MessagePort.prototype.unref = MessagePortPrototype.unref;
-// A communication channel consisting of a handle (that wraps around an
-// uv_async_t) which can receive information from other threads and emits
-// .onmessage events, and a function used for sending data to a MessagePort
-// in some other thread.
-MessagePort.prototype[kOnMessageListener] = function onmessage(event) {
- if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA)
- debug(`[${threadId}] received message`, event);
- // Emit the deserialized object to userland.
- this.emit('message', event.data);
-};
-
-// This is for compatibility with the Web's MessagePort API. It makes sense to
-// provide it as an `EventEmitter` in Node.js, but if somebody overrides
-// `onmessage`, we'll switch over to the Web API model.
-ObjectDefineProperty(MessagePort.prototype, 'onmessage', {
- enumerable: true,
- configurable: true,
- get() {
- return this[kOnMessageListener];
- },
- set(value) {
- this[kOnMessageListener] = value;
- if (typeof value === 'function') {
- this.ref();
- MessagePortPrototype.start.call(this);
- } else {
- this.unref();
- stopMessagePort(this);
- }
+class MessageEvent extends Event {
+ constructor(data, target, type) {
+ super(type);
+ this.data = data;
}
-});
+}
+
+ObjectDefineProperty(
+ MessagePort.prototype,
+ kCreateEvent,
+ {
+ value: function(data, type) {
+ return new MessageEvent(data, this, type);
+ },
+ configurable: false,
+ writable: false,
+ enumerable: false,
+ });
// This is called from inside the `MessagePort` constructor.
function oninit() {
+ initNodeEventTarget(this);
+ // TODO(addaleax): This should be on MessagePort.prototype, but
+ // defineEventHandler() does not support that.
+ defineEventHandler(this, 'message');
+ defineEventHandler(this, 'messageerror');
setupPortReferencing(this, this, 'message');
}
@@ -112,9 +107,15 @@ ObjectDefineProperty(MessagePort.prototype, onInitSymbol, {
value: oninit
});
+class MessagePortCloseEvent extends Event {
+ constructor() {
+ super('close');
+ }
+}
+
// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
- this.emit('close');
+ this.dispatchEvent(new MessagePortCloseEvent());
}
ObjectDefineProperty(MessagePort.prototype, handleOnCloseSymbol, {
@@ -156,18 +157,36 @@ function setupPortReferencing(port, eventEmitter, eventName) {
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
- eventEmitter.on('newListener', (name) => {
- if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
+ eventEmitter.on('newListener', function(name) {
+ if (name === eventName) newListener(eventEmitter.listenerCount(name));
+ });
+ eventEmitter.on('removeListener', function(name) {
+ if (name === eventName) removeListener(eventEmitter.listenerCount(name));
+ });
+ const origNewListener = eventEmitter[kNewListener];
+ eventEmitter[kNewListener] = function(size, type, ...args) {
+ if (type === eventName) newListener(size - 1);
+ return origNewListener.call(this, size, type, ...args);
+ };
+ const origRemoveListener = eventEmitter[kRemoveListener];
+ eventEmitter[kRemoveListener] = function(size, type, ...args) {
+ if (type === eventName) removeListener(size);
+ return origRemoveListener.call(this, size, type, ...args);
+ };
+
+ function newListener(size) {
+ if (size === 0) {
port.ref();
MessagePortPrototype.start.call(port);
}
- });
- eventEmitter.on('removeListener', (name) => {
- if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
+ }
+
+ function removeListener(size) {
+ if (size === 0) {
stopMessagePort(port);
port.unref();
}
- });
+ }
}
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 91927ebbd9..55f7b0b2cb 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -747,6 +747,8 @@ void MessagePort::OnMessage() {
Local<Value> payload;
Local<Value> message_error;
+ Local<Value> argv[2];
+
{
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
@@ -765,16 +767,15 @@ void MessagePort::OnMessage() {
continue;
}
- if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
+ argv[0] = payload;
+ argv[1] = env()->message_string();
+
+ if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
reschedule:
if (!message_error.IsEmpty()) {
- // This should become a `messageerror` event in the sense of the
- // EventTarget API at some point.
- Local<Value> argv[] = {
- env()->messageerror_string(),
- message_error
- };
- USE(MakeCallback(env()->emit_string(), arraysize(argv), argv));
+ argv[0] = message_error;
+ argv[1] = env()->messageerror_string();
+ USE(MakeCallback(emit_message, arraysize(argv), argv));
}
// Re-schedule OnMessage() execution in case of failure.
diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js
index 33c421c2f1..f7f1d2583d 100644
--- a/test/parallel/test-bootstrap-modules.js
+++ b/test/parallel/test-bootstrap-modules.js
@@ -99,6 +99,7 @@ if (!common.isMainThread) {
'NativeModule _stream_transform',
'NativeModule _stream_writable',
'NativeModule internal/error_serdes',
+ 'NativeModule internal/event_target',
'NativeModule internal/process/worker_thread_only',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/destroy',
@@ -109,6 +110,7 @@ if (!common.isMainThread) {
'NativeModule internal/worker',
'NativeModule internal/worker/io',
'NativeModule stream',
+ 'NativeModule util',
'NativeModule worker_threads',
].forEach(expectedModules.add.bind(expectedModules));
}
diff --git a/test/parallel/test-crypto-key-objects-messageport.js b/test/parallel/test-crypto-key-objects-messageport.js
index 1b910b571f..f38e20da42 100644
--- a/test/parallel/test-crypto-key-objects-messageport.js
+++ b/test/parallel/test-crypto-key-objects-messageport.js
@@ -75,9 +75,9 @@ for (const [key, repr] of keys) {
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
// implements EventTarget fully and in a cross-context manner.
- port2moved.emit = common.mustCall((name, err) => {
- assert.strictEqual(name, 'messageerror');
- assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
+ port2moved.onmessageerror = common.mustCall((event) => {
+ assert.strictEqual(event.data.code,
+ 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
});
port2moved.start();
diff --git a/test/parallel/test-worker-message-port-inspect-during-init-hook.js b/test/parallel/test-worker-message-port-inspect-during-init-hook.js
index 30b90710a6..8f9678de1e 100644
--- a/test/parallel/test-worker-message-port-inspect-during-init-hook.js
+++ b/test/parallel/test-worker-message-port-inspect-during-init-hook.js
@@ -10,8 +10,9 @@ const { MessageChannel } = require('worker_threads');
async_hooks.createHook({
init: common.mustCall((id, type, triggerId, resource) => {
- assert.strictEqual(util.inspect(resource),
- 'MessagePort { active: true, refed: false }');
+ assert.strictEqual(
+ util.inspect(resource),
+ 'MessagePort [EventTarget] { active: true, refed: false }');
}, 2)
}).enable();
diff --git a/test/parallel/test-worker-message-port-transfer-filehandle.js b/test/parallel/test-worker-message-port-transfer-filehandle.js
index 157acb22e8..c42d2e8b40 100644
--- a/test/parallel/test-worker-message-port-transfer-filehandle.js
+++ b/test/parallel/test-worker-message-port-transfer-filehandle.js
@@ -57,9 +57,9 @@ const { once } = require('events');
});
// TODO(addaleax): Switch this to a 'messageerror' event once MessagePort
// implements EventTarget fully and in a cross-context manner.
- port2moved.emit = common.mustCall((name, err) => {
- assert.strictEqual(name, 'messageerror');
- assert.strictEqual(err.code, 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
+ port2moved.onmessageerror = common.mustCall((event) => {
+ assert.strictEqual(event.data.code,
+ 'ERR_MESSAGE_TARGET_CONTEXT_UNAVAILABLE');
});
port2moved.start();
diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js
index d128dc7edb..4f4863c45e 100644
--- a/test/parallel/test-worker-message-port.js
+++ b/test/parallel/test-worker-message-port.js
@@ -154,7 +154,9 @@ const { MessageChannel, MessagePort } = require('worker_threads');
assert.deepStrictEqual(
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
[
- 'close', 'constructor', 'onmessage', 'postMessage', 'ref', 'start',
+ // TODO(addaleax): This should include onmessage (and eventually
+ // onmessageerror).
+ 'close', 'constructor', 'postMessage', 'ref', 'start',
'unref'
]);
}