diff options
author | Anna Henningsen <anna@addaleax.net> | 2018-02-08 04:59:10 +0100 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2018-02-14 10:00:29 +0100 |
commit | 0e7b61229aa602e55c5fb034a63d7da97eecff3b (patch) | |
tree | 0e64305591fd94e1b609c5fd4ba1ae1bd19ea66a /src/stream_base.cc | |
parent | 0ed9ea861b847579478457b7f5aab430fb6d77cb (diff) | |
download | node-new-0e7b61229aa602e55c5fb034a63d7da97eecff3b.tar.gz |
src: refactor WriteWrap and ShutdownWraps
Encapsulate stream requests more:
- `WriteWrap` and `ShutdownWrap` classes are now tailored to the
streams on which they are used. In particular, for most streams
these are now plain `AsyncWrap`s and do not carry the overhead
of unused libuv request data.
- Provide generic `Write()` and `Shutdown()` methods that wrap
around the actual implementations, and make *usage* of streams
easier, rather than implementing; for example, wrap objects
don’t need to be provided by callers anymore.
- Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to
call the corresponding JS handlers, rather than always trying
to call them. This makes usage of streams by other C++ code
easier and leaner.
Also fix up some tests that were previously not actually testing
asynchronicity when the comments indicated that they would.
PR-URL: https://github.com/nodejs/node/pull/18676
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/stream_base.cc')
-rw-r--r-- | src/stream_base.cc | 300 |
1 files changed, 98 insertions, 202 deletions
diff --git a/src/stream_base.cc b/src/stream_base.cc index 8bdcebe88a..9ad9fd5bcb 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -34,6 +34,11 @@ template int StreamBase::WriteString<LATIN1>( const FunctionCallbackInfo<Value>& args); +struct Free { + void operator()(char* ptr) const { free(ptr); } +}; + + int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) { return ReadStart(); } @@ -45,45 +50,10 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) { int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) { - Environment* env = Environment::GetCurrent(args); - CHECK(args[0]->IsObject()); Local<Object> req_wrap_obj = args[0].As<Object>(); - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id()); - ShutdownWrap* req_wrap = new ShutdownWrap(env, - req_wrap_obj, - this); - - int err = DoShutdown(req_wrap); - if (err) - delete req_wrap; - return err; -} - - -void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - Environment* env = req_wrap->env(); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - Local<Object> req_wrap_obj = req_wrap->object(); - Local<Value> argv[3] = { - Integer::New(env->isolate(), status), - GetObject(), - req_wrap_obj - }; - - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) - req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); - - delete req_wrap; + return Shutdown(req_wrap_obj); } @@ -104,19 +74,14 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { count = chunks->Length() >> 1; MaybeStackBuffer<uv_buf_t, 16> bufs(count); - uv_buf_t* buf_list = *bufs; size_t storage_size = 0; uint32_t bytes = 0; size_t offset; - WriteWrap* req_wrap; - int err; if (!all_buffers) { // Determine storage size first for (size_t i = 0; i < count; i++) { - storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize); - Local<Value> chunk = chunks->Get(i * 2); if (Buffer::HasInstance(chunk)) @@ -145,20 +110,11 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { bufs[i].len = Buffer::Length(chunk); bytes += bufs[i].len; } - - // Try writing immediately without allocation - err = DoTryWrite(&buf_list, &count); - if (err != 0 || count == 0) - goto done; } - { - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, - wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); - } + std::unique_ptr<char[], Free> storage; + if (storage_size > 0) + storage = std::unique_ptr<char[], Free>(Malloc(storage_size)); offset = 0; if (!all_buffers) { @@ -174,9 +130,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { } // Write string - offset = ROUND_UP(offset, WriteWrap::kAlignSize); CHECK_LE(offset, storage_size); - char* str_storage = req_wrap->Extra(offset); + char* str_storage = storage.get() + offset; size_t str_size = storage_size - offset; Local<String> string = chunk->ToString(env->context()).ToLocalChecked(); @@ -192,35 +147,17 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) { offset += str_size; bytes += str_size; } - - err = DoTryWrite(&buf_list, &count); - if (err != 0 || count == 0) { - req_wrap->Dispatched(); - req_wrap->Dispose(); - goto done; - } } - err = DoWrite(req_wrap, buf_list, count, nullptr); - req_wrap_obj->Set(env->async(), True(env->isolate())); - - if (err) - req_wrap->Dispose(); - - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } + StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes)); - - return err; + if (res.wrap != nullptr && storage) { + res.wrap->SetAllocatedStorage(storage.release(), storage_size); + } + return res.err; } - - int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { CHECK(args[0]->IsObject()); @@ -232,49 +169,20 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { } Local<Object> req_wrap_obj = args[0].As<Object>(); - const char* data = Buffer::Data(args[1]); - size_t length = Buffer::Length(args[1]); - WriteWrap* req_wrap; uv_buf_t buf; - buf.base = const_cast<char*>(data); - buf.len = length; - - // Try writing immediately without allocation - uv_buf_t* bufs = &buf; - size_t count = 1; - int err = DoTryWrite(&bufs, &count); - if (err != 0) - goto done; - if (count == 0) - goto done; - CHECK_EQ(count, 1); - - // Allocate, or write rest - { - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, - wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this); - } + buf.base = Buffer::Data(args[1]); + buf.len = Buffer::Length(args[1]); - err = DoWrite(req_wrap, bufs, count, nullptr); - req_wrap_obj->Set(env->async(), True(env->isolate())); - req_wrap_obj->Set(env->buffer_string(), args[1]); + StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj); - if (err) - req_wrap->Dispose(); + if (res.async) + req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust(); + req_wrap_obj->Set(env->context(), env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), buf.len)) + .FromJust(); - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); - } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), length)); - return err; + return res.err; } @@ -305,8 +213,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { return UV_ENOBUFS; // Try writing immediately if write size isn't too big - WriteWrap* req_wrap; - char* data; char stack_storage[16384]; // 16kb size_t data_size; uv_buf_t buf; @@ -325,36 +231,33 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { size_t count = 1; err = DoTryWrite(&bufs, &count); - // Failure - if (err != 0) - goto done; - - // Success - if (count == 0) - goto done; + // Immediate failure or success + if (err != 0 || count == 0) { + req_wrap_obj->Set(env->context(), env->async(), False(env->isolate())) + .FromJust(); + req_wrap_obj->Set(env->context(), + env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)) + .FromJust(); + return err; + } // Partial write CHECK_EQ(count, 1); } - { - AsyncWrap* wrap = GetAsyncWrap(); - CHECK_NE(wrap, nullptr); - AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, - wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); - } - - data = req_wrap->Extra(); + std::unique_ptr<char[], Free> data; if (try_write) { // Copy partial data - memcpy(data, buf.base, buf.len); + data = std::unique_ptr<char[], Free>(Malloc(buf.len)); + memcpy(data.get(), buf.base, buf.len); data_size = buf.len; } else { // Write it + data = std::unique_ptr<char[], Free>(Malloc(storage_size)); data_size = StringBytes::Write(env->isolate(), - data, + data.get(), storage_size, string, enc); @@ -362,78 +265,36 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { CHECK_LE(data_size, storage_size); - buf = uv_buf_init(data, data_size); - - if (!IsIPCPipe()) { - err = DoWrite(req_wrap, &buf, 1, nullptr); - } else { - uv_handle_t* send_handle = nullptr; - - if (!send_handle_obj.IsEmpty()) { - HandleWrap* wrap; - ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); - send_handle = wrap->GetHandle(); - // Reference LibuvStreamWrap instance to prevent it from being garbage - // collected before `AfterWrite` is called. - CHECK_EQ(false, req_wrap->persistent().IsEmpty()); - req_wrap_obj->Set(env->handle_string(), send_handle_obj); - } - - err = DoWrite( - req_wrap, - &buf, - 1, - reinterpret_cast<uv_stream_t*>(send_handle)); + buf = uv_buf_init(data.get(), data_size); + + uv_stream_t* send_handle = nullptr; + + if (IsIPCPipe() && !send_handle_obj.IsEmpty()) { + // TODO(addaleax): This relies on the fact that HandleWrap comes first + // as a superclass of each individual subclass. + // There are similar assumptions in other places in the code base. + // A better idea would be having all BaseObject's internal pointers + // refer to the BaseObject* itself; this would require refactoring + // throughout the code base but makes Node rely much less on C++ quirks. + HandleWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); + send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); + // Reference LibuvStreamWrap instance to prevent it from being garbage + // collected before `AfterWrite` is called. + req_wrap_obj->Set(env->handle_string(), send_handle_obj); } - req_wrap_obj->Set(env->async(), True(env->isolate())); + StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); - if (err) - req_wrap->Dispose(); + req_wrap_obj->Set(env->context(), env->bytes_string(), + Integer::NewFromUnsigned(env->isolate(), data_size)) + .FromJust(); - done: - const char* msg = Error(); - if (msg != nullptr) { - req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); - ClearError(); + if (res.wrap != nullptr) { + res.wrap->SetAllocatedStorage(data.release(), data_size); } - req_wrap_obj->Set(env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), data_size)); - return err; -} - - -void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - Environment* env = req_wrap->env(); - - HandleScope handle_scope(env->isolate()); - Context::Scope context_scope(env->context()); - - // The wrap and request objects should still be there. - CHECK_EQ(req_wrap->persistent().IsEmpty(), false); - - // Unref handle property - Local<Object> req_wrap_obj = req_wrap->object(); - req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust(); - EmitAfterWrite(req_wrap, status); - - Local<Value> argv[] = { - Integer::New(env->isolate(), status), - GetObject(), - req_wrap_obj, - Undefined(env->isolate()) - }; - - const char* msg = Error(); - if (msg != nullptr) { - argv[3] = OneByteString(env->isolate(), msg); - ClearError(); - } - - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) - req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); - req_wrap->Dispose(); + return res.err; } @@ -510,4 +371,39 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { stream->CallJSOnreadMethod(nread, obj); } + +void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( + StreamReq* req_wrap, int status) { + StreamBase* stream = static_cast<StreamBase*>(stream_); + Environment* env = stream->stream_env(); + AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); + Local<Object> req_wrap_obj = async_wrap->object(); + + Local<Value> argv[] = { + Integer::New(env->isolate(), status), + stream->GetObject(), + Undefined(env->isolate()) + }; + + const char* msg = stream->Error(); + if (msg != nullptr) { + argv[2] = OneByteString(env->isolate(), msg); + stream->ClearError(); + } + + if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) + async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); +} + +void ReportWritesToJSStreamListener::OnStreamAfterWrite( + WriteWrap* req_wrap, int status) { + OnStreamAfterReqFinished(req_wrap, status); +} + +void ReportWritesToJSStreamListener::OnStreamAfterShutdown( + ShutdownWrap* req_wrap, int status) { + OnStreamAfterReqFinished(req_wrap, status); +} + + } // namespace node |