summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-02-08 04:59:10 +0100
committerAnna Henningsen <anna@addaleax.net>2018-02-14 10:00:29 +0100
commit0e7b61229aa602e55c5fb034a63d7da97eecff3b (patch)
tree0e64305591fd94e1b609c5fd4ba1ae1bd19ea66a /src
parent0ed9ea861b847579478457b7f5aab430fb6d77cb (diff)
downloadnode-new-0e7b61229aa602e55c5fb034a63d7da97eecff3b.tar.gz
src: refactor WriteWrap and ShutdownWraps
Encapsulate stream requests more: - `WriteWrap` and `ShutdownWrap` classes are now tailored to the streams on which they are used. In particular, for most streams these are now plain `AsyncWrap`s and do not carry the overhead of unused libuv request data. - Provide generic `Write()` and `Shutdown()` methods that wrap around the actual implementations, and make *usage* of streams easier, rather than implementing; for example, wrap objects don’t need to be provided by callers anymore. - Use `EmitAfterWrite()` and `EmitAfterShutdown()` handlers to call the corresponding JS handlers, rather than always trying to call them. This makes usage of streams by other C++ code easier and leaner. Also fix up some tests that were previously not actually testing asynchronicity when the comments indicated that they would. PR-URL: https://github.com/nodejs/node/pull/18676 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/env.h1
-rw-r--r--src/js_stream.cc7
-rw-r--r--src/node_http2.cc44
-rw-r--r--src/node_http2.h9
-rw-r--r--src/req_wrap-inl.h5
-rw-r--r--src/req_wrap.h2
-rw-r--r--src/stream_base-inl.h216
-rw-r--r--src/stream_base.cc300
-rw-r--r--src/stream_base.h246
-rw-r--r--src/stream_wrap.cc36
-rw-r--r--src/stream_wrap.h3
-rw-r--r--src/tls_wrap.cc61
-rw-r--r--src/tls_wrap.h6
13 files changed, 533 insertions, 403 deletions
diff --git a/src/env.h b/src/env.h
index 0c3cfe2ced..68b674f4fd 100644
--- a/src/env.h
+++ b/src/env.h
@@ -306,6 +306,7 @@ class ModuleWrap;
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
+ V(shutdown_wrap_constructor_function, v8::Function) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tick_callback_function, v8::Function) \
V(timers_callback_function, v8::Function) \
diff --git a/src/js_stream.cc b/src/js_stream.cc
index 9e67a2094d..3ba6a254cf 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -91,8 +91,6 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object()
};
- req_wrap->Dispatched();
-
TryCatch try_catch(env()->isolate());
Local<Value> value;
int value_int = UV_EPROTO;
@@ -127,8 +125,6 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr
};
- w->Dispatched();
-
TryCatch try_catch(env()->isolate());
Local<Value> value;
int value_int = UV_EPROTO;
@@ -154,9 +150,8 @@ void JSStream::New(const FunctionCallbackInfo<Value>& args) {
template <class Wrap>
void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
- Wrap* w;
CHECK(args[0]->IsObject());
- ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());
+ Wrap* w = static_cast<Wrap*>(StreamReq::FromObject(args[0].As<Object>()));
w->Done(args[1]->Int32Value());
}
diff --git a/src/node_http2.cc b/src/node_http2.cc
index 2f688a4b35..6f59c119e5 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -1552,18 +1552,9 @@ void Http2Session::SendPendingData() {
chunks_sent_since_last_write_++;
- // DoTryWrite may modify both the buffer list start itself and the
- // base pointers/length of the individual buffers.
- uv_buf_t* writebufs = *bufs;
- if (stream_->DoTryWrite(&writebufs, &count) != 0 || count == 0) {
- // All writes finished synchronously, nothing more to do here.
- ClearOutgoing(0);
- return;
- }
-
- WriteWrap* req = AllocateSend();
- if (stream_->DoWrite(req, writebufs, count, nullptr) != 0) {
- req->Dispose();
+ StreamWriteResult res = underlying_stream()->Write(*bufs, count);
+ if (!res.async) {
+ ClearOutgoing(res.err);
}
DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
@@ -1649,15 +1640,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {
chunks_sent_since_last_write_ = n;
}
-// Allocates the data buffer used to pass outbound data to the i/o stream.
-WriteWrap* Http2Session::AllocateSend() {
- HandleScope scope(env()->isolate());
- Local<Object> obj =
- env()->write_wrap_constructor_function()
- ->NewInstance(env()->context()).ToLocalChecked();
- return WriteWrap::New(env(), obj, static_cast<StreamBase*>(stream_));
-}
-
// Callback used to receive inbound data from the i/o stream
void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
Http2Scope h2scope(this);
@@ -1833,20 +1815,15 @@ inline void Http2Stream::Close(int32_t code) {
DEBUG_HTTP2STREAM2(this, "closed with code %d", code);
}
-
-inline void Http2Stream::Shutdown() {
- CHECK(!this->IsDestroyed());
- Http2Scope h2scope(this);
- flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
- CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
- NGHTTP2_ERR_NOMEM);
- DEBUG_HTTP2STREAM(this, "writable side shutdown");
-}
-
int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
CHECK(!this->IsDestroyed());
- req_wrap->Dispatched();
- Shutdown();
+ {
+ Http2Scope h2scope(this);
+ flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
+ CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
+ NGHTTP2_ERR_NOMEM);
+ DEBUG_HTTP2STREAM(this, "writable side shutdown");
+ }
req_wrap->Done(0);
return 0;
}
@@ -2038,7 +2015,6 @@ inline int Http2Stream::DoWrite(WriteWrap* req_wrap,
CHECK_EQ(send_handle, nullptr);
Http2Scope h2scope(this);
session_->SetChunksSinceLastWrite();
- req_wrap->Dispatched();
if (!IsWritable()) {
req_wrap->Done(UV_EOF);
return 0;
diff --git a/src/node_http2.h b/src/node_http2.h
index bf41d74ed4..0e81eaac6c 100644
--- a/src/node_http2.h
+++ b/src/node_http2.h
@@ -601,9 +601,6 @@ class Http2Stream : public AsyncWrap,
inline void Close(int32_t code);
- // Shutdown the writable side of the stream
- inline void Shutdown();
-
// Destroy this stream instance and free all held memory.
inline void Destroy();
@@ -818,6 +815,10 @@ class Http2Session : public AsyncWrap, public StreamListener {
inline void EmitStatistics();
+ inline StreamBase* underlying_stream() {
+ return static_cast<StreamBase*>(stream_);
+ }
+
void Start();
void Stop();
void Close(uint32_t code = NGHTTP2_NO_ERROR,
@@ -907,8 +908,6 @@ class Http2Session : public AsyncWrap, public StreamListener {
template <get_setting fn>
static void GetSettings(const FunctionCallbackInfo<Value>& args);
- WriteWrap* AllocateSend();
-
uv_loop_t* event_loop() const {
return env()->event_loop();
}
diff --git a/src/req_wrap-inl.h b/src/req_wrap-inl.h
index 6d0c57cd81..4a7984e649 100644
--- a/src/req_wrap-inl.h
+++ b/src/req_wrap-inl.h
@@ -33,6 +33,11 @@ void ReqWrap<T>::Dispatched() {
req_.data = this;
}
+template <typename T>
+ReqWrap<T>* ReqWrap<T>::from_req(T* req) {
+ return ContainerOf(&ReqWrap<T>::req_, req);
+}
+
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
diff --git a/src/req_wrap.h b/src/req_wrap.h
index ddd0840aad..656be38dce 100644
--- a/src/req_wrap.h
+++ b/src/req_wrap.h
@@ -20,6 +20,8 @@ class ReqWrap : public AsyncWrap {
inline void Dispatched(); // Call this after the req has been dispatched.
T* req() { return &req_; }
+ static ReqWrap* from_req(T* req);
+
private:
friend class Environment;
friend int GenDebugSymbols();
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h
index 76922c1d8a..b479e04bae 100644
--- a/src/stream_base-inl.h
+++ b/src/stream_base-inl.h
@@ -25,6 +25,25 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;
+inline void StreamReq::AttachToObject(v8::Local<v8::Object> req_wrap_obj) {
+ CHECK_EQ(req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField),
+ nullptr);
+ req_wrap_obj->SetAlignedPointerInInternalField(kStreamReqField, this);
+}
+
+inline StreamReq* StreamReq::FromObject(v8::Local<v8::Object> req_wrap_obj) {
+ return static_cast<StreamReq*>(
+ req_wrap_obj->GetAlignedPointerFromInternalField(kStreamReqField));
+}
+
+inline void StreamReq::Dispose() {
+ object()->SetAlignedPointerInInternalField(kStreamReqField, nullptr);
+ delete this;
+}
+
+inline v8::Local<v8::Object> StreamReq::object() {
+ return GetAsyncWrap()->object();
+}
inline StreamListener::~StreamListener() {
if (stream_ != nullptr)
@@ -36,6 +55,15 @@ inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0));
}
+inline void StreamListener::OnStreamAfterShutdown(ShutdownWrap* w, int status) {
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamAfterShutdown(w, status);
+}
+
+inline void StreamListener::OnStreamAfterWrite(WriteWrap* w, int status) {
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamAfterWrite(w, status);
+}
inline StreamResource::~StreamResource() {
while (listener_ != nullptr) {
@@ -93,6 +121,9 @@ inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
listener_->OnStreamAfterWrite(w, status);
}
+inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
+ listener_->OnStreamAfterShutdown(w, status);
+}
inline StreamBase::StreamBase(Environment* env) : env_(env) {
PushStreamListener(&default_listener_);
@@ -102,6 +133,150 @@ inline Environment* StreamBase::stream_env() const {
return env_;
}
+inline void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
+ AfterRequest(req_wrap, [&]() {
+ EmitAfterWrite(req_wrap, status);
+ });
+}
+
+inline void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
+ AfterRequest(req_wrap, [&]() {
+ EmitAfterShutdown(req_wrap, status);
+ });
+}
+
+template<typename Wrap, typename EmitEvent>
+inline void StreamBase::AfterRequest(Wrap* req_wrap, EmitEvent emit) {
+ Environment* env = stream_env();
+
+ v8::HandleScope handle_scope(env->isolate());
+ v8::Context::Scope context_scope(env->context());
+
+ emit();
+ req_wrap->Dispose();
+}
+
+inline int StreamBase::Shutdown(v8::Local<v8::Object> req_wrap_obj) {
+ Environment* env = stream_env();
+ if (req_wrap_obj.IsEmpty()) {
+ req_wrap_obj =
+ env->shutdown_wrap_constructor_function()
+ ->NewInstance(env->context()).ToLocalChecked();
+ }
+
+ AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(
+ env, GetAsyncWrap()->get_async_id());
+ ShutdownWrap* req_wrap = CreateShutdownWrap(req_wrap_obj);
+ int err = DoShutdown(req_wrap);
+
+ if (err != 0) {
+ req_wrap->Dispose();
+ }
+
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+
+ return err;
+}
+
+inline StreamWriteResult StreamBase::Write(
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle,
+ v8::Local<v8::Object> req_wrap_obj) {
+ Environment* env = stream_env();
+ int err;
+ if (send_handle == nullptr) {
+ err = DoTryWrite(&bufs, &count);
+ if (err != 0 || count == 0) {
+ return StreamWriteResult { false, err, nullptr };
+ }
+ }
+
+ if (req_wrap_obj.IsEmpty()) {
+ req_wrap_obj =
+ env->write_wrap_constructor_function()
+ ->NewInstance(env->context()).ToLocalChecked();
+ }
+
+ AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(
+ env, GetAsyncWrap()->get_async_id());
+ WriteWrap* req_wrap = CreateWriteWrap(req_wrap_obj);
+
+ err = DoWrite(req_wrap, bufs, count, send_handle);
+ bool async = err == 0;
+
+ if (!async) {
+ req_wrap->Dispose();
+ req_wrap = nullptr;
+ }
+
+ const char* msg = Error();
+ if (msg != nullptr) {
+ req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
+ ClearError();
+ }
+
+ req_wrap_obj->Set(env->async(), v8::Boolean::New(env->isolate(), async));
+
+ return StreamWriteResult { async, err, req_wrap };
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleShutdownWrap<OtherBase, kResetPersistent>::SimpleShutdownWrap(
+ StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj)
+ : ShutdownWrap(stream, req_wrap_obj),
+ OtherBase(stream->stream_env(),
+ req_wrap_obj,
+ AsyncWrap::PROVIDER_SHUTDOWNWRAP) {
+ Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this));
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleShutdownWrap<OtherBase, kResetPersistent>::~SimpleShutdownWrap() {
+ ClearWrap(static_cast<AsyncWrap*>(this)->object());
+ if (kResetPersistent) {
+ auto& persistent = static_cast<AsyncWrap*>(this)->persistent();
+ CHECK_EQ(persistent.IsEmpty(), false);
+ persistent.Reset();
+ }
+}
+
+inline ShutdownWrap* StreamBase::CreateShutdownWrap(
+ v8::Local<v8::Object> object) {
+ return new SimpleShutdownWrap<AsyncWrap>(this, object);
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleWriteWrap<OtherBase, kResetPersistent>::SimpleWriteWrap(
+ StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj)
+ : WriteWrap(stream, req_wrap_obj),
+ OtherBase(stream->stream_env(),
+ req_wrap_obj,
+ AsyncWrap::PROVIDER_WRITEWRAP) {
+ Wrap(req_wrap_obj, static_cast<AsyncWrap*>(this));
+}
+
+template<typename OtherBase, bool kResetPersistent>
+SimpleWriteWrap<OtherBase, kResetPersistent>::~SimpleWriteWrap() {
+ ClearWrap(static_cast<AsyncWrap*>(this)->object());
+ if (kResetPersistent) {
+ auto& persistent = static_cast<AsyncWrap*>(this)->persistent();
+ CHECK_EQ(persistent.IsEmpty(), false);
+ persistent.Reset();
+ }
+}
+
+inline WriteWrap* StreamBase::CreateWriteWrap(
+ v8::Local<v8::Object> object) {
+ return new SimpleWriteWrap<AsyncWrap>(this, object);
+}
+
template <class Base>
void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate> t,
@@ -230,38 +405,35 @@ inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status);
}
-
-WriteWrap* WriteWrap::New(Environment* env,
- Local<Object> obj,
- StreamBase* wrap,
- size_t extra) {
- size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
- char* storage = new char[storage_size];
-
- return new(storage) WriteWrap(env, obj, wrap, storage_size);
+inline void WriteWrap::SetAllocatedStorage(char* data, size_t size) {
+ CHECK_EQ(storage_, nullptr);
+ storage_ = data;
+ storage_size_ = size;
}
-
-void WriteWrap::Dispose() {
- this->~WriteWrap();
- delete[] reinterpret_cast<char*>(this);
-}
-
-
-char* WriteWrap::Extra(size_t offset) {
- return reinterpret_cast<char*>(this) +
- ROUND_UP(sizeof(*this), kAlignSize) +
- offset;
+inline char* WriteWrap::Storage() {
+ return storage_;
}
-size_t WriteWrap::ExtraSize() const {
- return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
+inline size_t WriteWrap::StorageSize() const {
+ return storage_size_;
}
inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status);
}
+inline void StreamReq::Done(int status, const char* error_str) {
+ AsyncWrap* async_wrap = GetAsyncWrap();
+ Environment* env = async_wrap->env();
+ if (error_str != nullptr) {
+ async_wrap->object()->Set(env->error_string(),
+ OneByteString(env->isolate(), error_str));
+ }
+
+ OnDone(status);
+}
+
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 8bdcebe88a..9ad9fd5bcb 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -34,6 +34,11 @@ template int StreamBase::WriteString<LATIN1>(
const FunctionCallbackInfo<Value>& args);
+struct Free {
+ void operator()(char* ptr) const { free(ptr); }
+};
+
+
int StreamBase::ReadStartJS(const FunctionCallbackInfo<Value>& args) {
return ReadStart();
}
@@ -45,45 +50,10 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
- Environment* env = Environment::GetCurrent(args);
-
CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>();
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id());
- ShutdownWrap* req_wrap = new ShutdownWrap(env,
- req_wrap_obj,
- this);
-
- int err = DoShutdown(req_wrap);
- if (err)
- delete req_wrap;
- return err;
-}
-
-
-void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
- Environment* env = req_wrap->env();
-
- // The wrap and request objects should still be there.
- CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
-
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- Local<Object> req_wrap_obj = req_wrap->object();
- Local<Value> argv[3] = {
- Integer::New(env->isolate(), status),
- GetObject(),
- req_wrap_obj
- };
-
- if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
- req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
-
- delete req_wrap;
+ return Shutdown(req_wrap_obj);
}
@@ -104,19 +74,14 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
count = chunks->Length() >> 1;
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
- uv_buf_t* buf_list = *bufs;
size_t storage_size = 0;
uint32_t bytes = 0;
size_t offset;
- WriteWrap* req_wrap;
- int err;
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
- storage_size = ROUND_UP(storage_size, WriteWrap::kAlignSize);
-
Local<Value> chunk = chunks->Get(i * 2);
if (Buffer::HasInstance(chunk))
@@ -145,20 +110,11 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
bufs[i].len = Buffer::Length(chunk);
bytes += bufs[i].len;
}
-
- // Try writing immediately without allocation
- err = DoTryWrite(&buf_list, &count);
- if (err != 0 || count == 0)
- goto done;
}
- {
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
- wrap->get_async_id());
- req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
- }
+ std::unique_ptr<char[], Free> storage;
+ if (storage_size > 0)
+ storage = std::unique_ptr<char[], Free>(Malloc(storage_size));
offset = 0;
if (!all_buffers) {
@@ -174,9 +130,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}
// Write string
- offset = ROUND_UP(offset, WriteWrap::kAlignSize);
CHECK_LE(offset, storage_size);
- char* str_storage = req_wrap->Extra(offset);
+ char* str_storage = storage.get() + offset;
size_t str_size = storage_size - offset;
Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
@@ -192,35 +147,17 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
offset += str_size;
bytes += str_size;
}
-
- err = DoTryWrite(&buf_list, &count);
- if (err != 0 || count == 0) {
- req_wrap->Dispatched();
- req_wrap->Dispose();
- goto done;
- }
}
- err = DoWrite(req_wrap, buf_list, count, nullptr);
- req_wrap_obj->Set(env->async(), True(env->isolate()));
-
- if (err)
- req_wrap->Dispose();
-
- done:
- const char* msg = Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- ClearError();
- }
+ StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
req_wrap_obj->Set(env->bytes_string(), Number::New(env->isolate(), bytes));
-
- return err;
+ if (res.wrap != nullptr && storage) {
+ res.wrap->SetAllocatedStorage(storage.release(), storage_size);
+ }
+ return res.err;
}
-
-
int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
@@ -232,49 +169,20 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
}
Local<Object> req_wrap_obj = args[0].As<Object>();
- const char* data = Buffer::Data(args[1]);
- size_t length = Buffer::Length(args[1]);
- WriteWrap* req_wrap;
uv_buf_t buf;
- buf.base = const_cast<char*>(data);
- buf.len = length;
-
- // Try writing immediately without allocation
- uv_buf_t* bufs = &buf;
- size_t count = 1;
- int err = DoTryWrite(&bufs, &count);
- if (err != 0)
- goto done;
- if (count == 0)
- goto done;
- CHECK_EQ(count, 1);
-
- // Allocate, or write rest
- {
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
- wrap->get_async_id());
- req_wrap = WriteWrap::New(env, req_wrap_obj, this);
- }
+ buf.base = Buffer::Data(args[1]);
+ buf.len = Buffer::Length(args[1]);
- err = DoWrite(req_wrap, bufs, count, nullptr);
- req_wrap_obj->Set(env->async(), True(env->isolate()));
- req_wrap_obj->Set(env->buffer_string(), args[1]);
+ StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
- if (err)
- req_wrap->Dispose();
+ if (res.async)
+ req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
+ req_wrap_obj->Set(env->context(), env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), buf.len))
+ .FromJust();
- done:
- const char* msg = Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- ClearError();
- }
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), length));
- return err;
+ return res.err;
}
@@ -305,8 +213,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
return UV_ENOBUFS;
// Try writing immediately if write size isn't too big
- WriteWrap* req_wrap;
- char* data;
char stack_storage[16384]; // 16kb
size_t data_size;
uv_buf_t buf;
@@ -325,36 +231,33 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
size_t count = 1;
err = DoTryWrite(&bufs, &count);
- // Failure
- if (err != 0)
- goto done;
-
- // Success
- if (count == 0)
- goto done;
+ // Immediate failure or success
+ if (err != 0 || count == 0) {
+ req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
+ .FromJust();
+ req_wrap_obj->Set(env->context(),
+ env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), data_size))
+ .FromJust();
+ return err;
+ }
// Partial write
CHECK_EQ(count, 1);
}
- {
- AsyncWrap* wrap = GetAsyncWrap();
- CHECK_NE(wrap, nullptr);
- AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env,
- wrap->get_async_id());
- req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);
- }
-
- data = req_wrap->Extra();
+ std::unique_ptr<char[], Free> data;
if (try_write) {
// Copy partial data
- memcpy(data, buf.base, buf.len);
+ data = std::unique_ptr<char[], Free>(Malloc(buf.len));
+ memcpy(data.get(), buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
+ data = std::unique_ptr<char[], Free>(Malloc(storage_size));
data_size = StringBytes::Write(env->isolate(),
- data,
+ data.get(),
storage_size,
string,
enc);
@@ -362,78 +265,36 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
CHECK_LE(data_size, storage_size);
- buf = uv_buf_init(data, data_size);
-
- if (!IsIPCPipe()) {
- err = DoWrite(req_wrap, &buf, 1, nullptr);
- } else {
- uv_handle_t* send_handle = nullptr;
-
- if (!send_handle_obj.IsEmpty()) {
- HandleWrap* wrap;
- ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
- send_handle = wrap->GetHandle();
- // Reference LibuvStreamWrap instance to prevent it from being garbage
- // collected before `AfterWrite` is called.
- CHECK_EQ(false, req_wrap->persistent().IsEmpty());
- req_wrap_obj->Set(env->handle_string(), send_handle_obj);
- }
-
- err = DoWrite(
- req_wrap,
- &buf,
- 1,
- reinterpret_cast<uv_stream_t*>(send_handle));
+ buf = uv_buf_init(data.get(), data_size);
+
+ uv_stream_t* send_handle = nullptr;
+
+ if (IsIPCPipe() && !send_handle_obj.IsEmpty()) {
+ // TODO(addaleax): This relies on the fact that HandleWrap comes first
+ // as a superclass of each individual subclass.
+ // There are similar assumptions in other places in the code base.
+ // A better idea would be having all BaseObject's internal pointers
+ // refer to the BaseObject* itself; this would require refactoring
+ // throughout the code base but makes Node rely much less on C++ quirks.
+ HandleWrap* wrap;
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
+ send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
+ // Reference LibuvStreamWrap instance to prevent it from being garbage
+ // collected before `AfterWrite` is called.
+ req_wrap_obj->Set(env->handle_string(), send_handle_obj);
}
- req_wrap_obj->Set(env->async(), True(env->isolate()));
+ StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
- if (err)
- req_wrap->Dispose();
+ req_wrap_obj->Set(env->context(), env->bytes_string(),
+ Integer::NewFromUnsigned(env->isolate(), data_size))
+ .FromJust();
- done:
- const char* msg = Error();
- if (msg != nullptr) {
- req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
- ClearError();
+ if (res.wrap != nullptr) {
+ res.wrap->SetAllocatedStorage(data.release(), data_size);
}
- req_wrap_obj->Set(env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), data_size));
- return err;
-}
-
-
-void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
- Environment* env = req_wrap->env();
-
- HandleScope handle_scope(env->isolate());
- Context::Scope context_scope(env->context());
-
- // The wrap and request objects should still be there.
- CHECK_EQ(req_wrap->persistent().IsEmpty(), false);
-
- // Unref handle property
- Local<Object> req_wrap_obj = req_wrap->object();
- req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
- EmitAfterWrite(req_wrap, status);
-
- Local<Value> argv[] = {
- Integer::New(env->isolate(), status),
- GetObject(),
- req_wrap_obj,
- Undefined(env->isolate())
- };
-
- const char* msg = Error();
- if (msg != nullptr) {
- argv[3] = OneByteString(env->isolate(), msg);
- ClearError();
- }
-
- if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
- req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
- req_wrap->Dispose();
+ return res.err;
}
@@ -510,4 +371,39 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
stream->CallJSOnreadMethod(nread, obj);
}
+
+void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
+ StreamReq* req_wrap, int status) {
+ StreamBase* stream = static_cast<StreamBase*>(stream_);
+ Environment* env = stream->stream_env();
+ AsyncWrap* async_wrap = req_wrap->GetAsyncWrap();
+ Local<Object> req_wrap_obj = async_wrap->object();
+
+ Local<Value> argv[] = {
+ Integer::New(env->isolate(), status),
+ stream->GetObject(),
+ Undefined(env->isolate())
+ };
+
+ const char* msg = stream->Error();
+ if (msg != nullptr) {
+ argv[2] = OneByteString(env->isolate(), msg);
+ stream->ClearError();
+ }
+
+ if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
+ async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
+}
+
+void ReportWritesToJSStreamListener::OnStreamAfterWrite(
+ WriteWrap* req_wrap, int status) {
+ OnStreamAfterReqFinished(req_wrap, status);
+}
+
+void ReportWritesToJSStreamListener::OnStreamAfterShutdown(
+ ShutdownWrap* req_wrap, int status) {
+ OnStreamAfterReqFinished(req_wrap, status);
+}
+
+
} // namespace node
diff --git a/src/stream_base.h b/src/stream_base.h
index f18b6bda0a..59b8ee7b72 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -14,114 +14,75 @@
namespace node {
// Forward declarations
+class ShutdownWrap;
+class WriteWrap;
class StreamBase;
class StreamResource;
-template<typename Base>
+struct StreamWriteResult {
+ bool async;
+ int err;
+ WriteWrap* wrap;
+};
+
+
class StreamReq {
public:
- explicit StreamReq(StreamBase* stream) : stream_(stream) {
+ static constexpr int kStreamReqField = 1;
+
+ explicit StreamReq(StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj) : stream_(stream) {
+ AttachToObject(req_wrap_obj);
}
- inline void Done(int status, const char* error_str = nullptr) {
- Base* req = static_cast<Base*>(this);
- Environment* env = req->env();
- if (error_str != nullptr) {
- req->object()->Set(env->error_string(),
- OneByteString(env->isolate(), error_str));
- }
+ virtual ~StreamReq() {}
+ virtual AsyncWrap* GetAsyncWrap() = 0;
+ v8::Local<v8::Object> object();
- req->OnDone(status);
- }
+ void Done(int status, const char* error_str = nullptr);
+ void Dispose();
inline StreamBase* stream() const { return stream_; }
+ static StreamReq* FromObject(v8::Local<v8::Object> req_wrap_obj);
+
+ protected:
+ virtual void OnDone(int status) = 0;
+
+ void AttachToObject(v8::Local<v8::Object> req_wrap_obj);
+
private:
StreamBase* const stream_;
};
-class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
- public StreamReq<ShutdownWrap> {
+class ShutdownWrap : public StreamReq {
public:
- ShutdownWrap(Environment* env,
- v8::Local<v8::Object> req_wrap_obj,
- StreamBase* stream)
- : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
- StreamReq<ShutdownWrap>(stream) {
- Wrap(req_wrap_obj, this);
- }
+ ShutdownWrap(StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj)
+ : StreamReq(stream, req_wrap_obj) { }
- ~ShutdownWrap() {
- ClearWrap(object());
- }
-
- static ShutdownWrap* from_req(uv_shutdown_t* req) {
- return ContainerOf(&ShutdownWrap::req_, req);
- }
-
- size_t self_size() const override { return sizeof(*this); }
-
- inline void OnDone(int status); // Just calls stream()->AfterShutdown()
+ void OnDone(int status) override; // Just calls stream()->AfterShutdown()
};
-class WriteWrap : public ReqWrap<uv_write_t>,
- public StreamReq<WriteWrap> {
+class WriteWrap : public StreamReq {
public:
- static inline WriteWrap* New(Environment* env,
- v8::Local<v8::Object> obj,
- StreamBase* stream,
- size_t extra = 0);
- inline void Dispose();
- inline char* Extra(size_t offset = 0);
- inline size_t ExtraSize() const;
-
- size_t self_size() const override { return storage_size_; }
-
- static WriteWrap* from_req(uv_write_t* req) {
- return ContainerOf(&WriteWrap::req_, req);
- }
+ char* Storage();
+ size_t StorageSize() const;
+ void SetAllocatedStorage(char* data, size_t size);
- static const size_t kAlignSize = 16;
-
- WriteWrap(Environment* env,
- v8::Local<v8::Object> obj,
- StreamBase* stream)
- : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
- StreamReq<WriteWrap>(stream),
- storage_size_(0) {
- Wrap(obj, this);
- }
-
- inline void OnDone(int status); // Just calls stream()->AfterWrite()
-
- protected:
- WriteWrap(Environment* env,
- v8::Local<v8::Object> obj,
- StreamBase* stream,
- size_t storage_size)
- : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
- StreamReq<WriteWrap>(stream),
- storage_size_(storage_size) {
- Wrap(obj, this);
- }
+ WriteWrap(StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj)
+ : StreamReq(stream, req_wrap_obj) { }
~WriteWrap() {
- ClearWrap(object());
+ free(storage_);
}
- void* operator new(size_t size) = delete;
- void* operator new(size_t size, char* storage) { return storage; }
-
- // This is just to keep the compiler happy. It should never be called, since
- // we don't use exceptions in node.
- void operator delete(void* ptr, char* storage) { UNREACHABLE(); }
+ void OnDone(int status) override; // Just calls stream()->AfterWrite()
private:
- // People should not be using the non-placement new and delete operator on a
- // WriteWrap. Ensure this never happens.
- void operator delete(void* ptr) { UNREACHABLE(); }
-
- const size_t storage_size_;
+ char* storage_ = nullptr;
+ size_t storage_size_ = 0;
};
@@ -147,15 +108,23 @@ class StreamListener {
// `OnStreamRead()` is called when data is available on the socket and has
// been read into the buffer provided by `OnStreamAlloc()`.
// The `buf` argument is the return value of `uv_buf_t`, or may be a buffer
- // with base nullpptr in case of an error.
+ // with base nullptr in case of an error.
// `nread` is the number of read bytes (which is at most the buffer length),
// or, if negative, a libuv error code.
virtual void OnStreamRead(ssize_t nread,
const uv_buf_t& buf) = 0;
- // This is called once a Write has finished. `status` may be 0 or,
+ // This is called once a write has finished. `status` may be 0 or,
// if negative, a libuv error code.
- virtual void OnStreamAfterWrite(WriteWrap* w, int status) {}
+ // By default, this is simply passed on to the previous listener
+ // (and raises an assertion if there is none).
+ virtual void OnStreamAfterWrite(WriteWrap* w, int status);
+
+ // This is called once a shutdown has finished. `status` may be 0 or,
+ // if negative, a libuv error code.
+ // By default, this is simply passed on to the previous listener
+ // (and raises an assertion if there is none).
+ virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
// This is called immediately before the stream is destroyed.
virtual void OnStreamDestroy() {}
@@ -174,9 +143,21 @@ class StreamListener {
};
+// An (incomplete) stream listener class that calls the `.oncomplete()`
+// method of the JS objects associated with the wrap objects.
+class ReportWritesToJSStreamListener : public StreamListener {
+ public:
+ void OnStreamAfterWrite(WriteWrap* w, int status) override;
+ void OnStreamAfterShutdown(ShutdownWrap* w, int status) override;
+
+ private:
+ void OnStreamAfterReqFinished(StreamReq* req_wrap, int status);
+};
+
+
// A default emitter that just pushes data chunks as Buffer instances to
// JS land via the handle’s .ondata method.
-class EmitToJSStreamListener : public StreamListener {
+class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
public:
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
};
@@ -188,20 +169,31 @@ class StreamResource {
public:
virtual ~StreamResource();
- virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
- virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
- virtual int DoWrite(WriteWrap* w,
- uv_buf_t* bufs,
- size_t count,
- uv_stream_t* send_handle) = 0;
+ // These need to be implemented on the readable side of this stream:
// Start reading from the underlying resource. This is called by the consumer
- // when more data is desired.
+ // when more data is desired. Use `EmitAlloc()` and `EmitData()` to
+ // pass data along to the consumer.
virtual int ReadStart() = 0;
// Stop reading from the underlying resource. This is called by the
// consumer when its buffers are full and no more data can be handled.
virtual int ReadStop() = 0;
+ // These need to be implemented on the writable side of this stream:
+ // All of these methods may return an error code synchronously.
+ // In that case, the finish callback should *not* be called.
+
+ // Perform a shutdown operation, and call req_wrap->Done() when finished.
+ virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
+ // Try to write as much data as possible synchronously, and modify
+ // `*bufs` and `*count` accordingly. This is a no-op by default.
+ virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
+ // Perform a write of data, and call req_wrap->Done() when finished.
+ virtual int DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) = 0;
+
// Optionally, this may provide an error message to be used for
// failing writes.
virtual const char* Error() const;
@@ -223,6 +215,8 @@ class StreamResource {
void EmitRead(ssize_t nread, const uv_buf_t& buf = uv_buf_init(nullptr, 0));
// Call the current listener's OnStreamAfterWrite() method.
void EmitAfterWrite(WriteWrap* w, int status);
+ // Call the current listener's OnStreamAfterShutdown() method.
+ void EmitAfterShutdown(ShutdownWrap* w, int status);
StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0;
@@ -251,21 +245,40 @@ class StreamBase : public StreamResource {
void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
- // These are called by the respective {Write,Shutdown}Wrap class.
- virtual void AfterShutdown(ShutdownWrap* req, int status);
- virtual void AfterWrite(WriteWrap* req, int status);
-
// This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s.
Environment* stream_env() const;
- protected:
- explicit StreamBase(Environment* env);
+ // Shut down the current stream. This request can use an existing
+ // ShutdownWrap object (that was created in JS), or a new one will be created.
+ int Shutdown(v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
+
+ // Write data to the current stream. This request can use an existing
+ // WriteWrap object (that was created in JS), or a new one will be created.
+ // This will first try to write synchronously using `DoTryWrite()`, then
+ // asynchronously using `DoWrite()`.
+ // If the return value indicates a synchronous completion, no callback will
+ // be invoked.
+ StreamWriteResult Write(
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle = nullptr,
+ v8::Local<v8::Object> req_wrap_obj = v8::Local<v8::Object>());
+
+ // These can be overridden by subclasses to get more specific wrap instances.
+ // For example, a subclass Foo could create a FooWriteWrap or FooShutdownWrap
+ // (inheriting from ShutdownWrap/WriteWrap) that has extra fields, like
+ // an associated libuv request.
+ virtual ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object);
+ virtual WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object);
// One of these must be implemented
virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject();
+ protected:
+ explicit StreamBase(Environment* env);
+
// JS Methods
int ReadStartJS(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStopJS(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -292,6 +305,43 @@ class StreamBase : public StreamResource {
private:
Environment* env_;
EmitToJSStreamListener default_listener_;
+
+ // These are called by the respective {Write,Shutdown}Wrap class.
+ void AfterShutdown(ShutdownWrap* req, int status);
+ void AfterWrite(WriteWrap* req, int status);
+
+ template <typename Wrap, typename EmitEvent>
+ void AfterRequest(Wrap* req_wrap, EmitEvent emit);
+
+ friend class WriteWrap;
+ friend class ShutdownWrap;
+};
+
+
+// These are helpers for creating `ShutdownWrap`/`WriteWrap` instances.
+// `OtherBase` must have a constructor that matches the `AsyncWrap`
+// constructors’s (Environment*, Local<Object>, AsyncWrap::Provider) signature
+// and be a subclass of `AsyncWrap`.
+template <typename OtherBase, bool kResetPersistentOnDestroy = true>
+class SimpleShutdownWrap : public ShutdownWrap, public OtherBase {
+ public:
+ SimpleShutdownWrap(StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj);
+ ~SimpleShutdownWrap();
+
+ AsyncWrap* GetAsyncWrap() override { return this; }
+ size_t self_size() const override { return sizeof(*this); }
+};
+
+template <typename OtherBase, bool kResetPersistentOnDestroy = true>
+class SimpleWriteWrap : public WriteWrap, public OtherBase {
+ public:
+ SimpleWriteWrap(StreamBase* stream,
+ v8::Local<v8::Object> req_wrap_obj);
+ ~SimpleWriteWrap();
+
+ AsyncWrap* GetAsyncWrap() override { return this; }
+ size_t self_size() const override { return sizeof(*this) + StorageSize(); }
};
} // namespace node
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index bc10cf80e8..e1df9edd39 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -61,19 +61,22 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
[](const FunctionCallbackInfo<Value>& args) {
CHECK(args.IsConstructCall());
ClearWrap(args.This());
+ args.This()->SetAlignedPointerInInternalField(
+ StreamReq::kStreamReqField, nullptr);
};
Local<FunctionTemplate> sw =
FunctionTemplate::New(env->isolate(), is_construct_call_callback);
- sw->InstanceTemplate()->SetInternalFieldCount(1);
+ sw->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
Local<String> wrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap");
sw->SetClassName(wrapString);
AsyncWrap::AddWrapMethods(env, sw);
target->Set(wrapString, sw->GetFunction());
+ env->set_shutdown_wrap_constructor_function(sw->GetFunction());
Local<FunctionTemplate> ww =
FunctionTemplate::New(env->isolate(), is_construct_call_callback);
- ww->InstanceTemplate()->SetInternalFieldCount(1);
+ ww->InstanceTemplate()->SetInternalFieldCount(StreamReq::kStreamReqField + 1);
Local<String> writeWrapString =
FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap");
ww->SetClassName(writeWrapString);
@@ -261,8 +264,20 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable));
}
+typedef SimpleShutdownWrap<ReqWrap<uv_shutdown_t>, false> LibuvShutdownWrap;
+typedef SimpleWriteWrap<ReqWrap<uv_write_t>, false> LibuvWriteWrap;
-int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
+ShutdownWrap* LibuvStreamWrap::CreateShutdownWrap(Local<Object> object) {
+ return new LibuvShutdownWrap(this, object);
+}
+
+WriteWrap* LibuvStreamWrap::CreateWriteWrap(Local<Object> object) {
+ return new LibuvWriteWrap(this, object);
+}
+
+
+int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap_) {
+ LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(req_wrap_);
int err;
err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown);
req_wrap->Dispatched();
@@ -271,7 +286,8 @@ int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) {
- ShutdownWrap* req_wrap = ShutdownWrap::from_req(req);
+ LibuvShutdownWrap* req_wrap = static_cast<LibuvShutdownWrap*>(
+ LibuvShutdownWrap::from_req(req));
CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate());
Context::Scope context_scope(req_wrap->env()->context());
@@ -319,10 +335,11 @@ int LibuvStreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
}
-int LibuvStreamWrap::DoWrite(WriteWrap* w,
- uv_buf_t* bufs,
- size_t count,
- uv_stream_t* send_handle) {
+int LibuvStreamWrap::DoWrite(WriteWrap* req_wrap,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) {
+ LibuvWriteWrap* w = static_cast<LibuvWriteWrap*>(req_wrap);
int r;
if (send_handle == nullptr) {
r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite);
@@ -349,7 +366,8 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w,
void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) {
- WriteWrap* req_wrap = WriteWrap::from_req(req);
+ LibuvWriteWrap* req_wrap = static_cast<LibuvWriteWrap*>(
+ LibuvWriteWrap::from_req(req));
CHECK_NE(req_wrap, nullptr);
HandleScope scope(req_wrap->env()->isolate());
Context::Scope context_scope(req_wrap->env()->context());
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index e5ad25b91e..a97e8ba10f 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -73,6 +73,9 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
return stream()->type == UV_TCP;
}
+ ShutdownWrap* CreateShutdownWrap(v8::Local<v8::Object> object) override;
+ WriteWrap* CreateWriteWrap(v8::Local<v8::Object> object) override;
+
protected:
LibuvStreamWrap(Environment* env,
v8::Local<v8::Object> object,
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index ee6e120ca0..f2a84b83f3 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -285,37 +285,29 @@ void TLSWrap::EncOut() {
for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]);
- int err = stream_->DoTryWrite(&bufs, &count);
- if (err != 0) {
- InvokeQueued(err);
- } else if (count == 0) {
- env()->SetImmediate([](Environment* env, void* data) {
- NODE_COUNT_NET_BYTES_SENT(write_size_);
- static_cast<TLSWrap*>(data)->OnStreamAfterWrite(nullptr, 0);
- }, this, object());
+ StreamWriteResult res = underlying_stream()->Write(bufs, count);
+ if (res.err != 0) {
+ InvokeQueued(res.err);
return;
}
- Local<Object> req_wrap_obj =
- env()->write_wrap_constructor_function()
- ->NewInstance(env()->context()).ToLocalChecked();
- WriteWrap* write_req = WriteWrap::New(env(),
- req_wrap_obj,
- static_cast<StreamBase*>(stream_));
+ NODE_COUNT_NET_BYTES_SENT(write_size_);
- err = stream_->DoWrite(write_req, buf, count, nullptr);
-
- // Ignore errors, this should be already handled in js
- if (err) {
- write_req->Dispose();
- InvokeQueued(err);
- } else {
- NODE_COUNT_NET_BYTES_SENT(write_size_);
+ if (!res.async) {
+ // Simulate asynchronous finishing, TLS cannot handle this at the moment.
+ env()->SetImmediate([](Environment* env, void* data) {
+ static_cast<TLSWrap*>(data)->OnStreamAfterWrite(nullptr, 0);
+ }, this, object());
}
}
void TLSWrap::OnStreamAfterWrite(WriteWrap* req_wrap, int status) {
+ // Report back to the previous listener as well. This is only needed for the
+ // "empty" writes that are passed through directly to the underlying stream.
+ if (req_wrap != nullptr)
+ previous_listener_->OnStreamAfterWrite(req_wrap, status);
+
if (ssl_ == nullptr)
status = UV_ECANCELED;
@@ -513,24 +505,24 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
bool TLSWrap::IsIPCPipe() {
- return static_cast<StreamBase*>(stream_)->IsIPCPipe();
+ return underlying_stream()->IsIPCPipe();
}
int TLSWrap::GetFD() {
- return static_cast<StreamBase*>(stream_)->GetFD();
+ return underlying_stream()->GetFD();
}
bool TLSWrap::IsAlive() {
return ssl_ != nullptr &&
stream_ != nullptr &&
- static_cast<StreamBase*>(stream_)->IsAlive();
+ underlying_stream()->IsAlive();
}
bool TLSWrap::IsClosing() {
- return static_cast<StreamBase*>(stream_)->IsClosing();
+ return underlying_stream()->IsClosing();
}
@@ -580,6 +572,17 @@ int TLSWrap::DoWrite(WriteWrap* w,
// However, if there is any data that should be written to the socket,
// the callback should not be invoked immediately
if (BIO_pending(enc_out_) == 0) {
+ // We destroy the current WriteWrap* object and create a new one that
+ // matches the underlying stream, rather than the TLSWrap itself.
+
+ // Note: We cannot simply use w->object() because of the "optimized"
+ // way in which we read persistent handles; the JS object itself might be
+ // destroyed by w->Dispose(), and the Local<Object> we have is not a
+ // "real" handle in the sense the V8 is aware of its existence.
+ Local<Object> req_wrap_obj =
+ w->GetAsyncWrap()->persistent().Get(env()->isolate());
+ w->Dispose();
+ w = underlying_stream()->CreateWriteWrap(req_wrap_obj);
return stream_->DoWrite(w, bufs, count, send_handle);
}
}
@@ -587,7 +590,6 @@ int TLSWrap::DoWrite(WriteWrap* w,
// Store the current write wrap
CHECK_EQ(current_write_, nullptr);
current_write_ = w;
- w->Dispatched();
// Write queued data
if (empty) {
@@ -677,6 +679,11 @@ void TLSWrap::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
}
+ShutdownWrap* TLSWrap::CreateShutdownWrap(Local<Object> req_wrap_object) {
+ return underlying_stream()->CreateShutdownWrap(req_wrap_object);
+}
+
+
int TLSWrap::DoShutdown(ShutdownWrap* req_wrap) {
crypto::MarkPopErrorOnReturn mark_pop_error_on_return;
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index a1f0b99e86..afd19c027e 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -65,6 +65,8 @@ class TLSWrap : public AsyncWrap,
int ReadStart() override;
int ReadStop() override;
+ ShutdownWrap* CreateShutdownWrap(
+ v8::Local<v8::Object> req_wrap_object) override;
int DoShutdown(ShutdownWrap* req_wrap) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
@@ -78,6 +80,10 @@ class TLSWrap : public AsyncWrap,
size_t self_size() const override { return sizeof(*this); }
protected:
+ inline StreamBase* underlying_stream() {
+ return static_cast<StreamBase*>(stream_);
+ }
+
static const int kClearOutChunkSize = 16384;
// Maximum number of bytes for hello parser