summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2017-07-25 01:18:22 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2017-07-25 08:23:10 -0700
commitb252de9c855435aab3162ba7eea50af587501a11 (patch)
tree2a49ab53e7236e6bd368b5a634d8e4f3b6c22a38
parentb9eabbae2e312734b034d7588f03901ca76d158e (diff)
downloadrabbitmq-c-conn-try.tar.gz
Lib: simplify socket connection logicconn-try
Refactor amqp_socket_open_inner so that the socket connection logic is in it's own OS-specific socket_connect function, and the host resolution and retry logic calls this. This makes the socket connection logic easier to understand and reduces the number of win32 #ifdefs. This also fixes an issue where multiple hostnames are not tried if connect doesn't fail immediately. Fixes #430
-rw-r--r--librabbitmq/amqp_socket.c317
1 files changed, 154 insertions, 163 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index f4e536a..661c029 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -103,84 +103,6 @@ amqp_os_socket_init(void)
#endif
}
-static int
-amqp_os_socket_socket(int domain, int type, int protocol)
-{
-#ifdef _WIN32
- /*
- This cast is to squash warnings on Win64, see:
- http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
- */
- return (int)socket(domain, type, protocol);
-#else
- int flags;
-
- int s = socket(domain, type, protocol);
- if (s < 0) {
- return s;
- }
-
- /* Always enable CLOEXEC on the socket */
- flags = fcntl(s, F_GETFD);
- if (flags == -1
- || fcntl(s, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
- int e = errno;
- close(s);
- errno = e;
- return -1;
- }
-
- return s;
-
-#endif
-}
-
-static int
-amqp_os_socket_setsockopt(int sock, int level, int optname,
- const void *optval, size_t optlen)
-{
-#ifdef _WIN32
- /* the winsock setsockopt function has its 4th argument as a
- const char * */
- return setsockopt(sock, level, optname, (const char *)optval, (int)optlen);
-#else
- return setsockopt(sock, level, optname, optval, optlen);
-#endif
-}
-
-static int
-amqp_os_socket_setsockblock(int sock, int block)
-{
-
-#ifdef _WIN32
- u_long nonblock = !block;
- if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
- return AMQP_STATUS_SOCKET_ERROR;
- } else {
- return AMQP_STATUS_OK;
- }
-#else
- long arg;
-
- if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
- return AMQP_STATUS_SOCKET_ERROR;
- }
-
- if (block) {
- arg &= (~O_NONBLOCK);
- } else {
- arg |= O_NONBLOCK;
- }
-
- if (fcntl(sock, F_SETFL, arg) < 0) {
- return AMQP_STATUS_SOCKET_ERROR;
- }
-
- return AMQP_STATUS_OK;
-#endif
-}
-
-
int
amqp_os_socket_error(void)
{
@@ -421,6 +343,155 @@ int amqp_open_socket_noblock(char const *hostname,
return amqp_open_socket_inner(hostname, portnumber, deadline);
}
+#ifdef _WIN32
+static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
+ int one = 1;
+ SOCKET sockfd;
+ int last_error;
+
+ /*
+ * This cast is to squash warnings on Win64, see:
+ * http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64
+ */
+
+ sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+ if (INVALID_SOCKET == sockfd) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ /* Set the socket to be non-blocking */
+ if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ /* Disable nagle */
+ if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY,
+ (const char *)&one, sizeof(one))) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ /* Enable TCP keepalives */
+ if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE,
+ (const char *)&one, sizeof(one))) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) {
+ return (int)sockfd;
+ }
+
+ if (WSAEWOULDBLOCK != WSAGetLastError()) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ last_error =
+ amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline);
+ if (AMQP_STATUS_OK != last_error) {
+ goto err;
+ }
+
+ {
+ int result;
+ int result_len = sizeof(result);
+
+ if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR,
+ (char *)&result, &result_len) ||
+ result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+ }
+
+ return (int)sockfd;
+
+err:
+ closesocket(sockfd);
+ return last_error;
+}
+#else
+static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) {
+ int one = 1;
+ int sockfd;
+ int flags;
+ int last_error;
+
+ sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+ if (-1 == sockfd) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ /* Enable CLOEXEC on socket */
+ flags = fcntl(sockfd, F_GETFD);
+ if (flags == -1
+ || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ /* Set the socket as non-blocking */
+ flags = fcntl(sockfd, F_GETFL);
+ if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+#ifdef SO_NOSIGPIPE
+ /* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */
+ if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+#endif /* SO_NOSIGPIPE */
+
+ /* Disable nagle */
+ if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ /* Enable TCP keepalives */
+ if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+ return sockfd;
+ }
+
+ if (EINPROGRESS != errno) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+
+ last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline);
+ if (AMQP_STATUS_OK != last_error) {
+ goto err;
+ }
+
+ {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) ||
+ result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ goto err;
+ }
+ }
+
+ return sockfd;
+
+err:
+ close(sockfd);
+ return last_error;
+}
+#endif
+
int amqp_open_socket_inner(char const *hostname,
int portnumber,
amqp_time_t deadline) {
@@ -430,8 +501,6 @@ int amqp_open_socket_inner(char const *hostname,
char portnumber_string[33];
int sockfd = -1;
int last_error;
- int one = 1; /* for setsockopt */
- int res;
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -446,104 +515,26 @@ int amqp_open_socket_inner(char const *hostname,
(void)sprintf(portnumber_string, "%d", portnumber);
last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list);
-
if (0 != last_error) {
return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED;
}
for (addr = address_list; addr; addr = addr->ai_next) {
- if (-1 != sockfd) {
- amqp_os_socket_close(sockfd);
- }
-
- sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
-
- if (-1 == sockfd) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
-
-#ifdef SO_NOSIGPIPE
- if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
-#endif /* SO_NOSIGPIPE */
-
- if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
-
- if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
-
- if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one,
- sizeof(one))) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
+ sockfd = connect_socket(addr, deadline);
-#ifdef _WIN32
- res = connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen);
-#else
- res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
-#endif
-
- if (0 == res) {
+ if (sockfd > 0) {
last_error = AMQP_STATUS_OK;
break;
- }
-
-#ifdef _WIN32
- if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
- int event = AMQP_SF_POLLOUT | AMQP_SF_POLLERR;
-#else
- if (EINPROGRESS == amqp_os_socket_error()) {
- int event = AMQP_SF_POLLOUT;
-#endif
- last_error = amqp_poll(sockfd, event, deadline);
- if (AMQP_STATUS_OK == last_error) {
- int result;
- socklen_t result_len = sizeof(result);
-
-#ifdef _WIN32
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&result,
- (int *)&result_len) < 0
-#else
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0
-#endif
- || result != 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- } else {
- last_error = AMQP_STATUS_OK;
- }
- }
-
- if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT ||
- last_error == AMQP_STATUS_TIMER_FAILURE) {
- /* Exit for loop on timer errors or when connection established */
- break;
- }
-
- } else {
- /* Error connecting */
- last_error = AMQP_STATUS_SOCKET_ERROR;
+ } else if (sockfd == AMQP_STATUS_TIMEOUT) {
+ last_error = sockfd;
break;
}
}
freeaddrinfo(address_list);
- if (last_error != AMQP_STATUS_OK) {
- if (-1 != sockfd) {
- amqp_os_socket_close(sockfd);
- }
-
+ if (last_error != AMQP_STATUS_OK || sockfd == -1) {
return last_error;
}
-
return sockfd;
}