summaryrefslogtreecommitdiff
path: root/Utilities/cmlibuv/src/unix/stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'Utilities/cmlibuv/src/unix/stream.c')
-rw-r--r--Utilities/cmlibuv/src/unix/stream.c219
1 files changed, 102 insertions, 117 deletions
diff --git a/Utilities/cmlibuv/src/unix/stream.c b/Utilities/cmlibuv/src/unix/stream.c
index 3b6da8d464..dd0e0f7f58 100644
--- a/Utilities/cmlibuv/src/unix/stream.c
+++ b/Utilities/cmlibuv/src/unix/stream.c
@@ -164,7 +164,7 @@ static void uv__stream_osx_select(void* arg) {
else
max_fd = s->int_fd;
- while (1) {
+ for (;;) {
/* Terminate on semaphore */
if (uv_sem_trywait(&s->close_sem) == 0)
break;
@@ -195,7 +195,7 @@ static void uv__stream_osx_select(void* arg) {
/* Empty socketpair's buffer in case of interruption */
if (FD_ISSET(s->int_fd, s->sread))
- while (1) {
+ for (;;) {
r = read(s->int_fd, buf, sizeof(buf));
if (r == sizeof(buf))
@@ -799,33 +799,21 @@ static int uv__handle_fd(uv_handle_t* handle) {
}
}
-static void uv__write(uv_stream_t* stream) {
+static int uv__try_write(uv_stream_t* stream,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle) {
struct iovec* iov;
- QUEUE* q;
- uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
- int err;
-
-start:
-
- assert(uv__stream_fd(stream) >= 0);
-
- if (QUEUE_EMPTY(&stream->write_queue))
- return;
-
- q = QUEUE_HEAD(&stream->write_queue);
- req = QUEUE_DATA(q, uv_write_t, queue);
- assert(req->handle == stream);
/*
* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
- assert(sizeof(uv_buf_t) == sizeof(struct iovec));
- iov = (struct iovec*) &(req->bufs[req->write_index]);
- iovcnt = req->nbufs - req->write_index;
+ iov = (struct iovec*) bufs;
+ iovcnt = nbufs;
iovmax = uv__getiovmax();
@@ -837,8 +825,7 @@ start:
* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/
-
- if (req->send_handle) {
+ if (send_handle != NULL) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
@@ -847,12 +834,10 @@ start:
struct cmsghdr alias;
} scratch;
- if (uv__is_closing(req->send_handle)) {
- err = UV_EBADF;
- goto error;
- }
+ if (uv__is_closing(send_handle))
+ return UV_EBADF;
- fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
+ fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
memset(&scratch, 0, sizeof(scratch));
@@ -882,44 +867,68 @@ start:
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
-
- /* Ensure the handle isn't sent again in case this is a partial write. */
- if (n >= 0)
- req->send_handle = NULL;
} else {
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}
- if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
- err = UV__ERR(errno);
- goto error;
- }
+ if (n >= 0)
+ return n;
- if (n >= 0 && uv__write_req_update(stream, req, n)) {
- uv__write_req_finish(req);
- return; /* TODO(bnoordhuis) Start trying to write the next request. */
- }
+ if (IS_TRANSIENT_WRITE_ERROR(errno, send_handle))
+ return UV_EAGAIN;
- /* If this is a blocking stream, try again. */
- if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
- goto start;
+ return UV__ERR(errno);
+}
- /* We're not done. */
- uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
+static void uv__write(uv_stream_t* stream) {
+ QUEUE* q;
+ uv_write_t* req;
+ ssize_t n;
- /* Notify select() thread about state change */
- uv__stream_osx_interrupt_select(stream);
+ assert(uv__stream_fd(stream) >= 0);
+
+ for (;;) {
+ if (QUEUE_EMPTY(&stream->write_queue))
+ return;
+
+ q = QUEUE_HEAD(&stream->write_queue);
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ assert(req->handle == stream);
+
+ n = uv__try_write(stream,
+ &(req->bufs[req->write_index]),
+ req->nbufs - req->write_index,
+ req->send_handle);
+
+ /* Ensure the handle isn't sent again in case this is a partial write. */
+ if (n >= 0) {
+ req->send_handle = NULL;
+ if (uv__write_req_update(stream, req, n)) {
+ uv__write_req_finish(req);
+ return; /* TODO(bnoordhuis) Start trying to write the next request. */
+ }
+ } else if (n != UV_EAGAIN)
+ break;
+
+ /* If this is a blocking stream, try again. */
+ if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
+ continue;
+
+ /* We're not done. */
+ uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
- return;
+ /* Notify select() thread about state change */
+ uv__stream_osx_interrupt_select(stream);
+
+ return;
+ }
-error:
- req->error = err;
+ req->error = n;
+ // XXX(jwn): this must call uv__stream_flush_write_queue(stream, n) here, since we won't generate any more events
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
- if (!uv__io_active(&stream->io_watcher, POLLIN))
- uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
@@ -1001,9 +1010,9 @@ uv_handle_type uv__handle_type(int fd) {
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
stream->flags |= UV_HANDLE_READ_EOF;
stream->flags &= ~UV_HANDLE_READING;
+ stream->flags &= ~UV_HANDLE_READABLE;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
- if (!uv__io_active(&stream->io_watcher, POLLOUT))
- uv__handle_stop(stream);
+ uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb(stream, UV_EOF, buf);
}
@@ -1188,12 +1197,12 @@ static void uv__read(uv_stream_t* stream) {
#endif
} else {
/* Error. User should call uv_close(). */
+ stream->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
stream->read_cb(stream, UV__ERR(errno), &buf);
if (stream->flags & UV_HANDLE_READING) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
- if (!uv__io_active(&stream->io_watcher, POLLOUT))
- uv__handle_stop(stream);
+ uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
}
}
@@ -1276,6 +1285,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
req->cb = cb;
stream->shutdown_req = req;
stream->flags |= UV_HANDLE_SHUTTING;
+ stream->flags &= ~UV_HANDLE_WRITABLE;
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
@@ -1390,14 +1400,9 @@ static void uv__stream_connect(uv_stream_t* stream) {
}
-int uv_write2(uv_write_t* req,
- uv_stream_t* stream,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
- int empty_queue;
-
+static int uv__check_before_write(uv_stream_t* stream,
+ unsigned int nbufs,
+ uv_stream_t* send_handle) {
assert(nbufs > 0);
assert((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
@@ -1410,7 +1415,7 @@ int uv_write2(uv_write_t* req,
if (!(stream->flags & UV_HANDLE_WRITABLE))
return UV_EPIPE;
- if (send_handle) {
+ if (send_handle != NULL) {
if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return UV_EINVAL;
@@ -1430,6 +1435,22 @@ int uv_write2(uv_write_t* req,
#endif
}
+ return 0;
+}
+
+int uv_write2(uv_write_t* req,
+ uv_stream_t* stream,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ int empty_queue;
+ int err;
+
+ err = uv__check_before_write(stream, nbufs, send_handle);
+ if (err < 0)
+ return err;
+
/* It's legal for write_queue_size > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up write_queue_size later, see also uv__write_req_finish().
@@ -1498,72 +1519,37 @@ int uv_write(uv_write_t* req,
}
-void uv_try_write_cb(uv_write_t* req, int status) {
- /* Should not be called */
- abort();
-}
-
-
int uv_try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs) {
- int r;
- int has_pollout;
- size_t written;
- size_t req_size;
- uv_write_t req;
+ return uv_try_write2(stream, bufs, nbufs, NULL);
+}
+
+
+int uv_try_write2(uv_stream_t* stream,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle) {
+ int err;
/* Connecting or already writing some data */
if (stream->connect_req != NULL || stream->write_queue_size != 0)
return UV_EAGAIN;
- has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
+ err = uv__check_before_write(stream, nbufs, NULL);
+ if (err < 0)
+ return err;
- r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
- if (r != 0)
- return r;
-
- /* Remove not written bytes from write queue size */
- written = uv__count_bufs(bufs, nbufs);
- if (req.bufs != NULL)
- req_size = uv__write_req_size(&req);
- else
- req_size = 0;
- written -= req_size;
- stream->write_queue_size -= req_size;
-
- /* Unqueue request, regardless of immediateness */
- QUEUE_REMOVE(&req.queue);
- uv__req_unregister(stream->loop, &req);
- if (req.bufs != req.bufsml)
- uv__free(req.bufs);
- req.bufs = NULL;
-
- /* Do not poll for writable, if we wasn't before calling this */
- if (!has_pollout) {
- uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
- uv__stream_osx_interrupt_select(stream);
- }
-
- if (written == 0 && req_size != 0)
- return req.error < 0 ? req.error : UV_EAGAIN;
- else
- return written;
+ return uv__try_write(stream, bufs, nbufs, send_handle);
}
-int uv_read_start(uv_stream_t* stream,
- uv_alloc_cb alloc_cb,
- uv_read_cb read_cb) {
+int uv__read_start(uv_stream_t* stream,
+ uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
- if (stream->flags & UV_HANDLE_CLOSING)
- return UV_EINVAL;
-
- if (!(stream->flags & UV_HANDLE_READABLE))
- return UV_ENOTCONN;
-
/* The UV_HANDLE_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
@@ -1593,8 +1579,7 @@ int uv_read_stop(uv_stream_t* stream) {
stream->flags &= ~UV_HANDLE_READING;
uv__io_stop(stream->loop, &stream->io_watcher, POLLIN);
- if (!uv__io_active(&stream->io_watcher, POLLOUT))
- uv__handle_stop(stream);
+ uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
stream->read_cb = NULL;