diff options
Diffstat (limited to 'src/stream_base-inl.h')
-rw-r--r-- | src/stream_base-inl.h | 85 |
1 files changed, 83 insertions, 2 deletions
diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index cdcff67cc5..287978a870 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -25,6 +25,87 @@ using v8::Value; using AsyncHooks = Environment::AsyncHooks; + +inline StreamListener::~StreamListener() { + if (stream_ != nullptr) + stream_->RemoveStreamListener(this); +} + +inline void StreamListener::PassReadErrorToPreviousListener(ssize_t nread) { + CHECK_NE(previous_listener_, nullptr); + previous_listener_->OnStreamRead(nread, + uv_buf_init(nullptr, 0), + UV_UNKNOWN_HANDLE); +} + + +inline StreamResource::~StreamResource() { + while (listener_ != nullptr) { + listener_->OnStreamDestroy(); + RemoveStreamListener(listener_); + } +} + +inline void StreamResource::PushStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + CHECK_EQ(listener->stream_, nullptr); + + listener->previous_listener_ = listener_; + listener->stream_ = this; + + listener_ = listener; +} + +inline void StreamResource::RemoveStreamListener(StreamListener* listener) { + CHECK_NE(listener, nullptr); + + StreamListener* previous; + StreamListener* current; + + // Remove from the linked list. + for (current = listener_, previous = nullptr; + /* No loop condition because we want a crash if listener is not found */ + ; previous = current, current = current->previous_listener_) { + CHECK_NE(current, nullptr); + if (current == listener) { + if (previous != nullptr) + previous->previous_listener_ = current->previous_listener_; + else + listener_ = listener->previous_listener_; + break; + } + } + + listener->stream_ = nullptr; + listener->previous_listener_ = nullptr; +} + + +inline uv_buf_t StreamResource::EmitAlloc(size_t suggested_size) { + return listener_->OnStreamAlloc(suggested_size); +} + +inline void StreamResource::EmitRead(ssize_t nread, + const uv_buf_t& buf, + uv_handle_type pending) { + if (nread > 0) + bytes_read_ += static_cast<uint64_t>(nread); + listener_->OnStreamRead(nread, buf, pending); +} + +inline void StreamResource::EmitAfterWrite(WriteWrap* w, int status) { + listener_->OnStreamAfterWrite(w, status); +} + + +inline StreamBase::StreamBase(Environment* env) : env_(env) { + PushStreamListener(&default_listener_); +} + +inline Environment* StreamBase::stream_env() const { + return env_; +} + template <class Base> void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t, @@ -70,8 +151,8 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate>(), attributes); - env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStart>); - env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStop>); + env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>); + env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>); if ((flags & kFlagNoShutdown) == 0) env->SetProtoMethod(t, "shutdown", JSMethod<Base, &StreamBase::Shutdown>); if ((flags & kFlagHasWritev) != 0) |