summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2017-10-12 07:17:48 +0200
committerGibson Fahnestock <gibfahn@gmail.com>2017-10-31 00:15:16 +0000
commit87fd5b798f17f30531ea095b875d84b6b9a1f9b9 (patch)
tree2c7d714ff5da7a2cfa14b547734fc27d04a5de5f
parent9bea207e83a4b0be306bba8876402ab7dee7a391 (diff)
downloadnode-new-87fd5b798f17f30531ea095b875d84b6b9a1f9b9.tar.gz
lib: move _stream_wrap into internals
This makes a subsequent possible deprecation easier. PR-URL: https://github.com/nodejs/node/pull/16158 Backport-PR-URL: https://github.com/nodejs/node/pull/16626 Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Tobias Nießen <tniessen@tnie.de>
-rw-r--r--lib/_stream_wrap.js226
-rw-r--r--lib/internal/wrap_js_stream.js227
-rw-r--r--node.gyp1
3 files changed, 229 insertions, 225 deletions
diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js
index 6da778ccb6..10a0cf57e7 100644
--- a/lib/_stream_wrap.js
+++ b/lib/_stream_wrap.js
@@ -1,227 +1,3 @@
'use strict';
-const assert = require('assert');
-const util = require('util');
-// TODO(bmeurer): Change this back to const once hole checks are
-// properly optimized away early in Ignition+TurboFan.
-var Buffer = require('buffer').Buffer;
-const { Socket } = require('net');
-const { JSStream } = process.binding('js_stream');
-const uv = process.binding('uv');
-const debug = util.debuglog('stream_wrap');
-
-function StreamWrap(stream) {
- const handle = new JSStream();
-
- this.stream = stream;
-
- this._list = null;
-
- const self = this;
- handle.close = function(cb) {
- debug('close');
- self.doClose(cb);
- };
- handle.isAlive = function() {
- return self.isAlive();
- };
- handle.isClosing = function() {
- return self.isClosing();
- };
- handle.onreadstart = function() {
- return self.readStart();
- };
- handle.onreadstop = function() {
- return self.readStop();
- };
- handle.onshutdown = function(req) {
- return self.doShutdown(req);
- };
- handle.onwrite = function(req, bufs) {
- return self.doWrite(req, bufs);
- };
-
- this.stream.pause();
- this.stream.on('error', function onerror(err) {
- self.emit('error', err);
- });
- this.stream.on('data', function ondata(chunk) {
- if (!(chunk instanceof Buffer)) {
- // Make sure that no further `data` events will happen
- this.pause();
- this.removeListener('data', ondata);
-
- self.emit('error', new Error('Stream has StringDecoder'));
- return;
- }
-
- debug('data', chunk.length);
- if (self._handle)
- self._handle.readBuffer(chunk);
- });
- this.stream.once('end', function onend() {
- debug('end');
- if (self._handle)
- self._handle.emitEOF();
- });
-
- Socket.call(this, {
- handle: handle
- });
-}
-util.inherits(StreamWrap, Socket);
-module.exports = StreamWrap;
-
-// require('_stream_wrap').StreamWrap
-StreamWrap.StreamWrap = StreamWrap;
-
-StreamWrap.prototype.isAlive = function isAlive() {
- return true;
-};
-
-StreamWrap.prototype.isClosing = function isClosing() {
- return !this.readable || !this.writable;
-};
-
-StreamWrap.prototype.readStart = function readStart() {
- this.stream.resume();
- return 0;
-};
-
-StreamWrap.prototype.readStop = function readStop() {
- this.stream.pause();
- return 0;
-};
-
-StreamWrap.prototype.doShutdown = function doShutdown(req) {
- const self = this;
- const handle = this._handle;
- const item = this._enqueue('shutdown', req);
-
- this.stream.end(function() {
- // Ensure that write was dispatched
- setImmediate(function() {
- if (!self._dequeue(item))
- return;
-
- handle.finishShutdown(req, 0);
- });
- });
- return 0;
-};
-
-StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
- const self = this;
- const handle = self._handle;
-
- var pending = bufs.length;
-
- // Queue the request to be able to cancel it
- const item = self._enqueue('write', req);
-
- self.stream.cork();
- for (var n = 0; n < bufs.length; n++)
- self.stream.write(bufs[n], done);
- self.stream.uncork();
-
- function done(err) {
- if (!err && --pending !== 0)
- return;
-
- // Ensure that this is called once in case of error
- pending = 0;
-
- // Ensure that write was dispatched
- setImmediate(function() {
- // Do not invoke callback twice
- if (!self._dequeue(item))
- return;
-
- var errCode = 0;
- if (err) {
- if (err.code && uv['UV_' + err.code])
- errCode = uv['UV_' + err.code];
- else
- errCode = uv.UV_EPIPE;
- }
-
- handle.doAfterWrite(req);
- handle.finishWrite(req, errCode);
- });
- }
-
- return 0;
-};
-
-function QueueItem(type, req) {
- this.type = type;
- this.req = req;
- this.prev = this;
- this.next = this;
-}
-
-StreamWrap.prototype._enqueue = function _enqueue(type, req) {
- const item = new QueueItem(type, req);
- if (this._list === null) {
- this._list = item;
- return item;
- }
-
- item.next = this._list.next;
- item.prev = this._list;
- item.next.prev = item;
- item.prev.next = item;
-
- return item;
-};
-
-StreamWrap.prototype._dequeue = function _dequeue(item) {
- assert(item instanceof QueueItem);
-
- var next = item.next;
- var prev = item.prev;
-
- if (next === null && prev === null)
- return false;
-
- item.next = null;
- item.prev = null;
-
- if (next === item) {
- prev = null;
- next = null;
- } else {
- prev.next = next;
- next.prev = prev;
- }
-
- if (this._list === item)
- this._list = next;
-
- return true;
-};
-
-StreamWrap.prototype.doClose = function doClose(cb) {
- const self = this;
- const handle = self._handle;
-
- setImmediate(function() {
- while (self._list !== null) {
- const item = self._list;
- const req = item.req;
- self._dequeue(item);
-
- const errCode = uv.UV_ECANCELED;
- if (item.type === 'write') {
- handle.doAfterWrite(req);
- handle.finishWrite(req, errCode);
- } else if (item.type === 'shutdown') {
- handle.finishShutdown(req, errCode);
- }
- }
-
- // Should be already set by net.js
- assert(self._handle === null);
- cb();
- });
-};
+module.exports = require('internal/wrap_js_stream');
diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js
new file mode 100644
index 0000000000..25a77b8c4a
--- /dev/null
+++ b/lib/internal/wrap_js_stream.js
@@ -0,0 +1,227 @@
+'use strict';
+
+const assert = require('assert');
+const util = require('util');
+const Socket = require('net').Socket;
+const JSStream = process.binding('js_stream').JSStream;
+// TODO(bmeurer): Change this back to const once hole checks are
+// properly optimized away early in Ignition+TurboFan.
+var Buffer = require('buffer').Buffer;
+const uv = process.binding('uv');
+const debug = util.debuglog('stream_wrap');
+
+function StreamWrap(stream) {
+ const handle = new JSStream();
+
+ this.stream = stream;
+
+ this._list = null;
+
+ const self = this;
+ handle.close = function(cb) {
+ debug('close');
+ self.doClose(cb);
+ };
+ handle.isAlive = function() {
+ return self.isAlive();
+ };
+ handle.isClosing = function() {
+ return self.isClosing();
+ };
+ handle.onreadstart = function() {
+ return self.readStart();
+ };
+ handle.onreadstop = function() {
+ return self.readStop();
+ };
+ handle.onshutdown = function(req) {
+ return self.doShutdown(req);
+ };
+ handle.onwrite = function(req, bufs) {
+ return self.doWrite(req, bufs);
+ };
+
+ this.stream.pause();
+ this.stream.on('error', function onerror(err) {
+ self.emit('error', err);
+ });
+ this.stream.on('data', function ondata(chunk) {
+ if (!(chunk instanceof Buffer)) {
+ // Make sure that no further `data` events will happen
+ this.pause();
+ this.removeListener('data', ondata);
+
+ self.emit('error', new Error('Stream has StringDecoder'));
+ return;
+ }
+
+ debug('data', chunk.length);
+ if (self._handle)
+ self._handle.readBuffer(chunk);
+ });
+ this.stream.once('end', function onend() {
+ debug('end');
+ if (self._handle)
+ self._handle.emitEOF();
+ });
+
+ Socket.call(this, {
+ handle: handle
+ });
+}
+util.inherits(StreamWrap, Socket);
+module.exports = StreamWrap;
+
+// require('_stream_wrap').StreamWrap
+StreamWrap.StreamWrap = StreamWrap;
+
+StreamWrap.prototype.isAlive = function isAlive() {
+ return true;
+};
+
+StreamWrap.prototype.isClosing = function isClosing() {
+ return !this.readable || !this.writable;
+};
+
+StreamWrap.prototype.readStart = function readStart() {
+ this.stream.resume();
+ return 0;
+};
+
+StreamWrap.prototype.readStop = function readStop() {
+ this.stream.pause();
+ return 0;
+};
+
+StreamWrap.prototype.doShutdown = function doShutdown(req) {
+ const self = this;
+ const handle = this._handle;
+ const item = this._enqueue('shutdown', req);
+
+ this.stream.end(function() {
+ // Ensure that write was dispatched
+ setImmediate(function() {
+ if (!self._dequeue(item))
+ return;
+
+ handle.finishShutdown(req, 0);
+ });
+ });
+ return 0;
+};
+
+StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
+ const self = this;
+ const handle = self._handle;
+
+ var pending = bufs.length;
+
+ // Queue the request to be able to cancel it
+ const item = self._enqueue('write', req);
+
+ self.stream.cork();
+ for (var n = 0; n < bufs.length; n++)
+ self.stream.write(bufs[n], done);
+ self.stream.uncork();
+
+ function done(err) {
+ if (!err && --pending !== 0)
+ return;
+
+ // Ensure that this is called once in case of error
+ pending = 0;
+
+ // Ensure that write was dispatched
+ setImmediate(function() {
+ // Do not invoke callback twice
+ if (!self._dequeue(item))
+ return;
+
+ var errCode = 0;
+ if (err) {
+ if (err.code && uv['UV_' + err.code])
+ errCode = uv['UV_' + err.code];
+ else
+ errCode = uv.UV_EPIPE;
+ }
+
+ handle.doAfterWrite(req);
+ handle.finishWrite(req, errCode);
+ });
+ }
+
+ return 0;
+};
+
+function QueueItem(type, req) {
+ this.type = type;
+ this.req = req;
+ this.prev = this;
+ this.next = this;
+}
+
+StreamWrap.prototype._enqueue = function _enqueue(type, req) {
+ const item = new QueueItem(type, req);
+ if (this._list === null) {
+ this._list = item;
+ return item;
+ }
+
+ item.next = this._list.next;
+ item.prev = this._list;
+ item.next.prev = item;
+ item.prev.next = item;
+
+ return item;
+};
+
+StreamWrap.prototype._dequeue = function _dequeue(item) {
+ assert(item instanceof QueueItem);
+
+ var next = item.next;
+ var prev = item.prev;
+
+ if (next === null && prev === null)
+ return false;
+
+ item.next = null;
+ item.prev = null;
+
+ if (next === item) {
+ prev = null;
+ next = null;
+ } else {
+ prev.next = next;
+ next.prev = prev;
+ }
+
+ if (this._list === item)
+ this._list = next;
+
+ return true;
+};
+
+StreamWrap.prototype.doClose = function doClose(cb) {
+ const self = this;
+ const handle = self._handle;
+
+ setImmediate(function() {
+ while (self._list !== null) {
+ const item = self._list;
+ const req = item.req;
+ self._dequeue(item);
+
+ const errCode = uv.UV_ECANCELED;
+ if (item.type === 'write') {
+ handle.doAfterWrite(req);
+ handle.finishWrite(req, errCode);
+ } else if (item.type === 'shutdown') {
+ handle.finishShutdown(req, errCode);
+ }
+ }
+
+ // Should be already set by net.js
+ assert(self._handle === null);
+ cb();
+ });
+};
diff --git a/node.gyp b/node.gyp
index 1d62c26cdb..acaf822ea8 100644
--- a/node.gyp
+++ b/node.gyp
@@ -124,6 +124,7 @@
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
+ 'lib/internal/wrap_js_stream.js',
'deps/v8/tools/splaytree.js',
'deps/v8/tools/codemap.js',
'deps/v8/tools/consarray.js',