diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2017-07-25 01:18:22 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2017-07-25 08:23:10 -0700 |
commit | b252de9c855435aab3162ba7eea50af587501a11 (patch) | |
tree | 2a49ab53e7236e6bd368b5a634d8e4f3b6c22a38 | |
parent | b9eabbae2e312734b034d7588f03901ca76d158e (diff) | |
download | rabbitmq-c-b252de9c855435aab3162ba7eea50af587501a11.tar.gz |
Lib: simplify socket connection logicconn-try
Refactor amqp_socket_open_inner so that the socket connection logic is
in it's own OS-specific socket_connect function, and the host resolution
and retry logic calls this. This makes the socket connection logic
easier to understand and reduces the number of win32 #ifdefs.
This also fixes an issue where multiple hostnames are not tried if
connect doesn't fail immediately.
Fixes #430
-rw-r--r-- | librabbitmq/amqp_socket.c | 317 |
1 files changed, 154 insertions, 163 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index f4e536a..661c029 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -103,84 +103,6 @@ amqp_os_socket_init(void) #endif } -static int -amqp_os_socket_socket(int domain, int type, int protocol) -{ -#ifdef _WIN32 - /* - This cast is to squash warnings on Win64, see: - http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 - */ - return (int)socket(domain, type, protocol); -#else - int flags; - - int s = socket(domain, type, protocol); - if (s < 0) { - return s; - } - - /* Always enable CLOEXEC on the socket */ - flags = fcntl(s, F_GETFD); - if (flags == -1 - || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { - int e = errno; - close(s); - errno = e; - return -1; - } - - return s; - -#endif -} - -static int -amqp_os_socket_setsockopt(int sock, int level, int optname, - const void *optval, size_t optlen) -{ -#ifdef _WIN32 - /* the winsock setsockopt function has its 4th argument as a - const char * */ - return setsockopt(sock, level, optname, (const char *)optval, (int)optlen); -#else - return setsockopt(sock, level, optname, optval, optlen); -#endif -} - -static int -amqp_os_socket_setsockblock(int sock, int block) -{ - -#ifdef _WIN32 - u_long nonblock = !block; - if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) { - return AMQP_STATUS_SOCKET_ERROR; - } else { - return AMQP_STATUS_OK; - } -#else - long arg; - - if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) { - return AMQP_STATUS_SOCKET_ERROR; - } - - if (block) { - arg &= (~O_NONBLOCK); - } else { - arg |= O_NONBLOCK; - } - - if (fcntl(sock, F_SETFL, arg) < 0) { - return AMQP_STATUS_SOCKET_ERROR; - } - - return AMQP_STATUS_OK; -#endif -} - - int amqp_os_socket_error(void) { @@ -421,6 +343,155 @@ int amqp_open_socket_noblock(char const *hostname, return amqp_open_socket_inner(hostname, portnumber, deadline); } +#ifdef _WIN32 +static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { + int one = 1; + SOCKET sockfd; + int last_error; + + /* + * This cast is to squash warnings on Win64, see: + * http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 + */ + + sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (INVALID_SOCKET == sockfd) { + return AMQP_STATUS_SOCKET_ERROR; + } + + /* Set the socket to be non-blocking */ + if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + /* Disable nagle */ + if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, + (const char *)&one, sizeof(one))) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + /* Enable TCP keepalives */ + if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, + (const char *)&one, sizeof(one))) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) { + return (int)sockfd; + } + + if (WSAEWOULDBLOCK != WSAGetLastError()) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + last_error = + amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline); + if (AMQP_STATUS_OK != last_error) { + goto err; + } + + { + int result; + int result_len = sizeof(result); + + if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, + (char *)&result, &result_len) || + result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + } + + return (int)sockfd; + +err: + closesocket(sockfd); + return last_error; +} +#else +static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { + int one = 1; + int sockfd; + int flags; + int last_error; + + sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (-1 == sockfd) { + return AMQP_STATUS_SOCKET_ERROR; + } + + /* Enable CLOEXEC on socket */ + flags = fcntl(sockfd, F_GETFD); + if (flags == -1 + || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + /* Set the socket as non-blocking */ + flags = fcntl(sockfd, F_GETFL); + if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + +#ifdef SO_NOSIGPIPE + /* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */ + if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } +#endif /* SO_NOSIGPIPE */ + + /* Disable nagle */ + if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + /* Enable TCP keepalives */ + if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + return sockfd; + } + + if (EINPROGRESS != errno) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + + last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline); + if (AMQP_STATUS_OK != last_error) { + goto err; + } + + { + int result; + socklen_t result_len = sizeof(result); + + if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) || + result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + goto err; + } + } + + return sockfd; + +err: + close(sockfd); + return last_error; +} +#endif + int amqp_open_socket_inner(char const *hostname, int portnumber, amqp_time_t deadline) { @@ -430,8 +501,6 @@ int amqp_open_socket_inner(char const *hostname, char portnumber_string[33]; int sockfd = -1; int last_error; - int one = 1; /* for setsockopt */ - int res; last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -446,104 +515,26 @@ int amqp_open_socket_inner(char const *hostname, (void)sprintf(portnumber_string, "%d", portnumber); last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list); - if (0 != last_error) { return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED; } for (addr = address_list; addr; addr = addr->ai_next) { - if (-1 != sockfd) { - amqp_os_socket_close(sockfd); - } - - sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); - - if (-1 == sockfd) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } - -#ifdef SO_NOSIGPIPE - if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } -#endif /* SO_NOSIGPIPE */ - - if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } - - if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } - - if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, - sizeof(one))) { - last_error = AMQP_STATUS_SOCKET_ERROR; - continue; - } + sockfd = connect_socket(addr, deadline); -#ifdef _WIN32 - res = connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen); -#else - res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); -#endif - - if (0 == res) { + if (sockfd > 0) { last_error = AMQP_STATUS_OK; break; - } - -#ifdef _WIN32 - if (WSAEWOULDBLOCK == amqp_os_socket_error()) { - int event = AMQP_SF_POLLOUT | AMQP_SF_POLLERR; -#else - if (EINPROGRESS == amqp_os_socket_error()) { - int event = AMQP_SF_POLLOUT; -#endif - last_error = amqp_poll(sockfd, event, deadline); - if (AMQP_STATUS_OK == last_error) { - int result; - socklen_t result_len = sizeof(result); - -#ifdef _WIN32 - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&result, - (int *)&result_len) < 0 -#else - if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0 -#endif - || result != 0) { - last_error = AMQP_STATUS_SOCKET_ERROR; - } else { - last_error = AMQP_STATUS_OK; - } - } - - if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT || - last_error == AMQP_STATUS_TIMER_FAILURE) { - /* Exit for loop on timer errors or when connection established */ - break; - } - - } else { - /* Error connecting */ - last_error = AMQP_STATUS_SOCKET_ERROR; + } else if (sockfd == AMQP_STATUS_TIMEOUT) { + last_error = sockfd; break; } } freeaddrinfo(address_list); - if (last_error != AMQP_STATUS_OK) { - if (-1 != sockfd) { - amqp_os_socket_close(sockfd); - } - + if (last_error != AMQP_STATUS_OK || sockfd == -1) { return last_error; } - return sockfd; } |