diff options
-rw-r--r-- | lib/child_process_uv.js | 23 | ||||
-rw-r--r-- | lib/net_uv.js | 94 | ||||
-rw-r--r-- | test/fixtures/fork2.js | 21 | ||||
-rw-r--r-- | test/simple/test-child-process-fork2.js | 2 |
4 files changed, 68 insertions, 72 deletions
diff --git a/lib/child_process_uv.js b/lib/child_process_uv.js index 2e6c914178..da712a4d31 100644 --- a/lib/child_process_uv.js +++ b/lib/child_process_uv.js @@ -73,7 +73,7 @@ function setupChannel(target, channel) { var jsonBuffer = ''; - channel.onread = function(pool, offset, length, recvStream) { + channel.onread = function(pool, offset, length, recvHandle) { if (pool) { jsonBuffer += pool.toString('ascii', offset, offset + length); @@ -83,14 +83,7 @@ function setupChannel(target, channel) { var message = JSON.parse(json); jsonBuffer = jsonBuffer.slice(i + 1); - if (recvStream) { - // TODO support other types of stream. - // TODO probably need a queue of recvStreams - var server = new net.Server(); - server._handle = recvStream; - } - - target.emit('message', message, server); + target.emit('message', message, recvHandle); } } else { @@ -99,17 +92,9 @@ function setupChannel(target, channel) { } }; - target.send = function(message, sendStream) { + target.send = function(message, sendHandle) { if (!target._channel) throw new Error("channel closed"); - // Open up net.Server instances - if (sendStream) { - if (false == sendStream instanceof net.Server) { - throw new Error("sendStream must be instance of net.Server"); - } - sendStream = sendStream._handle; - } - // For overflow protection don't write if channel queue is too deep. if (channel.writeQueueSize > 1024 * 1024) { return false; @@ -117,7 +102,7 @@ function setupChannel(target, channel) { var buffer = Buffer(JSON.stringify(message) + '\n'); - var writeReq = channel.write(buffer, 0, buffer.length, sendStream); + var writeReq = channel.write(buffer, 0, buffer.length, sendHandle); if (!writeReq) { throw new Error(errno + " cannot write to IPC channel."); diff --git a/lib/net_uv.js b/lib/net_uv.js index 1caacf2a58..8d82877ce8 100644 --- a/lib/net_uv.js +++ b/lib/net_uv.js @@ -59,7 +59,7 @@ exports.connect = exports.createConnection = function(port /* [host], [cb] */) { /* called when creating new Socket, or when re-using a closed Socket */ function initSocketHandle(self) { - self._pendingWriteReqs = 0; + self._writeRequests = []; self._flags = 0; self._connectQueueSize = 0; @@ -237,7 +237,7 @@ Socket.prototype.destroySoon = function() { this.writable = false; this._flags |= FLAG_DESTROY_SOON; - if (this._pendingWriteReqs == 0) { + if (this._writeRequests.length == 0) { this.destroy(); } }; @@ -342,42 +342,29 @@ Socket.prototype.setEncoding = function(encoding) { }; -Socket.prototype._getpeername = function() { - if (!this._handle || !this._handle.getpeername) { - return {}; - } - if (!this._peername) { - this._peername = this._handle.getpeername(); - } - return this._peername; -}; - - -Socket.prototype.__defineGetter__('remoteAddress', function() { - return this._getpeername().address; -}); - - -Socket.prototype.__defineGetter__('remotePort', function() { - return this._getpeername().port; -}); - - -/* - * Arguments data, [encoding], [cb] - */ -Socket.prototype.write = function(data, arg1, arg2) { - var encoding, cb; +Socket.prototype.write = function(data /* [encoding], [fd], [cb] */) { + var encoding, fd, cb; // parse arguments - if (arg1) { - if (typeof arg1 === 'string') { - encoding = arg1; - cb = arg2; - } else if (typeof arg1 === 'function') { - cb = arg1; + if (typeof arguments[3] == 'function') { + cb = arguments[3]; + fd = arguments[2]; + encoding = arguments[1]; + } else if (typeof arguments[2] == 'function') { + cb = arguments[2]; + if (typeof arguments[1] == 'number') { + fd = arguments[1]; } else { - throw new Error("bad arg"); + encoding = arguments[1]; + } + } else if (typeof arguments[1] == 'function') { + cb = arguments[1]; + } else { + if (typeof arguments[1] == 'number') { + fd = arguments[1]; + } else { + encoding = arguments[1]; + fd = arguments[2]; } } @@ -392,9 +379,9 @@ Socket.prototype.write = function(data, arg1, arg2) { if (this._connecting) { this._connectQueueSize += data.length; if (this._connectQueue) { - this._connectQueue.push([data, encoding, cb]); + this._connectQueue.push([data, encoding, fd, cb]); } else { - this._connectQueue = [[data, encoding, cb]]; + this._connectQueue = [[data, encoding, fd, cb]]; } return false; } @@ -408,7 +395,7 @@ Socket.prototype.write = function(data, arg1, arg2) { writeReq.oncomplete = afterWrite; writeReq.cb = cb; - this._pendingWriteReqs++; + this._writeRequests.push(writeReq); return this._handle.writeQueueSize == 0; }; @@ -423,9 +410,10 @@ function afterWrite(status, handle, req, buffer) { } // TODO check status. - self._pendingWriteReqs--; + var req_ = self._writeRequests.shift(); + assert.equal(req, req_); - if (self._pendingWriteReqs == 0) { + if (self._writeRequests.length == 0) { // TODO remove all uses of ondrain - this is not a good hack. if (self.ondrain) self.ondrain(); self.emit('drain'); @@ -433,7 +421,7 @@ function afterWrite(status, handle, req, buffer) { if (req.cb) req.cb(); - if (self._pendingWriteReqs == 0 && self._flags & FLAG_DESTROY_SOON) { + if (self._writeRequests.length == 0 && self._flags & FLAG_DESTROY_SOON) { self.destroy(); } } @@ -679,11 +667,17 @@ Server.prototype.listen = function() { var port = toPort(arguments[0]); + var TCP = process.binding('tcp_wrap').TCP + if (arguments.length == 0 || typeof arguments[0] == 'function') { // Don't bind(). OS will assign a port with INADDR_ANY. // The port can be found with server.address() listen(self, null, null); + } else if (arguments[0] instanceof TCP) { + self._handle = arguments[0]; + listen(self, null, -1, -1); + } else if (isPipeName(arguments[0])) { // UNIX socket or Windows pipe. listen(self, arguments[0], -1, -1); @@ -712,6 +706,7 @@ Server.prototype.address = function() { function onconnection(clientHandle) { var handle = this; var self = handle.socket; + var peername; debug('onconnection'); @@ -725,12 +720,29 @@ function onconnection(clientHandle) { return; } + // Todo: implement this for unix sockets + if (clientHandle.getpeername) { + peername = clientHandle.getpeername(); + if (!peername.address || !peername.port) { + var err = errnoException(errno, 'accept'); + clientHandle.close(); + self.emit('error', err); + return; + } + } + var socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen }); socket.readable = socket.writable = true; + if (peername) { + socket.remoteAddress = peername.address; + socket.remotePort = peername.port; + // TODO: set family as well + } + socket.resume(); self.connections++; diff --git a/test/fixtures/fork2.js b/test/fixtures/fork2.js index 8aa579c31b..d8a39f0b64 100644 --- a/test/fixtures/fork2.js +++ b/test/fixtures/fork2.js @@ -3,25 +3,24 @@ var net = require('net'); var connections = 0; -process.on('message', function(m, server) { +process.on('message', function(m, serverHandle) { console.log('CHILD got message:', m); assert.ok(m.hello); - assert.ok(server); - assert.ok(server instanceof net.Server); + assert.ok(serverHandle); - // TODO need better API for this. - server._backlog = 9; - - server.listen(function() { - process.send({ gotHandle: true }); - }); - - server.on('connection', function(c) { + var server = new net.Server(function(c) { connections++; console.log('CHILD got connection'); c.destroy(); process.send({ childConnections: connections }); }); + + // TODO need better API for this. + server._backlog = 9; + + server.listen(serverHandle, function() { + process.send({ gotHandle: true }); + }); }); diff --git a/test/simple/test-child-process-fork2.js b/test/simple/test-child-process-fork2.js index ef4867da76..f0f73217aa 100644 --- a/test/simple/test-child-process-fork2.js +++ b/test/simple/test-child-process-fork2.js @@ -20,7 +20,7 @@ server._backlog = 9; server.listen(common.PORT, function() { console.log('PARENT send child server handle'); - n.send({ hello: 'world' }, server); + n.send({ hello: 'world' }, server._handle); }); function makeConnections() { |