summaryrefslogtreecommitdiff
path: root/src/stream_base-inl.h
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-01-08 01:14:06 +0100
committerRuben Bridgewater <ruben@bridgewater.de>2018-02-01 10:53:26 +0100
commit7c4b09b24bbe7d6a8cbad256f47b30a101a909ea (patch)
tree1aef41b1fd1cc0aad300b178e0a19e6da29615c8 /src/stream_base-inl.h
parent1b6cb947611de5865641d1a6780ee6930a4e1d69 (diff)
downloadnode-new-7c4b09b24bbe7d6a8cbad256f47b30a101a909ea.tar.gz
src: refactor stream callbacks and ownership
Instead of setting individual callbacks on streams and tracking stream ownership through a boolean `consume_` flag, always have one specific listener object in charge of a stream, and call methods on that object rather than generic C-style callbacks. PR-URL: https://github.com/nodejs/node/pull/18334 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src/stream_base-inl.h')
-rw-r--r--src/stream_base-inl.h85
1 files changed, 83 insertions, 2 deletions
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h
index cdcff67cc5..287978a870 100644
--- a/src/stream_base-inl.h
+++ b/src/stream_base-inl.h
@@ -25,6 +25,87 @@ using v8::Value;
using AsyncHooks = Environment::AsyncHooks;
+
+inline StreamListener::~StreamListener() {
+ if (stream_ != nullptr)
+ stream_->RemoveStreamListener(this);
+}
+
+inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) {
+ CHECK_NE(previous_listener_, nullptr);
+ previous_listener_->OnStreamRead(nread,
+ uv_buf_init(nullptr, 0),
+ UV_UNKNOWN_HANDLE);
+}
+
+
+inline StreamResource::~StreamResource() {
+ while (listener_ != nullptr) {
+ listener_->OnStreamDestroy();
+ RemoveStreamListener(listener_);
+ }
+}
+
+inline void StreamResource::PushStreamListener(StreamListener* listener) {
+ CHECK_NE(listener, nullptr);
+ CHECK_EQ(listener->stream_, nullptr);
+
+ listener->previous_listener_ = listener_;
+ listener->stream_ = this;
+
+ listener_ = listener;
+}
+
+inline void StreamResource::RemoveStreamListener(StreamListener* listener) {
+ CHECK_NE(listener, nullptr);
+
+ StreamListener* previous;
+ StreamListener* current;
+
+ // Remove from the linked list.
+ for (current = listener_, previous = nullptr;
+ /* No loop condition because we want a crash if listener is not found */
+ ; previous = current, current = current->previous_listener_) {
+ CHECK_NE(current, nullptr);
+ if (current == listener) {
+ if (previous != nullptr)
+ previous->previous_listener_ = current->previous_listener_;
+ else
+ listener_ = listener->previous_listener_;
+ break;
+ }
+ }
+
+ listener->stream_ = nullptr;
+ listener->previous_listener_ = nullptr;
+}
+
+
+inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) {
+ return listener_->OnStreamAlloc(suggested_size);
+}
+
+inline void StreamResource::EmitRead(ssize_t nread,
+ const uv_buf_t& buf,
+ uv_handle_type pending) {
+ if (nread > 0)
+ bytes_read_ += static_cast<uint64_t>(nread);
+ listener_->OnStreamRead(nread, buf, pending);
+}
+
+inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) {
+ listener_->OnStreamAfterWrite(w, status);
+}
+
+
+inline StreamBase::StreamBase(Environment* env) : env_(env) {
+ PushStreamListener(&default_listener_);
+}
+
+inline Environment* StreamBase::stream_env() const {
+ return env_;
+}
+
template <class Base>
void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate> t,
@@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate>(),
attributes);
- env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>);
- env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>);
+ env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
+ env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
if ((flags & kFlagNoShutdown) == 0)
env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>);
if ((flags & kFlagHasWritev) != 0)