summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c185
1 files changed, 176 insertions, 9 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index fa37201..d373a72 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname,
#endif
}
+static int
+amqp_os_socket_setsockblock(int sock, int block)
+{
+
+#ifdef _WIN32
+ int nonblock = !block;
+ if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ } else {
+ return AMQP_STATUS_OK;
+ }
+#else
+ long arg;
+
+ if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ if (block) {
+ arg &= (~O_NONBLOCK);
+ } else {
+ arg |= O_NONBLOCK;
+ }
+
+ if (fcntl(sock, F_SETFL, arg) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ return AMQP_STATUS_OK;
+#endif
+}
+
+
int
amqp_os_socket_error(void)
{
@@ -182,7 +215,15 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
{
assert(self);
assert(self->klass->open);
- return self->klass->open(self, host, port);
+ return self->klass->open(self, host, port, NULL);
+}
+
+int
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout)
+{
+ assert(self);
+ assert(self->klass->open);
+ return self->klass->open(self, host, port, timeout);
}
int
@@ -210,8 +251,9 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-int amqp_open_socket(char const *hostname,
- int portnumber)
+int amqp_open_socket_noblock(char const *hostname,
+ int portnumber,
+ struct timeval *timeout)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -220,6 +262,15 @@ int amqp_open_socket(char const *hostname,
int sockfd = -1;
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
+ int res;
+ int timer_error;
+ amqp_timer_t timer;
+
+ AMQP_INIT_TIMER(timer)
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -240,31 +291,147 @@ int amqp_open_socket(char const *hostname,
}
for (addr = address_list; addr; addr = addr->ai_next) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ sockfd = -1;
+ }
+
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+
if (-1 == sockfd) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
+
#ifdef SO_NOSIGPIPE
if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
}
#endif /* SO_NOSIGPIPE */
- if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+
+ if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
+ }
+
+ if (timeout) {
+ /* Trying to connect with timeout, set socket to non-blocking mode */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
+
+ if (0 == res) {
+ /* Connected immediately, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
+
+#ifdef _WIN32
+ if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
+#else
+ if (EINPROGRESS == amqp_os_socket_error()) {
+#endif
+
+ while(1) {
+ fd_set write_fd;
+ fd_set except_fd;
+
+ FD_ZERO(&write_fd);
+ FD_SET(sockfd, &write_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(sockfd, &except_fd);
+
+ timer_error = amqp_timer_update(&timer, timeout);
+
+ if (timer_error < 0) {
+ last_error = timer_error;
+ break;
+ }
+
+ /* Win32 requires except_fds to be passed to detect connection
+ * failure. Other platforms only need write_fds, passing except_fds
+ * seems to be harmless otherwise
+ */
+ res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+
+ if (res > 0) {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ if (result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ /* socket is ready to be written to, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ last_error = AMQP_STATUS_TIMEOUT;
+ break;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+ } /* end while(1) loop */
+
+ if (last_error == AMQP_STATUS_OK
+ || last_error == AMQP_STATUS_TIMEOUT
+ || last_error == AMQP_STATUS_TIMER_FAILURE) {
+ /* Exit for loop on timer errors or when connection established */
+ break;
+ }
+
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+
+ }
+
} else {
- last_error = AMQP_STATUS_OK;
- break;
+ /* Connect in blocking mode */
+ if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ } else {
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
}
}
freeaddrinfo(address_list);
if (last_error != AMQP_STATUS_OK) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ }
+
return last_error;
}