diff options
author | Anna Henningsen <anna@addaleax.net> | 2018-02-08 04:59:10 +0100 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2018-02-14 10:00:29 +0100 |
commit | 0e7b61229aa602e55c5fb034a63d7da97eecff3b (patch) | |
tree | 0e64305591fd94e1b609c5fd4ba1ae1bd19ea66a /src/stream_base-inl.h | |
parent | 0ed9ea861b847579478457b7f5aab430fb6d77cb (diff) | |
download | node-new-0e7b61229aa602e55c5fb034a63d7da97eecff3b.tar.gz |
src: refactor WriteWrap and ShutdownWraps
Encapsulate stream requests more:
- `WriteWrap` and `ShutdownWrap` classes are now tailored to the
streams on which they are used. In particular, for most streams
these are now plain `AsyncWrap`s and do not carry the overhead
of unused libuv request data.
- Provide generic `Write()` and `Shutdown()` methods that wrap
around the actual implementations, and make *usage* of streams
easier, rather than implementing; for example, wrap objects
don’t need to be provided by callers anymore.
- Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to
call the corresponding JS handlers, rather than always trying
to call them. This makes usage of streams by other C++ code
easier and leaner.
Also fix up some tests that were previously not actually testing
asynchronicity when the comments indicated that they would.
PR-URL: https://github.com/nodejs/node/pull/18676
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/stream_base-inl.h')
-rw-r--r-- | src/stream_base-inl.h | 216 |
1 files changed, 194 insertions, 22 deletions
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index 76922c1d8a..b479e04bae 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -25,6 +25,25 @@ using v8::Value; using AsyncHooks = Environment::AsyncHooks; +inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) { + CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField), + nullptr); + req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this); +} + +inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) { + return static_cast<StreamReq*>( + req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField)); +} + +inline void StreamReq::Dispose() { + object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr); + delete this; +} + +inline v8::Local<v8::Object> StreamReq::object() { + return GetAsyncWrap()->object(); +} inline StreamListener::~StreamListener() { if (stream_ != nullptr) @@ -36,6 +55,15 @@ inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); } +inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamAfterShutdown(w, status); +} + +inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamAfterWrite(w, status); +} inline StreamResource::~StreamResource() { while (listener_ != nullptr) { @@ -93,6 +121,9 @@ inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { listener_->OnStreamAfterWrite(w, status); } +inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) { + listener_->OnStreamAfterShutdown(w, status); +} inline StreamBase::StreamBase(Environment* env) : env_(env) { PushStreamListener(&default_listener_); @@ -102,6 +133,150 @@ inline Environment* StreamBase::stream_env() const { return env_; } +inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { + AfterRequest(req_wrap, [&]() { + EmitAfterWrite(req_wrap, status); + }); +} + +inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { + AfterRequest(req_wrap, [&]() { + EmitAfterShutdown(req_wrap, status); + }); +} + +template<typename Wrap, typename EmitEvent> +inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) { + Environment* env = stream_env(); + + v8::HandleScope handle_scope(env->isolate()); + v8::Context::Scope context_scope(env->context()); + + emit(); + req_wrap->Dispose(); +} + +inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) { + Environment* env = stream_env(); + if (req_wrap_obj.IsEmpty()) { + req_wrap_obj = + env->shutdown_wrap_constructor_function() + ->NewInstance(env->context()).ToLocalChecked(); + } + + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope( + env, GetAsyncWrap()->get_async_id()); + ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj); + int err = DoShutdown(req_wrap); + + if (err != 0) { + req_wrap->Dispose(); + } + + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + + return err; +} + +inline StreamWriteResult StreamBase::Write( + uv_buf_t* bufs, + size_t count, + uv_stream_t* send_handle, + v8::Local<v8::Object> req_wrap_obj) { + Environment* env = stream_env(); + int err; + if (send_handle == nullptr) { + err = DoTryWrite(&bufs, &count); + if (err != 0 || count == 0) { + return StreamWriteResult { false, err, nullptr }; + } + } + + if (req_wrap_obj.IsEmpty()) { + req_wrap_obj = + env->write_wrap_constructor_function() + ->NewInstance(env->context()).ToLocalChecked(); + } + + AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope( + env, GetAsyncWrap()->get_async_id()); + WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj); + + err = DoWrite(req_wrap, bufs, count, send_handle); + bool async = err == 0; + + if (!async) { + req_wrap->Dispose(); + req_wrap = nullptr; + } + + const char* msg = Error(); + if (msg != nullptr) { + req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg)); + ClearError(); + } + + req_wrap_obj->Set(env->async(), v8::Boolean::New(env->isolate(), async)); + + return StreamWriteResult { async, err, req_wrap }; +} + +template<typename OtherBase, bool kResetPersistent> +SimpleShutdownWrap<OtherBase, kResetPersistent>::SimpleShutdownWrap( + StreamBase* stream, + v8::Local<v8::Object> req_wrap_obj) + : ShutdownWrap(stream, req_wrap_obj), + OtherBase(stream->stream_env(), + req_wrap_obj, + AsyncWrap::PROVIDER_SHUTDOWNWRAP) { + Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this)); +} + +template<typename OtherBase, bool kResetPersistent> +SimpleShutdownWrap<OtherBase, kResetPersistent>::~SimpleShutdownWrap() { + ClearWrap(static_cast<AsyncWrap*>(this)->object()); + if (kResetPersistent) { + auto& persistent = static_cast<AsyncWrap*>(this)->persistent(); + CHECK_EQ(persistent.IsEmpty(), false); + persistent.Reset(); + } +} + +inline ShutdownWrap* StreamBase::CreateShutdownWrap( + v8::Local<v8::Object> object) { + return new SimpleShutdownWrap<AsyncWrap>(this, object); +} + +template<typename OtherBase, bool kResetPersistent> +SimpleWriteWrap<OtherBase, kResetPersistent>::SimpleWriteWrap( + StreamBase* stream, + v8::Local<v8::Object> req_wrap_obj) + : WriteWrap(stream, req_wrap_obj), + OtherBase(stream->stream_env(), + req_wrap_obj, + AsyncWrap::PROVIDER_WRITEWRAP) { + Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this)); +} + +template<typename OtherBase, bool kResetPersistent> +SimpleWriteWrap<OtherBase, kResetPersistent>::~SimpleWriteWrap() { + ClearWrap(static_cast<AsyncWrap*>(this)->object()); + if (kResetPersistent) { + auto& persistent = static_cast<AsyncWrap*>(this)->persistent(); + CHECK_EQ(persistent.IsEmpty(), false); + persistent.Reset(); + } +} + +inline WriteWrap* StreamBase::CreateWriteWrap( + v8::Local<v8::Object> object) { + return new SimpleWriteWrap<AsyncWrap>(this, object); +} + template <class Base> void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t, @@ -230,38 +405,35 @@ inline void ShutdownWrap::OnDone(int status) { stream()->AfterShutdown(this, status); } - -WriteWrap* WriteWrap::New(Environment* env, - Local<Object> obj, - StreamBase* wrap, - size_t extra) { - size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra; - char* storage = new char[storage_size]; - - return new(storage) WriteWrap(env, obj, wrap, storage_size); +inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) { + CHECK_EQ(storage_, nullptr); + storage_ = data; + storage_size_ = size; } - -void WriteWrap::Dispose() { - this->~WriteWrap(); - delete[] reinterpret_cast<char*>(this); -} - - -char* WriteWrap::Extra(size_t offset) { - return reinterpret_cast<char*>(this) + - ROUND_UP(sizeof(*this), kAlignSize) + - offset; +inline char* WriteWrap::Storage() { + return storage_; } -size_t WriteWrap::ExtraSize() const { - return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize); +inline size_t WriteWrap::StorageSize() const { + return storage_size_; } inline void WriteWrap::OnDone(int status) { stream()->AfterWrite(this, status); } +inline void StreamReq::Done(int status, const char* error_str) { + AsyncWrap* async_wrap = GetAsyncWrap(); + Environment* env = async_wrap->env(); + if (error_str != nullptr) { + async_wrap->object()->Set(env->error_string(), + OneByteString(env->isolate(), error_str)); + } + + OnDone(status); +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS |