summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorran <abbshrsoufii@gmail.com>2019-08-26 17:00:06 +0800
committerAnna Henningsen <anna@addaleax.net>2019-08-26 15:20:50 +0200
commit698a29420f92844478101ec1fccdc81b46954e2e (patch)
tree47e4f9c728981a0e9377d622a957632c39ccf61c
parent627bf59e8ddd9826720c45f430c2a2e489df6e66 (diff)
downloadnode-new-698a29420f92844478101ec1fccdc81b46954e2e.tar.gz
stream: fix readable state `awaitDrain` increase in recursion
PR-URL: https://github.com/nodejs/node/pull/27572 Reviewed-By: Anna Henningsen <anna@addaleax.net>
-rw-r--r--lib/_stream_readable.js65
-rw-r--r--test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js28
-rw-r--r--test/parallel/test-stream-pipe-await-drain-manual-resume.js25
-rw-r--r--test/parallel/test-stream-pipe-await-drain-push-while-write.js6
-rw-r--r--test/parallel/test-stream-pipe-await-drain.js12
-rw-r--r--test/parallel/test-stream2-basic.js1
6 files changed, 101 insertions, 36 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index ac37930d12..cfa36731e3 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -134,8 +134,10 @@ function ReadableState(options, stream, isDuplex) {
// Everything else in the universe uses 'utf8', though.
this.defaultEncoding = options.defaultEncoding || 'utf8';
- // The number of writers that are awaiting a drain event in .pipe()s
- this.awaitDrain = 0;
+ // Ref the piped dest which we need a drain event on it
+ // type: null | Writable | Set<Writable>
+ this.awaitDrainWriters = null;
+ this.multiAwaitDrain = false;
// If true, a maybeReadMore has been scheduled
this.readingMore = false;
@@ -310,7 +312,13 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync) {
- state.awaitDrain = 0;
+ // Use the guard to avoid creating `Set()` repeatedly
+ // when we have multiple pipes.
+ if (state.multiAwaitDrain) {
+ state.awaitDrainWriters.clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
stream.emit('data', chunk);
} else {
// Update the buffer info.
@@ -511,7 +519,11 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
- state.awaitDrain = 0;
+ if (state.multiAwaitDrain) {
+ state.awaitDrainWriters.clear();
+ } else {
+ state.awaitDrainWriters = null;
+ }
}
if (state.length === 0) {
@@ -656,6 +668,15 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this._readableState;
+ if (state.pipes.length === 1) {
+ if (!state.multiAwaitDrain) {
+ state.multiAwaitDrain = true;
+ state.awaitDrainWriters = new Set(
+ state.awaitDrainWriters ? [state.awaitDrainWriters] : []
+ );
+ }
+ }
+
state.pipes.push(dest);
debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);
@@ -709,7 +730,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
- if (ondrain && state.awaitDrain &&
+ if (ondrain && state.awaitDrainWriters &&
(!dest._writableState || dest._writableState.needDrain))
ondrain();
}
@@ -724,16 +745,22 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
// to get stuck in a permanently paused state if that write
// also returned false.
// => Check whether `dest` is still a piping destination.
- if (state.pipes.length > 0 && state.pipes.includes(dest) && !cleanedUp) {
- debug('false write response, pause', state.awaitDrain);
- state.awaitDrain++;
+ if (!cleanedUp) {
+ if (state.pipes.length === 1 && state.pipes[0] === dest) {
+ debug('false write response, pause', 0);
+ state.awaitDrainWriters = dest;
+ state.multiAwaitDrain = false;
+ } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
+ debug('false write response, pause', state.awaitDrainWriters.size);
+ state.awaitDrainWriters.add(dest);
+ }
}
if (!ondrain) {
// When the dest drains, it reduces the awaitDrain counter
// on the source. This would be more elegant with a .once()
// handler in flow(), but adding and removing repeatedly is
// too slow.
- ondrain = pipeOnDrain(src);
+ ondrain = pipeOnDrain(src, dest);
dest.on('drain', ondrain);
}
src.pause();
@@ -783,13 +810,23 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
return dest;
};
-function pipeOnDrain(src) {
+function pipeOnDrain(src, dest) {
return function pipeOnDrainFunctionResult() {
const state = src._readableState;
- debug('pipeOnDrain', state.awaitDrain);
- if (state.awaitDrain)
- state.awaitDrain--;
- if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+
+ // `ondrain` will call directly,
+ // `this` maybe not a reference to dest,
+ // so we use the real dest here.
+ if (state.awaitDrainWriters === dest) {
+ debug('pipeOnDrain', 1);
+ state.awaitDrainWriters = null;
+ } else if (state.multiAwaitDrain) {
+ debug('pipeOnDrain', state.awaitDrainWriters.size);
+ state.awaitDrainWriters.delete(dest);
+ }
+
+ if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
+ EE.listenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
diff --git a/test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js b/test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js
new file mode 100644
index 0000000000..110d46bb9f
--- /dev/null
+++ b/test/parallel/test-stream-await-drain-writers-in-synchronously-recursion-write.js
@@ -0,0 +1,28 @@
+'use strict';
+const common = require('../common');
+const { PassThrough } = require('stream');
+
+const encode = new PassThrough({
+ highWaterMark: 1
+});
+
+const decode = new PassThrough({
+ highWaterMark: 1
+});
+
+const send = common.mustCall((buf) => {
+ encode.write(buf);
+}, 4);
+
+let i = 0;
+const onData = common.mustCall(() => {
+ if (++i === 2) {
+ send(Buffer.from([0x3]));
+ send(Buffer.from([0x4]));
+ }
+}, 4);
+
+encode.pipe(decode).on('data', onData);
+
+send(Buffer.from([0x1]));
+send(Buffer.from([0x2]));
diff --git a/test/parallel/test-stream-pipe-await-drain-manual-resume.js b/test/parallel/test-stream-pipe-await-drain-manual-resume.js
index 37acead996..a95a5e05ae 100644
--- a/test/parallel/test-stream-pipe-await-drain-manual-resume.js
+++ b/test/parallel/test-stream-pipe-await-drain-manual-resume.js
@@ -28,10 +28,10 @@ readable.pipe(writable);
readable.once('pause', common.mustCall(() => {
assert.strictEqual(
- readable._readableState.awaitDrain,
- 1,
- 'Expected awaitDrain to equal 1 but instead got ' +
- `${readable._readableState.awaitDrain}`
+ readable._readableState.awaitDrainWriters,
+ writable,
+ 'Expected awaitDrainWriters to be a Writable but instead got ' +
+ `${readable._readableState.awaitDrainWriters}`
);
// First pause, resume manually. The next write() to writable will still
// return false, because chunks are still being buffered, so it will increase
@@ -43,10 +43,10 @@ readable.once('pause', common.mustCall(() => {
readable.once('pause', common.mustCall(() => {
assert.strictEqual(
- readable._readableState.awaitDrain,
- 1,
- '.resume() should not reset the counter but instead got ' +
- `${readable._readableState.awaitDrain}`
+ readable._readableState.awaitDrainWriters,
+ writable,
+ '.resume() should not reset the awaitDrainWriters, but instead got ' +
+ `${readable._readableState.awaitDrainWriters}`
);
// Second pause, handle all chunks from now on. Once all callbacks that
// are currently queued up are handled, the awaitDrain drain counter should
@@ -65,10 +65,11 @@ readable.push(null);
writable.on('finish', common.mustCall(() => {
assert.strictEqual(
- readable._readableState.awaitDrain,
- 0,
- 'awaitDrain should equal 0 after all chunks are written but instead got' +
- `${readable._readableState.awaitDrain}`
+ readable._readableState.awaitDrainWriters,
+ null,
+ `awaitDrainWriters should be reset to null
+ after all chunks are written but instead got
+ ${readable._readableState.awaitDrainWriters}`
);
// Everything okay, all chunks were written.
}));
diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js
index d14ad46cb0..6dbf3c669b 100644
--- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js
+++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js
@@ -6,8 +6,8 @@ const assert = require('assert');
const writable = new stream.Writable({
write: common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
- readable._readableState.awaitDrain,
- 0
+ readable._readableState.awaitDrainWriters,
+ null,
);
if (chunk.length === 32 * 1024) { // first chunk
@@ -15,7 +15,7 @@ const writable = new stream.Writable({
// We should check if awaitDrain counter is increased in the next
// tick, because awaitDrain is incremented after this method finished
process.nextTick(() => {
- assert.strictEqual(readable._readableState.awaitDrain, 1);
+ assert.strictEqual(readable._readableState.awaitDrainWriters, writable);
});
}
diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js
index 9286ceb791..3ae248e08b 100644
--- a/test/parallel/test-stream-pipe-await-drain.js
+++ b/test/parallel/test-stream-pipe-await-drain.js
@@ -24,10 +24,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {
writer1.once('chunk-received', () => {
assert.strictEqual(
- reader._readableState.awaitDrain,
+ reader._readableState.awaitDrainWriters.size,
0,
'awaitDrain initial value should be 0, actual is ' +
- reader._readableState.awaitDrain
+ reader._readableState.awaitDrainWriters
);
setImmediate(() => {
// This one should *not* get through to writer1 because writer2 is not
@@ -39,10 +39,10 @@ writer1.once('chunk-received', () => {
// A "slow" consumer:
writer2._write = common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(
- reader._readableState.awaitDrain,
+ reader._readableState.awaitDrainWriters.size,
1,
'awaitDrain should be 1 after first push, actual is ' +
- reader._readableState.awaitDrain
+ reader._readableState.awaitDrainWriters
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
@@ -51,10 +51,10 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
writer3._write = common.mustCall((chunk, encoding, cb) => {
assert.strictEqual(
- reader._readableState.awaitDrain,
+ reader._readableState.awaitDrainWriters.size,
2,
'awaitDrain should be 2 after second push, actual is ' +
- reader._readableState.awaitDrain
+ reader._readableState.awaitDrainWriters
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
diff --git a/test/parallel/test-stream2-basic.js b/test/parallel/test-stream2-basic.js
index 5e0f9c6e91..7121f7bda7 100644
--- a/test/parallel/test-stream2-basic.js
+++ b/test/parallel/test-stream2-basic.js
@@ -355,7 +355,6 @@ class TestWriter extends EE {
assert.strictEqual(v, null);
const w = new R();
-
w.write = function(buffer) {
written = true;
assert.strictEqual(ended, false);