summaryrefslogtreecommitdiff
path: root/deps/uv/src/uv-unix.c
diff options
context:
space:
mode:
Diffstat (limited to 'deps/uv/src/uv-unix.c')
-rw-r--r--deps/uv/src/uv-unix.c557
1 files changed, 547 insertions, 10 deletions
diff --git a/deps/uv/src/uv-unix.c b/deps/uv/src/uv-unix.c
index 7ed15ca711..3044363b29 100644
--- a/deps/uv/src/uv-unix.c
+++ b/deps/uv/src/uv-unix.c
@@ -103,6 +103,28 @@ static void uv__stream_connect(uv_stream_t*);
static void uv__stream_io(EV_P_ ev_io* watcher, int revents);
static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
+static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w);
+static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w);
+static void uv__udp_run_completed(uv_udp_t* handle);
+static void uv__udp_run_pending(uv_udp_t* handle);
+static void uv__udp_destroy(uv_udp_t* handle);
+static void uv__udp_recvmsg(uv_udp_t* handle);
+static void uv__udp_sendmsg(uv_udp_t* handle);
+static void uv__udp_io(EV_P_ ev_io* w, int events);
+static int uv__udp_bind(uv_udp_t* handle,
+ int domain,
+ struct sockaddr* addr,
+ socklen_t len,
+ unsigned flags);
+static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
+static int uv__udp_send(uv_udp_send_t* req,
+ uv_udp_t* handle,
+ uv_buf_t bufs[],
+ int bufcnt,
+ struct sockaddr* addr,
+ socklen_t addrlen,
+ uv_udp_send_cb send_cb);
+
#ifndef __GNUC__
#define __attribute__(a)
#endif
@@ -173,6 +195,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
case ECONNRESET: return UV_ECONNRESET;
case EFAULT: return UV_EFAULT;
case EMFILE: return UV_EMFILE;
+ case EMSGSIZE: return UV_EMSGSIZE;
case EINVAL: return UV_EINVAL;
case ECONNREFUSED: return UV_ECONNREFUSED;
case EADDRINUSE: return UV_EADDRINUSE;
@@ -201,6 +224,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) {
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
+ uv_udp_t* udp;
uv_async_t* async;
uv_timer_t* timer;
uv_stream_t* stream;
@@ -226,6 +250,17 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
uv__close(stream->accepted_fd);
stream->accepted_fd = -1;
}
+
+ assert(!ev_is_active(&stream->read_watcher));
+ assert(!ev_is_active(&stream->write_watcher));
+ break;
+
+ case UV_UDP:
+ udp = (uv_udp_t*)handle;
+ uv__udp_watcher_stop(udp, &udp->read_watcher);
+ uv__udp_watcher_stop(udp, &udp->write_watcher);
+ uv__close(udp->fd);
+ udp->fd = -1;
break;
case UV_PREPARE:
@@ -302,6 +337,489 @@ static void uv__handle_init(uv_handle_t* handle, uv_handle_type type) {
}
+static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) {
+ int flags;
+
+ assert(w == &handle->read_watcher
+ || w == &handle->write_watcher);
+
+ flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
+
+ w->data = handle;
+ ev_set_cb(w, uv__udp_io);
+ ev_io_set(w, handle->fd, flags);
+ ev_io_start(EV_DEFAULT_UC_ w);
+}
+
+
+static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) {
+ int flags;
+
+ assert(w == &handle->read_watcher
+ || w == &handle->write_watcher);
+
+ flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
+
+ ev_io_stop(EV_DEFAULT_UC_ w);
+ ev_io_set(w, -1, flags);
+ ev_set_cb(w, NULL);
+ w->data = (void*)0xDEADBABE;
+}
+
+
+static void uv__udp_destroy(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ ngx_queue_t* q;
+
+ uv__udp_run_completed(handle);
+
+ while (!ngx_queue_empty(&handle->write_queue)) {
+ q = ngx_queue_head(&handle->write_queue);
+ ngx_queue_remove(q);
+
+ req = ngx_queue_data(q, uv_udp_send_t, queue);
+ if (req->send_cb) {
+ /* FIXME proper error code like UV_EABORTED */
+ uv_err_new_artificial((uv_handle_t*)handle, UV_EINTR);
+ req->send_cb(req, -1);
+ }
+ }
+
+ /* Now tear down the handle. */
+ handle->flags = 0;
+ handle->recv_cb = NULL;
+ handle->alloc_cb = NULL;
+ /* but _do not_ touch close_cb */
+
+ if (handle->fd != -1) {
+ uv__close(handle->fd);
+ handle->fd = -1;
+ }
+
+ uv__udp_watcher_stop(handle, &handle->read_watcher);
+ uv__udp_watcher_stop(handle, &handle->write_watcher);
+}
+
+
+static void uv__udp_run_pending(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ ngx_queue_t* q;
+ struct msghdr h;
+ ssize_t size;
+
+ while (!ngx_queue_empty(&handle->write_queue)) {
+ q = ngx_queue_head(&handle->write_queue);
+ assert(q != NULL);
+
+ req = ngx_queue_data(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ memset(&h, 0, sizeof h);
+ h.msg_name = &req->addr;
+ h.msg_namelen = req->addrlen;
+ h.msg_iov = (struct iovec*)req->bufs;
+ h.msg_iovlen = req->bufcnt;
+
+ do {
+ size = sendmsg(handle->fd, &h, 0);
+ }
+ while (size == -1 && errno == EINTR);
+
+ /* TODO try to write once or twice more in the
+ * hope that the socket becomes readable again?
+ */
+ if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
+ break;
+
+ req->status = (size == -1 ? -errno : size);
+
+#ifndef NDEBUG
+ /* Sanity check. */
+ if (size != -1) {
+ ssize_t nbytes;
+ int i;
+
+ for (nbytes = i = 0; i < req->bufcnt; i++)
+ nbytes += req->bufs[i].len;
+
+ assert(size == nbytes);
+ }
+#endif
+
+ /* Sending a datagram is an atomic operation: either all data
+ * is written or nothing is (and EMSGSIZE is raised). That is
+ * why we don't handle partial writes. Just pop the request
+ * off the write queue and onto the completed queue, done.
+ */
+ ngx_queue_remove(&req->queue);
+ ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
+ }
+}
+
+
+static void uv__udp_run_completed(uv_udp_t* handle) {
+ uv_udp_send_t* req;
+ ngx_queue_t* q;
+
+ while (!ngx_queue_empty(&handle->write_completed_queue)) {
+ q = ngx_queue_head(&handle->write_completed_queue);
+ assert(q != NULL);
+
+ ngx_queue_remove(q);
+
+ req = ngx_queue_data(q, uv_udp_send_t, queue);
+ assert(req != NULL);
+
+ if (req->bufs != req->bufsml)
+ free(req->bufs);
+
+ if (req->send_cb == NULL)
+ continue;
+
+ /* req->status >= 0 == bytes written
+ * req->status < 0 == errno
+ */
+ if (req->status >= 0) {
+ req->send_cb(req, 0);
+ }
+ else {
+ uv_err_new((uv_handle_t*)handle, -req->status);
+ req->send_cb(req, -1);
+ }
+ }
+}
+
+
+static void uv__udp_recvmsg(uv_udp_t* handle) {
+ struct sockaddr_storage peer;
+ struct msghdr h;
+ ssize_t nread;
+ uv_buf_t buf;
+ int flags;
+
+ assert(handle->recv_cb != NULL);
+ assert(handle->alloc_cb != NULL);
+
+ do {
+ /* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */
+ buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
+ assert(buf.len > 0);
+ assert(buf.base != NULL);
+
+ memset(&h, 0, sizeof h);
+ h.msg_name = &peer;
+ h.msg_namelen = sizeof peer;
+ h.msg_iov = (struct iovec*)&buf;
+ h.msg_iovlen = 1;
+
+ do {
+ nread = recvmsg(handle->fd, &h, 0);
+ }
+ while (nread == -1 && errno == EINTR);
+
+ if (nread == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ uv_err_new((uv_handle_t*)handle, EAGAIN);
+ handle->recv_cb(handle, 0, buf, NULL, 0);
+ }
+ else {
+ uv_err_new((uv_handle_t*)handle, errno);
+ handle->recv_cb(handle, -1, buf, NULL, 0);
+ }
+ }
+ else {
+ flags = 0;
+
+ if (h.msg_flags & MSG_TRUNC)
+ flags |= UV_UDP_PARTIAL;
+
+ handle->recv_cb(handle,
+ nread,
+ buf,
+ (struct sockaddr*)&peer,
+ flags);
+ }
+ }
+ /* recv_cb callback may decide to pause or close the handle */
+ while (nread != -1
+ && handle->fd != -1
+ && handle->recv_cb != NULL);
+}
+
+
+static void uv__udp_sendmsg(uv_udp_t* handle) {
+ assert(!ngx_queue_empty(&handle->write_queue)
+ || !ngx_queue_empty(&handle->write_completed_queue));
+
+ /* Write out pending data first. */
+ uv__udp_run_pending(handle);
+
+ /* Drain 'request completed' queue. */
+ uv__udp_run_completed(handle);
+
+ if (!ngx_queue_empty(&handle->write_completed_queue)) {
+ /* Schedule completion callbacks. */
+ ev_feed_event(EV_DEFAULT_ &handle->write_watcher, EV_WRITE);
+ }
+ else if (ngx_queue_empty(&handle->write_queue)) {
+ /* Pending queue and completion queue empty, stop watcher. */
+ uv__udp_watcher_stop(handle, &handle->write_watcher);
+ }
+}
+
+
+static void uv__udp_io(EV_P_ ev_io* w, int events) {
+ uv_udp_t* handle;
+
+ handle = w->data;
+ assert(handle != NULL);
+ assert(handle->type == UV_UDP);
+ assert(handle->fd >= 0);
+ assert(!(events & ~(EV_READ|EV_WRITE)));
+
+ if (events & EV_READ)
+ uv__udp_recvmsg(handle);
+
+ if (events & EV_WRITE)
+ uv__udp_sendmsg(handle);
+}
+
+
+static int uv__udp_bind(uv_udp_t* handle,
+ int domain,
+ struct sockaddr* addr,
+ socklen_t len,
+ unsigned flags) {
+ int saved_errno;
+ int status;
+ int yes;
+ int fd;
+
+ saved_errno = errno;
+ status = -1;
+
+ /* Check for bad flags. */
+ if (flags & ~UV_UDP_IPV6ONLY) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ /* Cannot set IPv6-only mode on non-IPv6 socket. */
+ if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
+ uv_err_new((uv_handle_t*)handle, EINVAL);
+ goto out;
+ }
+
+ /* Check for already active socket. */
+ if (handle->fd != -1) {
+ uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY);
+ goto out;
+ }
+
+ if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+
+ if (flags & UV_UDP_IPV6ONLY) {
+#ifdef IPV6_V6ONLY
+ yes = 1;
+ if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+#else
+ uv_err_new((uv_handle_t*)handle, ENOTSUP);
+ goto out;
+#endif
+ }
+
+ if (bind(fd, addr, len) == -1) {
+ uv_err_new((uv_handle_t*)handle, errno);
+ goto out;
+ }
+
+ handle->fd = fd;
+ status = 0;
+
+out:
+ if (status)
+ uv__close(fd);
+
+ errno = saved_errno;
+ return status;
+}
+
+
+static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
+ struct sockaddr_storage taddr;
+ socklen_t addrlen;
+
+ assert(domain == AF_INET || domain == AF_INET6);
+
+ if (handle->fd != -1)
+ return 0;
+
+ switch (domain) {
+ case AF_INET:
+ {
+ struct sockaddr_in* addr = (void*)&taddr;
+ memset(addr, 0, sizeof *addr);
+ addr->sin_family = AF_INET;
+ addr->sin_addr.s_addr = INADDR_ANY;
+ addrlen = sizeof *addr;
+ break;
+ }
+ case AF_INET6:
+ {
+ struct sockaddr_in6* addr = (void*)&taddr;
+ memset(addr, 0, sizeof *addr);
+ addr->sin6_family = AF_INET6;
+ addr->sin6_addr = in6addr_any;
+ addrlen = sizeof *addr;
+ break;
+ }
+ default:
+ assert(0 && "unsupported address family");
+ abort();
+ }
+
+ return uv__udp_bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
+}
+
+
+static int uv__udp_send(uv_udp_send_t* req,
+ uv_udp_t* handle,
+ uv_buf_t bufs[],
+ int bufcnt,
+ struct sockaddr* addr,
+ socklen_t addrlen,
+ uv_udp_send_cb send_cb) {
+ if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
+ return -1;
+
+ /* Don't use uv__req_init(), it zeroes the data field. */
+ uv_counters()->req_init++;
+
+ memcpy(&req->addr, addr, addrlen);
+ req->addrlen = addrlen;
+ req->send_cb = send_cb;
+ req->handle = handle;
+ req->bufcnt = bufcnt;
+ req->type = UV_UDP_SEND;
+
+ if (bufcnt <= UV_REQ_BUFSML_SIZE) {
+ req->bufs = req->bufsml;
+ }
+ else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
+ uv_err_new((uv_handle_t*)handle, ENOMEM);
+ return -1;
+ }
+ memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
+
+ ngx_queue_insert_tail(&handle->write_queue, &req->queue);
+ uv__udp_watcher_start(handle, &handle->write_watcher);
+
+ return 0;
+}
+
+
+int uv_udp_init(uv_udp_t* handle) {
+ memset(handle, 0, sizeof *handle);
+
+ uv__handle_init((uv_handle_t*)handle, UV_UDP);
+ uv_counters()->udp_init++;
+
+ handle->fd = -1;
+ ngx_queue_init(&handle->write_queue);
+ ngx_queue_init(&handle->write_completed_queue);
+
+ return 0;
+}
+
+
+int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
+ return uv__udp_bind(handle,
+ AF_INET,
+ (struct sockaddr*)&addr,
+ sizeof addr,
+ flags);
+}
+
+
+int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
+ return uv__udp_bind(handle,
+ AF_INET6,
+ (struct sockaddr*)&addr,
+ sizeof addr,
+ flags);
+}
+
+
+int uv_udp_send(uv_udp_send_t* req,
+ uv_udp_t* handle,
+ uv_buf_t bufs[],
+ int bufcnt,
+ struct sockaddr_in addr,
+ uv_udp_send_cb send_cb) {
+ return uv__udp_send(req,
+ handle,
+ bufs,
+ bufcnt,
+ (struct sockaddr*)&addr,
+ sizeof addr,
+ send_cb);
+}
+
+
+int uv_udp_send6(uv_udp_send_t* req,
+ uv_udp_t* handle,
+ uv_buf_t bufs[],
+ int bufcnt,
+ struct sockaddr_in6 addr,
+ uv_udp_send_cb send_cb) {
+ return uv__udp_send(req,
+ handle,
+ bufs,
+ bufcnt,
+ (struct sockaddr*)&addr,
+ sizeof addr,
+ send_cb);
+}
+
+
+int uv_udp_recv_start(uv_udp_t* handle,
+ uv_alloc_cb alloc_cb,
+ uv_udp_recv_cb recv_cb) {
+ if (alloc_cb == NULL || recv_cb == NULL) {
+ uv_err_new_artificial((uv_handle_t*)handle, UV_EINVAL);
+ return -1;
+ }
+
+ if (ev_is_active(&handle->read_watcher)) {
+ uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY);
+ return -1;
+ }
+
+ if (uv__udp_maybe_deferred_bind(handle, AF_INET))
+ return -1;
+
+ handle->alloc_cb = alloc_cb;
+ handle->recv_cb = recv_cb;
+ uv__udp_watcher_start(handle, &handle->read_watcher);
+
+ return 0;
+}
+
+
+int uv_udp_recv_stop(uv_udp_t* handle) {
+ uv__udp_watcher_stop(handle, &handle->read_watcher);
+ handle->alloc_cb = NULL;
+ handle->recv_cb = NULL;
+ return 0;
+}
+
+
int uv_tcp_init(uv_tcp_t* tcp) {
uv__handle_init((uv_handle_t*)tcp, UV_TCP);
uv_counters()->tcp_init++;
@@ -329,8 +847,10 @@ int uv_tcp_init(uv_tcp_t* tcp) {
}
-static int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr,
- int addrsize) {
+static int uv__tcp_bind(uv_tcp_t* tcp,
+ int domain,
+ struct sockaddr* addr,
+ int addrsize) {
int saved_errno;
int status;
@@ -376,8 +896,10 @@ int uv_tcp_bind(uv_tcp_t* tcp, struct sockaddr_in addr) {
return -1;
}
- return uv__bind(tcp, AF_INET, (struct sockaddr*)&addr,
- sizeof(struct sockaddr_in));
+ return uv__tcp_bind(tcp,
+ AF_INET,
+ (struct sockaddr*)&addr,
+ sizeof(struct sockaddr_in));
}
@@ -387,8 +909,10 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) {
return -1;
}
- return uv__bind(tcp, AF_INET6, (struct sockaddr*)&addr,
- sizeof(struct sockaddr_in6));
+ return uv__tcp_bind(tcp,
+ AF_INET6,
+ (struct sockaddr*)&addr,
+ sizeof(struct sockaddr_in6));
}
@@ -587,6 +1111,13 @@ void uv__finish_close(uv_handle_t* handle) {
assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher));
break;
+ case UV_UDP:
+ assert(!ev_is_active(&((uv_udp_t*)handle)->read_watcher));
+ assert(!ev_is_active(&((uv_udp_t*)handle)->write_watcher));
+ assert(((uv_udp_t*)handle)->fd == -1);
+ uv__udp_destroy((uv_udp_t*)handle);
+ break;
+
case UV_PROCESS:
assert(!ev_is_active(&((uv_process_t*)handle)->child_watcher));
break;
@@ -820,21 +1351,22 @@ static void uv__read(uv_stream_t* stream) {
*/
while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) {
assert(stream->alloc_cb);
- buf = stream->alloc_cb(stream, 64 * 1024);
+ buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
assert(buf.len > 0);
assert(buf.base);
+ assert(stream->fd >= 0);
do {
nread = read(stream->fd, buf.base, buf.len);
}
- while (nread == -1 && errno == EINTR);
+ while (nread < 0 && errno == EINTR);
if (nread < 0) {
/* Error */
if (errno == EAGAIN) {
/* Wait for the next one. */
- if (((uv_handle_t*)stream)->flags & UV_READING) {
+ if (stream->flags & UV_READING) {
ev_io_start(EV_DEFAULT_UC_ &stream->read_watcher);
}
uv_err_new((uv_handle_t*)stream, EAGAIN);
@@ -1112,7 +1644,7 @@ out:
}
-int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen) {
+int uv_getsockname(uv_handle_t* handle, struct sockaddr* name, int* namelen) {
socklen_t socklen;
int saved_errno;
@@ -1251,6 +1783,11 @@ int64_t uv_now() {
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
+ if (stream->flags & UV_CLOSING) {
+ uv_err_new((uv_handle_t*)stream, EINVAL);
+ return -1;
+ }
+
/* The UV_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/