summaryrefslogtreecommitdiff
path: root/librabbitmq/amqp_socket.c
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-04-14 17:29:03 +0100
committerAsk Solem <ask@celeryproject.org>2014-04-14 17:29:03 +0100
commitbe3000b4c84d7503f5ef4067de44ff16d060d158 (patch)
treefecacb0f149b067202c443b59aad3cc027a0ff1c /librabbitmq/amqp_socket.c
parentdcb8edaccd6e164d624edfab0f3120d96f707f0a (diff)
parentfe844e41ffad5691607982cbfe4054aacdcb81e0 (diff)
downloadrabbitmq-c-github-ask-be3000b4c84d7503f5ef4067de44ff16d060d158.tar.gz
Merge branch 'alanxz/master'
Conflicts: Makefile.am codegen
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r--librabbitmq/amqp_socket.c333
1 files changed, 306 insertions, 27 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 441192a..79a7696 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname,
#endif
}
+static int
+amqp_os_socket_setsockblock(int sock, int block)
+{
+
+#ifdef _WIN32
+ int nonblock = !block;
+ if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ } else {
+ return AMQP_STATUS_OK;
+ }
+#else
+ long arg;
+
+ if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ if (block) {
+ arg &= (~O_NONBLOCK);
+ } else {
+ arg |= O_NONBLOCK;
+ }
+
+ if (fcntl(sock, F_SETFL, arg) < 0) {
+ return AMQP_STATUS_SOCKET_ERROR;
+ }
+
+ return AMQP_STATUS_OK;
+#endif
+}
+
+
int
amqp_os_socket_error(void)
{
@@ -182,25 +215,32 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port)
{
assert(self);
assert(self->klass->open);
- return self->klass->open(self, host, port);
+ return self->klass->open(self, host, port, NULL);
}
int
-amqp_socket_close(amqp_socket_t *self)
+amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout)
{
- if (self) {
- assert(self->klass->close);
- return self->klass->close(self);
- }
- return AMQP_STATUS_OK;
+ assert(self);
+ assert(self->klass->open);
+ return self->klass->open(self, host, port, timeout);
}
int
-amqp_socket_error(amqp_socket_t *self)
+amqp_socket_close(amqp_socket_t *self)
{
assert(self);
- assert(self->klass->error);
- return self->klass->error(self);
+ assert(self->klass->close);
+ return self->klass->close(self);
+}
+
+void
+amqp_socket_delete(amqp_socket_t *self)
+{
+ if (self) {
+ assert(self->klass->delete);
+ self->klass->delete(self);
+ }
}
int
@@ -211,8 +251,16 @@ amqp_socket_get_sockfd(amqp_socket_t *self)
return self->klass->get_sockfd(self);
}
-int amqp_open_socket(char const *hostname,
- int portnumber)
+int
+amqp_open_socket(char const *hostname,
+ int portnumber)
+{
+ return amqp_open_socket_noblock(hostname, portnumber, NULL);
+}
+
+int amqp_open_socket_noblock(char const *hostname,
+ int portnumber,
+ struct timeval *timeout)
{
struct addrinfo hint;
struct addrinfo *address_list;
@@ -221,6 +269,15 @@ int amqp_open_socket(char const *hostname,
int sockfd = -1;
int last_error = AMQP_STATUS_OK;
int one = 1; /* for setsockopt */
+ int res;
+ int timer_error;
+ amqp_timer_t timer;
+
+ AMQP_INIT_TIMER(timer)
+
+ if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) {
+ return AMQP_STATUS_INVALID_PARAMETER;
+ }
last_error = amqp_os_socket_init();
if (AMQP_STATUS_OK != last_error) {
@@ -241,31 +298,147 @@ int amqp_open_socket(char const *hostname,
}
for (addr = address_list; addr; addr = addr->ai_next) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ sockfd = -1;
+ }
+
sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
+
if (-1 == sockfd) {
last_error = AMQP_STATUS_SOCKET_ERROR;
continue;
}
+
#ifdef SO_NOSIGPIPE
if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
}
#endif /* SO_NOSIGPIPE */
- if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))
- || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+
+ if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) {
last_error = AMQP_STATUS_SOCKET_ERROR;
- amqp_os_socket_close(sockfd);
continue;
+ }
+
+ if (timeout) {
+ /* Trying to connect with timeout, set socket to non-blocking mode */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ res = connect(sockfd, addr->ai_addr, addr->ai_addrlen);
+
+ if (0 == res) {
+ /* Connected immediately, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
+
+#ifdef _WIN32
+ if (WSAEWOULDBLOCK == amqp_os_socket_error()) {
+#else
+ if (EINPROGRESS == amqp_os_socket_error()) {
+#endif
+
+ while(1) {
+ fd_set write_fd;
+ fd_set except_fd;
+
+ FD_ZERO(&write_fd);
+ FD_SET(sockfd, &write_fd);
+
+ FD_ZERO(&except_fd);
+ FD_SET(sockfd, &except_fd);
+
+ timer_error = amqp_timer_update(&timer, timeout);
+
+ if (timer_error < 0) {
+ last_error = timer_error;
+ break;
+ }
+
+ /* Win32 requires except_fds to be passed to detect connection
+ * failure. Other platforms only need write_fds, passing except_fds
+ * seems to be harmless otherwise
+ */
+ res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv);
+
+ if (res > 0) {
+ int result;
+ socklen_t result_len = sizeof(result);
+
+ if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ if (result != 0) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+
+ /* socket is ready to be written to, set to blocking mode again */
+ if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ }
+
+ last_error = AMQP_STATUS_OK;
+ break;
+ } else if (0 == res) {
+ /* Timed out - return */
+ last_error = AMQP_STATUS_TIMEOUT;
+ break;
+ } else if (errno == EINTR) {
+ /* Try again */
+ continue;
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+ }
+ } /* end while(1) loop */
+
+ if (last_error == AMQP_STATUS_OK
+ || last_error == AMQP_STATUS_TIMEOUT
+ || last_error == AMQP_STATUS_TIMER_FAILURE) {
+ /* Exit for loop on timer errors or when connection established */
+ break;
+ }
+
+ } else {
+ /* Error connecting */
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ break;
+
+ }
+
} else {
- last_error = AMQP_STATUS_OK;
- break;
+ /* Connect in blocking mode */
+ if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) {
+ last_error = AMQP_STATUS_SOCKET_ERROR;
+ continue;
+ } else {
+ last_error = AMQP_STATUS_OK;
+ break;
+ }
}
}
freeaddrinfo(address_list);
if (last_error != AMQP_STATUS_OK) {
+ if (-1 != sockfd) {
+ amqp_os_socket_close(sockfd);
+ }
+
return last_error;
}
@@ -404,8 +577,9 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru
if (0 == current_timestamp) {
return AMQP_STATUS_TIMER_FAILURE;
}
- end_timestamp = start + timeout->tv_sec * AMQP_NS_PER_S +
- timeout->tv_usec * AMQP_NS_PER_US;
+ end_timestamp = start +
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
if (current_timestamp > end_timestamp) {
return AMQP_STATUS_TIMEOUT;
}
@@ -526,9 +700,6 @@ static int wait_frame_inner(amqp_connection_state_t state,
/* Complete frame was read. Return it. */
return AMQP_STATUS_OK;
}
-
- /* Incomplete or ignored frame. Keep processing input. */
- assert(res != 0);
}
beginrecv:
@@ -559,8 +730,8 @@ beginrecv:
if (timeout) {
if (0 == timeout_timestamp) {
timeout_timestamp = current_timestamp +
- timeout->tv_sec * AMQP_NS_PER_S +
- timeout->tv_usec * AMQP_NS_PER_US;
+ (uint64_t)timeout->tv_sec * AMQP_NS_PER_S +
+ (uint64_t)timeout->tv_usec * AMQP_NS_PER_US;
}
if (current_timestamp > timeout_timestamp) {
@@ -600,7 +771,6 @@ beginrecv:
if (AMQP_STATUS_TIMEOUT == res) {
if (next_timestamp == state->next_recv_heartbeat) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_HEARTBEAT_TIMEOUT;
} else if (next_timestamp == timeout_timestamp) {
return AMQP_STATUS_TIMEOUT;
@@ -616,6 +786,109 @@ beginrecv:
}
}
+static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link;
+ amqp_frame_t *frame_copy;
+
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel);
+
+ if (NULL == channel_pool) {
+ return NULL;
+ }
+
+ link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
+ frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
+
+ if (NULL == link || NULL == frame_copy) {
+ return NULL;
+ }
+
+ *frame_copy = *frame;
+ link->data = frame_copy;
+
+ return link;
+}
+
+int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link = amqp_create_link_for_frame(state, frame);
+ if (NULL == link) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ } else {
+ state->last_queued_frame->next = link;
+ }
+
+ link->next = NULL;
+ state->last_queued_frame = link;
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link = amqp_create_link_for_frame(state, frame);
+ if (NULL == link) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ state->last_queued_frame = link;
+ link->next = NULL;
+ } else {
+ link->next = state->first_queued_frame;
+ state->first_queued_frame = link;
+ }
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_frame_t *decoded_frame)
+{
+ amqp_frame_t *frame_ptr;
+ amqp_link_t *cur;
+ int res;
+
+ for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
+ frame_ptr = cur->data;
+
+ if (channel == frame_ptr->channel) {
+ state->first_queued_frame = cur->next;
+ if (NULL == state->first_queued_frame) {
+ state->last_queued_frame = NULL;
+ }
+
+ *decoded_frame = *frame_ptr;
+
+ return AMQP_STATUS_OK;
+ }
+ }
+
+ while (1) {
+ res = wait_frame_inner(state, decoded_frame, NULL);
+
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ if (channel == decoded_frame->channel) {
+ return AMQP_STATUS_OK;
+ } else {
+ res = amqp_queue_frame(state, decoded_frame);
+ if (res != AMQP_STATUS_OK) {
+ return res;
+ }
+ }
+ }
+}
+
int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
@@ -654,7 +927,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state,
|| frame.frame_type != AMQP_FRAME_METHOD
|| frame.payload.method.id != expected_method) {
amqp_socket_close(state->socket);
- state->socket = NULL;
return AMQP_STATUS_WRONG_METHOD;
}
*output = frame.payload.method;
@@ -859,6 +1131,13 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
goto error_res;
}
+ res = amqp_table_clone(&s->server_properties, &state->server_properties,
+ &state->properties_pool);
+
+ if (AMQP_STATUS_OK != res) {
+ goto error_res;
+ }
+
/* TODO: check that our chosen SASL mechanism is in the list of
acceptable mechanisms. Or even let the application choose from
the list! */