summaryrefslogtreecommitdiff
path: root/lib/_stream_readable.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/_stream_readable.js')
-rw-r--r--lib/_stream_readable.js12
1 files changed, 3 insertions, 9 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index dd483adb41..c7b356ed44 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -258,6 +258,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
+ state.awaitDrain = 0;
stream.emit('data', chunk);
} else {
// update the buffer info.
@@ -456,6 +457,7 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
+ state.awaitDrain = 0;
}
if (state.length === 0) {
@@ -637,18 +639,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
ondrain();
}
- // If the user pushes more data while we're writing to dest then we'll end up
- // in ondata again. However, we only want to increase awaitDrain once because
- // dest will only emit one 'drain' event for the multiple writes.
- // => Introduce a guard on increasing awaitDrain.
- var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
- increasedAwaitDrain = false;
var ret = dest.write(chunk);
debug('dest.write', ret);
- if (false === ret && !increasedAwaitDrain) {
+ if (ret === false) {
// If the user unpiped during `dest.write()`, it is possible
// to get stuck in a permanently paused state if that write
// also returned false.
@@ -658,7 +654,6 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
!cleanedUp) {
debug('false write response, pause', state.awaitDrain);
state.awaitDrain++;
- increasedAwaitDrain = true;
}
src.pause();
}
@@ -834,7 +829,6 @@ function resume_(stream, state) {
}
state.resumeScheduled = false;
- state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
if (state.flowing && !state.reading)