From a240c6930627ffd0bb61628b3a4f399f9062d275 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Mon, 6 Apr 2015 22:41:34 -0700 Subject: Simplify amqp_open_socket_nonblock. --- librabbitmq/amqp_socket.c | 145 +++++++++++++++++++++------------------------- 1 file changed, 65 insertions(+), 80 deletions(-) diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index f581fc9..dbd6dac 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -71,6 +71,7 @@ #ifdef _WIN32 # define poll(fdarray, nfds, timeout) WSAPoll(fdarray, nfds, timeout) +# define EINPROGRESS WSAEWOULDBLOCK #endif static int @@ -279,7 +280,7 @@ int amqp_open_socket_noblock(char const *hostname, int timer_error; amqp_timer_t timer; - AMQP_INIT_TIMER(timer) + AMQP_INIT_TIMER(timer); if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 || INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S + @@ -329,114 +330,98 @@ int amqp_open_socket_noblock(char const *hostname, continue; } - if (timeout) { - /* Trying to connect with timeout, set socket to non-blocking mode */ - if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } - - res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } - if (0 == res) { - /* Connected immediately, set to blocking mode again */ - if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } + res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); - last_error = AMQP_STATUS_OK; - break; + if (0 == res) { + /* Connected immediately, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; } -#ifdef _WIN32 - if (WSAEWOULDBLOCK == amqp_os_socket_error()) { -#else - if (EINPROGRESS == amqp_os_socket_error()) { -#endif + last_error = AMQP_STATUS_OK; + break; + } - while(1) { - struct pollfd pfd; - int timeout_ms; + if (EINPROGRESS == amqp_os_socket_error()) { + while (1) { + struct pollfd pfd; + int timeout_ms; pfd.fd = sockfd; pfd.events = POLLOUT; pfd.revents = 0; + if (timeout) { timer_error = amqp_timer_update(&timer, timeout); - if (timer_error < 0) { last_error = timer_error; break; } timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S + - timer.tv.tv_usec / AMQP_US_PER_MS; - /* Win32 requires except_fds to be passed to detect connection - * failure. Other platforms only need write_fds, passing except_fds - * seems to be harmless otherwise - */ - res = poll(&pfd, 1, timeout_ms); - - if (res > 0) { - int result; - socklen_t result_len = sizeof(result); - - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - break; - } - - if (result != 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - break; - } - - /* socket is ready to be written to, set to blocking mode again */ - if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } - - last_error = AMQP_STATUS_OK; - break; - } else if (0 == res) { - /* Timed out - return */ - last_error = AMQP_STATUS_TIMEOUT; + timer.tv.tv_usec / AMQP_US_PER_MS; + } else { + timeout_ms = -1; + } + /* Win32 requires except_fds to be passed to detect connection + * failure. Other platforms only need write_fds, passing except_fds + * seems to be harmless otherwise + */ + res = poll(&pfd, 1, timeout_ms); + + if (res > 0) { + int result; + socklen_t result_len = sizeof(result); + + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < + 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; break; - } else if (errno == EINTR) { - /* Try again */ - continue; - } else { - /* Error connecting */ + } + + if (result != 0) { last_error = AMQP_STATUS_SOCKET_ERROR; break; } - } /* end while(1) loop */ - if (last_error == AMQP_STATUS_OK - || last_error == AMQP_STATUS_TIMEOUT - || last_error == AMQP_STATUS_TIMER_FAILURE) { - /* Exit for loop on timer errors or when connection established */ + /* socket is ready to be written to, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } else if (0 == res) { + /* Timed out - return */ + last_error = AMQP_STATUS_TIMEOUT; + break; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; break; } + } /* end while(1) loop */ - } else { - /* Error connecting */ - last_error = AMQP_STATUS_SOCKET_ERROR; + if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT || + last_error == AMQP_STATUS_TIMER_FAILURE) { + /* Exit for loop on timer errors or when connection established */ break; - } } else { - /* Connect in blocking mode */ - if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } else { - last_error = AMQP_STATUS_OK; - break; - } + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; } } -- cgit v1.2.1