diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 333 |
1 files changed, 306 insertions, 27 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 441192a..79a7696 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -133,6 +133,39 @@ amqp_os_socket_setsockopt(int sock, int level, int optname, #endif } +static int +amqp_os_socket_setsockblock(int sock, int block) +{ + +#ifdef _WIN32 + int nonblock = !block; + if (NO_ERROR != ioctlsocket(sock, FIONBIO, &nonblock)) { + return AMQP_STATUS_SOCKET_ERROR; + } else { + return AMQP_STATUS_OK; + } +#else + long arg; + + if ((arg = fcntl(sock, F_GETFL, NULL)) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + if (block) { + arg &= (~O_NONBLOCK); + } else { + arg |= O_NONBLOCK; + } + + if (fcntl(sock, F_SETFL, arg) < 0) { + return AMQP_STATUS_SOCKET_ERROR; + } + + return AMQP_STATUS_OK; +#endif +} + + int amqp_os_socket_error(void) { @@ -182,25 +215,32 @@ amqp_socket_open(amqp_socket_t *self, const char *host, int port) { assert(self); assert(self->klass->open); - return self->klass->open(self, host, port); + return self->klass->open(self, host, port, NULL); } int -amqp_socket_close(amqp_socket_t *self) +amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, struct timeval *timeout) { - if (self) { - assert(self->klass->close); - return self->klass->close(self); - } - return AMQP_STATUS_OK; + assert(self); + assert(self->klass->open); + return self->klass->open(self, host, port, timeout); } int -amqp_socket_error(amqp_socket_t *self) +amqp_socket_close(amqp_socket_t *self) { assert(self); - assert(self->klass->error); - return self->klass->error(self); + assert(self->klass->close); + return self->klass->close(self); +} + +void +amqp_socket_delete(amqp_socket_t *self) +{ + if (self) { + assert(self->klass->delete); + self->klass->delete(self); + } } int @@ -211,8 +251,16 @@ amqp_socket_get_sockfd(amqp_socket_t *self) return self->klass->get_sockfd(self); } -int amqp_open_socket(char const *hostname, - int portnumber) +int +amqp_open_socket(char const *hostname, + int portnumber) +{ + return amqp_open_socket_noblock(hostname, portnumber, NULL); +} + +int amqp_open_socket_noblock(char const *hostname, + int portnumber, + struct timeval *timeout) { struct addrinfo hint; struct addrinfo *address_list; @@ -221,6 +269,15 @@ int amqp_open_socket(char const *hostname, int sockfd = -1; int last_error = AMQP_STATUS_OK; int one = 1; /* for setsockopt */ + int res; + int timer_error; + amqp_timer_t timer; + + AMQP_INIT_TIMER(timer) + + if (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0)) { + return AMQP_STATUS_INVALID_PARAMETER; + } last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { @@ -241,31 +298,147 @@ int amqp_open_socket(char const *hostname, } for (addr = address_list; addr; addr = addr->ai_next) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + sockfd = -1; + } + sockfd = amqp_os_socket_socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (-1 == sockfd) { last_error = AMQP_STATUS_SOCKET_ERROR; continue; } + #ifdef SO_NOSIGPIPE if (0 != amqp_os_socket_setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; } #endif /* SO_NOSIGPIPE */ - if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one)) - || 0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + + if (0 != amqp_os_socket_setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; - amqp_os_socket_close(sockfd); continue; + } + + if (timeout) { + /* Trying to connect with timeout, set socket to non-blocking mode */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 0)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + res = connect(sockfd, addr->ai_addr, addr->ai_addrlen); + + if (0 == res) { + /* Connected immediately, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } + +#ifdef _WIN32 + if (WSAEWOULDBLOCK == amqp_os_socket_error()) { +#else + if (EINPROGRESS == amqp_os_socket_error()) { +#endif + + while(1) { + fd_set write_fd; + fd_set except_fd; + + FD_ZERO(&write_fd); + FD_SET(sockfd, &write_fd); + + FD_ZERO(&except_fd); + FD_SET(sockfd, &except_fd); + + timer_error = amqp_timer_update(&timer, timeout); + + if (timer_error < 0) { + last_error = timer_error; + break; + } + + /* Win32 requires except_fds to be passed to detect connection + * failure. Other platforms only need write_fds, passing except_fds + * seems to be harmless otherwise + */ + res = select(sockfd+1, NULL, &write_fd, &except_fd, &timer.tv); + + if (res > 0) { + int result; + socklen_t result_len = sizeof(result); + + if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) < 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + if (result != 0) { + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + + /* socket is ready to be written to, set to blocking mode again */ + if (AMQP_STATUS_OK != amqp_os_socket_setsockblock(sockfd, 1)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } + + last_error = AMQP_STATUS_OK; + break; + } else if (0 == res) { + /* Timed out - return */ + last_error = AMQP_STATUS_TIMEOUT; + break; + } else if (errno == EINTR) { + /* Try again */ + continue; + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + } + } /* end while(1) loop */ + + if (last_error == AMQP_STATUS_OK + || last_error == AMQP_STATUS_TIMEOUT + || last_error == AMQP_STATUS_TIMER_FAILURE) { + /* Exit for loop on timer errors or when connection established */ + break; + } + + } else { + /* Error connecting */ + last_error = AMQP_STATUS_SOCKET_ERROR; + break; + + } + } else { - last_error = AMQP_STATUS_OK; - break; + /* Connect in blocking mode */ + if (0 != connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { + last_error = AMQP_STATUS_SOCKET_ERROR; + continue; + } else { + last_error = AMQP_STATUS_OK; + break; + } } } freeaddrinfo(address_list); if (last_error != AMQP_STATUS_OK) { + if (-1 != sockfd) { + amqp_os_socket_close(sockfd); + } + return last_error; } @@ -404,8 +577,9 @@ static int recv_with_timeout(amqp_connection_state_t state, uint64_t start, stru if (0 == current_timestamp) { return AMQP_STATUS_TIMER_FAILURE; } - end_timestamp = start + timeout->tv_sec * AMQP_NS_PER_S + - timeout->tv_usec * AMQP_NS_PER_US; + end_timestamp = start + + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; if (current_timestamp > end_timestamp) { return AMQP_STATUS_TIMEOUT; } @@ -526,9 +700,6 @@ static int wait_frame_inner(amqp_connection_state_t state, /* Complete frame was read. Return it. */ return AMQP_STATUS_OK; } - - /* Incomplete or ignored frame. Keep processing input. */ - assert(res != 0); } beginrecv: @@ -559,8 +730,8 @@ beginrecv: if (timeout) { if (0 == timeout_timestamp) { timeout_timestamp = current_timestamp + - timeout->tv_sec * AMQP_NS_PER_S + - timeout->tv_usec * AMQP_NS_PER_US; + (uint64_t)timeout->tv_sec * AMQP_NS_PER_S + + (uint64_t)timeout->tv_usec * AMQP_NS_PER_US; } if (current_timestamp > timeout_timestamp) { @@ -600,7 +771,6 @@ beginrecv: if (AMQP_STATUS_TIMEOUT == res) { if (next_timestamp == state->next_recv_heartbeat) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (next_timestamp == timeout_timestamp) { return AMQP_STATUS_TIMEOUT; @@ -616,6 +786,109 @@ beginrecv: } } +static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link; + amqp_frame_t *frame_copy; + + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel); + + if (NULL == channel_pool) { + return NULL; + } + + link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); + frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); + + if (NULL == link || NULL == frame_copy) { + return NULL; + } + + *frame_copy = *frame; + link->data = frame_copy; + + return link; +} + +int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link = amqp_create_link_for_frame(state, frame); + if (NULL == link) { + return AMQP_STATUS_NO_MEMORY; + } + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + } else { + state->last_queued_frame->next = link; + } + + link->next = NULL; + state->last_queued_frame = link; + + return AMQP_STATUS_OK; +} + +int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link = amqp_create_link_for_frame(state, frame); + if (NULL == link) { + return AMQP_STATUS_NO_MEMORY; + } + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + state->last_queued_frame = link; + link->next = NULL; + } else { + link->next = state->first_queued_frame; + state->first_queued_frame = link; + } + + return AMQP_STATUS_OK; +} + +int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_frame_t *decoded_frame) +{ + amqp_frame_t *frame_ptr; + amqp_link_t *cur; + int res; + + for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { + frame_ptr = cur->data; + + if (channel == frame_ptr->channel) { + state->first_queued_frame = cur->next; + if (NULL == state->first_queued_frame) { + state->last_queued_frame = NULL; + } + + *decoded_frame = *frame_ptr; + + return AMQP_STATUS_OK; + } + } + + while (1) { + res = wait_frame_inner(state, decoded_frame, NULL); + + if (AMQP_STATUS_OK != res) { + return res; + } + + if (channel == decoded_frame->channel) { + return AMQP_STATUS_OK; + } else { + res = amqp_queue_frame(state, decoded_frame); + if (res != AMQP_STATUS_OK) { + return res; + } + } + } +} + int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { @@ -654,7 +927,6 @@ int amqp_simple_wait_method(amqp_connection_state_t state, || frame.frame_type != AMQP_FRAME_METHOD || frame.payload.method.id != expected_method) { amqp_socket_close(state->socket); - state->socket = NULL; return AMQP_STATUS_WRONG_METHOD; } *output = frame.payload.method; @@ -859,6 +1131,13 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, goto error_res; } + res = amqp_table_clone(&s->server_properties, &state->server_properties, + &state->properties_pool); + + if (AMQP_STATUS_OK != res) { + goto error_res; + } + /* TODO: check that our chosen SASL mechanism is in the list of acceptable mechanisms. Or even let the application choose from the list! */ |