diff options
author | Anna Henningsen <anna@addaleax.net> | 2018-01-08 01:14:06 +0100 |
---|---|---|
committer | Ruben Bridgewater <ruben@bridgewater.de> | 2018-02-01 10:53:26 +0100 |
commit | 7c4b09b24bbe7d6a8cbad256f47b30a101a909ea (patch) | |
tree | 1aef41b1fd1cc0aad300b178e0a19e6da29615c8 /src/stream_base-inl.h | |
parent | 1b6cb947611de5865641d1a6780ee6930a4e1d69 (diff) | |
download | node-new-7c4b09b24bbe7d6a8cbad256f47b30a101a909ea.tar.gz |
src: refactor stream callbacks and ownership
Instead of setting individual callbacks on streams and tracking
stream ownership through a boolean `consume_` flag, always have
one specific listener object in charge of a stream, and call
methods on that object rather than generic C-style callbacks.
PR-URL: https://github.com/nodejs/node/pull/18334
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'src/stream_base-inl.h')
-rw-r--r-- | src/stream_base-inl.h | 85 |
1 files changed, 83 insertions, 2 deletions
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index cdcff67cc5..287978a870 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -25,6 +25,87 @@ using v8::Value; using AsyncHooks = Environment::AsyncHooks; + +inline StreamListener::~StreamListener() { + if (stream_ != nullptr) + stream_->RemoveStreamListener(this); +} + +inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, + uv_buf_init(nullptr, 0), + UV_UNKNOWN_HANDLE); +} + + +inline StreamResource::~StreamResource() { + while (listener_ != nullptr) { + listener_->OnStreamDestroy(); + RemoveStreamListener(listener_); + } +} + +inline void StreamResource::PushStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + CHECK_EQ(listener->stream_, nullptr); + + listener->previous_listener_ = listener_; + listener->stream_ = this; + + listener_ = listener; +} + +inline void StreamResource::RemoveStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + + StreamListener* previous; + StreamListener* current; + + // Remove from the linked list. + for (current = listener_, previous = nullptr; + /* No loop condition because we want a crash if listener is not found */ + ; previous = current, current = current->previous_listener_) { + CHECK_NE(current, nullptr); + if (current == listener) { + if (previous != nullptr) + previous->previous_listener_ = current->previous_listener_; + else + listener_ = listener->previous_listener_; + break; + } + } + + listener->stream_ = nullptr; + listener->previous_listener_ = nullptr; +} + + +inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { + return listener_->OnStreamAlloc(suggested_size); +} + +inline void StreamResource::EmitRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + if (nread > 0) + bytes_read_ += static_cast<uint64_t>(nread); + listener_->OnStreamRead(nread, buf, pending); +} + +inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { + listener_->OnStreamAfterWrite(w, status); +} + + +inline StreamBase::StreamBase(Environment* env) : env_(env) { + PushStreamListener(&default_listener_); +} + +inline Environment* StreamBase::stream_env() const { + return env_; +} + template <class Base> void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t, @@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate>(), attributes); - env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>); - env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>); + env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>); + env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>); if ((flags & kFlagNoShutdown) == 0) env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>); if ((flags & kFlagHasWritev) != 0) |