diff options
author | zaq178miami <pinepain@gmail.com> | 2013-06-23 19:36:10 +0300 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-07-06 22:28:45 -0700 |
commit | 6ad770dc62f76fa0625d277b521a120b549d9fc2 (patch) | |
tree | 964847f5a839c7ccbfba6c09bc9b5ca3102c89d4 /librabbitmq | |
parent | b6a1dfec14e70fc6afe8ce9710231e552ba6bfb5 (diff) | |
download | rabbitmq-c-6ad770dc62f76fa0625d277b521a120b549d9fc2.tar.gz |
Add nonblocking connect support
Diffstat (limited to 'librabbitmq')
-rw-r--r-- | librabbitmq/amqp.h | 20 | ||||
-rw-r--r-- | librabbitmq/amqp_cyassl.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_gnutls.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_openssl.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_polarssl.c | 10 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 185 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 20 | ||||
-rw-r--r-- | librabbitmq/amqp_tcp_socket.c | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_timer.c | 37 | ||||
-rw-r--r-- | librabbitmq/amqp_timer.h | 25 |
10 files changed, 294 insertions, 19 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index de6eda3..6adabe0 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -666,6 +666,26 @@ AMQP_CALL amqp_socket_open(amqp_socket_t *self, const char *host, int port); /** + * Open a socket connection. + * + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). + * + * \param [in,out] self A socket object. + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode + * + * \return Zero upon success, non-zero otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout); + +/** * Get the socket descriptor in use by a socket object. * * Retrieve the underlying socket descriptor. This function can be used to diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c index 89047d9..05ce12e 100644 --- a/librabbitmq/amqp_cyassl.c +++ b/librabbitmq/amqp_cyassl.c @@ -155,7 +155,7 @@ amqp_ssl_error_string(AMQP_UNUSED int err) } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; int status; @@ -167,7 +167,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) return -1; } - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { self->last_error = - self->sockfd; return -1; diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c index 734643c..f18d427 100644 --- a/librabbitmq/amqp_gnutls.c +++ b/librabbitmq/amqp_gnutls.c @@ -119,7 +119,7 @@ amqp_ssl_socket_recv(void *base, } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; int status; @@ -132,7 +132,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) return -1; } - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { self->last_error = -self->sockfd; return -1; diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index 2bd4fda..f70d377 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -232,7 +232,7 @@ error: } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; long result; @@ -247,7 +247,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) } SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY); - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { status = self->sockfd; self->internal_error = amqp_os_socket_error(); diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c index 770fdbe..bae3141 100644 --- a/librabbitmq/amqp_polarssl.c +++ b/librabbitmq/amqp_polarssl.c @@ -128,12 +128,20 @@ amqp_ssl_socket_recv(void *base, } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { int status; struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; self->last_error = 0; + if (timeout && (timeout->tv_sec != 0 || timeout->tv_usec != 0)) { + /* We don't support PolarSSL for now because it uses its own connect() wrapper + * It is not too hard to implement net_connect() with noblock support, + * but then we will have to maintain that piece of code and keep it synced with main PolarSSL code base + */ + return AMQP_STATUS_INVALID_PARAMETER; + } + status = net_connect(&self->sockfd, host, port); if (status) { /* This isn't quite right. We should probably translate between diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index fa37201..d373a72 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname, #endif } +static int +amqp_os_socket_setsockblock(int sock, int block) +{ + +#ifdef _WIN32 + int nonblock = !block; + if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) { + return AMQP_STATUS_SOCKET_ERROR; + } else { + return AMQP_STATUS_OK; + } +#else + long arg; + + if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + if (block) { + arg &= (~O_NONBLOCK); + } else { + arg |= O_NONBLOCK; + } + + if (fcntl(sock, F_SETFL, arg) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + return AMQP_STATUS_OK; +#endif +} + + int amqp_os_socket_error(void) { @@ -182,7 +215,15 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) { assert(self); assert(self->klass->open); - return self->klass->open(self, host, port); + return self->klass->open(self, host, port, NULL); +} + +int +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout) +{ + assert(self); + assert(self->klass->open); + return self->klass->open(self, host, port, timeout); } int @@ -210,8 +251,9 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -int amqp_open_socket(char const *hostname, - int portnumber) +int amqp_open_socket_noblock(char const *hostname, + int portnumber, + struct timeval *timeout) { struct addrinfo hint; struct addrinfo *address_list; @@ -220,6 +262,15 @@ int amqp_open_socket(char const *hostname, int sockfd = -1; int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ + int res; + int timer_error; + amqp_timer_t timer; + + AMQP_INIT_TIMER(timer) + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -240,31 +291,147 @@ int amqp_open_socket(char const *hostname, } for (addr = address_list; addr; addr = addr->ai_next) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + sockfd = -1; + } + sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (-1 == sockfd) { last_error = AMQP_STATUS_SOCKET_ERROR; continue; } + #ifdef SO_NOSIGPIPE if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; } #endif /* SO_NOSIGPIPE */ - if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + + if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; + } + + if (timeout) { + /* Trying to connect with timeout, set socket to non-blocking mode */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); + + if (0 == res) { + /* Connected immediately, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } + +#ifdef _WIN32 + if (WSAEWOULDBLOCK == amqp_os_socket_error()) { +#else + if (EINPROGRESS == amqp_os_socket_error()) { +#endif + + while(1) { + fd_set write_fd; + fd_set except_fd; + + FD_ZERO(&write_fd); + FD_SET(sockfd, &write_fd); + + FD_ZERO(&except_fd); + FD_SET(sockfd, &except_fd); + + timer_error = amqp_timer_update(&timer, timeout); + + if (timer_error < 0) { + last_error = timer_error; + break; + } + + /* Win32 requires except_fds to be passed to detect connection + * failure. Other platforms only need write_fds, passing except_fds + * seems to be harmless otherwise + */ + res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + + if (res > 0) { + int result; + socklen_t result_len = sizeof(result); + + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + if (result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + /* socket is ready to be written to, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } else if (0 == res) { + /* Timed out - return */ + last_error = AMQP_STATUS_TIMEOUT; + break; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + } /* end while(1) loop */ + + if (last_error == AMQP_STATUS_OK + || last_error == AMQP_STATUS_TIMEOUT + || last_error == AMQP_STATUS_TIMER_FAILURE) { + /* Exit for loop on timer errors or when connection established */ + break; + } + + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + + } + } else { - last_error = AMQP_STATUS_OK; - break; + /* Connect in blocking mode */ + if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } else { + last_error = AMQP_STATUS_OK; + break; + } } } freeaddrinfo(address_list); if (last_error != AMQP_STATUS_OK) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + } + return last_error; } diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index fed3947..b0a1805 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -46,7 +46,7 @@ amqp_os_socket_close(int sockfd); typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int); typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t); typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); -typedef int (*amqp_socket_open_fn)(void *, const char *, int); +typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *); typedef int (*amqp_socket_close_fn)(void *); typedef int (*amqp_socket_get_sockfd_fn)(void *); typedef void (*amqp_socket_delete_fn)(void *); @@ -164,6 +164,24 @@ amqp_socket_close(amqp_socket_t *self); void amqp_socket_delete(amqp_socket_t *self); +/** + * Open a socket connection. + * + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). + * + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode + * + * \return File descriptor upon success, non-zero negative error code otherwise. + */ +int +amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout); + + AMQP_END_DECLS #endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 6ab71ef..9a9fc69 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -220,10 +220,10 @@ start: } static int -amqp_tcp_socket_open(void *base, const char *host, int port) +amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { int err = self->sockfd; self->sockfd = -1; diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c index 7066358..f89d43d 100644 --- a/librabbitmq/amqp_timer.c +++ b/librabbitmq/amqp_timer.c @@ -20,7 +20,9 @@ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ +#include "amqp.h" #include "amqp_timer.h" +#include <string.h> #if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32)) # define AMQP_WIN_TIMER_API @@ -96,3 +98,38 @@ amqp_get_monotonic_timestamp(void) return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec); } #endif /* AMQP_POSIX_TIMER_API */ + +int +amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout) +{ + if (0 == timer->current_timestamp) { + timer->current_timestamp = amqp_get_monotonic_timestamp(); + + if (0 == timer->current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + timer->timeout_timestamp = timer->current_timestamp + + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + + } else { + timer->current_timestamp = amqp_get_monotonic_timestamp(); + + if (0 == timer->current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + } + + if (timer->current_timestamp > timer->timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + + timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp; + + memset(&timer->tv, 0, sizeof(struct timeval)); + timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S; + timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; + + return AMQP_STATUS_OK; +} diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h index d1718af..946096d 100644 --- a/librabbitmq/amqp_timer.h +++ b/librabbitmq/amqp_timer.h @@ -25,12 +25,37 @@ #include <stdint.h> +#ifdef _WIN32 +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include <Winsock2.h> +#else +# include <sys/time.h> +#endif + #define AMQP_NS_PER_S 1000000000 #define AMQP_NS_PER_US 1000 +#define AMQP_INIT_TIMER(structure) { \ + structure.current_timestamp = 0; \ + structure.timeout_timestamp = 0; \ +} + +typedef struct amqp_timer_t_ { + uint64_t current_timestamp; + uint64_t timeout_timestamp; + uint64_t ns_until_next_timeout; + struct timeval tv; +} amqp_timer_t; + /* Gets a monotonic timestamp in ns */ uint64_t amqp_get_monotonic_timestamp(void); +/* Prepare timeout value and modify timer state based on timer state. */ +int +amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout); + #endif /* AMQP_TIMER_H */ |