summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_stream_wrap.js134
-rw-r--r--lib/_tls_wrap.js32
-rw-r--r--src/js_stream.cc2
-rw-r--r--src/tls_wrap.cc7
-rw-r--r--test/parallel/test-stream-wrap.js39
-rw-r--r--test/parallel/test-tls-connect-given-socket.js29
-rw-r--r--test/parallel/test-tls-destroy-whilst-write.js29
7 files changed, 239 insertions, 33 deletions
diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js
index 1eac2ddf94..924d07a986 100644
--- a/lib/_stream_wrap.js
+++ b/lib/_stream_wrap.js
@@ -1,18 +1,23 @@
'use strict';
+const assert = require('assert');
const util = require('util');
const Socket = require('net').Socket;
const JSStream = process.binding('js_stream').JSStream;
const uv = process.binding('uv');
+const debug = util.debuglog('stream_wrap');
function StreamWrap(stream) {
- var handle = new JSStream();
+ const handle = new JSStream();
this.stream = stream;
- var self = this;
+ this._list = null;
+
+ const self = this;
handle.close = function(cb) {
- cb();
+ debug('close');
+ self.doClose(cb);
};
handle.isAlive = function() {
return self.isAlive();
@@ -27,21 +32,25 @@ function StreamWrap(stream) {
return self.readStop();
};
handle.onshutdown = function(req) {
- return self.shutdown(req);
+ return self.doShutdown(req);
};
handle.onwrite = function(req, bufs) {
- return self.write(req, bufs);
+ return self.doWrite(req, bufs);
};
this.stream.pause();
+ this.stream.on('error', function(err) {
+ self.emit('error', err);
+ });
this.stream.on('data', function(chunk) {
- self._handle.readBuffer(chunk);
+ debug('data', chunk.length);
+ if (self._handle)
+ self._handle.readBuffer(chunk);
});
this.stream.once('end', function() {
- self._handle.emitEOF();
- });
- this.stream.on('error', function(err) {
- self.emit('error', err);
+ debug('end');
+ if (self._handle)
+ self._handle.emitEOF();
});
Socket.call(this, {
@@ -55,11 +64,11 @@ module.exports = StreamWrap;
StreamWrap.StreamWrap = StreamWrap;
StreamWrap.prototype.isAlive = function isAlive() {
- return this.readable && this.writable;
+ return true;
};
StreamWrap.prototype.isClosing = function isClosing() {
- return !this.isAlive();
+ return !this.readable || !this.writable;
};
StreamWrap.prototype.readStart = function readStart() {
@@ -72,21 +81,31 @@ StreamWrap.prototype.readStop = function readStop() {
return 0;
};
-StreamWrap.prototype.shutdown = function shutdown(req) {
- var self = this;
+StreamWrap.prototype.doShutdown = function doShutdown(req) {
+ const self = this;
+ const handle = this._handle;
+ const item = this._enqueue('shutdown', req);
this.stream.end(function() {
// Ensure that write was dispatched
setImmediate(function() {
- self._handle.finishShutdown(req, 0);
+ if (!self._dequeue(item))
+ return;
+
+ handle.finishShutdown(req, 0);
});
});
return 0;
};
-StreamWrap.prototype.write = function write(req, bufs) {
+StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
+ const self = this;
+ const handle = self._handle;
+
var pending = bufs.length;
- var self = this;
+
+ // Queue the request to be able to cancel it
+ const item = self._enqueue('write', req);
self.stream.cork();
bufs.forEach(function(buf) {
@@ -103,6 +122,10 @@ StreamWrap.prototype.write = function write(req, bufs) {
// Ensure that write was dispatched
setImmediate(function() {
+ // Do not invoke callback twice
+ if (!self._dequeue(item))
+ return;
+
var errCode = 0;
if (err) {
if (err.code && uv['UV_' + err.code])
@@ -111,10 +134,83 @@ StreamWrap.prototype.write = function write(req, bufs) {
errCode = uv.UV_EPIPE;
}
- self._handle.doAfterWrite(req);
- self._handle.finishWrite(req, errCode);
+ handle.doAfterWrite(req);
+ handle.finishWrite(req, errCode);
});
}
return 0;
};
+
+function QueueItem(type, req) {
+ this.type = type;
+ this.req = req;
+ this.prev = this;
+ this.next = this;
+}
+
+StreamWrap.prototype._enqueue = function enqueue(type, req) {
+ const item = new QueueItem(type, req);
+ if (this._list === null) {
+ this._list = item;
+ return item;
+ }
+
+ item.next = this._list.next;
+ item.prev = this._list;
+ item.next.prev = item;
+ item.prev.next = item;
+
+ return item;
+};
+
+StreamWrap.prototype._dequeue = function dequeue(item) {
+ assert(item instanceof QueueItem);
+
+ var next = item.next;
+ var prev = item.prev;
+
+ if (next === null && prev === null)
+ return false;
+
+ item.next = null;
+ item.prev = null;
+
+ if (next === item) {
+ prev = null;
+ next = null;
+ } else {
+ prev.next = next;
+ next.prev = prev;
+ }
+
+ if (this._list === item)
+ this._list = next;
+
+ return true;
+};
+
+StreamWrap.prototype.doClose = function doClose(cb) {
+ const self = this;
+ const handle = self._handle;
+
+ setImmediate(function() {
+ while (self._list !== null) {
+ const item = self._list;
+ const req = item.req;
+ self._dequeue(item);
+
+ const errCode = uv.UV_ECANCELED;
+ if (item.type === 'write') {
+ handle.doAfterWrite(req);
+ handle.finishWrite(req, errCode);
+ } else if (item.type === 'shutdown') {
+ handle.finishShutdown(req, errCode);
+ }
+ }
+
+ // Should be already set by net.js
+ assert(self._handle === null);
+ cb();
+ });
+};
diff --git a/lib/_tls_wrap.js b/lib/_tls_wrap.js
index 8ccee2379d..b96e577a76 100644
--- a/lib/_tls_wrap.js
+++ b/lib/_tls_wrap.js
@@ -254,7 +254,7 @@ function TLSSocket(socket, options) {
this.encrypted = true;
net.Socket.call(this, {
- handle: this._wrapHandle(wrap && wrap._handle),
+ handle: this._wrapHandle(wrap),
allowHalfOpen: socket && socket.allowHalfOpen,
readable: false,
writable: false
@@ -279,7 +279,7 @@ util.inherits(TLSSocket, net.Socket);
exports.TLSSocket = TLSSocket;
var proxiedMethods = [
- 'close', 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6',
+ 'ref', 'unref', 'open', 'bind', 'listen', 'connect', 'bind6',
'connect6', 'getsockname', 'getpeername', 'setNoDelay', 'setKeepAlive',
'setSimultaneousAccepts', 'setBlocking',
@@ -295,8 +295,20 @@ proxiedMethods.forEach(function(name) {
};
});
-TLSSocket.prototype._wrapHandle = function(handle) {
+tls_wrap.TLSWrap.prototype.close = function closeProxy(cb) {
+ if (this._parentWrap && this._parentWrap._handle === this._parent) {
+ setImmediate(cb);
+ return this._parentWrap.destroy();
+ }
+ return this._parent.close(cb);
+};
+
+TLSSocket.prototype._wrapHandle = function(wrap) {
var res;
+ var handle;
+
+ if (wrap)
+ handle = wrap._handle;
var options = this._tlsOptions;
if (!handle) {
@@ -310,6 +322,7 @@ TLSSocket.prototype._wrapHandle = function(handle) {
tls.createSecureContext();
res = tls_wrap.wrap(handle, context.context, options.isServer);
res._parent = handle;
+ res._parentWrap = wrap;
res._secureContext = context;
res.reading = handle.reading;
Object.defineProperty(handle, 'reading', {
@@ -355,7 +368,13 @@ TLSSocket.prototype._init = function(socket, wrap) {
// represent real writeQueueSize during regular writes.
ssl.writeQueueSize = 1;
- this.server = options.server || null;
+ this.server = options.server;
+
+ // Move the server to TLSSocket, otherwise both `socket.destroy()` and
+ // `TLSSocket.destroy()` will decrement number of connections of the TLS
+ // server, leading to misfiring `server.close()` callback
+ if (socket && socket.server === this.server)
+ socket.server = null;
// For clients, we will always have either a given ca list or be using
// default one
@@ -418,6 +437,7 @@ TLSSocket.prototype._init = function(socket, wrap) {
// set `.onsniselect` callback.
if (process.features.tls_sni &&
options.isServer &&
+ options.SNICallback &&
options.server &&
(options.SNICallback !== SNICallback ||
options.server._contexts.length)) {
@@ -554,6 +574,10 @@ TLSSocket.prototype._start = function() {
return;
}
+ // Socket was destroyed before the connection was established
+ if (!this._handle)
+ return;
+
debug('start');
if (this._tlsOptions.requestOCSP)
this._handle.requestOCSP();
diff --git a/src/js_stream.cc b/src/js_stream.cc
index 6b7c4063e0..91041d0201 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -163,7 +163,7 @@ template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
Wrap* w = Unwrap<Wrap>(args[0].As<Object>());
- w->Done(args[0]->Int32Value());
+ w->Done(args[1]->Int32Value());
}
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index b8a648de92..d4c7c9055d 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -320,6 +320,10 @@ void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) {
TLSWrap* wrap = req_wrap->wrap()->Cast<TLSWrap>();
req_wrap->Dispose();
+ // We should not be getting here after `DestroySSL`, because all queued writes
+ // must be invoked with UV_ECANCELED
+ CHECK_NE(wrap->ssl_, nullptr);
+
// Handle error
if (status) {
// Ignore errors after shutdown
@@ -331,9 +335,6 @@ void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) {
return;
}
- if (wrap->ssl_ == nullptr)
- return;
-
// Commit
NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_);
diff --git a/test/parallel/test-stream-wrap.js b/test/parallel/test-stream-wrap.js
new file mode 100644
index 0000000000..e7a7ecddd2
--- /dev/null
+++ b/test/parallel/test-stream-wrap.js
@@ -0,0 +1,39 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+
+const StreamWrap = require('_stream_wrap');
+const Duplex = require('stream').Duplex;
+const ShutdownWrap = process.binding('stream_wrap').ShutdownWrap;
+
+var done = false;
+
+function testShutdown(callback) {
+ var stream = new Duplex({
+ read: function() {
+ },
+ write: function() {
+ }
+ });
+
+ var wrap = new StreamWrap(stream);
+
+ var req = new ShutdownWrap();
+ req.oncomplete = function(code) {
+ assert(code < 0);
+ callback();
+ };
+ req.handle = wrap._handle;
+
+ // Close the handle to simulate
+ wrap.destroy();
+ req.handle.shutdown(req);
+}
+
+testShutdown(function() {
+ done = true;
+});
+
+process.on('exit', function() {
+ assert(done);
+});
diff --git a/test/parallel/test-tls-connect-given-socket.js b/test/parallel/test-tls-connect-given-socket.js
index 9e8170b13a..902b67aa51 100644
--- a/test/parallel/test-tls-connect-given-socket.js
+++ b/test/parallel/test-tls-connect-given-socket.js
@@ -43,16 +43,33 @@ var server = tls.createServer(options, function(socket) {
});
assert(client.readable);
assert(client.writable);
+
+ return client;
}
- // Already connected socket
- var connected = net.connect(common.PORT, function() {
- establish(connected);
+ // Immediate death socket
+ var immediateDeath = net.connect(common.PORT);
+ establish(immediateDeath).destroy();
+
+ // Outliving
+ var outlivingTCP = net.connect(common.PORT);
+ outlivingTCP.on('connect', function() {
+ outlivingTLS.destroy();
+ next();
});
+ var outlivingTLS = establish(outlivingTCP);
+
+ function next() {
+ // Already connected socket
+ var connected = net.connect(common.PORT, function() {
+ establish(connected);
+ });
+
+ // Connecting socket
+ var connecting = net.connect(common.PORT);
+ establish(connecting);
- // Connecting socket
- var connecting = net.connect(common.PORT);
- establish(connecting);
+ }
});
process.on('exit', function() {
diff --git a/test/parallel/test-tls-destroy-whilst-write.js b/test/parallel/test-tls-destroy-whilst-write.js
new file mode 100644
index 0000000000..8b865fab17
--- /dev/null
+++ b/test/parallel/test-tls-destroy-whilst-write.js
@@ -0,0 +1,29 @@
+'use strict';
+var assert = require('assert');
+var common = require('../common');
+
+if (!common.hasCrypto) {
+ console.log('1..0 # Skipped: missing crypto');
+ process.exit();
+}
+var tls = require('tls');
+var stream = require('stream');
+
+var delay = new stream.Duplex({
+ read: function read() {
+ },
+ write: function write(data, enc, cb) {
+ console.log('pending');
+ setTimeout(function() {
+ console.log('done');
+ cb();
+ }, 200);
+ }
+});
+
+var secure = tls.connect({
+ socket: delay
+});
+setImmediate(function() {
+ secure.destroy();
+});