summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnatoli Papirovski <apapirovski@mac.com>2017-12-11 17:55:17 -0500
committerAnatoli Papirovski <apapirovski@mac.com>2017-12-18 09:58:02 -0500
commitd36e1b4fed57b34d93e70d3408d753e00b8ed754 (patch)
tree03e0a12b194453231ad7391ff16b2c7cf3a489f2
parent68c63a9fa362138bed852714862ac37b85c06adb (diff)
downloadnode-new-d36e1b4fed57b34d93e70d3408d753e00b8ed754.tar.gz
net,src: refactor writeQueueSize tracking
Currently, writeQueueSize is never used in C++ and barely used within JS. Instead of constantly updating the value on the JS object, create a getter that will retrieve the most up-to-date value from C++. For the vast majority of cases though, create a new prop on Socket.prototype[kLastWriteQueueSize] using a Symbol. Use this to track the current write size, entirely in JS land. PR-URL: https://github.com/nodejs/node/pull/17650 Reviewed-By: Anna Henningsen <anna@addaleax.net>
-rw-r--r--lib/_tls_wrap.js5
-rw-r--r--lib/net.js33
-rw-r--r--src/pipe_wrap.cc1
-rw-r--r--src/stream_base.cc13
-rw-r--r--src/stream_base.h1
-rw-r--r--src/stream_wrap.cc48
-rw-r--r--src/stream_wrap.h6
-rw-r--r--src/tcp_wrap.cc1
-rw-r--r--src/tls_wrap.cc45
-rw-r--r--src/tls_wrap.h5
-rw-r--r--src/tty_wrap.cc3
-rw-r--r--test/parallel/test-tls-buffersize.js10
-rw-r--r--test/sequential/test-http-keep-alive-large-write.js67
-rw-r--r--test/sequential/test-https-keep-alive-large-write.js70
14 files changed, 125 insertions, 183 deletions
diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js
index 3a1b8753f0..e30efa4159 100644
--- a/lib/_tls_wrap.js
+++ b/lib/_tls_wrap.js
@@ -460,11 +460,6 @@ TLSSocket.prototype._init = function(socket, wrap) {
var options = this._tlsOptions;
var ssl = this._handle;
- // lib/net.js expect this value to be non-zero if write hasn't been flushed
- // immediately. After the handshake is done this will represent the actual
- // write queue size
- ssl.writeQueueSize = 1;
-
this.server = options.server;
// For clients, we will always have either a given ca list or be using
diff --git a/lib/net.js b/lib/net.js
index 994c1e3390..245415d87c 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -48,6 +48,8 @@ const { nextTick } = require('internal/process/next_tick');
const errors = require('internal/errors');
const dns = require('dns');
+const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
+
// `cluster` is only used by `listenInCluster` so for startup performance
// reasons it's lazy loaded.
var cluster = null;
@@ -198,6 +200,7 @@ function Socket(options) {
this._handle = null;
this._parent = null;
this._host = null;
+ this[kLastWriteQueueSize] = 0;
if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
@@ -398,12 +401,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
Socket.prototype._onTimeout = function() {
- if (this._handle) {
- // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
+ const handle = this._handle;
+ const lastWriteQueueSize = this[kLastWriteQueueSize];
+ if (lastWriteQueueSize > 0 && handle) {
+ // `lastWriteQueueSize !== writeQueueSize` means there is
// an active write in progress, so we suppress the timeout.
- const prevWriteQueueSize = this._handle.writeQueueSize;
- if (prevWriteQueueSize > 0 &&
- prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
+ const writeQueueSize = handle.writeQueueSize;
+ if (lastWriteQueueSize !== writeQueueSize) {
+ this[kLastWriteQueueSize] = writeQueueSize;
this._unrefTimer();
return;
}
@@ -473,7 +478,7 @@ Object.defineProperty(Socket.prototype, 'readyState', {
Object.defineProperty(Socket.prototype, 'bufferSize', {
get: function() {
if (this._handle) {
- return this._handle.writeQueueSize + this.writableLength;
+ return this[kLastWriteQueueSize] + this.writableLength;
}
}
});
@@ -764,12 +769,13 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
this._bytesDispatched += req.bytes;
- // If it was entirely flushed, we can write some more right now.
- // However, if more is left in the queue, then wait until that clears.
- if (req.async && this._handle.writeQueueSize !== 0)
- req.cb = cb;
- else
+ if (!req.async) {
cb();
+ return;
+ }
+
+ req.cb = cb;
+ this[kLastWriteQueueSize] = req.bytes;
};
@@ -853,6 +859,9 @@ function afterWrite(status, handle, req, err) {
if (self !== process.stderr && self !== process.stdout)
debug('afterWrite', status);
+ if (req.async)
+ self[kLastWriteQueueSize] = 0;
+
// callback may come after call to destroy.
if (self.destroyed) {
debug('afterWrite destroyed');
@@ -872,7 +881,7 @@ function afterWrite(status, handle, req, err) {
debug('afterWrite call cb');
if (req.cb)
- req.cb.call(self);
+ req.cb.call(undefined);
}
diff --git a/src/pipe_wrap.cc b/src/pipe_wrap.cc
index 76280f0ce7..c6dce756ce 100644
--- a/src/pipe_wrap.cc
+++ b/src/pipe_wrap.cc
@@ -165,7 +165,6 @@ PipeWrap::PipeWrap(Environment* env,
int r = uv_pipe_init(env->event_loop(), &handle_, ipc);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_pipe_init() returns void.
- UpdateWriteQueueSize();
}
diff --git a/src/stream_base.cc b/src/stream_base.cc
index bb25fc1cff..a48e77063e 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -193,7 +193,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}
err = DoWrite(req_wrap, buf_list, count, nullptr);
- req_wrap_obj->Set(env->async(), True(env->isolate()));
+ if (HasWriteQueue())
+ req_wrap_obj->Set(env->async(), True(env->isolate()));
if (err)
req_wrap->Dispose();
@@ -249,7 +250,8 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
req_wrap = WriteWrap::New(env, req_wrap_obj, this);
err = DoWrite(req_wrap, bufs, count, nullptr);
- req_wrap_obj->Set(env->async(), True(env->isolate()));
+ if (HasWriteQueue())
+ req_wrap_obj->Set(env->async(), True(env->isolate()));
req_wrap_obj->Set(env->buffer_string(), args[1]);
if (err)
@@ -373,7 +375,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
reinterpret_cast<uv_stream_t*>(send_handle));
}
- req_wrap_obj->Set(env->async(), True(env->isolate()));
+ if (HasWriteQueue())
+ req_wrap_obj->Set(env->async(), True(env->isolate()));
if (err)
req_wrap->Dispose();
@@ -467,6 +470,10 @@ int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
return 0;
}
+bool StreamResource::HasWriteQueue() {
+ return true;
+}
+
const char* StreamResource::Error() const {
return nullptr;
diff --git a/src/stream_base.h b/src/stream_base.h
index d063176b04..071627f3bf 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -162,6 +162,7 @@ class StreamResource {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
+ virtual bool HasWriteQueue();
virtual const char* Error() const;
virtual void ClearError();
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index f6cfba84c2..094991107b 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -40,13 +40,15 @@
namespace node {
using v8::Context;
+using v8::DontDelete;
using v8::EscapableHandleScope;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
-using v8::Integer;
using v8::Local;
using v8::Object;
+using v8::ReadOnly;
+using v8::Signature;
using v8::Value;
@@ -99,7 +101,16 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
void LibuvStreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
- env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
+ Local<FunctionTemplate> get_write_queue_size =
+ FunctionTemplate::New(env->isolate(),
+ GetWriteQueueSize,
+ env->as_external(),
+ Signature::New(env->isolate(), target));
+ target->PrototypeTemplate()->SetAccessorProperty(
+ env->write_queue_size_string(),
+ get_write_queue_size,
+ Local<FunctionTemplate>(),
+ static_cast<PropertyAttribute>(ReadOnly | DontDelete));
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
}
@@ -135,17 +146,6 @@ bool LibuvStreamWrap::IsIPCPipe() {
}
-uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
- HandleScope scope(env()->isolate());
- uint32_t write_queue_size = stream()->write_queue_size;
- object()->Set(env()->context(),
- env()->write_queue_size_string(),
- Integer::NewFromUnsigned(env()->isolate(),
- write_queue_size)).FromJust();
- return write_queue_size;
-}
-
-
int LibuvStreamWrap::ReadStart() {
return uv_read_start(stream(), OnAlloc, OnRead);
}
@@ -267,13 +267,18 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}
-void LibuvStreamWrap::UpdateWriteQueueSize(
- const FunctionCallbackInfo<Value>& args) {
+void LibuvStreamWrap::GetWriteQueueSize(
+ const FunctionCallbackInfo<Value>& info) {
LibuvStreamWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
+
+ if (wrap->stream() == nullptr) {
+ info.GetReturnValue().Set(0);
+ return;
+ }
- uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
- args.GetReturnValue().Set(write_queue_size);
+ uint32_t write_queue_size = wrap->stream()->write_queue_size;
+ info.GetReturnValue().Set(write_queue_size);
}
@@ -370,12 +375,16 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
}
w->Dispatched();
- UpdateWriteQueueSize();
return r;
}
+bool LibuvStreamWrap::HasWriteQueue() {
+ return stream()->write_queue_size > 0;
+}
+
+
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = WriteWrap::from_req(req);
CHECK_NE(req_wrap, nullptr);
@@ -387,7 +396,6 @@ void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) {
StreamBase::AfterWrite(w, status);
- UpdateWriteQueueSize();
}
} // namespace node
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index 414bad393f..a695f9a08a 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -55,6 +55,7 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
+ bool HasWriteQueue() override;
inline uv_stream_t* stream() const {
return stream_;
@@ -83,15 +84,14 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
}
AsyncWrap* GetAsyncWrap() override;
- uint32_t UpdateWriteQueueSize();
static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);
private:
- static void UpdateWriteQueueSize(
- const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void GetWriteQueueSize(
+ const v8::FunctionCallbackInfo<v8::Value>& info);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
// Callbacks for libuv
diff --git a/src/tcp_wrap.cc b/src/tcp_wrap.cc
index 8dd14e2e16..aa0cb7ed17 100644
--- a/src/tcp_wrap.cc
+++ b/src/tcp_wrap.cc
@@ -169,7 +169,6 @@ TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
int r = uv_tcp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
// Suggestion: uv_tcp_init() returns void.
- UpdateWriteQueueSize();
}
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index c661a0ac32..b2ef5184e0 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -35,14 +35,16 @@ namespace node {
using crypto::SecureContext;
using crypto::SSLWrap;
using v8::Context;
+using v8::DontDelete;
using v8::EscapableHandleScope;
using v8::Exception;
using v8::Function;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
-using v8::Integer;
using v8::Local;
using v8::Object;
+using v8::ReadOnly;
+using v8::Signature;
using v8::String;
using v8::Value;
@@ -309,7 +311,6 @@ void TLSWrap::EncOut() {
// No data to write
if (BIO_pending(enc_out_) == 0) {
- UpdateWriteQueueSize();
if (clear_in_->Length() == 0)
InvokeQueued(0);
return;
@@ -555,17 +556,6 @@ bool TLSWrap::IsClosing() {
}
-uint32_t TLSWrap::UpdateWriteQueueSize(uint32_t write_queue_size) {
- HandleScope scope(env()->isolate());
- if (write_queue_size == 0)
- write_queue_size = BIO_pending(enc_out_);
- object()->Set(env()->context(),
- env()->write_queue_size_string(),
- Integer::NewFromUnsigned(env()->isolate(),
- write_queue_size)).FromJust();
- return write_queue_size;
-}
-
int TLSWrap::ReadStart() {
if (stream_ != nullptr)
@@ -612,9 +602,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0) {
- // net.js expects writeQueueSize to be > 0 if the write isn't
- // immediately flushed
- UpdateWriteQueueSize(1);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}
@@ -666,7 +653,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
// Try writing data immediately
EncOut();
- UpdateWriteQueueSize();
return 0;
}
@@ -938,12 +924,17 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
-void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
+void TLSWrap::GetWriteQueueSize(const FunctionCallbackInfo<Value>& info) {
TLSWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, info.This());
- uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
- args.GetReturnValue().Set(write_queue_size);
+ if (wrap->clear_in_ == nullptr) {
+ info.GetReturnValue().Set(0);
+ return;
+ }
+
+ uint32_t write_queue_size = BIO_pending(wrap->enc_out_);
+ info.GetReturnValue().Set(write_queue_size);
}
@@ -966,6 +957,17 @@ void TLSWrap::Initialize(Local<Object> target,
t->InstanceTemplate()->SetInternalFieldCount(1);
t->SetClassName(tlsWrapString);
+ Local<FunctionTemplate> get_write_queue_size =
+ FunctionTemplate::New(env->isolate(),
+ GetWriteQueueSize,
+ env->as_external(),
+ Signature::New(env->isolate(), t));
+ t->PrototypeTemplate()->SetAccessorProperty(
+ env->write_queue_size_string(),
+ get_write_queue_size,
+ Local<FunctionTemplate>(),
+ static_cast<PropertyAttribute>(ReadOnly | DontDelete));
+
AsyncWrap::AddWrapMethods(env, t, AsyncWrap::kFlagHasReset);
env->SetProtoMethod(t, "receive", Receive);
env->SetProtoMethod(t, "start", Start);
@@ -973,7 +975,6 @@ void TLSWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
- env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index 87eac75779..2ca9d53137 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -131,7 +131,6 @@ class TLSWrap : public AsyncWrap,
AsyncWrap* GetAsyncWrap() override;
bool IsIPCPipe() override;
- uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0);
// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx);
@@ -189,8 +188,8 @@ class TLSWrap : public AsyncWrap,
bool eof_;
private:
- static void UpdateWriteQueueSize(
- const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void GetWriteQueueSize(
+ const v8::FunctionCallbackInfo<v8::Value>& info);
};
} // namespace node
diff --git a/src/tty_wrap.cc b/src/tty_wrap.cc
index df512bfcd3..fae39158ef 100644
--- a/src/tty_wrap.cc
+++ b/src/tty_wrap.cc
@@ -153,12 +153,11 @@ void TTYWrap::New(const FunctionCallbackInfo<Value>& args) {
CHECK_GE(fd, 0);
int err = 0;
- TTYWrap* wrap = new TTYWrap(env, args.This(), fd, args[1]->IsTrue(), &err);
+ new TTYWrap(env, args.This(), fd, args[1]->IsTrue(), &err);
if (err != 0) {
env->CollectUVExceptionInfo(args[2], err, "uv_tty_init");
args.GetReturnValue().SetUndefined();
}
- wrap->UpdateWriteQueueSize();
}
diff --git a/test/parallel/test-tls-buffersize.js b/test/parallel/test-tls-buffersize.js
index 49848cd865..c94b95d7b3 100644
--- a/test/parallel/test-tls-buffersize.js
+++ b/test/parallel/test-tls-buffersize.js
@@ -7,17 +7,17 @@ const fixtures = require('../common/fixtures');
const tls = require('tls');
const iter = 10;
-const overhead = 30;
const server = tls.createServer({
key: fixtures.readKey('agent2-key.pem'),
cert: fixtures.readKey('agent2-cert.pem')
}, common.mustCall((socket) => {
- socket.on('readable', common.mustCallAtLeast(() => {
- socket.read();
- }, 1));
+ let str = '';
+ socket.setEncoding('utf-8');
+ socket.on('data', (chunk) => { str += chunk; });
socket.on('end', common.mustCall(() => {
+ assert.strictEqual(str, 'a'.repeat(iter - 1));
server.close();
}));
}));
@@ -31,7 +31,7 @@ server.listen(0, common.mustCall(() => {
for (let i = 1; i < iter; i++) {
client.write('a');
- assert.strictEqual(client.bufferSize, i + overhead);
+ assert.strictEqual(client.bufferSize, i + 1);
}
client.on('finish', common.mustCall(() => {
diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js
index 2cdf539e76..4119c2353d 100644
--- a/test/sequential/test-http-keep-alive-large-write.js
+++ b/test/sequential/test-http-keep-alive-large-write.js
@@ -6,26 +6,12 @@ const http = require('http');
// This test assesses whether long-running writes can complete
// or timeout because the socket is not aware that the backing
// stream is still writing.
-// To simulate a slow client, we write a really large chunk and
-// then proceed through the following cycle:
-// 1) Receive first 'data' event and record currently written size
-// 2) Once we've read up to currently written size recorded above,
-// we pause the stream and wait longer than the server timeout
-// 3) Socket.prototype._onTimeout triggers and should confirm
-// that the backing stream is still active and writing
-// 4) Our timer fires, we resume the socket and start at 1)
-const minReadSize = 250000;
-const serverTimeout = common.platformTimeout(500);
-let offsetTimeout = common.platformTimeout(100);
-let serverConnectionHandle;
-let writeSize = 3000000;
-let didReceiveData = false;
-// this represents each cycles write size, where the cycle consists
-// of `write > read > _onTimeout`
-let currentWriteSize = 0;
+const writeSize = 3000000;
+let socket;
const server = http.createServer(common.mustCall((req, res) => {
+ server.close();
const content = Buffer.alloc(writeSize, 0x44);
res.writeHead(200, {
@@ -34,47 +20,28 @@ const server = http.createServer(common.mustCall((req, res) => {
'Vary': 'Accept-Encoding'
});
- serverConnectionHandle = res.socket._handle;
+ socket = res.socket;
+ const onTimeout = socket._onTimeout;
+ socket._onTimeout = common.mustCallAtLeast(() => onTimeout.call(socket), 1);
res.write(content);
res.end();
}));
-server.setTimeout(serverTimeout);
server.on('timeout', () => {
- assert.strictEqual(didReceiveData, false, 'Should not timeout');
+ // TODO(apapirovski): This test is faulty on certain Windows systems
+ // as no queue is ever created
+ assert(!socket._handle || socket._handle.writeQueueSize === 0,
+ 'Should not timeout');
});
server.listen(0, common.mustCall(() => {
http.get({
path: '/',
port: server.address().port
- }, common.mustCall((res) => {
- const resume = () => res.resume();
- let receivedBufferLength = 0;
- let firstReceivedAt;
- res.on('data', common.mustCallAtLeast((buf) => {
- if (receivedBufferLength === 0) {
- currentWriteSize = Math.max(
- minReadSize,
- writeSize - serverConnectionHandle.writeQueueSize
- );
- didReceiveData = false;
- firstReceivedAt = Date.now();
- }
- receivedBufferLength += buf.length;
- if (receivedBufferLength >= currentWriteSize) {
- didReceiveData = true;
- writeSize = serverConnectionHandle.writeQueueSize;
- receivedBufferLength = 0;
- res.pause();
- setTimeout(
- resume,
- serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
- );
- offsetTimeout = 0;
- }
- }, 1));
- res.on('end', common.mustCall(() => {
- server.close();
- }));
- }));
+ }, (res) => {
+ res.once('data', () => {
+ socket._onTimeout();
+ res.on('data', () => {});
+ });
+ res.on('end', () => server.close());
+ });
}));
diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js
index 5048f4f951..79381ba873 100644
--- a/test/sequential/test-https-keep-alive-large-write.js
+++ b/test/sequential/test-https-keep-alive-large-write.js
@@ -2,31 +2,15 @@
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
-const assert = require('assert');
const fixtures = require('../common/fixtures');
const https = require('https');
// This test assesses whether long-running writes can complete
// or timeout because the socket is not aware that the backing
// stream is still writing.
-// To simulate a slow client, we write a really large chunk and
-// then proceed through the following cycle:
-// 1) Receive first 'data' event and record currently written size
-// 2) Once we've read up to currently written size recorded above,
-// we pause the stream and wait longer than the server timeout
-// 3) Socket.prototype._onTimeout triggers and should confirm
-// that the backing stream is still active and writing
-// 4) Our timer fires, we resume the socket and start at 1)
-const minReadSize = 250000;
-const serverTimeout = common.platformTimeout(500);
-let offsetTimeout = common.platformTimeout(100);
-let serverConnectionHandle;
-let writeSize = 2000000;
-let didReceiveData = false;
-// this represents each cycles write size, where the cycle consists
-// of `write > read > _onTimeout`
-let currentWriteSize = 0;
+const writeSize = 30000000;
+let socket;
const server = https.createServer({
key: fixtures.readKey('agent1-key.pem'),
@@ -40,50 +24,24 @@ const server = https.createServer({
'Vary': 'Accept-Encoding'
});
- serverConnectionHandle = res.socket._handle;
- res.write(content, () => {
- assert.strictEqual(serverConnectionHandle.writeQueueSize, 0);
- });
+ socket = res.socket;
+ const onTimeout = socket._onTimeout;
+ socket._onTimeout = common.mustCallAtLeast(() => onTimeout.call(socket), 1);
+ res.write(content);
res.end();
}));
-server.setTimeout(serverTimeout);
-server.on('timeout', () => {
- assert.strictEqual(didReceiveData, false, 'Should not timeout');
-});
+server.on('timeout', common.mustNotCall());
server.listen(0, common.mustCall(() => {
https.get({
path: '/',
port: server.address().port,
rejectUnauthorized: false
- }, common.mustCall((res) => {
- const resume = () => res.resume();
- let receivedBufferLength = 0;
- let firstReceivedAt;
- res.on('data', common.mustCallAtLeast((buf) => {
- if (receivedBufferLength === 0) {
- currentWriteSize = Math.max(
- minReadSize,
- writeSize - serverConnectionHandle.writeQueueSize
- );
- didReceiveData = false;
- firstReceivedAt = Date.now();
- }
- receivedBufferLength += buf.length;
- if (receivedBufferLength >= currentWriteSize) {
- didReceiveData = true;
- writeSize = serverConnectionHandle.writeQueueSize;
- receivedBufferLength = 0;
- res.pause();
- setTimeout(
- resume,
- serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
- );
- offsetTimeout = 0;
- }
- }, 1));
- res.on('end', common.mustCall(() => {
- server.close();
- }));
- }));
+ }, (res) => {
+ res.once('data', () => {
+ socket._onTimeout();
+ res.on('data', () => {});
+ });
+ res.on('end', () => server.close());
+ });
}));