diff options
author | Fedor Indutny <fedor.indutny@gmail.com> | 2014-01-29 02:48:10 +0400 |
---|---|---|
committer | Fedor Indutny <fedor.indutny@gmail.com> | 2014-01-29 02:49:03 +0400 |
commit | 9836a4eeda1e2d43aad0923f1f72b364792629bc (patch) | |
tree | f9a43115eaca3a49f83c910d20ea363bef3b2b29 | |
parent | eaf76648a6ba05932465fdb2478a16ca4b6c17a6 (diff) | |
download | node-new-9836a4eeda1e2d43aad0923f1f72b364792629bc.tar.gz |
stream_wrap: use `uv_try_write` where possible
Use `uv_try_write` for string and buffer writes, thus avoiding to do
allocations and copying in some of the cases.
-rw-r--r-- | benchmark/net/tcp-raw-pipe.js | 2 | ||||
-rw-r--r-- | benchmark/net/tcp-raw-s2c.js | 9 | ||||
-rw-r--r-- | lib/net.js | 8 | ||||
-rw-r--r-- | src/env.h | 1 | ||||
-rw-r--r-- | src/stream_wrap.cc | 146 | ||||
-rw-r--r-- | src/stream_wrap.h | 3 | ||||
-rw-r--r-- | src/tls_wrap.cc | 6 | ||||
-rw-r--r-- | src/tls_wrap.h | 1 | ||||
-rw-r--r-- | test/simple/test-tcp-wrap-listen.js | 9 |
9 files changed, 148 insertions, 37 deletions
diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js index 91c69e9b6c..bda683985d 100644 --- a/benchmark/net/tcp-raw-pipe.js +++ b/benchmark/net/tcp-raw-pipe.js @@ -51,7 +51,7 @@ function server() { if (nread < 0) fail(nread, 'read'); - var writeReq = {}; + var writeReq = { async: false }; err = clientHandle.writeBuffer(writeReq, buffer); if (err) diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js index 6fb6568527..500be1b72b 100644 --- a/benchmark/net/tcp-raw-s2c.js +++ b/benchmark/net/tcp-raw-s2c.js @@ -68,7 +68,7 @@ function server() { write(); function write() { - var writeReq = { oncomplete: afterWrite }; + var writeReq = { async: false, oncomplete: afterWrite }; var err; switch (type) { case 'buf': @@ -82,8 +82,13 @@ function server() { break; } - if (err) + if (err) { fail(err, 'write'); + } else if (!writeReq.async) { + process.nextTick(function() { + afterWrite(null, clientHandle, writeReq); + }); + } } function afterWrite(err, handle, req) { diff --git a/lib/net.js b/lib/net.js index 800bae38cf..8e6dcbf7bf 100644 --- a/lib/net.js +++ b/lib/net.js @@ -626,7 +626,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { return false; } - var req = { oncomplete: afterWrite }; + var req = { oncomplete: afterWrite, async: false }; var err; if (writev) { @@ -660,10 +660,10 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { // If it was entirely flushed, we can write some more right now. // However, if more is left in the queue, then wait until that clears. - if (this._handle.writeQueueSize === 0) - cb(); - else + if (req.async && this._handle.writeQueueSize != 0) req.cb = cb; + else + cb(); }; @@ -53,6 +53,7 @@ namespace node { #define PER_ISOLATE_STRING_PROPERTIES(V) \ V(address_string, "address") \ V(atime_string, "atime") \ + V(async, "async") \ V(async_queue_string, "_asyncQueue") \ V(birthtime_string, "birthtime") \ V(blksize_string, "blksize") \ diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index e0079b8136..848abefd5e 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -33,6 +33,7 @@ #include "util-inl.h" #include <stdlib.h> // abort() +#include <string.h> // memcpy() #include <limits.h> // INT_MAX @@ -49,6 +50,7 @@ using v8::Number; using v8::Object; using v8::PropertyCallbackInfo; using v8::String; +using v8::True; using v8::Undefined; using v8::Value; @@ -200,30 +202,43 @@ void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) { Local<Object> buf_obj = args[1].As<Object>(); size_t length = Buffer::Length(buf_obj); - char* storage = new char[sizeof(WriteWrap)]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); + char* storage; + WriteWrap* req_wrap; uv_buf_t buf; WriteBuffer(buf_obj, &buf); - int err = wrap->callbacks()->DoWrite(req_wrap, - &buf, - 1, - NULL, - StreamWrap::AfterWrite); + // Try writing immediately without allocation + uv_buf_t* bufs = &buf; + size_t count = 1; + int err = wrap->callbacks()->TryWrite(&bufs, &count); + if (err == 0) + goto done; + assert(count == 1); + + // Allocate, or write rest + storage = new char[sizeof(WriteWrap)]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); + + err = wrap->callbacks()->DoWrite(req_wrap, + bufs, + count, + NULL, + StreamWrap::AfterWrite); req_wrap->Dispatched(); - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(length, node_isolate)); - const char* msg = wrap->callbacks()->Error(); - if (msg != NULL) - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->async(), True(node_isolate)); if (err) { req_wrap->~WriteWrap(); delete[] storage; } + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != NULL) + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(length, node_isolate)); args.GetReturnValue().Set(err); } @@ -256,22 +271,53 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) { return; } - char* storage = new char[sizeof(WriteWrap) + storage_size + 15]; - WriteWrap* req_wrap = - new(storage) WriteWrap(env, req_wrap_obj, wrap); + // Try writing immediately if write size isn't too big + char* storage; + WriteWrap* req_wrap; + char* data; + char stack_storage[16384]; // 16kb + size_t data_size; + uv_buf_t buf; + + bool try_write = storage_size + 15 <= sizeof(stack_storage) && + (!wrap->is_named_pipe_ipc() || !args[2]->IsObject()); + if (try_write) { + data_size = StringBytes::Write(stack_storage, + storage_size, + string, + encoding); + buf = uv_buf_init(stack_storage, data_size); + + uv_buf_t* bufs = &buf; + size_t count = 1; + err = wrap->callbacks()->TryWrite(&bufs, &count); + + // Success + if (err == 0) + goto done; + + // Failure, or partial write + assert(count == 1); + } + + storage = new char[sizeof(WriteWrap) + storage_size + 15]; + req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap); - char* data = reinterpret_cast<char*>(ROUND_UP( + data = reinterpret_cast<char*>(ROUND_UP( reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16)); - size_t data_size; - data_size = StringBytes::Write(data, storage_size, string, encoding); + if (try_write) { + // Copy partial data + memcpy(data, buf.base, buf.len); + data_size = buf.len; + } else { + // Write it + data_size = StringBytes::Write(data, storage_size, string, encoding); + } assert(data_size <= storage_size); - uv_buf_t buf; - - buf.base = data; - buf.len = data_size; + buf = uv_buf_init(data, data_size); if (!wrap->is_named_pipe_ipc()) { err = wrap->callbacks()->DoWrite(req_wrap, @@ -301,17 +347,19 @@ void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) { } req_wrap->Dispatched(); - req_wrap->object()->Set(env->bytes_string(), - Integer::NewFromUnsigned(data_size, node_isolate)); - const char* msg = wrap->callbacks()->Error(); - if (msg != NULL) - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap->object()->Set(env->async(), True(node_isolate)); if (err) { req_wrap->~WriteWrap(); delete[] storage; } + done: + const char* msg = wrap->callbacks()->Error(); + if (msg != NULL) + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + req_wrap_obj->Set(env->bytes_string(), + Integer::NewFromUnsigned(data_size, node_isolate)); args.GetReturnValue().Set(err); } @@ -405,6 +453,7 @@ void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) { delete[] bufs; req_wrap->Dispatched(); + req_wrap->object()->Set(env->async(), True(node_isolate)); req_wrap->object()->Set(env->bytes_string(), Number::New(node_isolate, bytes)); const char* msg = wrap->callbacks()->Error(); @@ -518,6 +567,47 @@ const char* StreamWrapCallbacks::Error() { } +// NOTE: Call to this function could change both `buf`'s and `count`'s +// values, shifting their base and decrementing their length. This is +// required in order to skip the data that was successfully written via +// uv_try_write(). +int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { + int err; + size_t written; + uv_buf_t* vbufs = *bufs; + size_t vcount = *count; + + err = uv_try_write(wrap()->stream(), vbufs, vcount); + if (err < 0) + return err; + + // Slice off the buffers: skip all written buffers and slice the one that + // was partially written. + written = err; + for (; written != 0 && vcount > 0; vbufs++, vcount--) { + // Slice + if (vbufs[0].len > written) { + vbufs[0].base += written; + vbufs[0].len -= written; + written = 0; + break; + + // Discard + } else { + written -= vbufs[0].len; + } + } + + *bufs = vbufs; + *count = vcount; + + if (vcount == 0) + return 0; + else + return -1; +} + + int StreamWrapCallbacks::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/src/stream_wrap.h b/src/stream_wrap.h index d1a94fb422..f91bb8ba55 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -74,6 +74,9 @@ class StreamWrapCallbacks { } virtual const char* Error(); + + virtual int TryWrite(uv_buf_t** bufs, size_t* count); + virtual int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 6d9425c040..92febc15b8 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -511,6 +511,12 @@ const char* TLSCallbacks::Error() { } +int TLSCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) { + // TODO(indutny): Support it + return -1; +} + + int TLSCallbacks::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/src/tls_wrap.h b/src/tls_wrap.h index db78009ede..946cc1c64d 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -51,6 +51,7 @@ class TLSCallbacks : public crypto::SSLWrap<TLSCallbacks>, v8::Handle<v8::Context> context); const char* Error(); + int TryWrite(uv_buf_t** bufs, size_t* count); int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, diff --git a/test/simple/test-tcp-wrap-listen.js b/test/simple/test-tcp-wrap-listen.js index 1a3dc12ee9..940ea8b4b8 100644 --- a/test/simple/test-tcp-wrap-listen.js +++ b/test/simple/test-tcp-wrap-listen.js @@ -55,7 +55,7 @@ server.onconnection = function(err, client) { assert.equal(0, client.writeQueueSize); - var req = {}; + var req = { async: false }; var err = client.writeBuffer(req, buffer); assert.equal(err, 0); client.pendingWrites.push(req); @@ -64,7 +64,12 @@ server.onconnection = function(err, client) { // 11 bytes should flush assert.equal(0, client.writeQueueSize); - req.oncomplete = function(status, client_, req_) { + if (req.async && client.writeQueueSize != 0) + req.oncomplete = done; + else + process.nextTick(done.bind(null, 0, client, req)); + + function done(status, client_, req_) { assert.equal(req, client.pendingWrites.shift()); // Check parameters. |