summaryrefslogtreecommitdiff
path: root/src/stream_base-inl.h
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-02-08 04:59:10 +0100
committerAnna Henningsen <anna@addaleax.net>2018-02-14 10:00:29 +0100
commit0e7b61229aa602e55c5fb034a63d7da97eecff3b (patch)
tree0e64305591fd94e1b609c5fd4ba1ae1bd19ea66a /src/stream_base-inl.h
parent0ed9ea861b847579478457b7f5aab430fb6d77cb (diff)
downloadnode-new-0e7b61229aa602e55c5fb034a63d7da97eecff3b.tar.gz
src: refactor WriteWrap and ShutdownWraps
Encapsulate stream requests more: - `WriteWrap` and `ShutdownWrap` classes are now tailored to the streams on which they are used. In particular, for most streams these are now plain `AsyncWrap`s and do not carry the overhead of unused libuv request data. - Provide generic `Write()` and `Shutdown()` methods that wrap around the actual implementations, and make *usage* of streams easier, rather than implementing; for example, wrap objects don’t need to be provided by callers anymore. - Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to call the corresponding JS handlers, rather than always trying to call them. This makes usage of streams by other C++ code easier and leaner. Also fix up some tests that were previously not actually testing asynchronicity when the comments indicated that they would. PR-URL: https://github.com/nodejs/node/pull/18676 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/stream_base-inl.h')
-rw-r--r--src/stream_base-inl.h216
1 files changed, 194 insertions, 22 deletions
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h
index 76922c1d8a..b479e04bae 100644
--- a/src/stream_base-inl.h
+++ b/src/stream_base-inl.h
@@ -25,6 +25,25 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;
+inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
+ CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField),
+ nullptr);
+ req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this);
+}
+
+inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
+ return static_cast<StreamReq*>(
+ req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField));
+}
+
+inline void StreamReq::Dispose() {
+ object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr);
+ delete this;
+}
+
+inline v8::Local<v8::Object> StreamReq::object() {
+ return GetAsyncWrap()->object();
+}
inline StreamListener::~StreamListener() {
if (stream_ != nullptr)
@@ -36,6 +55,15 @@ inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
}
+inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamAfterShutdown(w, status);
+}
+
+inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamAfterWrite(w, status);
+}
inline StreamResource::~StreamResource() {
while (listener_ != nullptr) {
@@ -93,6 +121,9 @@ inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
listener_->OnStreamAfterWrite(w, status);
}
+inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
+ listener_->OnStreamAfterShutdown(w, status);
+}
inline StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_);
@@ -102,6 +133,150 @@ inline Environment* StreamBase::stream_env() const {
return env_;
}
+inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
+ AfterRequest(req_wrap, [&]() {
+ EmitAfterWrite(req_wrap, status);
+ });
+}
+
+inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
+ AfterRequest(req_wrap, [&]() {
+ EmitAfterShutdown(req_wrap, status);
+ });
+}
+
+template<typename Wrap, typename EmitEvent>
+inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) {
+ Environment* env = stream_env();
+
+ v8::HandleScope handle_scope(env->isolate());
+ v8::Context::Scope context_scope(env->context());
+
+ emit();
+ req_wrap->Dispose();
+}
+
+inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
+ Environment* env = stream_env();
+ if (req_wrap_obj.IsEmpty()) {
+ req_wrap_obj =
+ env->shutdown_wrap_constructor_function()
+ ->NewInstance(env->context()).ToLocalChecked();
+ }
+
+ AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(
+ env, GetAsyncWrap()->get_async_id());
+ ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
+ int err = DoShutdown(req_wrap);
+
+ if (err != 0) {
+ req_wrap->Dispose();
+ }
+
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+
+ return err;
+}
+
+inline StreamWriteResult StreamBase::Write(
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle,
+ v8::Local<v8::Object> req_wrap_obj) {
+ Environment* env = stream_env();
+ int err;
+ if (send_handle == nullptr) {
+ err = DoTryWrite(&bufs, &count);
+ if (err != 0 || count == 0) {
+ return StreamWriteResult { false, err, nullptr };
+ }
+ }
+
+ if (req_wrap_obj.IsEmpty()) {
+ req_wrap_obj =
+ env->write_wrap_constructor_function()
+ ->NewInstance(env->context()).ToLocalChecked();
+ }
+
+ AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(
+ env, GetAsyncWrap()->get_async_id());
+ WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
+
+ err = DoWrite(req_wrap, bufs, count, send_handle);
+ bool async = err == 0;
+
+ if (!async) {
+ req_wrap->Dispose();
+ req_wrap = nullptr;
+ }
+
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+
+ req_wrap_obj->Set(env->async(), v8::Boolean::New(env->isolate(), async));
+
+ return StreamWriteResult { async, err, req_wrap };
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleShutdownWrap<OtherBase, kResetPersistent>::SimpleShutdownWrap(
+ StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj)
+ : ShutdownWrap(stream, req_wrap_obj),
+ OtherBase(stream->stream_env(),
+ req_wrap_obj,
+ AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
+ Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this));
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleShutdownWrap<OtherBase, kResetPersistent>::~SimpleShutdownWrap() {
+ ClearWrap(static_cast<AsyncWrap*>(this)->object());
+ if (kResetPersistent) {
+ auto& persistent = static_cast<AsyncWrap*>(this)->persistent();
+ CHECK_EQ(persistent.IsEmpty(), false);
+ persistent.Reset();
+ }
+}
+
+inline ShutdownWrap* StreamBase::CreateShutdownWrap(
+ v8::Local<v8::Object> object) {
+ return new SimpleShutdownWrap<AsyncWrap>(this, object);
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleWriteWrap<OtherBase, kResetPersistent>::SimpleWriteWrap(
+ StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj)
+ : WriteWrap(stream, req_wrap_obj),
+ OtherBase(stream->stream_env(),
+ req_wrap_obj,
+ AsyncWrap::PROVIDER_WRITEWRAP) {
+ Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this));
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleWriteWrap<OtherBase, kResetPersistent>::~SimpleWriteWrap() {
+ ClearWrap(static_cast<AsyncWrap*>(this)->object());
+ if (kResetPersistent) {
+ auto& persistent = static_cast<AsyncWrap*>(this)->persistent();
+ CHECK_EQ(persistent.IsEmpty(), false);
+ persistent.Reset();
+ }
+}
+
+inline WriteWrap* StreamBase::CreateWriteWrap(
+ v8::Local<v8::Object> object) {
+ return new SimpleWriteWrap<AsyncWrap>(this, object);
+}
+
template <class Base>
void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate> t,
@@ -230,38 +405,35 @@ inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status);
}
-
-WriteWrap* WriteWrap::New(Environment* env,
- Local<Object> obj,
- StreamBase* wrap,
- size_t extra) {
- size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
- char* storage = new char[storage_size];
-
- return new(storage) WriteWrap(env, obj, wrap, storage_size);
+inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) {
+ CHECK_EQ(storage_, nullptr);
+ storage_ = data;
+ storage_size_ = size;
}
-
-void WriteWrap::Dispose() {
- this->~WriteWrap();
- delete[] reinterpret_cast<char*>(this);
-}
-
-
-char* WriteWrap::Extra(size_t offset) {
- return reinterpret_cast<char*>(this) +
- ROUND_UP(sizeof(*this), kAlignSize) +
- offset;
+inline char* WriteWrap::Storage() {
+ return storage_;
}
-size_t WriteWrap::ExtraSize() const {
- return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
+inline size_t WriteWrap::StorageSize() const {
+ return storage_size_;
}
inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status);
}
+inline void StreamReq::Done(int status, const char* error_str) {
+ AsyncWrap* async_wrap = GetAsyncWrap();
+ Environment* env = async_wrap->env();
+ if (error_str != nullptr) {
+ async_wrap->object()->Set(env->error_string(),
+ OneByteString(env->isolate(), error_str));
+ }
+
+ OnDone(status);
+}
+
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS