summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-06-01 22:43:51 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-06-01 23:50:27 -0700
commit3f0d7454a3c31464fa771879d9922880e86b4a17 (patch)
tree7dd8b496e7b3d6539d87945b53d5b1ffcda2aacd
parentbe2e6dd499067cc4bf6b5a28242f494a2dfef299 (diff)
downloadrabbitmq-c-3f0d7454a3c31464fa771879d9922880e86b4a17.tar.gz
Lib: add AMQP_SF_POLL{IN,OUT} flags for amqp_poll
Get rid of amqp_poll_{read,write} and flatten the call-tree to amqp_poll with an appropriate flag. Done in preparation for adding a select()-based implementation of the amqp_poll() function.
-rw-r--r--librabbitmq/amqp_openssl.c8
-rw-r--r--librabbitmq/amqp_socket.c31
-rw-r--r--librabbitmq/amqp_socket.h12
3 files changed, 25 insertions, 26 deletions
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index d666638..bb28ddb 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -298,10 +298,10 @@ start_connect:
self->internal_error = SSL_get_error(self->ssl, status);
switch (self->internal_error) {
case SSL_ERROR_WANT_READ:
- status = amqp_poll_read(self->sockfd, deadline);
+ status = amqp_poll(self->sockfd, AMQP_SF_POLLIN, deadline);
break;
case SSL_ERROR_WANT_WRITE:
- status = amqp_poll_write(self->sockfd, deadline);
+ status = amqp_poll(self->sockfd, AMQP_SF_POLLOUT, deadline);
break;
default:
status = AMQP_STATUS_SSL_CONNECTION_FAILED;
@@ -362,10 +362,10 @@ start_shutdown:
self->internal_error = SSL_get_error(self->ssl, res);
switch (self->internal_error) {
case SSL_ERROR_WANT_READ:
- res = amqp_poll_read(self->sockfd, amqp_time_infinite());
+ res = amqp_poll(self->sockfd, AMQP_SF_POLLIN, amqp_time_infinite());
break;
case SSL_ERROR_WANT_WRITE:
- res = amqp_poll_write(self->sockfd, amqp_time_infinite());
+ res = amqp_poll(self->sockfd, AMQP_SF_POLLOUT, amqp_time_infinite());
break;
}
if (AMQP_STATUS_OK == res) {
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 964782e..6267d23 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -257,17 +257,24 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-static int amqp_poll(int fd, short event, amqp_time_t deadline) {
+int amqp_poll(int fd, int event, amqp_time_t deadline) {
struct pollfd pfd;
int res;
int timeout_ms;
/* Function should only ever be called with one of these two */
- assert(event == POLLIN || event == POLLOUT);
+ assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT);
start_poll:
pfd.fd = fd;
- pfd.events = event;
+ switch (event) {
+ case AMQP_SF_POLLIN:
+ pfd.events = POLLIN;
+ break;
+ case AMQP_SF_POLLOUT:
+ pfd.events = POLLOUT;
+ break;
+ }
timeout_ms = amqp_time_ms_until(deadline);
if (-1 > timeout_ms) {
@@ -294,14 +301,6 @@ start_poll:
return AMQP_STATUS_OK;
}
-int amqp_poll_read(int fd, amqp_time_t deadline) {
- return amqp_poll(fd, POLLIN, deadline);
-}
-
-int amqp_poll_write(int fd, amqp_time_t deadline) {
- return amqp_poll(fd, POLLOUT, deadline);
-}
-
static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
amqp_time_t deadline) {
int fd = amqp_get_sockfd(state);
@@ -310,10 +309,10 @@ static ssize_t do_poll(amqp_connection_state_t state, ssize_t res,
}
switch (res) {
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, deadline);
+ res = amqp_poll(fd, AMQP_SF_POLLIN, deadline);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, deadline);
+ res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline);
break;
}
return res;
@@ -442,7 +441,7 @@ int amqp_open_socket_inner(char const *hostname,
#else
if (EINPROGRESS == amqp_os_socket_error()) {
#endif
- last_error = amqp_poll_write(sockfd, deadline);
+ last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline);
if (AMQP_STATUS_OK == last_error) {
int result;
socklen_t result_len = sizeof(result);
@@ -668,10 +667,10 @@ start_recv:
default:
return (int)res;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD:
- res = amqp_poll_read(fd, timeout);
+ res = amqp_poll(fd, AMQP_SF_POLLIN, timeout);
break;
case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE:
- res = amqp_poll_write(fd, timeout);
+ res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout);
break;
}
if (AMQP_STATUS_OK == res) {
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index 45c2f1b..86183ec 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -39,7 +39,9 @@ AMQP_BEGIN_DECLS
typedef enum {
AMQP_SF_NONE = 0,
- AMQP_SF_MORE = 1
+ AMQP_SF_MORE = 1,
+ AMQP_SF_POLLIN = 2,
+ AMQP_SF_POLLOUT = 4
} amqp_socket_flag_enum;
int
@@ -163,11 +165,9 @@ amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *t
int amqp_open_socket_inner(char const *hostname, int portnumber,
amqp_time_t deadline);
-/* Wait up to deadline for fd to become readable */
-int amqp_poll_read(int fd, amqp_time_t deadline);
-
-/* Wait up to deadline for fd to become writeable */
-int amqp_poll_write(int fd, amqp_time_t deadline);
+/* Wait up to dealline for fd to become readable or writeable depending on
+ * event (AMQP_SF_POLLIN, AMQP_SF_POLLOUT) */
+int amqp_poll(int fd, int event, amqp_time_t deadline);
int amqp_send_method_inner(amqp_connection_state_t state,
amqp_channel_t channel, amqp_method_number_t id,