#include "stream_wrap.h" #include "stream_base-inl.h" #include "env-inl.h" #include "env.h" #include "handle_wrap.h" #include "node_buffer.h" #include "node_counters.h" #include "pipe_wrap.h" #include "req-wrap-inl.h" #include "tcp_wrap.h" #include "udp_wrap.h" #include "util-inl.h" #include // abort() #include // memcpy() #include // INT_MAX namespace node { using v8::Context; using v8::EscapableHandleScope; using v8::FunctionCallbackInfo; using v8::FunctionTemplate; using v8::HandleScope; using v8::Integer; using v8::Local; using v8::Object; using v8::Value; void StreamWrap::Initialize(Local target, Local unused, Local context) { Environment* env = Environment::GetCurrent(context); Local sw = FunctionTemplate::New(env->isolate(), ShutdownWrap::NewShutdownWrap); sw->InstanceTemplate()->SetInternalFieldCount(1); sw->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap")); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "ShutdownWrap"), sw->GetFunction()); Local ww = FunctionTemplate::New(env->isolate(), WriteWrap::NewWriteWrap); ww->InstanceTemplate()->SetInternalFieldCount(1); ww->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap")); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "WriteWrap"), ww->GetFunction()); env->set_write_wrap_constructor_function(ww->GetFunction()); } StreamWrap::StreamWrap(Environment* env, Local object, uv_stream_t* stream, AsyncWrap::ProviderType provider, AsyncWrap* parent) : HandleWrap(env, object, reinterpret_cast(stream), provider, parent), StreamBase(env), stream_(stream) { set_after_write_cb({ OnAfterWriteImpl, this }); set_alloc_cb({ OnAllocImpl, this }); set_read_cb({ OnReadImpl, this }); } void StreamWrap::AddMethods(Environment* env, v8::Local target, int flags) { env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize); env->SetProtoMethod(target, "setBlocking", SetBlocking); StreamBase::AddMethods(env, target, flags); } int StreamWrap::GetFD() { int fd = -1; #if !defined(_WIN32) if (stream() != nullptr) uv_fileno(reinterpret_cast(stream()), &fd); #endif return fd; } bool StreamWrap::IsAlive() { return HandleWrap::IsAlive(this); } bool StreamWrap::IsClosing() { return uv_is_closing(reinterpret_cast(stream())); } void* StreamWrap::Cast() { return reinterpret_cast(this); } AsyncWrap* StreamWrap::GetAsyncWrap() { return static_cast(this); } bool StreamWrap::IsIPCPipe() { return is_named_pipe_ipc(); } uint32_t StreamWrap::UpdateWriteQueueSize() { HandleScope scope(env()->isolate()); uint32_t write_queue_size = stream()->write_queue_size; object()->Set(env()->context(), env()->write_queue_size_string(), Integer::NewFromUnsigned(env()->isolate(), write_queue_size)).FromJust(); return write_queue_size; } int StreamWrap::ReadStart() { return uv_read_start(stream(), OnAlloc, OnRead); } int StreamWrap::ReadStop() { return uv_read_stop(stream()); } void StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { StreamWrap* wrap = static_cast(handle->data); HandleScope scope(wrap->env()->isolate()); Context::Scope context_scope(wrap->env()->context()); CHECK_EQ(wrap->stream(), reinterpret_cast(handle)); return static_cast(wrap)->OnAlloc(suggested_size, buf); } void StreamWrap::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) { buf->base = node::Malloc(size); buf->len = size; } template static Local AcceptHandle(Environment* env, StreamWrap* parent) { EscapableHandleScope scope(env->isolate()); Local wrap_obj; UVType* handle; wrap_obj = WrapType::Instantiate(env, parent); if (wrap_obj.IsEmpty()) return Local(); WrapType* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, wrap_obj, Local()); handle = wrap->UVHandle(); if (uv_accept(parent->stream(), reinterpret_cast(handle))) ABORT(); return scope.Escape(wrap_obj); } void StreamWrap::OnReadImpl(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending, void* ctx) { StreamWrap* wrap = static_cast(ctx); Environment* env = wrap->env(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); Local pending_obj; if (nread < 0) { if (buf->base != nullptr) free(buf->base); wrap->EmitData(nread, Local(), pending_obj); return; } if (nread == 0) { if (buf->base != nullptr) free(buf->base); return; } CHECK_LE(static_cast(nread), buf->len); char* base = node::Realloc(buf->base, nread); if (pending == UV_TCP) { pending_obj = AcceptHandle(env, wrap); } else if (pending == UV_NAMED_PIPE) { pending_obj = AcceptHandle(env, wrap); } else if (pending == UV_UDP) { pending_obj = AcceptHandle(env, wrap); } else { CHECK_EQ(pending, UV_UNKNOWN_HANDLE); } Local obj = Buffer::New(env, base, nread).ToLocalChecked(); wrap->EmitData(nread, obj, pending_obj); } void StreamWrap::OnRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { StreamWrap* wrap = static_cast(handle->data); HandleScope scope(wrap->env()->isolate()); Context::Scope context_scope(wrap->env()->context()); uv_handle_type type = UV_UNKNOWN_HANDLE; if (wrap->is_named_pipe_ipc() && uv_pipe_pending_count(reinterpret_cast(handle)) > 0) { type = uv_pipe_pending_type(reinterpret_cast(handle)); } // We should not be getting this callback if someone as already called // uv_close() on the handle. CHECK_EQ(wrap->persistent().IsEmpty(), false); if (nread > 0) { if (wrap->is_tcp()) { NODE_COUNT_NET_BYTES_RECV(nread); } else if (wrap->is_named_pipe()) { NODE_COUNT_PIPE_BYTES_RECV(nread); } } static_cast(wrap)->OnRead(nread, buf, type); } void StreamWrap::UpdateWriteQueueSize( const FunctionCallbackInfo& args) { StreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); uint32_t write_queue_size = wrap->UpdateWriteQueueSize(); args.GetReturnValue().Set(write_queue_size); } void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { StreamWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); CHECK_GT(args.Length(), 0); if (!wrap->IsAlive()) return args.GetReturnValue().Set(UV_EINVAL); bool enable = args[0]->IsTrue(); args.GetReturnValue().Set(uv_stream_set_blocking(wrap->stream(), enable)); } int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) { int err; err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown); req_wrap->Dispatched(); return err; } void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { ShutdownWrap* req_wrap = ShutdownWrap::from_req(req); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); Context::Scope context_scope(req_wrap->env()->context()); req_wrap->Done(status); } // NOTE: Call to this function could change both `buf`'s and `count`'s // values, shifting their base and decrementing their length. This is // required in order to skip the data that was successfully written via // uv_try_write(). int StreamWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) { int err; size_t written; uv_buf_t* vbufs = *bufs; size_t vcount = *count; err = uv_try_write(stream(), vbufs, vcount); if (err == UV_ENOSYS || err == UV_EAGAIN) return 0; if (err < 0) return err; // Slice off the buffers: skip all written buffers and slice the one that // was partially written. written = err; for (; vcount > 0; vbufs++, vcount--) { // Slice if (vbufs[0].len > written) { vbufs[0].base += written; vbufs[0].len -= written; written = 0; break; // Discard } else { written -= vbufs[0].len; } } *bufs = vbufs; *count = vcount; return 0; } int StreamWrap::DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count, uv_stream_t* send_handle) { int r; if (send_handle == nullptr) { r = uv_write(w->req(), stream(), bufs, count, AfterWrite); } else { r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite); } if (!r) { size_t bytes = 0; for (size_t i = 0; i < count; i++) bytes += bufs[i].len; if (stream()->type == UV_TCP) { NODE_COUNT_NET_BYTES_SENT(bytes); } else if (stream()->type == UV_NAMED_PIPE) { NODE_COUNT_PIPE_BYTES_SENT(bytes); } } w->Dispatched(); UpdateWriteQueueSize(); return r; } void StreamWrap::AfterWrite(uv_write_t* req, int status) { WriteWrap* req_wrap = WriteWrap::from_req(req); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); Context::Scope context_scope(req_wrap->env()->context()); req_wrap->Done(status); } void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { StreamWrap* wrap = static_cast(ctx); wrap->UpdateWriteQueueSize(); } } // namespace node NODE_MODULE_CONTEXT_AWARE_BUILTIN(stream_wrap, node::StreamWrap::Initialize)