diff options
author | Matteo Collina <hello@matteocollina.com> | 2019-05-30 17:58:55 +0200 |
---|---|---|
committer | Matteo Collina <hello@matteocollina.com> | 2019-12-23 09:29:01 +0100 |
commit | 38a593b0f3bc4fa52ed9216d75a98bbf7ab5bd9e (patch) | |
tree | 2980edbf0dba55d57590640e29fe4164845e600b /lib/events.js | |
parent | 5707ed21a253b2ed1e2f5944f622c20092b866d6 (diff) | |
download | node-new-38a593b0f3bc4fa52ed9216d75a98bbf7ab5bd9e.tar.gz |
events: add EventEmitter.on to async iterate over events
Fixes: https://github.com/nodejs/node/issues/27847
PR-URL: https://github.com/nodejs/node/pull/27994
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Diffstat (limited to 'lib/events.js')
-rw-r--r-- | lib/events.js | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/lib/events.js b/lib/events.js index 6f3739c67f..7889f62d90 100644 --- a/lib/events.js +++ b/lib/events.js @@ -29,12 +29,16 @@ const { ObjectCreate, ObjectDefineProperty, ObjectGetPrototypeOf, + ObjectSetPrototypeOf, ObjectKeys, Promise, + PromiseReject, + PromiseResolve, ReflectApply, ReflectOwnKeys, Symbol, SymbolFor, + SymbolAsyncIterator } = primordials; const kRejection = SymbolFor('nodejs.rejection'); @@ -62,6 +66,7 @@ function EventEmitter(opts) { } module.exports = EventEmitter; module.exports.once = once; +module.exports.on = on; // Backwards-compat with node 0.10.x EventEmitter.EventEmitter = EventEmitter; @@ -657,3 +662,102 @@ function once(emitter, name) { emitter.once(name, eventListener); }); } + +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); + +function createIterResult(value, done) { + return { value, done }; +} + +function on(emitter, event) { + const unconsumedEvents = []; + const unconsumedPromises = []; + let error = null; + let finished = false; + + const iterator = ObjectSetPrototypeOf({ + next() { + // First, we consume all unread events + const value = unconsumedEvents.shift(); + if (value) { + return PromiseResolve(createIterResult(value, false)); + } + + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error) { + const p = PromiseReject(error); + // Only the first element errors + error = null; + return p; + } + + // If the iterator is finished, resolve to done + if (finished) { + return PromiseResolve(createIterResult(undefined, true)); + } + + // Wait until an event happens + return new Promise(function(resolve, reject) { + unconsumedPromises.push({ resolve, reject }); + }); + }, + + return() { + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + finished = true; + + for (const promise of unconsumedPromises) { + promise.resolve(createIterResult(undefined, true)); + } + + return PromiseResolve(createIterResult(undefined, true)); + }, + + throw(err) { + if (!err || !(err instanceof Error)) { + throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator', + 'Error', err); + } + error = err; + emitter.removeListener(event, eventHandler); + emitter.removeListener('error', errorHandler); + }, + + [SymbolAsyncIterator]() { + return this; + } + }, AsyncIteratorPrototype); + + emitter.on(event, eventHandler); + emitter.on('error', errorHandler); + + return iterator; + + function eventHandler(...args) { + const promise = unconsumedPromises.shift(); + if (promise) { + promise.resolve(createIterResult(args, false)); + } else { + unconsumedEvents.push(args); + } + } + + function errorHandler(err) { + finished = true; + + const toError = unconsumedPromises.shift(); + + if (toError) { + toError.reject(err); + } else { + // The next time we call next() + error = err; + } + + iterator.return(); + } +} |