diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 185 |
1 files changed, 176 insertions, 9 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index fa37201..d373a72 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname, #endif } +static int +amqp_os_socket_setsockblock(int sock, int block) +{ + +#ifdef _WIN32 + int nonblock = !block; + if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) { + return AMQP_STATUS_SOCKET_ERROR; + } else { + return AMQP_STATUS_OK; + } +#else + long arg; + + if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + if (block) { + arg &= (~O_NONBLOCK); + } else { + arg |= O_NONBLOCK; + } + + if (fcntl(sock, F_SETFL, arg) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + return AMQP_STATUS_OK; +#endif +} + + int amqp_os_socket_error(void) { @@ -182,7 +215,15 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) { assert(self); assert(self->klass->open); - return self->klass->open(self, host, port); + return self->klass->open(self, host, port, NULL); +} + +int +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout) +{ + assert(self); + assert(self->klass->open); + return self->klass->open(self, host, port, timeout); } int @@ -210,8 +251,9 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -int amqp_open_socket(char const *hostname, - int portnumber) +int amqp_open_socket_noblock(char const *hostname, + int portnumber, + struct timeval *timeout) { struct addrinfo hint; struct addrinfo *address_list; @@ -220,6 +262,15 @@ int amqp_open_socket(char const *hostname, int sockfd = -1; int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ + int res; + int timer_error; + amqp_timer_t timer; + + AMQP_INIT_TIMER(timer) + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -240,31 +291,147 @@ int amqp_open_socket(char const *hostname, } for (addr = address_list; addr; addr = addr->ai_next) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + sockfd = -1; + } + sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (-1 == sockfd) { last_error = AMQP_STATUS_SOCKET_ERROR; continue; } + #ifdef SO_NOSIGPIPE if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; } #endif /* SO_NOSIGPIPE */ - if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + + if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; + } + + if (timeout) { + /* Trying to connect with timeout, set socket to non-blocking mode */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); + + if (0 == res) { + /* Connected immediately, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } + +#ifdef _WIN32 + if (WSAEWOULDBLOCK == amqp_os_socket_error()) { +#else + if (EINPROGRESS == amqp_os_socket_error()) { +#endif + + while(1) { + fd_set write_fd; + fd_set except_fd; + + FD_ZERO(&write_fd); + FD_SET(sockfd, &write_fd); + + FD_ZERO(&except_fd); + FD_SET(sockfd, &except_fd); + + timer_error = amqp_timer_update(&timer, timeout); + + if (timer_error < 0) { + last_error = timer_error; + break; + } + + /* Win32 requires except_fds to be passed to detect connection + * failure. Other platforms only need write_fds, passing except_fds + * seems to be harmless otherwise + */ + res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + + if (res > 0) { + int result; + socklen_t result_len = sizeof(result); + + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + if (result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + /* socket is ready to be written to, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } else if (0 == res) { + /* Timed out - return */ + last_error = AMQP_STATUS_TIMEOUT; + break; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + } /* end while(1) loop */ + + if (last_error == AMQP_STATUS_OK + || last_error == AMQP_STATUS_TIMEOUT + || last_error == AMQP_STATUS_TIMER_FAILURE) { + /* Exit for loop on timer errors or when connection established */ + break; + } + + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + + } + } else { - last_error = AMQP_STATUS_OK; - break; + /* Connect in blocking mode */ + if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } else { + last_error = AMQP_STATUS_OK; + break; + } } } freeaddrinfo(address_list); if (last_error != AMQP_STATUS_OK) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + } + return last_error; } |