summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2015-04-06 22:41:34 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2015-04-19 22:09:13 -0700
commita240c6930627ffd0bb61628b3a4f399f9062d275 (patch)
tree12b57fe3414960d85e34bca2ef67a87e94882812
parent0ce505da95f40040ae4138601d907b4dd34eab47 (diff)
downloadrabbitmq-c-a240c6930627ffd0bb61628b3a4f399f9062d275.tar.gz
Simplify amqp_open_socket_nonblock.
-rw-r--r--librabbitmq/amqp_socket.c145
1 files changed, 65 insertions, 80 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index f581fc9..dbd6dac 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -71,6 +71,7 @@
#ifdef _WIN32
# define poll(fdarray, nfds, timeout) WSAPoll(fdarray, nfds, timeout)
+# define EINPROGRESS WSAEWOULDBLOCK
#endif
static int
@@ -279,7 +280,7 @@ int amqp_open_socket_noblock(char const *hostname,
int timer_error;
amqp_timer_t timer;
- AMQP_INIT_TIMER(timer)
+ AMQP_INIT_TIMER(timer);
if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0 ||
INT_MAX < ((uint64_t)timeout->tv_sec * AMQP_MS_PER_S +
@@ -329,114 +330,98 @@ int amqp_open_socket_noblock(char const *hostname,
continue;
}
- if (timeout) {
- /* Trying to connect with timeout, set socket to non-blocking mode */
- if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
-
- res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
- if (0 == res) {
- /* Connected immediately, set to blocking mode again */
- if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
+ res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
- last_error = AMQP_STATUS_OK;
- break;
+ if (0 == res) {
+ /* Connected immediately, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
}
-#ifdef _WIN32
- if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
-#else
- if (EINPROGRESS == amqp_os_socket_error()) {
-#endif
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
- while(1) {
- struct pollfd pfd;
- int timeout_ms;
+ if (EINPROGRESS == amqp_os_socket_error()) {
+ while (1) {
+ struct pollfd pfd;
+ int timeout_ms;
pfd.fd = sockfd;
pfd.events = POLLOUT;
pfd.revents = 0;
+ if (timeout) {
timer_error = amqp_timer_update(&timer, timeout);
-
if (timer_error < 0) {
last_error = timer_error;
break;
}
timeout_ms = timer.tv.tv_sec * AMQP_MS_PER_S +
- timer.tv.tv_usec / AMQP_US_PER_MS;
- /* Win32 requires except_fds to be passed to detect connection
- * failure. Other platforms only need write_fds, passing except_fds
- * seems to be harmless otherwise
- */
- res = poll(&pfd, 1, timeout_ms);
-
- if (res > 0) {
- int result;
- socklen_t result_len = sizeof(result);
-
- if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
- }
-
- if (result != 0) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- break;
- }
-
- /* socket is ready to be written to, set to blocking mode again */
- if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- }
-
- last_error = AMQP_STATUS_OK;
- break;
- } else if (0 == res) {
- /* Timed out - return */
- last_error = AMQP_STATUS_TIMEOUT;
+ timer.tv.tv_usec / AMQP_US_PER_MS;
+ } else {
+ timeout_ms = -1;
+ }
+ /* Win32 requires except_fds to be passed to detect connection
+ * failure. Other platforms only need write_fds, passing except_fds
+ * seems to be harmless otherwise
+ */
+ res = poll(&pfd, 1, timeout_ms);
+
+ if (res > 0) {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) <
+ 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
break;
- } else if (errno == EINTR) {
- /* Try again */
- continue;
- } else {
- /* Error connecting */
+ }
+
+ if (result != 0) {
last_error = AMQP_STATUS_SOCKET_ERROR;
break;
}
- } /* end while(1) loop */
- if (last_error == AMQP_STATUS_OK
- || last_error == AMQP_STATUS_TIMEOUT
- || last_error == AMQP_STATUS_TIMER_FAILURE) {
- /* Exit for loop on timer errors or when connection established */
+ /* socket is ready to be written to, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ last_error = AMQP_STATUS_TIMEOUT;
+ break;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
break;
}
+ } /* end while(1) loop */
- } else {
- /* Error connecting */
- last_error = AMQP_STATUS_SOCKET_ERROR;
+ if (last_error == AMQP_STATUS_OK || last_error == AMQP_STATUS_TIMEOUT ||
+ last_error == AMQP_STATUS_TIMER_FAILURE) {
+ /* Exit for loop on timer errors or when connection established */
break;
-
}
} else {
- /* Connect in blocking mode */
- if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
- last_error = AMQP_STATUS_SOCKET_ERROR;
- continue;
- } else {
- last_error = AMQP_STATUS_OK;
- break;
- }
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
}
}