summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile.am12
-rw-r--r--examples/CMakeLists.txt6
-rw-r--r--examples/amqp_connect_timeout.c110
-rw-r--r--examples/amqps_connect_timeout.c132
-rw-r--r--librabbitmq/amqp.h20
-rw-r--r--librabbitmq/amqp_cyassl.c4
-rw-r--r--librabbitmq/amqp_gnutls.c4
-rw-r--r--librabbitmq/amqp_openssl.c4
-rw-r--r--librabbitmq/amqp_polarssl.c10
-rw-r--r--librabbitmq/amqp_socket.c185
-rw-r--r--librabbitmq/amqp_socket.h20
-rw-r--r--librabbitmq/amqp_tcp_socket.c4
-rw-r--r--librabbitmq/amqp_timer.c37
-rw-r--r--librabbitmq/amqp_timer.h25
14 files changed, 554 insertions, 19 deletions
diff --git a/Makefile.am b/Makefile.am
index da55823..989f5e9 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -154,6 +154,7 @@ endif
noinst_PROGRAMS = \
examples/amqp_bind \
examples/amqp_consumer \
+ examples/amqp_connect_timeout \
examples/amqp_exchange_declare \
examples/amqp_listen \
examples/amqp_listenq \
@@ -187,6 +188,11 @@ examples_amqp_consumer_LDADD = \
examples/libutils.la \
librabbitmq/librabbitmq.la
+examples_amqp_connect_timeout_SOURCES = examples/amqp_connect_timeout.c
+examples_amqp_connect_timeout_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
examples_amqp_unbind_SOURCES = examples/amqp_unbind.c
examples_amqp_unbind_LDADD = \
examples/libutils.la \
@@ -211,6 +217,7 @@ examples_amqp_rpc_sendstring_client_LDADD = \
if SSL
noinst_PROGRAMS += \
examples/amqps_bind \
+ examples/amqps_connect_timeout \
examples/amqps_consumer \
examples/amqps_exchange_declare \
examples/amqps_listen \
@@ -224,6 +231,11 @@ examples_amqps_bind_LDADD = \
examples/libutils.la \
librabbitmq/librabbitmq.la
+examples_amqps_connect_timeout_SOURCES = examples/amqps_connect_timeout.c
+examples_amqps_connect_timeout_LDADD = \
+ examples/libutils.la \
+ librabbitmq/librabbitmq.la
+
examples_amqps_consumer_SOURCES = examples/amqps_consumer.c
examples_amqps_consumer_LDADD = \
examples/libutils.la \
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index a184cf6..8dcdcf4 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -28,6 +28,9 @@ target_link_libraries(amqp_listen ${RMQ_LIBRARY_TARGET})
add_executable(amqp_producer amqp_producer.c ${COMMON_SRCS})
target_link_libraries(amqp_producer ${RMQ_LIBRARY_TARGET})
+add_executable(amqp_connect_timeout amqp_connect_timeout.c ${COMMON_SRCS})
+target_link_libraries(amqp_connect_timeout ${RMQ_LIBRARY_TARGET})
+
add_executable(amqp_consumer amqp_consumer.c ${COMMON_SRCS})
target_link_libraries(amqp_consumer ${RMQ_LIBRARY_TARGET})
@@ -41,6 +44,9 @@ add_executable(amqp_listenq amqp_listenq.c ${COMMON_SRCS})
target_link_libraries(amqp_listenq ${RMQ_LIBRARY_TARGET})
if (ENABLE_SSL_SUPPORT)
+add_executable(amqps_connect_timeout amqps_connect_timeout.c ${COMMON_SRCS})
+target_link_libraries(amqps_connect_timeout ${RMQ_LIBRARY_TARGET})
+
add_executable(amqps_sendstring amqps_sendstring.c ${COMMON_SRCS})
target_link_libraries(amqps_sendstring ${RMQ_LIBRARY_TARGET})
diff --git a/examples/amqp_connect_timeout.c b/examples/amqp_connect_timeout.c
new file mode 100644
index 0000000..c2bd5ec
--- /dev/null
+++ b/examples/amqp_connect_timeout.c
@@ -0,0 +1,110 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Bogdan Padalko are Copyright (c) 2013.
+ * Bogdan Padalko. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
+
+#include <assert.h>
+
+#ifdef _WIN32
+# ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+# endif
+# include <Winsock2.h>
+#else
+# include <sys/time.h>
+#endif
+
+#include "utils.h"
+
+int main(int argc, char const *const *argv)
+{
+ char const *hostname;
+ int port;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+ struct timeval *tv;
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: amqp_connect_timeout host port [timeout_sec [timeout_usec=0]]\n");
+ return 1;
+ }
+
+ if (argc > 3) {
+ tv = malloc(sizeof(struct timeval));
+
+ tv->tv_sec = atoi(argv[3]);
+
+ if (argc > 4 ) {
+ tv->tv_usec = atoi(argv[4]);
+ } else {
+ tv->tv_usec = 0;
+ }
+
+ } else {
+ tv = NULL;
+ }
+
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+
+ conn = amqp_new_connection();
+
+ socket = amqp_tcp_socket_new(conn);
+
+ if (!socket) {
+ die("creating TCP socket");
+ }
+
+ die_on_error(amqp_socket_open_noblock(socket, hostname, port, tv), "opening TCP socket");
+
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
+
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+
+ printf ("Done\n");
+ return 0;
+}
diff --git a/examples/amqps_connect_timeout.c b/examples/amqps_connect_timeout.c
new file mode 100644
index 0000000..89c00ac
--- /dev/null
+++ b/examples/amqps_connect_timeout.c
@@ -0,0 +1,132 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2012-2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Portions created by Mike Steinert are Copyright (c) 2012-2013
+ * Mike Steinert. All Rights Reserved.
+ *
+ * Portions created by Bogdan Padalko are Copyright (c) 2013.
+ * Bogdan Padalko. All Rights Reserved.
+ *
+ * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
+ * All Rights Reserved.
+ *
+ * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
+ * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <stdint.h>
+#include <amqp_ssl_socket.h>
+
+#include <assert.h>
+
+#ifdef _WIN32
+# ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+# endif
+# include <Winsock2.h>
+#else
+# include <sys/time.h>
+#endif
+
+#include "utils.h"
+
+
+int main(int argc, char const *const *argv)
+{
+ char const nofile[2] = "-";
+ char const *hostname;
+ int port, status;
+ amqp_socket_t *socket;
+ amqp_connection_state_t conn;
+ struct timeval *tv;
+
+ if (argc < 3) {
+ fprintf(stderr, "Usage: amqps_connect_timeout host port "
+ "[cacert.pem [key.pem cert.pem [timeout_sec [timeout_usec=0]]]]\n");
+ return 1;
+ }
+
+ hostname = argv[1];
+ port = atoi(argv[2]);
+
+ if (argc > 6) {
+ tv = malloc(sizeof(struct timeval));
+
+ tv->tv_sec = atoi(argv[6]);
+
+ if (argc > 7 ) {
+ tv->tv_usec = atoi(argv[7]);
+ } else {
+ tv->tv_usec = 0;
+ }
+
+ } else {
+ tv = NULL;
+ }
+
+ conn = amqp_new_connection();
+
+ socket = amqp_ssl_socket_new(conn);
+ if (!socket) {
+ die("creating SSL/TLS socket");
+ }
+
+ if (argc > 3 && strcmp(nofile, argv[3])) {
+ die_on_error(amqp_ssl_socket_set_cacert(socket, argv[3]), "setting CA certificate");
+ }
+
+ if (argc > 5) {
+ if (!strcmp(nofile, argv[5]) && !strcmp(nofile, argv[4])) {
+ status = 0;
+ } else if (!strcmp(nofile, argv[5]) || !strcmp(nofile, argv[4])) {
+ status = -1;
+ } else {
+ status = amqp_ssl_socket_set_key(socket, argv[5], argv[4]);
+ }
+
+ if (status) {
+ die("setting client key");
+ }
+ }
+
+ die_on_error(amqp_socket_open_noblock(socket, hostname, port, tv), "opening SSL/TLS connection");
+
+ die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
+ "Logging in");
+
+ die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection");
+ die_on_error(amqp_destroy_connection(conn), "Ending connection");
+
+ printf ("Done\n");
+ return 0;
+}
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index de6eda3..6adabe0 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -666,6 +666,26 @@ AMQP_CALL
amqp_socket_open(amqp_socket_t *self, const char *host, int port);
/**
+ * Open a socket connection.
+ *
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
+ *
+ * \param [in,out] self A socket object.
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout);
+
+/**
* Get the socket descriptor in use by a socket object.
*
* Retrieve the underlying socket descriptor. This function can be used to
diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c
index 89047d9..05ce12e 100644
--- a/librabbitmq/amqp_cyassl.c
+++ b/librabbitmq/amqp_cyassl.c
@@ -155,7 +155,7 @@ amqp_ssl_error_string(AMQP_UNUSED int err)
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
int status;
@@ -167,7 +167,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
return -1;
}
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
self->last_error = - self->sockfd;
return -1;
diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c
index 734643c..f18d427 100644
--- a/librabbitmq/amqp_gnutls.c
+++ b/librabbitmq/amqp_gnutls.c
@@ -119,7 +119,7 @@ amqp_ssl_socket_recv(void *base,
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
int status;
@@ -132,7 +132,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
return -1;
}
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
self->last_error = -self->sockfd;
return -1;
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index 2bd4fda..f70d377 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -232,7 +232,7 @@ error:
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
long result;
@@ -247,7 +247,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
}
SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY);
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
status = self->sockfd;
self->internal_error = amqp_os_socket_error();
diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c
index 770fdbe..bae3141 100644
--- a/librabbitmq/amqp_polarssl.c
+++ b/librabbitmq/amqp_polarssl.c
@@ -128,12 +128,20 @@ amqp_ssl_socket_recv(void *base,
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
int status;
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
self->last_error = 0;
+ if (timeout && (timeout->tv_sec != 0 || timeout->tv_usec != 0)) {
+ /* We don't support PolarSSL for now because it uses its own connect() wrapper
+ * It is not too hard to implement net_connect() with noblock support,
+ * but then we will have to maintain that piece of code and keep it synced with main PolarSSL code base
+ */
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
status = net_connect(&self->sockfd, host, port);
if (status) {
/* This isn't quite right. We should probably translate between
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index fa37201..d373a72 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname,
#endif
}
+static int
+amqp_os_socket_setsockblock(int sock, int block)
+{
+
+#ifdef _WIN32
+ int nonblock = !block;
+ if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ } else {
+ return AMQP_STATUS_OK;
+ }
+#else
+ long arg;
+
+ if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ if (block) {
+ arg &= (~O_NONBLOCK);
+ } else {
+ arg |= O_NONBLOCK;
+ }
+
+ if (fcntl(sock, F_SETFL, arg) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ return AMQP_STATUS_OK;
+#endif
+}
+
+
int
amqp_os_socket_error(void)
{
@@ -182,7 +215,15 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
{
assert(self);
assert(self->klass->open);
- return self->klass->open(self, host, port);
+ return self->klass->open(self, host, port, NULL);
+}
+
+int
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout)
+{
+ assert(self);
+ assert(self->klass->open);
+ return self->klass->open(self, host, port, timeout);
}
int
@@ -210,8 +251,9 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-int amqp_open_socket(char const *hostname,
- int portnumber)
+int amqp_open_socket_noblock(char const *hostname,
+ int portnumber,
+ struct timeval *timeout)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -220,6 +262,15 @@ int amqp_open_socket(char const *hostname,
int sockfd = -1;
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
+ int res;
+ int timer_error;
+ amqp_timer_t timer;
+
+ AMQP_INIT_TIMER(timer)
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -240,31 +291,147 @@ int amqp_open_socket(char const *hostname,
}
for (addr = address_list; addr; addr = addr->ai_next) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ sockfd = -1;
+ }
+
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+
if (-1 == sockfd) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
+
#ifdef SO_NOSIGPIPE
if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
}
#endif /* SO_NOSIGPIPE */
- if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+
+ if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
+ }
+
+ if (timeout) {
+ /* Trying to connect with timeout, set socket to non-blocking mode */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
+
+ if (0 == res) {
+ /* Connected immediately, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
+
+#ifdef _WIN32
+ if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
+#else
+ if (EINPROGRESS == amqp_os_socket_error()) {
+#endif
+
+ while(1) {
+ fd_set write_fd;
+ fd_set except_fd;
+
+ FD_ZERO(&write_fd);
+ FD_SET(sockfd, &write_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(sockfd, &except_fd);
+
+ timer_error = amqp_timer_update(&timer, timeout);
+
+ if (timer_error < 0) {
+ last_error = timer_error;
+ break;
+ }
+
+ /* Win32 requires except_fds to be passed to detect connection
+ * failure. Other platforms only need write_fds, passing except_fds
+ * seems to be harmless otherwise
+ */
+ res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+
+ if (res > 0) {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ if (result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ /* socket is ready to be written to, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ last_error = AMQP_STATUS_TIMEOUT;
+ break;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+ } /* end while(1) loop */
+
+ if (last_error == AMQP_STATUS_OK
+ || last_error == AMQP_STATUS_TIMEOUT
+ || last_error == AMQP_STATUS_TIMER_FAILURE) {
+ /* Exit for loop on timer errors or when connection established */
+ break;
+ }
+
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+
+ }
+
} else {
- last_error = AMQP_STATUS_OK;
- break;
+ /* Connect in blocking mode */
+ if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ } else {
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
}
}
freeaddrinfo(address_list);
if (last_error != AMQP_STATUS_OK) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ }
+
return last_error;
}
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index fed3947..b0a1805 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -46,7 +46,7 @@ amqp_os_socket_close(int sockfd);
typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int);
typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t);
typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
-typedef int (*amqp_socket_open_fn)(void *, const char *, int);
+typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *);
typedef int (*amqp_socket_close_fn)(void *);
typedef int (*amqp_socket_get_sockfd_fn)(void *);
typedef void (*amqp_socket_delete_fn)(void *);
@@ -164,6 +164,24 @@ amqp_socket_close(amqp_socket_t *self);
void
amqp_socket_delete(amqp_socket_t *self);
+/**
+ * Open a socket connection.
+ *
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
+ *
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode
+ *
+ * \return File descriptor upon success, non-zero negative error code otherwise.
+ */
+int
+amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout);
+
+
AMQP_END_DECLS
#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 6ab71ef..9a9fc69 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -220,10 +220,10 @@ start:
}
static int
-amqp_tcp_socket_open(void *base, const char *host, int port)
+amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
int err = self->sockfd;
self->sockfd = -1;
diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c
index 7066358..f89d43d 100644
--- a/librabbitmq/amqp_timer.c
+++ b/librabbitmq/amqp_timer.c
@@ -20,7 +20,9 @@
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
+#include "amqp.h"
#include "amqp_timer.h"
+#include <string.h>
#if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32))
# define AMQP_WIN_TIMER_API
@@ -96,3 +98,38 @@ amqp_get_monotonic_timestamp(void)
return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec);
}
#endif /* AMQP_POSIX_TIMER_API */
+
+int
+amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout)
+{
+ if (0 == timer->current_timestamp) {
+ timer->current_timestamp = amqp_get_monotonic_timestamp();
+
+ if (0 == timer->current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ timer->timeout_timestamp = timer->current_timestamp +
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+
+ } else {
+ timer->current_timestamp = amqp_get_monotonic_timestamp();
+
+ if (0 == timer->current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ }
+
+ if (timer->current_timestamp > timer->timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+
+ timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp;
+
+ memset(&timer->tv, 0, sizeof(struct timeval));
+ timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S;
+ timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+
+ return AMQP_STATUS_OK;
+}
diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h
index d1718af..946096d 100644
--- a/librabbitmq/amqp_timer.h
+++ b/librabbitmq/amqp_timer.h
@@ -25,12 +25,37 @@
#include <stdint.h>
+#ifdef _WIN32
+# ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+# endif
+# include <Winsock2.h>
+#else
+# include <sys/time.h>
+#endif
+
#define AMQP_NS_PER_S 1000000000
#define AMQP_NS_PER_US 1000
+#define AMQP_INIT_TIMER(structure) { \
+ structure.current_timestamp = 0; \
+ structure.timeout_timestamp = 0; \
+}
+
+typedef struct amqp_timer_t_ {
+ uint64_t current_timestamp;
+ uint64_t timeout_timestamp;
+ uint64_t ns_until_next_timeout;
+ struct timeval tv;
+} amqp_timer_t;
+
/* Gets a monotonic timestamp in ns */
uint64_t
amqp_get_monotonic_timestamp(void);
+/* Prepare timeout value and modify timer state based on timer state. */
+int
+amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout);
+
#endif /* AMQP_TIMER_H */