From 6ad770dc62f76fa0625d277b521a120b549d9fc2 Mon Sep 17 00:00:00 2001 From: zaq178miami Date: Sun, 23 Jun 2013 19:36:10 +0300 Subject: Add nonblocking connect support --- Makefile.am | 12 +++ examples/CMakeLists.txt | 6 ++ examples/amqp_connect_timeout.c | 110 +++++++++++++++++++++++ examples/amqps_connect_timeout.c | 132 ++++++++++++++++++++++++++++ librabbitmq/amqp.h | 20 +++++ librabbitmq/amqp_cyassl.c | 4 +- librabbitmq/amqp_gnutls.c | 4 +- librabbitmq/amqp_openssl.c | 4 +- librabbitmq/amqp_polarssl.c | 10 ++- librabbitmq/amqp_socket.c | 185 +++++++++++++++++++++++++++++++++++++-- librabbitmq/amqp_socket.h | 20 ++++- librabbitmq/amqp_tcp_socket.c | 4 +- librabbitmq/amqp_timer.c | 37 ++++++++ librabbitmq/amqp_timer.h | 25 ++++++ 14 files changed, 554 insertions(+), 19 deletions(-) create mode 100644 examples/amqp_connect_timeout.c create mode 100644 examples/amqps_connect_timeout.c diff --git a/Makefile.am b/Makefile.am index da55823..989f5e9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -154,6 +154,7 @@ endif noinst_PROGRAMS = \ examples/amqp_bind \ examples/amqp_consumer \ + examples/amqp_connect_timeout \ examples/amqp_exchange_declare \ examples/amqp_listen \ examples/amqp_listenq \ @@ -187,6 +188,11 @@ examples_amqp_consumer_LDADD = \ examples/libutils.la \ librabbitmq/librabbitmq.la +examples_amqp_connect_timeout_SOURCES = examples/amqp_connect_timeout.c +examples_amqp_connect_timeout_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + examples_amqp_unbind_SOURCES = examples/amqp_unbind.c examples_amqp_unbind_LDADD = \ examples/libutils.la \ @@ -211,6 +217,7 @@ examples_amqp_rpc_sendstring_client_LDADD = \ if SSL noinst_PROGRAMS += \ examples/amqps_bind \ + examples/amqps_connect_timeout \ examples/amqps_consumer \ examples/amqps_exchange_declare \ examples/amqps_listen \ @@ -224,6 +231,11 @@ examples_amqps_bind_LDADD = \ examples/libutils.la \ librabbitmq/librabbitmq.la +examples_amqps_connect_timeout_SOURCES = examples/amqps_connect_timeout.c +examples_amqps_connect_timeout_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + examples_amqps_consumer_SOURCES = examples/amqps_consumer.c examples_amqps_consumer_LDADD = \ examples/libutils.la \ diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index a184cf6..8dcdcf4 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -28,6 +28,9 @@ target_link_libraries(amqp_listen ${RMQ_LIBRARY_TARGET}) add_executable(amqp_producer amqp_producer.c ${COMMON_SRCS}) target_link_libraries(amqp_producer ${RMQ_LIBRARY_TARGET}) +add_executable(amqp_connect_timeout amqp_connect_timeout.c ${COMMON_SRCS}) +target_link_libraries(amqp_connect_timeout ${RMQ_LIBRARY_TARGET}) + add_executable(amqp_consumer amqp_consumer.c ${COMMON_SRCS}) target_link_libraries(amqp_consumer ${RMQ_LIBRARY_TARGET}) @@ -41,6 +44,9 @@ add_executable(amqp_listenq amqp_listenq.c ${COMMON_SRCS}) target_link_libraries(amqp_listenq ${RMQ_LIBRARY_TARGET}) if (ENABLE_SSL_SUPPORT) +add_executable(amqps_connect_timeout amqps_connect_timeout.c ${COMMON_SRCS}) +target_link_libraries(amqps_connect_timeout ${RMQ_LIBRARY_TARGET}) + add_executable(amqps_sendstring amqps_sendstring.c ${COMMON_SRCS}) target_link_libraries(amqps_sendstring ${RMQ_LIBRARY_TARGET}) diff --git a/examples/amqp_connect_timeout.c b/examples/amqp_connect_timeout.c new file mode 100644 index 0000000..c2bd5ec --- /dev/null +++ b/examples/amqp_connect_timeout.c @@ -0,0 +1,110 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Bogdan Padalko are Copyright (c) 2013. + * Bogdan Padalko. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include +#include + +#include + +#ifdef _WIN32 +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include +#else +# include +#endif + +#include "utils.h" + +int main(int argc, char const *const *argv) +{ + char const *hostname; + int port; + amqp_socket_t *socket; + amqp_connection_state_t conn; + struct timeval *tv; + + if (argc < 3) { + fprintf(stderr, "Usage: amqp_connect_timeout host port [timeout_sec [timeout_usec=0]]\n"); + return 1; + } + + if (argc > 3) { + tv = malloc(sizeof(struct timeval)); + + tv->tv_sec = atoi(argv[3]); + + if (argc > 4 ) { + tv->tv_usec = atoi(argv[4]); + } else { + tv->tv_usec = 0; + } + + } else { + tv = NULL; + } + + + hostname = argv[1]; + port = atoi(argv[2]); + + conn = amqp_new_connection(); + + socket = amqp_tcp_socket_new(conn); + + if (!socket) { + die("creating TCP socket"); + } + + die_on_error(amqp_socket_open_noblock(socket, hostname, port, tv), "opening TCP socket"); + + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in"); + + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + printf ("Done\n"); + return 0; +} diff --git a/examples/amqps_connect_timeout.c b/examples/amqps_connect_timeout.c new file mode 100644 index 0000000..89c00ac --- /dev/null +++ b/examples/amqps_connect_timeout.c @@ -0,0 +1,132 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2012-2013 + * Alan Antonuk. All Rights Reserved. + * + * Portions created by Mike Steinert are Copyright (c) 2012-2013 + * Mike Steinert. All Rights Reserved. + * + * Portions created by Bogdan Padalko are Copyright (c) 2013. + * Bogdan Padalko. All Rights Reserved. + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include +#include +#include + +#include +#include + +#include + +#ifdef _WIN32 +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include +#else +# include +#endif + +#include "utils.h" + + +int main(int argc, char const *const *argv) +{ + char const nofile[2] = "-"; + char const *hostname; + int port, status; + amqp_socket_t *socket; + amqp_connection_state_t conn; + struct timeval *tv; + + if (argc < 3) { + fprintf(stderr, "Usage: amqps_connect_timeout host port " + "[cacert.pem [key.pem cert.pem [timeout_sec [timeout_usec=0]]]]\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + + if (argc > 6) { + tv = malloc(sizeof(struct timeval)); + + tv->tv_sec = atoi(argv[6]); + + if (argc > 7 ) { + tv->tv_usec = atoi(argv[7]); + } else { + tv->tv_usec = 0; + } + + } else { + tv = NULL; + } + + conn = amqp_new_connection(); + + socket = amqp_ssl_socket_new(conn); + if (!socket) { + die("creating SSL/TLS socket"); + } + + if (argc > 3 && strcmp(nofile, argv[3])) { + die_on_error(amqp_ssl_socket_set_cacert(socket, argv[3]), "setting CA certificate"); + } + + if (argc > 5) { + if (!strcmp(nofile, argv[5]) && !strcmp(nofile, argv[4])) { + status = 0; + } else if (!strcmp(nofile, argv[5]) || !strcmp(nofile, argv[4])) { + status = -1; + } else { + status = amqp_ssl_socket_set_key(socket, argv[5], argv[4]); + } + + if (status) { + die("setting client key"); + } + } + + die_on_error(amqp_socket_open_noblock(socket, hostname, port, tv), "opening SSL/TLS connection"); + + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + printf ("Done\n"); + return 0; +} diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index de6eda3..6adabe0 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -665,6 +665,26 @@ int AMQP_CALL amqp_socket_open(amqp_socket_t *self, const char *host, int port); +/** + * Open a socket connection. + * + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). + * + * \param [in,out] self A socket object. + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode + * + * \return Zero upon success, non-zero otherwise. + */ +AMQP_PUBLIC_FUNCTION +int +AMQP_CALL +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout); + /** * Get the socket descriptor in use by a socket object. * diff --git a/librabbitmq/amqp_cyassl.c b/librabbitmq/amqp_cyassl.c index 89047d9..05ce12e 100644 --- a/librabbitmq/amqp_cyassl.c +++ b/librabbitmq/amqp_cyassl.c @@ -155,7 +155,7 @@ amqp_ssl_error_string(AMQP_UNUSED int err) } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; int status; @@ -167,7 +167,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) return -1; } - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { self->last_error = - self->sockfd; return -1; diff --git a/librabbitmq/amqp_gnutls.c b/librabbitmq/amqp_gnutls.c index 734643c..f18d427 100644 --- a/librabbitmq/amqp_gnutls.c +++ b/librabbitmq/amqp_gnutls.c @@ -119,7 +119,7 @@ amqp_ssl_socket_recv(void *base, } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; int status; @@ -132,7 +132,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) return -1; } - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { self->last_error = -self->sockfd; return -1; diff --git a/librabbitmq/amqp_openssl.c b/librabbitmq/amqp_openssl.c index 2bd4fda..f70d377 100644 --- a/librabbitmq/amqp_openssl.c +++ b/librabbitmq/amqp_openssl.c @@ -232,7 +232,7 @@ error: } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; long result; @@ -247,7 +247,7 @@ amqp_ssl_socket_open(void *base, const char *host, int port) } SSL_set_mode(self->ssl, SSL_MODE_AUTO_RETRY); - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { status = self->sockfd; self->internal_error = amqp_os_socket_error(); diff --git a/librabbitmq/amqp_polarssl.c b/librabbitmq/amqp_polarssl.c index 770fdbe..bae3141 100644 --- a/librabbitmq/amqp_polarssl.c +++ b/librabbitmq/amqp_polarssl.c @@ -128,12 +128,20 @@ amqp_ssl_socket_recv(void *base, } static int -amqp_ssl_socket_open(void *base, const char *host, int port) +amqp_ssl_socket_open(void *base, const char *host, int port, struct timeval *timeout) { int status; struct amqp_ssl_socket_t *self = (struct amqp_ssl_socket_t *)base; self->last_error = 0; + if (timeout && (timeout->tv_sec != 0 || timeout->tv_usec != 0)) { + /* We don't support PolarSSL for now because it uses its own connect() wrapper + * It is not too hard to implement net_connect() with noblock support, + * but then we will have to maintain that piece of code and keep it synced with main PolarSSL code base + */ + return AMQP_STATUS_INVALID_PARAMETER; + } + status = net_connect(&self->sockfd, host, port); if (status) { /* This isn't quite right. We should probably translate between diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index fa37201..d373a72 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname, #endif } +static int +amqp_os_socket_setsockblock(int sock, int block) +{ + +#ifdef _WIN32 + int nonblock = !block; + if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) { + return AMQP_STATUS_SOCKET_ERROR; + } else { + return AMQP_STATUS_OK; + } +#else + long arg; + + if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + if (block) { + arg &= (~O_NONBLOCK); + } else { + arg |= O_NONBLOCK; + } + + if (fcntl(sock, F_SETFL, arg) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + return AMQP_STATUS_OK; +#endif +} + + int amqp_os_socket_error(void) { @@ -182,7 +215,15 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) { assert(self); assert(self->klass->open); - return self->klass->open(self, host, port); + return self->klass->open(self, host, port, NULL); +} + +int +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout) +{ + assert(self); + assert(self->klass->open); + return self->klass->open(self, host, port, timeout); } int @@ -210,8 +251,9 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -int amqp_open_socket(char const *hostname, - int portnumber) +int amqp_open_socket_noblock(char const *hostname, + int portnumber, + struct timeval *timeout) { struct addrinfo hint; struct addrinfo *address_list; @@ -220,6 +262,15 @@ int amqp_open_socket(char const *hostname, int sockfd = -1; int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ + int res; + int timer_error; + amqp_timer_t timer; + + AMQP_INIT_TIMER(timer) + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -240,31 +291,147 @@ int amqp_open_socket(char const *hostname, } for (addr = address_list; addr; addr = addr->ai_next) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + sockfd = -1; + } + sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (-1 == sockfd) { last_error = AMQP_STATUS_SOCKET_ERROR; continue; } + #ifdef SO_NOSIGPIPE if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; } #endif /* SO_NOSIGPIPE */ - if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + + if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; + } + + if (timeout) { + /* Trying to connect with timeout, set socket to non-blocking mode */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); + + if (0 == res) { + /* Connected immediately, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } + +#ifdef _WIN32 + if (WSAEWOULDBLOCK == amqp_os_socket_error()) { +#else + if (EINPROGRESS == amqp_os_socket_error()) { +#endif + + while(1) { + fd_set write_fd; + fd_set except_fd; + + FD_ZERO(&write_fd); + FD_SET(sockfd, &write_fd); + + FD_ZERO(&except_fd); + FD_SET(sockfd, &except_fd); + + timer_error = amqp_timer_update(&timer, timeout); + + if (timer_error < 0) { + last_error = timer_error; + break; + } + + /* Win32 requires except_fds to be passed to detect connection + * failure. Other platforms only need write_fds, passing except_fds + * seems to be harmless otherwise + */ + res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + + if (res > 0) { + int result; + socklen_t result_len = sizeof(result); + + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + if (result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + /* socket is ready to be written to, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } else if (0 == res) { + /* Timed out - return */ + last_error = AMQP_STATUS_TIMEOUT; + break; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + } /* end while(1) loop */ + + if (last_error == AMQP_STATUS_OK + || last_error == AMQP_STATUS_TIMEOUT + || last_error == AMQP_STATUS_TIMER_FAILURE) { + /* Exit for loop on timer errors or when connection established */ + break; + } + + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + + } + } else { - last_error = AMQP_STATUS_OK; - break; + /* Connect in blocking mode */ + if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } else { + last_error = AMQP_STATUS_OK; + break; + } } } freeaddrinfo(address_list); if (last_error != AMQP_STATUS_OK) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + } + return last_error; } diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index fed3947..b0a1805 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -46,7 +46,7 @@ amqp_os_socket_close(int sockfd); typedef ssize_t (*amqp_socket_writev_fn)(void *, struct iovec *, int); typedef ssize_t (*amqp_socket_send_fn)(void *, const void *, size_t); typedef ssize_t (*amqp_socket_recv_fn)(void *, void *, size_t, int); -typedef int (*amqp_socket_open_fn)(void *, const char *, int); +typedef int (*amqp_socket_open_fn)(void *, const char *, int, struct timeval *); typedef int (*amqp_socket_close_fn)(void *); typedef int (*amqp_socket_get_sockfd_fn)(void *); typedef void (*amqp_socket_delete_fn)(void *); @@ -164,6 +164,24 @@ amqp_socket_close(amqp_socket_t *self); void amqp_socket_delete(amqp_socket_t *self); +/** + * Open a socket connection. + * + * This function opens a socket connection returned from amqp_tcp_socket_new() + * or amqp_ssl_socket_new(). This function should be called after setting + * socket options and prior to assigning the socket to an AMQP connection with + * amqp_set_socket(). + * + * \param [in] host Connect to this host. + * \param [in] port Connect on this remote port. + * \param [in] timeout Max allowed time to spent on opening. If NULL - run in blocking mode + * + * \return File descriptor upon success, non-zero negative error code otherwise. + */ +int +amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout); + + AMQP_END_DECLS #endif /* AMQP_SOCKET_H */ diff --git a/librabbitmq/amqp_tcp_socket.c b/librabbitmq/amqp_tcp_socket.c index 6ab71ef..9a9fc69 100644 --- a/librabbitmq/amqp_tcp_socket.c +++ b/librabbitmq/amqp_tcp_socket.c @@ -220,10 +220,10 @@ start: } static int -amqp_tcp_socket_open(void *base, const char *host, int port) +amqp_tcp_socket_open(void *base, const char *host, int port, struct timeval *timeout) { struct amqp_tcp_socket_t *self = (struct amqp_tcp_socket_t *)base; - self->sockfd = amqp_open_socket(host, port); + self->sockfd = amqp_open_socket_noblock(host, port, timeout); if (0 > self->sockfd) { int err = self->sockfd; self->sockfd = -1; diff --git a/librabbitmq/amqp_timer.c b/librabbitmq/amqp_timer.c index 7066358..f89d43d 100644 --- a/librabbitmq/amqp_timer.c +++ b/librabbitmq/amqp_timer.c @@ -20,7 +20,9 @@ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * DEALINGS IN THE SOFTWARE. */ +#include "amqp.h" #include "amqp_timer.h" +#include #if (defined(_WIN32) || defined(__WIN32__) || defined(WIN32)) # define AMQP_WIN_TIMER_API @@ -96,3 +98,38 @@ amqp_get_monotonic_timestamp(void) return ((uint64_t)tp.tv_sec * AMQP_NS_PER_S + (uint64_t)tp.tv_nsec); } #endif /* AMQP_POSIX_TIMER_API */ + +int +amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout) +{ + if (0 == timer->current_timestamp) { + timer->current_timestamp = amqp_get_monotonic_timestamp(); + + if (0 == timer->current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + + timer->timeout_timestamp = timer->current_timestamp + + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; + + } else { + timer->current_timestamp = amqp_get_monotonic_timestamp(); + + if (0 == timer->current_timestamp) { + return AMQP_STATUS_TIMER_FAILURE; + } + } + + if (timer->current_timestamp > timer->timeout_timestamp) { + return AMQP_STATUS_TIMEOUT; + } + + timer->ns_until_next_timeout = timer->timeout_timestamp - timer->current_timestamp; + + memset(&timer->tv, 0, sizeof(struct timeval)); + timer->tv.tv_sec = timer->ns_until_next_timeout / AMQP_NS_PER_S; + timer->tv.tv_usec = (timer->ns_until_next_timeout % AMQP_NS_PER_S) / AMQP_NS_PER_US; + + return AMQP_STATUS_OK; +} diff --git a/librabbitmq/amqp_timer.h b/librabbitmq/amqp_timer.h index d1718af..946096d 100644 --- a/librabbitmq/amqp_timer.h +++ b/librabbitmq/amqp_timer.h @@ -25,12 +25,37 @@ #include +#ifdef _WIN32 +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include +#else +# include +#endif + #define AMQP_NS_PER_S 1000000000 #define AMQP_NS_PER_US 1000 +#define AMQP_INIT_TIMER(structure) { \ + structure.current_timestamp = 0; \ + structure.timeout_timestamp = 0; \ +} + +typedef struct amqp_timer_t_ { + uint64_t current_timestamp; + uint64_t timeout_timestamp; + uint64_t ns_until_next_timeout; + struct timeval tv; +} amqp_timer_t; + /* Gets a monotonic timestamp in ns */ uint64_t amqp_get_monotonic_timestamp(void); +/* Prepare timeout value and modify timer state based on timer state. */ +int +amqp_timer_update(amqp_timer_t *timer, struct timeval *timeout); + #endif /* AMQP_TIMER_H */ -- cgit v1.2.1