summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnatoli Papirovski <apapirovski@mac.com>2017-10-25 19:04:41 -0400
committerGibson Fahnestock <gibfahn@gmail.com>2017-10-31 00:15:09 +0000
commite592c320ce3daad4745e076d3efb028e7d763638 (patch)
tree831ef08fad22e3db36527abd62f40db6b04b04b7
parent1b08ae853ed2a1ed2b6dfa4a38bd8611891b5975 (diff)
downloadnode-new-e592c320ce3daad4745e076d3efb028e7d763638.tar.gz
http2: fix several timeout related issues
* correctly reset write timers: currently reset timers on both session & stream when write starts and when it ends. * prevent large writes from timing out: when writing a large chunk of data in http2, once the data is handed off to C++, the JS session & stream lose all track of the write and will timeout if the write doesn't complete within the timeout window Fix this issue by tracking whether a write request is ongoing and also tracking how many chunks have been sent since the most recent write started. (Since each write call resets the timer.) PR-URL: https://github.com/nodejs/node/pull/16525 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
-rw-r--r--lib/internal/http2/core.js78
-rw-r--r--src/env.h1
-rw-r--r--src/node_http2.cc24
-rw-r--r--src/node_http2.h4
-rw-r--r--test/sequential/test-http2-timeout-large-write-file.js89
-rw-r--r--test/sequential/test-http2-timeout-large-write.js84
6 files changed, 274 insertions, 6 deletions
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 71489ba4ec..869a5c8ce9 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -746,7 +746,8 @@ class Http2Session extends EventEmitter {
shutdown: false,
shuttingDown: false,
pendingAck: 0,
- maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10)
+ maxPendingAck: Math.max(1, (options.maxPendingAck | 0) || 10),
+ writeQueueSize: 0
};
this[kType] = type;
@@ -1080,6 +1081,22 @@ class Http2Session extends EventEmitter {
}
_onTimeout() {
+ // This checks whether a write is currently in progress and also whether
+ // that write is actually sending data across the write. The kHandle
+ // stored `chunksSentSinceLastWrite` is only updated when a timeout event
+ // happens, meaning that if a write is ongoing it should never equal the
+ // newly fetched, updated value.
+ if (this[kState].writeQueueSize > 0) {
+ const handle = this[kHandle];
+ const chunksSentSinceLastWrite = handle !== undefined ?
+ handle.chunksSentSinceLastWrite : null;
+ if (chunksSentSinceLastWrite !== null &&
+ chunksSentSinceLastWrite !== handle.updateChunksSent()) {
+ _unrefActive(this);
+ return;
+ }
+ }
+
process.nextTick(emit, this, 'timeout');
}
}
@@ -1199,8 +1216,27 @@ function createWriteReq(req, handle, data, encoding) {
}
}
+function trackWriteState(stream, bytes) {
+ const session = stream[kSession];
+ stream[kState].writeQueueSize += bytes;
+ session[kState].writeQueueSize += bytes;
+ session[kHandle].chunksSentSinceLastWrite = 0;
+}
+
function afterDoStreamWrite(status, handle, req) {
- _unrefActive(handle[kOwner]);
+ const session = handle[kOwner];
+ _unrefActive(session);
+
+ const state = session[kState];
+ const { bytes } = req;
+ state.writeQueueSize -= bytes;
+
+ const stream = state.streams.get(req.stream);
+ if (stream !== undefined) {
+ _unrefActive(stream);
+ stream[kState].writeQueueSize -= bytes;
+ }
+
if (typeof req.callback === 'function')
req.callback();
this.handle = undefined;
@@ -1312,7 +1348,8 @@ class Http2Stream extends Duplex {
headersSent: false,
headRequest: false,
aborted: false,
- closeHandler: onSessionClose.bind(this)
+ closeHandler: onSessionClose.bind(this),
+ writeQueueSize: 0
};
this.once('ready', streamOnceReady);
@@ -1359,6 +1396,23 @@ class Http2Stream extends Duplex {
}
_onTimeout() {
+ // This checks whether a write is currently in progress and also whether
+ // that write is actually sending data across the write. The kHandle
+ // stored `chunksSentSinceLastWrite` is only updated when a timeout event
+ // happens, meaning that if a write is ongoing it should never equal the
+ // newly fetched, updated value.
+ if (this[kState].writeQueueSize > 0) {
+ const handle = this[kSession][kHandle];
+ const chunksSentSinceLastWrite = handle !== undefined ?
+ handle.chunksSentSinceLastWrite : null;
+ if (chunksSentSinceLastWrite !== null &&
+ chunksSentSinceLastWrite !== handle.updateChunksSent()) {
+ _unrefActive(this);
+ _unrefActive(this[kSession]);
+ return;
+ }
+ }
+
process.nextTick(emit, this, 'timeout');
}
@@ -1396,10 +1450,11 @@ class Http2Stream extends Duplex {
this.once('ready', this._write.bind(this, data, encoding, cb));
return;
}
- _unrefActive(this);
if (!this[kState].headersSent)
this[kProceed]();
const session = this[kSession];
+ _unrefActive(this);
+ _unrefActive(session);
const handle = session[kHandle];
const req = new WriteWrap();
req.stream = this[kID];
@@ -1410,7 +1465,7 @@ class Http2Stream extends Duplex {
const err = createWriteReq(req, handle, data, encoding);
if (err)
throw util._errnoException(err, 'write', req.error);
- this._bytesDispatched += req.bytes;
+ trackWriteState(this, req.bytes);
}
_writev(data, cb) {
@@ -1418,10 +1473,11 @@ class Http2Stream extends Duplex {
this.once('ready', this._writev.bind(this, data, cb));
return;
}
- _unrefActive(this);
if (!this[kState].headersSent)
this[kProceed]();
const session = this[kSession];
+ _unrefActive(this);
+ _unrefActive(session);
const handle = session[kHandle];
const req = new WriteWrap();
req.stream = this[kID];
@@ -1438,6 +1494,7 @@ class Http2Stream extends Duplex {
const err = handle.writev(req, chunks);
if (err)
throw util._errnoException(err, 'write', req.error);
+ trackWriteState(this, req.bytes);
}
_read(nread) {
@@ -1531,6 +1588,10 @@ class Http2Stream extends Duplex {
return;
}
+ const state = this[kState];
+ session[kState].writeQueueSize -= state.writeQueueSize;
+ state.writeQueueSize = 0;
+
const server = session[kServer];
if (server !== undefined && err) {
server.emit('streamError', err, this);
@@ -1625,7 +1686,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1,
if (ret < 0) {
err = new NghttpError(ret);
process.nextTick(emit, this, 'error', err);
+ break;
}
+ // exact length of the file doesn't matter here, since the
+ // stream is closing anyway — just use 1 to signify that
+ // a write does exist
+ trackWriteState(this, 1);
}
}
diff --git a/src/env.h b/src/env.h
index 42781e8f23..3d9d074963 100644
--- a/src/env.h
+++ b/src/env.h
@@ -111,6 +111,7 @@ class ModuleWrap;
V(callback_string, "callback") \
V(change_string, "change") \
V(channel_string, "channel") \
+ V(chunks_sent_since_last_write_string, "chunksSentSinceLastWrite") \
V(constants_string, "constants") \
V(oncertcb_string, "oncertcb") \
V(onclose_string, "_onclose") \
diff --git a/src/node_http2.cc b/src/node_http2.cc
index 4b29013636..ec65e0052f 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -603,6 +603,8 @@ void Http2Session::SubmitFile(const FunctionCallbackInfo<Value>& args) {
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
}
+ session->chunks_sent_since_last_write_ = 0;
+
Headers list(isolate, context, headers);
args.GetReturnValue().Set(stream->SubmitFile(fd, *list, list.length(),
@@ -757,6 +759,23 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
stream->FlushDataChunks();
}
+void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {
+ Http2Session* session;
+ Environment* env = Environment::GetCurrent(args);
+ Isolate* isolate = env->isolate();
+ ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
+
+ HandleScope scope(isolate);
+
+ uint32_t length = session->chunks_sent_since_last_write_;
+
+ session->object()->Set(env->context(),
+ env->chunks_sent_since_last_write_string(),
+ Integer::NewFromUnsigned(isolate, length)).FromJust();
+
+ args.GetReturnValue().Set(length);
+}
+
void Http2Session::SubmitPushPromise(const FunctionCallbackInfo<Value>& args) {
Http2Session* session;
Environment* env = Environment::GetCurrent(args);
@@ -811,6 +830,8 @@ int Http2Session::DoWrite(WriteWrap* req_wrap,
}
}
+ chunks_sent_since_last_write_ = 0;
+
nghttp2_stream_write_t* req = new nghttp2_stream_write_t;
req->data = req_wrap;
@@ -846,6 +867,7 @@ void Http2Session::Send(uv_buf_t* buf, size_t length) {
this,
AfterWrite);
+ chunks_sent_since_last_write_++;
uv_buf_t actual = uv_buf_init(buf->base, length);
if (stream_->DoWrite(write_req, &actual, 1, nullptr)) {
write_req->Dispose();
@@ -1255,6 +1277,8 @@ void Initialize(Local<Object> target,
Http2Session::DestroyStream);
env->SetProtoMethod(session, "flushData",
Http2Session::FlushData);
+ env->SetProtoMethod(session, "updateChunksSent",
+ Http2Session::UpdateChunksSent);
StreamBase::AddMethods<Http2Session>(env, session,
StreamBase::kFlagHasWritev |
StreamBase::kFlagNoShutdown);
diff --git a/src/node_http2.h b/src/node_http2.h
index e35c189ea6..1b24b97ff0 100644
--- a/src/node_http2.h
+++ b/src/node_http2.h
@@ -475,6 +475,7 @@ class Http2Session : public AsyncWrap,
static void SubmitGoaway(const FunctionCallbackInfo<Value>& args);
static void DestroyStream(const FunctionCallbackInfo<Value>& args);
static void FlushData(const FunctionCallbackInfo<Value>& args);
+ static void UpdateChunksSent(const FunctionCallbackInfo<Value>& args);
template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);
@@ -493,6 +494,9 @@ class Http2Session : public AsyncWrap,
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
padding_strategy_type padding_strategy_ = PADDING_STRATEGY_NONE;
+ // use this to allow timeout tracking during long-lasting writes
+ uint32_t chunks_sent_since_last_write_ = 0;
+
char stream_buf_[kAllocBufferSize];
};
diff --git a/test/sequential/test-http2-timeout-large-write-file.js b/test/sequential/test-http2-timeout-large-write-file.js
new file mode 100644
index 0000000000..f52523780d
--- /dev/null
+++ b/test/sequential/test-http2-timeout-large-write-file.js
@@ -0,0 +1,89 @@
+'use strict';
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+const assert = require('assert');
+const fixtures = require('../common/fixtures');
+const fs = require('fs');
+const http2 = require('http2');
+const path = require('path');
+
+common.refreshTmpDir();
+
+// This test assesses whether long-running writes can complete
+// or timeout because the session or stream are not aware that the
+// backing stream is still writing.
+// To simulate a slow client, we write a really large chunk and
+// then proceed through the following cycle:
+// 1) Receive first 'data' event and record currently written size
+// 2) Once we've read up to currently written size recorded above,
+// we pause the stream and wait longer than the server timeout
+// 3) Socket.prototype._onTimeout triggers and should confirm
+// that the backing stream is still active and writing
+// 4) Our timer fires, we resume the socket and start at 1)
+
+const writeSize = 3000000;
+const minReadSize = 500000;
+const serverTimeout = common.platformTimeout(500);
+let offsetTimeout = common.platformTimeout(100);
+let didReceiveData = false;
+
+const content = Buffer.alloc(writeSize, 0x44);
+const filepath = path.join(common.tmpDir, 'http2-large-write.tmp');
+fs.writeFileSync(filepath, content, 'binary');
+const fd = fs.openSync(filepath, 'r');
+
+const server = http2.createSecureServer({
+ key: fixtures.readKey('agent1-key.pem'),
+ cert: fixtures.readKey('agent1-cert.pem')
+});
+server.on('stream', common.mustCall((stream) => {
+ stream.respondWithFD(fd, {
+ 'Content-Type': 'application/octet-stream',
+ 'Content-Length': content.length.toString(),
+ 'Vary': 'Accept-Encoding'
+ });
+ stream.setTimeout(serverTimeout);
+ stream.on('timeout', () => {
+ assert.strictEqual(didReceiveData, false, 'Should not timeout');
+ });
+ stream.end();
+}));
+server.setTimeout(serverTimeout);
+server.on('timeout', () => {
+ assert.strictEqual(didReceiveData, false, 'Should not timeout');
+});
+
+server.listen(0, common.mustCall(() => {
+ const client = http2.connect(`https://localhost:${server.address().port}`,
+ { rejectUnauthorized: false });
+
+ const req = client.request({ ':path': '/' });
+ req.end();
+
+ const resume = () => req.resume();
+ let receivedBufferLength = 0;
+ let firstReceivedAt;
+ req.on('data', common.mustCallAtLeast((buf) => {
+ if (receivedBufferLength === 0) {
+ didReceiveData = false;
+ firstReceivedAt = Date.now();
+ }
+ receivedBufferLength += buf.length;
+ if (receivedBufferLength >= minReadSize &&
+ receivedBufferLength < writeSize) {
+ didReceiveData = true;
+ receivedBufferLength = 0;
+ req.pause();
+ setTimeout(
+ resume,
+ serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
+ );
+ offsetTimeout = 0;
+ }
+ }, 1));
+ req.on('end', common.mustCall(() => {
+ client.destroy();
+ server.close();
+ }));
+}));
diff --git a/test/sequential/test-http2-timeout-large-write.js b/test/sequential/test-http2-timeout-large-write.js
new file mode 100644
index 0000000000..f0a11b2e44
--- /dev/null
+++ b/test/sequential/test-http2-timeout-large-write.js
@@ -0,0 +1,84 @@
+'use strict';
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+const assert = require('assert');
+const fixtures = require('../common/fixtures');
+const http2 = require('http2');
+
+// This test assesses whether long-running writes can complete
+// or timeout because the session or stream are not aware that the
+// backing stream is still writing.
+// To simulate a slow client, we write a really large chunk and
+// then proceed through the following cycle:
+// 1) Receive first 'data' event and record currently written size
+// 2) Once we've read up to currently written size recorded above,
+// we pause the stream and wait longer than the server timeout
+// 3) Socket.prototype._onTimeout triggers and should confirm
+// that the backing stream is still active and writing
+// 4) Our timer fires, we resume the socket and start at 1)
+
+const writeSize = 3000000;
+const minReadSize = 500000;
+const serverTimeout = common.platformTimeout(500);
+let offsetTimeout = common.platformTimeout(100);
+let didReceiveData = false;
+
+const server = http2.createSecureServer({
+ key: fixtures.readKey('agent1-key.pem'),
+ cert: fixtures.readKey('agent1-cert.pem')
+});
+server.on('stream', common.mustCall((stream) => {
+ const content = Buffer.alloc(writeSize, 0x44);
+
+ stream.respond({
+ 'Content-Type': 'application/octet-stream',
+ 'Content-Length': content.length.toString(),
+ 'Vary': 'Accept-Encoding'
+ });
+
+ stream.write(content);
+ stream.setTimeout(serverTimeout);
+ stream.on('timeout', () => {
+ assert.strictEqual(didReceiveData, false, 'Should not timeout');
+ });
+ stream.end();
+}));
+server.setTimeout(serverTimeout);
+server.on('timeout', () => {
+ assert.strictEqual(didReceiveData, false, 'Should not timeout');
+});
+
+server.listen(0, common.mustCall(() => {
+ const client = http2.connect(`https://localhost:${server.address().port}`,
+ { rejectUnauthorized: false });
+
+ const req = client.request({ ':path': '/' });
+ req.end();
+
+ const resume = () => req.resume();
+ let receivedBufferLength = 0;
+ let firstReceivedAt;
+ req.on('data', common.mustCallAtLeast((buf) => {
+ if (receivedBufferLength === 0) {
+ didReceiveData = false;
+ firstReceivedAt = Date.now();
+ }
+ receivedBufferLength += buf.length;
+ if (receivedBufferLength >= minReadSize &&
+ receivedBufferLength < writeSize) {
+ didReceiveData = true;
+ receivedBufferLength = 0;
+ req.pause();
+ setTimeout(
+ resume,
+ serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
+ );
+ offsetTimeout = 0;
+ }
+ }, 1));
+ req.on('end', common.mustCall(() => {
+ client.destroy();
+ server.close();
+ }));
+}));