summaryrefslogtreecommitdiff
path: root/lib/_stream_readable.js
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-02-02 01:41:35 +0100
committerAnna Henningsen <anna@addaleax.net>2018-02-06 23:04:04 +0100
commite7cb694a609de41817ba7492fe6848ab6dcb4768 (patch)
treee0a2d1bc80f56981845a5a89c8ee15b45814168e /lib/_stream_readable.js
parent610cac26f0f0d98df2541bdc4251c624cca52702 (diff)
downloadnode-new-e7cb694a609de41817ba7492fe6848ab6dcb4768.tar.gz
stream: always reset awaitDrain when emitting data
The complicated `awaitDrain` machinery can be made a bit slimmer, and more correct, by just resetting the value each time `stream.emit('data')` is called. By resetting the value before emitting the data chunk, and seeing whether any pipe destinations return `.write() === false`, we always end up in a consistent state and don’t need to worry about odd situations (like `dest.write(chunk)` emitting more data). PR-URL: https://github.com/nodejs/node/pull/18516 Fixes: https://github.com/nodejs/node/issues/18484 Fixes: https://github.com/nodejs/node/issues/18512 Refs: https://github.com/nodejs/node/pull/18515 Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Minwoo Jung <minwoo@nodesource.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
Diffstat (limited to 'lib/_stream_readable.js')
-rw-r--r--lib/_stream_readable.js12
1 files changed, 3 insertions, 9 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index dd483adb41..c7b356ed44 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -258,6 +258,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
+ state.awaitDrain = 0;
stream.emit('data', chunk);
} else {
// update the buffer info.
@@ -456,6 +457,7 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
+ state.awaitDrain = 0;
}
if (state.length === 0) {
@@ -637,18 +639,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
ondrain();
}
- // If the user pushes more data while we're writing to dest then we'll end up
- // in ondata again. However, we only want to increase awaitDrain once because
- // dest will only emit one 'drain' event for the multiple writes.
- // => Introduce a guard on increasing awaitDrain.
- var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
- increasedAwaitDrain = false;
var ret = dest.write(chunk);
debug('dest.write', ret);
- if (false === ret && !increasedAwaitDrain) {
+ if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
@@ -658,7 +654,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
!cleanedUp) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
- increasedAwaitDrain = true;
}
src.pause();
}
@@ -834,7 +829,6 @@ function resume_(stream, state) {
}
state.resumeScheduled = false;
- state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)