summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-13 23:45:31 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-04-19 22:09:33 -0700
commitc745aca12166b086d58dc9c8ededa4896529e2d1 (patch)
tree15209b36e94ca1c0d2dc2c6fff72d4c5d6512d5c
parent8ed47ad13d174c3051e04aef79a520ebd7e70756 (diff)
downloadrabbitmq-c-c745aca12166b086d58dc9c8ededa4896529e2d1.tar.gz
Refactor read/write poll function usage.
-rw-r--r--librabbitmq/amqp_socket.c160
-rw-r--r--librabbitmq/amqp_socket.h6
2 files changed, 78 insertions, 88 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index e9aa2a7..4b996f3 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -258,14 +258,17 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-static int poll_for_write(int fd, uint64_t start, struct timeval *timeout) {
+static int amqp_poll(int fd, short event, uint64_t start, struct timeval *timeout) {
struct pollfd pfd;
int res;
int timeout_ms;
+ /* Function should only ever be called with one of these two */
+ assert(event == POLLIN || event == POLLOUT);
+
start_poll:
pfd.fd = fd;
- pfd.events = POLLOUT;
+ pfd.events = event;
if (timeout) {
timeout_ms =
@@ -277,7 +280,10 @@ start_poll:
res = poll(&pfd, 1, timeout_ms);
if (0 < res) {
- return AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE;
+ /* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or
+ * equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall
+ * doesn't need to be made. */
+ return AMQP_STATUS_OK;
} else if (0 == res) {
return AMQP_STATUS_TIMEOUT;
} else {
@@ -305,17 +311,27 @@ start_poll:
return AMQP_STATUS_SOCKET_ERROR;
}
}
+ return AMQP_STATUS_OK;
+}
+
+int amqp_poll_read(int fd, uint64_t start, struct timeval *timeout) {
+ return amqp_poll(fd, POLLIN, start, timeout);
+}
+
+int amqp_poll_write(int fd, uint64_t start, struct timeval* timeout) {
+ return amqp_poll(fd, POLLOUT, start, timeout);
}
ssize_t amqp_try_writev(amqp_connection_state_t state, struct iovec *iov,
int iovcnt) {
int i;
+ int fd;
ssize_t res;
struct iovec *iov_left = iov;
int iovcnt_left = iovcnt;
/* TODO(alanxz) this should probably be a parameter */
struct timeval *timeout = NULL;
- uint64_t start;
+ uint64_t start = 0;
ssize_t len_left;
if (timeout) {
start = amqp_get_monotonic_timestamp();
@@ -350,19 +366,23 @@ start_send:
}
}
goto start_send;
- } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) {
- return res;
}
- {
- int fd;
- fd = amqp_get_sockfd(state);
- if (-1 == fd) {
- return AMQP_STATUS_SOCKET_CLOSED;
- }
- res = poll_for_write(fd, start, timeout);
- if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) {
- goto start_send;
- }
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
+ switch (res) {
+ default:
+ return res;
+ case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
+ res = amqp_poll_read(fd, start, timeout);
+ break;
+ case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
+ res = amqp_poll_write(fd, start, timeout);
+ break;
+ }
+ if (AMQP_STATUS_OK == res) {
+ goto start_send;
}
return res;
}
@@ -370,12 +390,13 @@ start_send:
ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf,
size_t len) {
ssize_t res;
+ int fd;
void* buf_left = (void*)buf;
/* Assume that len is going to be larger than ssize_t can hold. */
ssize_t len_left = (size_t)len;
/* TODO(alanxz) this should probably be a parameter */
struct timeval *timeout = NULL;
- uint64_t start;
+ uint64_t start = 0;
if (timeout) {
start = amqp_get_monotonic_timestamp();
if (0 == start) {
@@ -393,19 +414,23 @@ start_send:
return AMQP_STATUS_OK;
}
goto start_send;
- } else if (res != AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE) {
- return res;
}
- {
- int fd;
- fd = amqp_get_sockfd(state);
- if (-1 == fd) {
- return AMQP_STATUS_SOCKET_CLOSED;
- }
- res = poll_for_write(fd, start, timeout);
- if (AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE == res) {
- goto start_send;
- }
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_SOCKET_CLOSED;
+ }
+ switch (res) {
+ default:
+ return res;
+ case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
+ res = amqp_poll_read(fd, start, timeout);
+ break;
+ case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
+ res = amqp_poll_write(fd, start, timeout);
+ break;
+ }
+ if (AMQP_STATUS_OK == res) {
+ goto start_send;
}
return res;
}
@@ -734,6 +759,7 @@ static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decode
static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, struct timeval *timeout)
{
int res;
+ int fd;
if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
@@ -746,66 +772,24 @@ start_recv:
state->sock_inbound_buffer.len, 0);
if (res < 0) {
- if (AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD == res) {
- int fd;
-
- fd = amqp_get_sockfd(state);
- if (-1 == fd) {
- return AMQP_STATUS_CONNECTION_CLOSED;
- }
-
- while (1) {
- struct pollfd pfd;
- int timeout_ms;
-
- pfd.fd = fd;
- pfd.events = POLLIN;
- pfd.revents = 0;
-
- if (timeout) {
- timeout_ms = timeout->tv_sec * AMQP_MS_PER_S +
- timeout->tv_usec / AMQP_US_PER_MS;
- } else {
- timeout_ms = -1;
- }
-
- res = poll(&pfd, 1, timeout_ms);
-
- if (0 < res) {
- goto start_recv;
- } else if (0 == res) {
- return AMQP_STATUS_TIMEOUT;
- } else if (-1 == res) {
- switch (amqp_os_socket_error()) {
- case EINTR:
- if (timeout) {
- uint64_t end_timestamp;
- uint64_t time_left;
- uint64_t current_timestamp = amqp_get_monotonic_timestamp();
- if (0 == current_timestamp) {
- return AMQP_STATUS_TIMER_FAILURE;
- }
- end_timestamp = start +
- (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
- (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
- if (current_timestamp > end_timestamp) {
- return AMQP_STATUS_TIMEOUT;
- }
-
- time_left = end_timestamp - current_timestamp;
-
- timeout->tv_sec = time_left / AMQP_NS_PER_S;
- timeout->tv_usec = (time_left % AMQP_NS_PER_S) / AMQP_NS_PER_US;
- }
- continue;
- default:
- return AMQP_STATUS_SOCKET_ERROR;
- }
- }
- }
- } else {
- return res;
+ fd = amqp_get_sockfd(state);
+ if (-1 == fd) {
+ return AMQP_STATUS_CONNECTION_CLOSED;
}
+ switch (res) {
+ default:
+ return res;
+ case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
+ res = amqp_poll_read(fd, start, timeout);
+ break;
+ case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
+ res = amqp_poll_write(fd, start, timeout);
+ break;
+ }
+ if (AMQP_STATUS_OK == res) {
+ goto start_recv;
+ }
+ return res;
}
state->sock_inbound_limit = res;
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index a1c4040..41617a0 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -186,6 +186,12 @@ int
amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout);
int
+amqp_poll_read(int fd, uint64_t start, struct timeval *timeout);
+
+int
+amqp_poll_write(int fd, uint64_t start, struct timeval *timeout);
+
+int
amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);
int