diff options
author | Fedor Indutny <fedor@indutny.com> | 2015-02-23 23:09:44 +0300 |
---|---|---|
committer | Fedor Indutny <fedor@indutny.com> | 2015-02-24 22:38:21 +0300 |
commit | 1738c7783526868d86cb213414cb4d40c5a89662 (patch) | |
tree | 956d199d9e1b7f793bfe60db58bef27583ca1ebb /lib/_stream_wrap.js | |
parent | e00c938d246c29897344be3b3060533bb4ad7806 (diff) | |
download | node-new-1738c7783526868d86cb213414cb4d40c5a89662.tar.gz |
streams: introduce StreamWrap and JSStream
Introduce a way to wrap plain-js `stream.Duplex` streams into C++
StreamBase's child class. With such method at hand it is now possible to
pass `stream.Duplex` instance as a `socket` parameter to
`tls.connect()`.
PR-URL: https://github.com/iojs/io.js/pull/926
Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
Diffstat (limited to 'lib/_stream_wrap.js')
-rw-r--r-- | lib/_stream_wrap.js | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js new file mode 100644 index 0000000000..c3dcfe51b6 --- /dev/null +++ b/lib/_stream_wrap.js @@ -0,0 +1,118 @@ +const util = require('util'); +const Socket = require('net').Socket; +const JSStream = process.binding('js_stream').JSStream; +const uv = process.binding('uv'); + +function StreamWrap(stream) { + var handle = new JSStream(); + + this.stream = stream; + + var self = this; + handle.close = function(cb) { + cb(); + }; + handle.isAlive = function() { + return self.isAlive(); + }; + handle.isClosing = function() { + return self.isClosing(); + }; + handle.onreadstart = function() { + return self.readStart(); + }; + handle.onreadstop = function() { + return self.readStop(); + }; + handle.onshutdown = function(req) { + return self.shutdown(req); + }; + handle.onwrite = function(req, bufs) { + return self.write(req, bufs); + }; + + this.stream.pause(); + this.stream.on('data', function(chunk) { + self._handle.readBuffer(chunk); + }); + this.stream.once('end', function() { + self._handle.emitEOF(); + }); + this.stream.on('error', function(err) { + self.emit('error', err); + }); + + Socket.call(this, { + handle: handle + }); +} +util.inherits(StreamWrap, Socket); +module.exports = StreamWrap; + +// require('_stream_wrap').StreamWrap +StreamWrap.StreamWrap = StreamWrap; + +StreamWrap.prototype.isAlive = function isAlive() { + return this.readable && this.writable; +}; + +StreamWrap.prototype.isClosing = function isClosing() { + return !this.isAlive(); +}; + +StreamWrap.prototype.readStart = function readStart() { + this.stream.resume(); + return 0; +}; + +StreamWrap.prototype.readStop = function readStop() { + this.stream.pause(); + return 0; +}; + +StreamWrap.prototype.shutdown = function shutdown(req) { + var self = this; + + this.stream.end(function() { + // Ensure that write was dispatched + setImmediate(function() { + self._handle.finishShutdown(req, 0); + }); + }); + return 0; +}; + +StreamWrap.prototype.write = function write(req, bufs) { + var pending = bufs.length; + var self = this; + + self.stream.cork(); + bufs.forEach(function(buf) { + self.stream.write(buf, done); + }); + self.stream.uncork(); + + function done(err) { + if (!err && --pending !== 0) + return; + + // Ensure that this is called once in case of error + pending = 0; + + // Ensure that write was dispatched + setImmediate(function() { + var errCode = 0; + if (err) { + if (err.code && uv['UV_' + err.code]) + errCode = uv['UV_' + err.code]; + else + errCode = uv.UV_EPIPE; + } + + self._handle.doAfterWrite(req); + self._handle.finishWrite(req, errCode); + }); + } + + return 0; +}; |