summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorzaq178miami <pinepain@gmail.com>2013-06-23 19:36:10 +0300
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-06 22:28:45 -0700
commit6ad770dc62f76fa0625d277b521a120b549d9fc2 (patch)
tree964847f5a839c7ccbfba6c09bc9b5ca3102c89d4 /librabbitmq
parentb6a1dfec14e70fc6afe8ce9710231e552ba6bfb5 (diff)
downloadrabbitmq-c-6ad770dc62f76fa0625d277b521a120b549d9fc2.tar.gz
Add nonblocking connect support
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp.h20
-rw-r--r--librabbitmq/amqp_cyassl.c4
-rw-r--r--librabbitmq/amqp_gnutls.c4
-rw-r--r--librabbitmq/amqp_openssl.c4
-rw-r--r--librabbitmq/amqp_polarssl.c10
-rw-r--r--librabbitmq/amqp_socket.c185
-rw-r--r--librabbitmq/amqp_socket.h20
-rw-r--r--librabbitmq/amqp_tcp_socket.c4
-rw-r--r--librabbitmq/amqp_timer.c37
-rw-r--r--librabbitmq/amqp_timer.h25
10 files changed, 294 insertions, 19 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index de6eda3..6adabe0 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -666,6 +666,26 @@ AMQP_CALL
amqp_socket_open(amqp_socket_t *self, const char *host, int port);
/**
+ * Open a socket connection.
+ *
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
+ *
+ * \param [in,out] self A socket object.
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode
+ *
+ * \return Zero upon success, non-zero otherwise.
+ */
+AMQP_PUBLIC_FUNCTION
+int
+AMQP_CALL
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout);
+
+/**
* Get the socket descriptor in use by a socket object.
*
* Retrieve the underlying socket descriptor. This function can be used to
diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c
index 89047d9..05ce12e 100644
--- a/librabbitmq/amqp_cyassl.c
+++ b/librabbitmq/amqp_cyassl.c
@@ -155,7 +155,7 @@ amqp_ssl_error_string(AMQP_UNUSED int err)
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
int status;
@@ -167,7 +167,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
return -1;
}
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
self->last_error = - self->sockfd;
return -1;
diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c
index 734643c..f18d427 100644
--- a/librabbitmq/amqp_gnutls.c
+++ b/librabbitmq/amqp_gnutls.c
@@ -119,7 +119,7 @@ amqp_ssl_socket_recv(void *base,
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
int status;
@@ -132,7 +132,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
return -1;
}
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
self->last_error = -self->sockfd;
return -1;
diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c
index 2bd4fda..f70d377 100644
--- a/librabbitmq/amqp_openssl.c
+++ b/librabbitmq/amqp_openssl.c
@@ -232,7 +232,7 @@ error:
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
long result;
@@ -247,7 +247,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port)
}
SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY);
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
status = self->sockfd;
self->internal_error = amqp_os_socket_error();
diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c
index 770fdbe..bae3141 100644
--- a/librabbitmq/amqp_polarssl.c
+++ b/librabbitmq/amqp_polarssl.c
@@ -128,12 +128,20 @@ amqp_ssl_socket_recv(void *base,
}
static int
-amqp_ssl_socket_open(void *base, const char *host, int port)
+amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
int status;
struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base;
self->last_error = 0;
+ if (timeout && (timeout->tv_sec != 0 || timeout->tv_usec != 0)) {
+ /* We don't support PolarSSL for now because it uses its own connect() wrapper
+ * It is not too hard to implement net_connect() with noblock support,
+ * but then we will have to maintain that piece of code and keep it synced with main PolarSSL code base
+ */
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
+
status = net_connect(&self->sockfd, host, port);
if (status) {
/* This isn't quite right. We should probably translate between
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index fa37201..d373a72 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname,
#endif
}
+static int
+amqp_os_socket_setsockblock(int sock, int block)
+{
+
+#ifdef _WIN32
+ int nonblock = !block;
+ if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ } else {
+ return AMQP_STATUS_OK;
+ }
+#else
+ long arg;
+
+ if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ if (block) {
+ arg &= (~O_NONBLOCK);
+ } else {
+ arg |= O_NONBLOCK;
+ }
+
+ if (fcntl(sock, F_SETFL, arg) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ return AMQP_STATUS_OK;
+#endif
+}
+
+
int
amqp_os_socket_error(void)
{
@@ -182,7 +215,15 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
{
assert(self);
assert(self->klass->open);
- return self->klass->open(self, host, port);
+ return self->klass->open(self, host, port, NULL);
+}
+
+int
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout)
+{
+ assert(self);
+ assert(self->klass->open);
+ return self->klass->open(self, host, port, timeout);
}
int
@@ -210,8 +251,9 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-int amqp_open_socket(char const *hostname,
- int portnumber)
+int amqp_open_socket_noblock(char const *hostname,
+ int portnumber,
+ struct timeval *timeout)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -220,6 +262,15 @@ int amqp_open_socket(char const *hostname,
int sockfd = -1;
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
+ int res;
+ int timer_error;
+ amqp_timer_t timer;
+
+ AMQP_INIT_TIMER(timer)
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -240,31 +291,147 @@ int amqp_open_socket(char const *hostname,
}
for (addr = address_list; addr; addr = addr->ai_next) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ sockfd = -1;
+ }
+
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+
if (-1 == sockfd) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
+
#ifdef SO_NOSIGPIPE
if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
}
#endif /* SO_NOSIGPIPE */
- if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+
+ if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
+ }
+
+ if (timeout) {
+ /* Trying to connect with timeout, set socket to non-blocking mode */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
+
+ if (0 == res) {
+ /* Connected immediately, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
+
+#ifdef _WIN32
+ if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
+#else
+ if (EINPROGRESS == amqp_os_socket_error()) {
+#endif
+
+ while(1) {
+ fd_set write_fd;
+ fd_set except_fd;
+
+ FD_ZERO(&write_fd);
+ FD_SET(sockfd, &write_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(sockfd, &except_fd);
+
+ timer_error = amqp_timer_update(&timer, timeout);
+
+ if (timer_error < 0) {
+ last_error = timer_error;
+ break;
+ }
+
+ /* Win32 requires except_fds to be passed to detect connection
+ * failure. Other platforms only need write_fds, passing except_fds
+ * seems to be harmless otherwise
+ */
+ res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+
+ if (res > 0) {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ if (result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ /* socket is ready to be written to, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ last_error = AMQP_STATUS_TIMEOUT;
+ break;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+ } /* end while(1) loop */
+
+ if (last_error == AMQP_STATUS_OK
+ || last_error == AMQP_STATUS_TIMEOUT
+ || last_error == AMQP_STATUS_TIMER_FAILURE) {
+ /* Exit for loop on timer errors or when connection established */
+ break;
+ }
+
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+
+ }
+
} else {
- last_error = AMQP_STATUS_OK;
- break;
+ /* Connect in blocking mode */
+ if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ } else {
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
}
}
freeaddrinfo(address_list);
if (last_error != AMQP_STATUS_OK) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ }
+
return last_error;
}
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index fed3947..b0a1805 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -46,7 +46,7 @@ amqp_os_socket_close(int sockfd);
typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int);
typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t);
typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int);
-typedef int (*amqp_socket_open_fn)(void *, const char *, int);
+typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *);
typedef int (*amqp_socket_close_fn)(void *);
typedef int (*amqp_socket_get_sockfd_fn)(void *);
typedef void (*amqp_socket_delete_fn)(void *);
@@ -164,6 +164,24 @@ amqp_socket_close(amqp_socket_t *self);
void
amqp_socket_delete(amqp_socket_t *self);
+/**
+ * Open a socket connection.
+ *
+ * This function opens a socket connection returned from amqp_tcp_socket_new()
+ * or amqp_ssl_socket_new(). This function should be called after setting
+ * socket options and prior to assigning the socket to an AMQP connection with
+ * amqp_set_socket().
+ *
+ * \param [in] host Connect to this host.
+ * \param [in] port Connect on this remote port.
+ * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode
+ *
+ * \return File descriptor upon success, non-zero negative error code otherwise.
+ */
+int
+amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout);
+
+
AMQP_END_DECLS
#endif /* AMQP_SOCKET_H */
diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c
index 6ab71ef..9a9fc69 100644
--- a/librabbitmq/amqp_tcp_socket.c
+++ b/librabbitmq/amqp_tcp_socket.c
@@ -220,10 +220,10 @@ start:
}
static int
-amqp_tcp_socket_open(void *base, const char *host, int port)
+amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout)
{
struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base;
- self->sockfd = amqp_open_socket(host, port);
+ self->sockfd = amqp_open_socket_noblock(host, port, timeout);
if (0 > self->sockfd) {
int err = self->sockfd;
self->sockfd = -1;
diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c
index 7066358..f89d43d 100644
--- a/librabbitmq/amqp_timer.c
+++ b/librabbitmq/amqp_timer.c
@@ -20,7 +20,9 @@
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
+#include "amqp.h"
#include "amqp_timer.h"
+#include <string.h>
#if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32))
# define AMQP_WIN_TIMER_API
@@ -96,3 +98,38 @@ amqp_get_monotonic_timestamp(void)
return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec);
}
#endif /* AMQP_POSIX_TIMER_API */
+
+int
+amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout)
+{
+ if (0 == timer->current_timestamp) {
+ timer->current_timestamp = amqp_get_monotonic_timestamp();
+
+ if (0 == timer->current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+
+ timer->timeout_timestamp = timer->current_timestamp +
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
+
+ } else {
+ timer->current_timestamp = amqp_get_monotonic_timestamp();
+
+ if (0 == timer->current_timestamp) {
+ return AMQP_STATUS_TIMER_FAILURE;
+ }
+ }
+
+ if (timer->current_timestamp > timer->timeout_timestamp) {
+ return AMQP_STATUS_TIMEOUT;
+ }
+
+ timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp;
+
+ memset(&timer->tv, 0, sizeof(struct timeval));
+ timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S;
+ timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US;
+
+ return AMQP_STATUS_OK;
+}
diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h
index d1718af..946096d 100644
--- a/librabbitmq/amqp_timer.h
+++ b/librabbitmq/amqp_timer.h
@@ -25,12 +25,37 @@
#include <stdint.h>
+#ifdef _WIN32
+# ifndef WIN32_LEAN_AND_MEAN
+# define WIN32_LEAN_AND_MEAN
+# endif
+# include <Winsock2.h>
+#else
+# include <sys/time.h>
+#endif
+
#define AMQP_NS_PER_S 1000000000
#define AMQP_NS_PER_US 1000
+#define AMQP_INIT_TIMER(structure) { \
+ structure.current_timestamp = 0; \
+ structure.timeout_timestamp = 0; \
+}
+
+typedef struct amqp_timer_t_ {
+ uint64_t current_timestamp;
+ uint64_t timeout_timestamp;
+ uint64_t ns_until_next_timeout;
+ struct timeval tv;
+} amqp_timer_t;
+
/* Gets a monotonic timestamp in ns */
uint64_t
amqp_get_monotonic_timestamp(void);
+/* Prepare timeout value and modify timer state based on timer state. */
+int
+amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout);
+
#endif /* AMQP_TIMER_H */