diff options
Diffstat (limited to 'deps/uv/src/uv-unix.c')
-rw-r--r-- | deps/uv/src/uv-unix.c | 557 |
1 files changed, 547 insertions, 10 deletions
diff --git a/deps/uv/src/uv-unix.c b/deps/uv/src/uv-unix.c index 7ed15ca711..3044363b29 100644 --- a/deps/uv/src/uv-unix.c +++ b/deps/uv/src/uv-unix.c @@ -103,6 +103,28 @@ static void uv__stream_connect(uv_stream_t*); static void uv__stream_io(EV_P_ ev_io* watcher, int revents); static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents); +static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w); +static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w); +static void uv__udp_run_completed(uv_udp_t* handle); +static void uv__udp_run_pending(uv_udp_t* handle); +static void uv__udp_destroy(uv_udp_t* handle); +static void uv__udp_recvmsg(uv_udp_t* handle); +static void uv__udp_sendmsg(uv_udp_t* handle); +static void uv__udp_io(EV_P_ ev_io* w, int events); +static int uv__udp_bind(uv_udp_t* handle, + int domain, + struct sockaddr* addr, + socklen_t len, + unsigned flags); +static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain); +static int uv__udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr* addr, + socklen_t addrlen, + uv_udp_send_cb send_cb); + #ifndef __GNUC__ #define __attribute__(a) #endif @@ -173,6 +195,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { case ECONNRESET: return UV_ECONNRESET; case EFAULT: return UV_EFAULT; case EMFILE: return UV_EMFILE; + case EMSGSIZE: return UV_EMSGSIZE; case EINVAL: return UV_EINVAL; case ECONNREFUSED: return UV_ECONNREFUSED; case EADDRINUSE: return UV_EADDRINUSE; @@ -201,6 +224,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) { void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { + uv_udp_t* udp; uv_async_t* async; uv_timer_t* timer; uv_stream_t* stream; @@ -226,6 +250,17 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { uv__close(stream->accepted_fd); stream->accepted_fd = -1; } + + assert(!ev_is_active(&stream->read_watcher)); + assert(!ev_is_active(&stream->write_watcher)); + break; + + case UV_UDP: + udp = (uv_udp_t*)handle; + uv__udp_watcher_stop(udp, &udp->read_watcher); + uv__udp_watcher_stop(udp, &udp->write_watcher); + uv__close(udp->fd); + udp->fd = -1; break; case UV_PREPARE: @@ -302,6 +337,489 @@ static void uv__handle_init(uv_handle_t* handle, uv_handle_type type) { } +static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) { + int flags; + + assert(w == &handle->read_watcher + || w == &handle->write_watcher); + + flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE); + + w->data = handle; + ev_set_cb(w, uv__udp_io); + ev_io_set(w, handle->fd, flags); + ev_io_start(EV_DEFAULT_UC_ w); +} + + +static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) { + int flags; + + assert(w == &handle->read_watcher + || w == &handle->write_watcher); + + flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE); + + ev_io_stop(EV_DEFAULT_UC_ w); + ev_io_set(w, -1, flags); + ev_set_cb(w, NULL); + w->data = (void*)0xDEADBABE; +} + + +static void uv__udp_destroy(uv_udp_t* handle) { + uv_udp_send_t* req; + ngx_queue_t* q; + + uv__udp_run_completed(handle); + + while (!ngx_queue_empty(&handle->write_queue)) { + q = ngx_queue_head(&handle->write_queue); + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_udp_send_t, queue); + if (req->send_cb) { + /* FIXME proper error code like UV_EABORTED */ + uv_err_new_artificial((uv_handle_t*)handle, UV_EINTR); + req->send_cb(req, -1); + } + } + + /* Now tear down the handle. */ + handle->flags = 0; + handle->recv_cb = NULL; + handle->alloc_cb = NULL; + /* but _do not_ touch close_cb */ + + if (handle->fd != -1) { + uv__close(handle->fd); + handle->fd = -1; + } + + uv__udp_watcher_stop(handle, &handle->read_watcher); + uv__udp_watcher_stop(handle, &handle->write_watcher); +} + + +static void uv__udp_run_pending(uv_udp_t* handle) { + uv_udp_send_t* req; + ngx_queue_t* q; + struct msghdr h; + ssize_t size; + + while (!ngx_queue_empty(&handle->write_queue)) { + q = ngx_queue_head(&handle->write_queue); + assert(q != NULL); + + req = ngx_queue_data(q, uv_udp_send_t, queue); + assert(req != NULL); + + memset(&h, 0, sizeof h); + h.msg_name = &req->addr; + h.msg_namelen = req->addrlen; + h.msg_iov = (struct iovec*)req->bufs; + h.msg_iovlen = req->bufcnt; + + do { + size = sendmsg(handle->fd, &h, 0); + } + while (size == -1 && errno == EINTR); + + /* TODO try to write once or twice more in the + * hope that the socket becomes readable again? + */ + if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + break; + + req->status = (size == -1 ? -errno : size); + +#ifndef NDEBUG + /* Sanity check. */ + if (size != -1) { + ssize_t nbytes; + int i; + + for (nbytes = i = 0; i < req->bufcnt; i++) + nbytes += req->bufs[i].len; + + assert(size == nbytes); + } +#endif + + /* Sending a datagram is an atomic operation: either all data + * is written or nothing is (and EMSGSIZE is raised). That is + * why we don't handle partial writes. Just pop the request + * off the write queue and onto the completed queue, done. + */ + ngx_queue_remove(&req->queue); + ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue); + } +} + + +static void uv__udp_run_completed(uv_udp_t* handle) { + uv_udp_send_t* req; + ngx_queue_t* q; + + while (!ngx_queue_empty(&handle->write_completed_queue)) { + q = ngx_queue_head(&handle->write_completed_queue); + assert(q != NULL); + + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_udp_send_t, queue); + assert(req != NULL); + + if (req->bufs != req->bufsml) + free(req->bufs); + + if (req->send_cb == NULL) + continue; + + /* req->status >= 0 == bytes written + * req->status < 0 == errno + */ + if (req->status >= 0) { + req->send_cb(req, 0); + } + else { + uv_err_new((uv_handle_t*)handle, -req->status); + req->send_cb(req, -1); + } + } +} + + +static void uv__udp_recvmsg(uv_udp_t* handle) { + struct sockaddr_storage peer; + struct msghdr h; + ssize_t nread; + uv_buf_t buf; + int flags; + + assert(handle->recv_cb != NULL); + assert(handle->alloc_cb != NULL); + + do { + /* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */ + buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024); + assert(buf.len > 0); + assert(buf.base != NULL); + + memset(&h, 0, sizeof h); + h.msg_name = &peer; + h.msg_namelen = sizeof peer; + h.msg_iov = (struct iovec*)&buf; + h.msg_iovlen = 1; + + do { + nread = recvmsg(handle->fd, &h, 0); + } + while (nread == -1 && errno == EINTR); + + if (nread == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + uv_err_new((uv_handle_t*)handle, EAGAIN); + handle->recv_cb(handle, 0, buf, NULL, 0); + } + else { + uv_err_new((uv_handle_t*)handle, errno); + handle->recv_cb(handle, -1, buf, NULL, 0); + } + } + else { + flags = 0; + + if (h.msg_flags & MSG_TRUNC) + flags |= UV_UDP_PARTIAL; + + handle->recv_cb(handle, + nread, + buf, + (struct sockaddr*)&peer, + flags); + } + } + /* recv_cb callback may decide to pause or close the handle */ + while (nread != -1 + && handle->fd != -1 + && handle->recv_cb != NULL); +} + + +static void uv__udp_sendmsg(uv_udp_t* handle) { + assert(!ngx_queue_empty(&handle->write_queue) + || !ngx_queue_empty(&handle->write_completed_queue)); + + /* Write out pending data first. */ + uv__udp_run_pending(handle); + + /* Drain 'request completed' queue. */ + uv__udp_run_completed(handle); + + if (!ngx_queue_empty(&handle->write_completed_queue)) { + /* Schedule completion callbacks. */ + ev_feed_event(EV_DEFAULT_ &handle->write_watcher, EV_WRITE); + } + else if (ngx_queue_empty(&handle->write_queue)) { + /* Pending queue and completion queue empty, stop watcher. */ + uv__udp_watcher_stop(handle, &handle->write_watcher); + } +} + + +static void uv__udp_io(EV_P_ ev_io* w, int events) { + uv_udp_t* handle; + + handle = w->data; + assert(handle != NULL); + assert(handle->type == UV_UDP); + assert(handle->fd >= 0); + assert(!(events & ~(EV_READ|EV_WRITE))); + + if (events & EV_READ) + uv__udp_recvmsg(handle); + + if (events & EV_WRITE) + uv__udp_sendmsg(handle); +} + + +static int uv__udp_bind(uv_udp_t* handle, + int domain, + struct sockaddr* addr, + socklen_t len, + unsigned flags) { + int saved_errno; + int status; + int yes; + int fd; + + saved_errno = errno; + status = -1; + + /* Check for bad flags. */ + if (flags & ~UV_UDP_IPV6ONLY) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + /* Cannot set IPv6-only mode on non-IPv6 socket. */ + if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + /* Check for already active socket. */ + if (handle->fd != -1) { + uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY); + goto out; + } + + if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } + + if (flags & UV_UDP_IPV6ONLY) { +#ifdef IPV6_V6ONLY + yes = 1; + if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } +#else + uv_err_new((uv_handle_t*)handle, ENOTSUP); + goto out; +#endif + } + + if (bind(fd, addr, len) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } + + handle->fd = fd; + status = 0; + +out: + if (status) + uv__close(fd); + + errno = saved_errno; + return status; +} + + +static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) { + struct sockaddr_storage taddr; + socklen_t addrlen; + + assert(domain == AF_INET || domain == AF_INET6); + + if (handle->fd != -1) + return 0; + + switch (domain) { + case AF_INET: + { + struct sockaddr_in* addr = (void*)&taddr; + memset(addr, 0, sizeof *addr); + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = INADDR_ANY; + addrlen = sizeof *addr; + break; + } + case AF_INET6: + { + struct sockaddr_in6* addr = (void*)&taddr; + memset(addr, 0, sizeof *addr); + addr->sin6_family = AF_INET6; + addr->sin6_addr = in6addr_any; + addrlen = sizeof *addr; + break; + } + default: + assert(0 && "unsupported address family"); + abort(); + } + + return uv__udp_bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0); +} + + +static int uv__udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr* addr, + socklen_t addrlen, + uv_udp_send_cb send_cb) { + if (uv__udp_maybe_deferred_bind(handle, addr->sa_family)) + return -1; + + /* Don't use uv__req_init(), it zeroes the data field. */ + uv_counters()->req_init++; + + memcpy(&req->addr, addr, addrlen); + req->addrlen = addrlen; + req->send_cb = send_cb; + req->handle = handle; + req->bufcnt = bufcnt; + req->type = UV_UDP_SEND; + + if (bufcnt <= UV_REQ_BUFSML_SIZE) { + req->bufs = req->bufsml; + } + else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) { + uv_err_new((uv_handle_t*)handle, ENOMEM); + return -1; + } + memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0])); + + ngx_queue_insert_tail(&handle->write_queue, &req->queue); + uv__udp_watcher_start(handle, &handle->write_watcher); + + return 0; +} + + +int uv_udp_init(uv_udp_t* handle) { + memset(handle, 0, sizeof *handle); + + uv__handle_init((uv_handle_t*)handle, UV_UDP); + uv_counters()->udp_init++; + + handle->fd = -1; + ngx_queue_init(&handle->write_queue); + ngx_queue_init(&handle->write_completed_queue); + + return 0; +} + + +int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) { + return uv__udp_bind(handle, + AF_INET, + (struct sockaddr*)&addr, + sizeof addr, + flags); +} + + +int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) { + return uv__udp_bind(handle, + AF_INET6, + (struct sockaddr*)&addr, + sizeof addr, + flags); +} + + +int uv_udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr_in addr, + uv_udp_send_cb send_cb) { + return uv__udp_send(req, + handle, + bufs, + bufcnt, + (struct sockaddr*)&addr, + sizeof addr, + send_cb); +} + + +int uv_udp_send6(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr_in6 addr, + uv_udp_send_cb send_cb) { + return uv__udp_send(req, + handle, + bufs, + bufcnt, + (struct sockaddr*)&addr, + sizeof addr, + send_cb); +} + + +int uv_udp_recv_start(uv_udp_t* handle, + uv_alloc_cb alloc_cb, + uv_udp_recv_cb recv_cb) { + if (alloc_cb == NULL || recv_cb == NULL) { + uv_err_new_artificial((uv_handle_t*)handle, UV_EINVAL); + return -1; + } + + if (ev_is_active(&handle->read_watcher)) { + uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY); + return -1; + } + + if (uv__udp_maybe_deferred_bind(handle, AF_INET)) + return -1; + + handle->alloc_cb = alloc_cb; + handle->recv_cb = recv_cb; + uv__udp_watcher_start(handle, &handle->read_watcher); + + return 0; +} + + +int uv_udp_recv_stop(uv_udp_t* handle) { + uv__udp_watcher_stop(handle, &handle->read_watcher); + handle->alloc_cb = NULL; + handle->recv_cb = NULL; + return 0; +} + + int uv_tcp_init(uv_tcp_t* tcp) { uv__handle_init((uv_handle_t*)tcp, UV_TCP); uv_counters()->tcp_init++; @@ -329,8 +847,10 @@ int uv_tcp_init(uv_tcp_t* tcp) { } -static int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, - int addrsize) { +static int uv__tcp_bind(uv_tcp_t* tcp, + int domain, + struct sockaddr* addr, + int addrsize) { int saved_errno; int status; @@ -376,8 +896,10 @@ int uv_tcp_bind(uv_tcp_t* tcp, struct sockaddr_in addr) { return -1; } - return uv__bind(tcp, AF_INET, (struct sockaddr*)&addr, - sizeof(struct sockaddr_in)); + return uv__tcp_bind(tcp, + AF_INET, + (struct sockaddr*)&addr, + sizeof(struct sockaddr_in)); } @@ -387,8 +909,10 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) { return -1; } - return uv__bind(tcp, AF_INET6, (struct sockaddr*)&addr, - sizeof(struct sockaddr_in6)); + return uv__tcp_bind(tcp, + AF_INET6, + (struct sockaddr*)&addr, + sizeof(struct sockaddr_in6)); } @@ -587,6 +1111,13 @@ void uv__finish_close(uv_handle_t* handle) { assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher)); break; + case UV_UDP: + assert(!ev_is_active(&((uv_udp_t*)handle)->read_watcher)); + assert(!ev_is_active(&((uv_udp_t*)handle)->write_watcher)); + assert(((uv_udp_t*)handle)->fd == -1); + uv__udp_destroy((uv_udp_t*)handle); + break; + case UV_PROCESS: assert(!ev_is_active(&((uv_process_t*)handle)->child_watcher)); break; @@ -820,21 +1351,22 @@ static void uv__read(uv_stream_t* stream) { */ while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) { assert(stream->alloc_cb); - buf = stream->alloc_cb(stream, 64 * 1024); + buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); assert(buf.len > 0); assert(buf.base); + assert(stream->fd >= 0); do { nread = read(stream->fd, buf.base, buf.len); } - while (nread == -1 && errno == EINTR); + while (nread < 0 && errno == EINTR); if (nread < 0) { /* Error */ if (errno == EAGAIN) { /* Wait for the next one. */ - if (((uv_handle_t*)stream)->flags & UV_READING) { + if (stream->flags & UV_READING) { ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher); } uv_err_new((uv_handle_t*)stream, EAGAIN); @@ -1112,7 +1644,7 @@ out: } -int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen) { +int uv_getsockname(uv_handle_t* handle, struct sockaddr* name, int* namelen) { socklen_t socklen; int saved_errno; @@ -1251,6 +1783,11 @@ int64_t uv_now() { int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE); + if (stream->flags & UV_CLOSING) { + uv_err_new((uv_handle_t*)stream, EINVAL); + return -1; + } + /* The UV_READING flag is irrelevant of the state of the tcp - it just * expresses the desired state of the user. */ |