diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2015-06-01 22:43:51 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2015-06-01 23:50:27 -0700 |
commit | 3f0d7454a3c31464fa771879d9922880e86b4a17 (patch) | |
tree | 7dd8b496e7b3d6539d87945b53d5b1ffcda2aacd | |
parent | be2e6dd499067cc4bf6b5a28242f494a2dfef299 (diff) | |
download | rabbitmq-c-3f0d7454a3c31464fa771879d9922880e86b4a17.tar.gz |
Lib: add AMQP_SF_POLL{IN,OUT} flags for amqp_poll
Get rid of amqp_poll_{read,write} and flatten the call-tree to amqp_poll with an
appropriate flag. Done in preparation for adding a select()-based
implementation of the amqp_poll() function.
-rw-r--r-- | librabbitmq/amqp_openssl.c | 8 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 31 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 12 |
3 files changed, 25 insertions, 26 deletions
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index d666638..bb28ddb 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -298,10 +298,10 @@ start_connect: self->internal_error = SSL_get_error(self->ssl, status); switch (self->internal_error) { case SSL_ERROR_WANT_READ: - status = amqp_poll_read(self->sockfd, deadline); + status = amqp_poll(self->sockfd, AMQP_SF_POLLIN, deadline); break; case SSL_ERROR_WANT_WRITE: - status = amqp_poll_write(self->sockfd, deadline); + status = amqp_poll(self->sockfd, AMQP_SF_POLLOUT, deadline); break; default: status = AMQP_STATUS_SSL_CONNECTION_FAILED; @@ -362,10 +362,10 @@ start_shutdown: self->internal_error = SSL_get_error(self->ssl, res); switch (self->internal_error) { case SSL_ERROR_WANT_READ: - res = amqp_poll_read(self->sockfd, amqp_time_infinite()); + res = amqp_poll(self->sockfd, AMQP_SF_POLLIN, amqp_time_infinite()); break; case SSL_ERROR_WANT_WRITE: - res = amqp_poll_write(self->sockfd, amqp_time_infinite()); + res = amqp_poll(self->sockfd, AMQP_SF_POLLOUT, amqp_time_infinite()); break; } if (AMQP_STATUS_OK == res) { diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 964782e..6267d23 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -257,17 +257,24 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -static int amqp_poll(int fd, short event, amqp_time_t deadline) { +int amqp_poll(int fd, int event, amqp_time_t deadline) { struct pollfd pfd; int res; int timeout_ms; /* Function should only ever be called with one of these two */ - assert(event == POLLIN || event == POLLOUT); + assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT); start_poll: pfd.fd = fd; - pfd.events = event; + switch (event) { + case AMQP_SF_POLLIN: + pfd.events = POLLIN; + break; + case AMQP_SF_POLLOUT: + pfd.events = POLLOUT; + break; + } timeout_ms = amqp_time_ms_until(deadline); if (-1 > timeout_ms) { @@ -294,14 +301,6 @@ start_poll: return AMQP_STATUS_OK; } -int amqp_poll_read(int fd, amqp_time_t deadline) { - return amqp_poll(fd, POLLIN, deadline); -} - -int amqp_poll_write(int fd, amqp_time_t deadline) { - return amqp_poll(fd, POLLOUT, deadline); -} - static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, amqp_time_t deadline) { int fd = amqp_get_sockfd(state); @@ -310,10 +309,10 @@ static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, } switch (res) { case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, deadline); + res = amqp_poll(fd, AMQP_SF_POLLIN, deadline); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, deadline); + res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline); break; } return res; @@ -442,7 +441,7 @@ int amqp_open_socket_inner(char const *hostname, #else if (EINPROGRESS == amqp_os_socket_error()) { #endif - last_error = amqp_poll_write(sockfd, deadline); + last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline); if (AMQP_STATUS_OK == last_error) { int result; socklen_t result_len = sizeof(result); @@ -668,10 +667,10 @@ start_recv: default: return (int)res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: - res = amqp_poll_read(fd, timeout); + res = amqp_poll(fd, AMQP_SF_POLLIN, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: - res = amqp_poll_write(fd, timeout); + res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout); break; } if (AMQP_STATUS_OK == res) { diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index 45c2f1b..86183ec 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -39,7 +39,9 @@ AMQP_BEGIN_DECLS typedef enum { AMQP_SF_NONE = 0, - AMQP_SF_MORE = 1 + AMQP_SF_MORE = 1, + AMQP_SF_POLLIN = 2, + AMQP_SF_POLLOUT = 4 } amqp_socket_flag_enum; int @@ -163,11 +165,9 @@ amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *t int amqp_open_socket_inner(char const *hostname, int portnumber, amqp_time_t deadline); -/* Wait up to deadline for fd to become readable */ -int amqp_poll_read(int fd, amqp_time_t deadline); - -/* Wait up to deadline for fd to become writeable */ -int amqp_poll_write(int fd, amqp_time_t deadline); +/* Wait up to dealline for fd to become readable or writeable depending on + * event (AMQP_SF_POLLIN, AMQP_SF_POLLOUT) */ +int amqp_poll(int fd, int event, amqp_time_t deadline); int amqp_send_method_inner(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, |