summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnatoli Papirovski <apapirovski@mac.com>2017-10-28 20:25:35 -0400
committerGibson Fahnestock <gibfahn@gmail.com>2017-10-31 00:15:11 +0000
commit95a61cbb1e8ef38f62a8b6c92670c283b31225a1 (patch)
treebdddabafe33a3b03c9b0adc4686666c64b05128c
parent528edb2ea88c870f70d09ab01fe4b7ae70f4f75f (diff)
downloadnode-new-95a61cbb1e8ef38f62a8b6c92670c283b31225a1.tar.gz
http2: fix stream reading resumption
_read should always resume the underlying code that is attempting to push data to a readable stream. Adjust http2 core code to resume its reading appropriately. Some other general cleanup around reading, resuming & draining. PR-URL: https://github.com/nodejs/node/pull/16580 Fixes: https://github.com/nodejs/node/issues/16578 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--lib/internal/http2/core.js26
-rw-r--r--src/node_http2.cc2
-rw-r--r--src/node_http2_core-inl.h10
-rw-r--r--src/node_http2_core.h3
-rw-r--r--test/parallel/parallel.status2
-rw-r--r--test/parallel/test-http2-compat-serverrequest-pipe.js6
-rw-r--r--test/parallel/test-http2-pipe.js49
7 files changed, 79 insertions, 19 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 869a5c8ce9..7f5e615a7a 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -282,8 +282,13 @@ function onSessionRead(nread, buf, handle) {
'report this as a bug in Node.js');
_unrefActive(owner); // Reset the session timeout timer
_unrefActive(stream); // Reset the stream timeout timer
- if (nread >= 0 && !stream.destroyed)
- return stream.push(buf);
+ if (nread >= 0 && !stream.destroyed) {
+ // prevent overflowing the buffer while pause figures out the
+ // stream needs to actually pause and streamOnPause runs
+ if (!stream.push(buf))
+ owner[kHandle].streamReadStop(id);
+ return;
+ }
// Last chunk was received. End the readable side.
stream.push(null);
@@ -1276,8 +1281,6 @@ function onStreamClosed(code) {
}
function streamOnResume() {
- if (this._paused)
- return this.pause();
if (this[kID] === undefined) {
this.once('ready', streamOnResume);
return;
@@ -1299,12 +1302,10 @@ function streamOnPause() {
}
}
-function streamOnDrain() {
- const needPause = 0 > this._writableState.highWaterMark;
- if (this._paused && !needPause) {
- this._paused = false;
- this.resume();
- }
+function handleFlushData(handle, streamID) {
+ assert(handle.flushData(streamID) === undefined,
+ `HTTP/2 Stream ${streamID} does not exist. Please report this as ` +
+ 'a bug in Node.js');
}
function streamOnSessionConnect() {
@@ -1357,7 +1358,6 @@ class Http2Stream extends Duplex {
this.once('finish', onHandleFinish);
this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
- this.on('drain', streamOnDrain);
session.once('close', state.closeHandler);
if (session[kState].connecting) {
@@ -1507,9 +1507,7 @@ class Http2Stream extends Duplex {
return;
}
_unrefActive(this);
- assert(this[kSession][kHandle].flushData(this[kID]) === undefined,
- 'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' +
- 'a bug in Node.js');
+ process.nextTick(handleFlushData, this[kSession][kHandle], this[kID]);
}
// Submits an RST-STREAM frame to shutdown this stream.
diff --git a/src/node_http2.cc b/src/node_http2.cc
index ec65e0052f..d12b9ca000 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
if (!(stream = session->FindStream(id))) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}
- stream->FlushDataChunks();
+ stream->ReadResume();
}
void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
diff --git a/src/node_http2_core-inl.h b/src/node_http2_core-inl.h
index 5f6016a510..67bacfbbf0 100644
--- a/src/node_http2_core-inl.h
+++ b/src/node_http2_core-inl.h
@@ -510,7 +510,7 @@ inline void Nghttp2Session::SendPendingData() {
// the proceed with the rest.
while (srcRemaining > destRemaining) {
DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n",
- TypeName(), destRemaining);
+ TypeName(), destLength + destRemaining);
memcpy(dest.base + destOffset, src + srcOffset, destRemaining);
destLength += destRemaining;
Send(&dest, destLength);
@@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() {
FlushDataChunks();
}
+inline void Nghttp2Stream::ReadResume() {
+ DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
+ flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
+
+ // Flush any queued data chunks immediately out to the JS layer
+ FlushDataChunks();
+}
+
inline void Nghttp2Stream::ReadStop() {
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
if (!IsReading())
diff --git a/src/node_http2_core.h b/src/node_http2_core.h
index a7808ea049..91c58e78bb 100644
--- a/src/node_http2_core.h
+++ b/src/node_http2_core.h
@@ -384,6 +384,9 @@ class Nghttp2Stream {
// the session to be emitted at the JS side
inline void ReadStart();
+ // Resume Reading
+ inline void ReadResume();
+
// Stop/Pause Reading.
inline void ReadStop();
diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status
index 2297aad9c8..2958fad4d0 100644
--- a/test/parallel/parallel.status
+++ b/test/parallel/parallel.status
@@ -18,5 +18,7 @@ test-npm-install: PASS,FLAKY
[$system==solaris] # Also applies to SmartOS
[$system==freebsd]
+test-http2-compat-serverrequest-pipe: PASS,FLAKY
+test-http2-pipe: PASS,FLAKY
[$system==aix]
diff --git a/test/parallel/test-http2-compat-serverrequest-pipe.js b/test/parallel/test-http2-compat-serverrequest-pipe.js
index a19b319187..04c8cfe546 100644
--- a/test/parallel/test-http2-compat-serverrequest-pipe.js
+++ b/test/parallel/test-http2-compat-serverrequest-pipe.js
@@ -11,9 +11,9 @@ const path = require('path');
// piping should work as expected with createWriteStream
-const loc = fixtures.path('person.jpg');
-const fn = path.join(common.tmpDir, 'http2pipe.jpg');
common.refreshTmpDir();
+const loc = fixtures.path('url-tests.js');
+const fn = path.join(common.tmpDir, 'http2-url-tests.js');
const server = http2.createServer();
@@ -21,7 +21,7 @@ server.on('request', common.mustCall((req, res) => {
const dest = req.pipe(fs.createWriteStream(fn));
dest.on('finish', common.mustCall(() => {
assert.strictEqual(req.complete, true);
- assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
+ assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
fs.unlinkSync(fn);
res.end();
}));
diff --git a/test/parallel/test-http2-pipe.js b/test/parallel/test-http2-pipe.js
new file mode 100644
index 0000000000..819fab5154
--- /dev/null
+++ b/test/parallel/test-http2-pipe.js
@@ -0,0 +1,49 @@
+'use strict';
+
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+const fixtures = require('../common/fixtures');
+const assert = require('assert');
+const http2 = require('http2');
+const fs = require('fs');
+const path = require('path');
+
+// piping should work as expected with createWriteStream
+
+common.refreshTmpDir();
+const loc = fixtures.path('url-tests.js');
+const fn = path.join(common.tmpDir, 'http2-url-tests.js');
+
+const server = http2.createServer();
+
+server.on('stream', common.mustCall((stream) => {
+ const dest = stream.pipe(fs.createWriteStream(fn));
+ dest.on('finish', common.mustCall(() => {
+ assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length);
+ fs.unlinkSync(fn);
+ stream.respond();
+ stream.end();
+ }));
+}));
+
+server.listen(0, common.mustCall(() => {
+ const port = server.address().port;
+ const client = http2.connect(`http://localhost:${port}`);
+
+ let remaining = 2;
+ function maybeClose() {
+ if (--remaining === 0) {
+ server.close();
+ client.destroy();
+ }
+ }
+
+ const req = client.request({ ':method': 'POST' });
+ req.on('response', common.mustCall());
+ req.resume();
+ req.on('end', common.mustCall(maybeClose));
+ const str = fs.createReadStream(loc);
+ str.on('end', common.mustCall(maybeClose));
+ str.pipe(req);
+}));