summaryrefslogtreecommitdiff
path: root/lib/events.js
diff options
context:
space:
mode:
authorMatteo Collina <hello@matteocollina.com>2019-05-30 17:58:55 +0200
committerMatteo Collina <hello@matteocollina.com>2019-12-23 09:29:01 +0100
commit38a593b0f3bc4fa52ed9216d75a98bbf7ab5bd9e (patch)
tree2980edbf0dba55d57590640e29fe4164845e600b /lib/events.js
parent5707ed21a253b2ed1e2f5944f622c20092b866d6 (diff)
downloadnode-new-38a593b0f3bc4fa52ed9216d75a98bbf7ab5bd9e.tar.gz
events: add EventEmitter.on to async iterate over events
Fixes: https://github.com/nodejs/node/issues/27847 PR-URL: https://github.com/nodejs/node/pull/27994 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com>
Diffstat (limited to 'lib/events.js')
-rw-r--r--lib/events.js104
1 files changed, 104 insertions, 0 deletions
diff --git a/lib/events.js b/lib/events.js
index 6f3739c67f..7889f62d90 100644
--- a/lib/events.js
+++ b/lib/events.js
@@ -29,12 +29,16 @@ const {
ObjectCreate,
ObjectDefineProperty,
ObjectGetPrototypeOf,
+ ObjectSetPrototypeOf,
ObjectKeys,
Promise,
+ PromiseReject,
+ PromiseResolve,
ReflectApply,
ReflectOwnKeys,
Symbol,
SymbolFor,
+ SymbolAsyncIterator
} = primordials;
const kRejection = SymbolFor('nodejs.rejection');
@@ -62,6 +66,7 @@ function EventEmitter(opts) {
}
module.exports = EventEmitter;
module.exports.once = once;
+module.exports.on = on;
// Backwards-compat with node 0.10.x
EventEmitter.EventEmitter = EventEmitter;
@@ -657,3 +662,102 @@ function once(emitter, name) {
emitter.once(name, eventListener);
});
}
+
+const AsyncIteratorPrototype = ObjectGetPrototypeOf(
+ ObjectGetPrototypeOf(async function* () {}).prototype);
+
+function createIterResult(value, done) {
+ return { value, done };
+}
+
+function on(emitter, event) {
+ const unconsumedEvents = [];
+ const unconsumedPromises = [];
+ let error = null;
+ let finished = false;
+
+ const iterator = ObjectSetPrototypeOf({
+ next() {
+ // First, we consume all unread events
+ const value = unconsumedEvents.shift();
+ if (value) {
+ return PromiseResolve(createIterResult(value, false));
+ }
+
+ // Then we error, if an error happened
+ // This happens one time if at all, because after 'error'
+ // we stop listening
+ if (error) {
+ const p = PromiseReject(error);
+ // Only the first element errors
+ error = null;
+ return p;
+ }
+
+ // If the iterator is finished, resolve to done
+ if (finished) {
+ return PromiseResolve(createIterResult(undefined, true));
+ }
+
+ // Wait until an event happens
+ return new Promise(function(resolve, reject) {
+ unconsumedPromises.push({ resolve, reject });
+ });
+ },
+
+ return() {
+ emitter.removeListener(event, eventHandler);
+ emitter.removeListener('error', errorHandler);
+ finished = true;
+
+ for (const promise of unconsumedPromises) {
+ promise.resolve(createIterResult(undefined, true));
+ }
+
+ return PromiseResolve(createIterResult(undefined, true));
+ },
+
+ throw(err) {
+ if (!err || !(err instanceof Error)) {
+ throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
+ 'Error', err);
+ }
+ error = err;
+ emitter.removeListener(event, eventHandler);
+ emitter.removeListener('error', errorHandler);
+ },
+
+ [SymbolAsyncIterator]() {
+ return this;
+ }
+ }, AsyncIteratorPrototype);
+
+ emitter.on(event, eventHandler);
+ emitter.on('error', errorHandler);
+
+ return iterator;
+
+ function eventHandler(...args) {
+ const promise = unconsumedPromises.shift();
+ if (promise) {
+ promise.resolve(createIterResult(args, false));
+ } else {
+ unconsumedEvents.push(args);
+ }
+ }
+
+ function errorHandler(err) {
+ finished = true;
+
+ const toError = unconsumedPromises.shift();
+
+ if (toError) {
+ toError.reject(err);
+ } else {
+ // The next time we call next()
+ error = err;
+ }
+
+ iterator.return();
+ }
+}