// Copyright 2007 - 2021, Alan Antonuk and the rabbitmq-c contributors. // SPDX-License-Identifier: mit #ifdef HAVE_CONFIG_H #include "config.h" #endif #ifdef _MSC_VER #define _CRT_SECURE_NO_WARNINGS #endif #include "amqp_private.h" #include "amqp_socket.h" #include "amqp_table.h" #include "amqp_time.h" #include #include #include #include #include #include #include #include #if ((defined(_WIN32)) || (defined(__MINGW32__)) || (defined(__MINGW64__))) #ifndef WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN #endif #include #include #else #include /* On older BSD types.h must come before net includes */ #include #include #ifdef HAVE_SELECT #include #endif #include #include #include #include #ifdef HAVE_POLL #include #endif #include #endif static int amqp_id_in_reply_list(amqp_method_number_t expected, amqp_method_number_t *list); static int amqp_os_socket_init(void) { #ifdef _WIN32 static int called_wsastartup = 0; if (!called_wsastartup) { WSADATA data; int res = WSAStartup(0x0202, &data); if (res) { return AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR; } called_wsastartup = 1; } return AMQP_STATUS_OK; #else return AMQP_STATUS_OK; #endif } int amqp_os_socket_error(void) { #ifdef _WIN32 return WSAGetLastError(); #else return errno; #endif } int amqp_os_socket_close(int sockfd) { #ifdef _WIN32 return closesocket(sockfd); #else return close(sockfd); #endif } ssize_t amqp_socket_send(amqp_socket_t *self, const void *buf, size_t len, int flags) { assert(self); assert(self->klass->send); return self->klass->send(self, buf, len, flags); } ssize_t amqp_socket_recv(amqp_socket_t *self, void *buf, size_t len, int flags) { assert(self); assert(self->klass->recv); return self->klass->recv(self, buf, len, flags); } int amqp_socket_open(amqp_socket_t *self, const char *host, int port) { assert(self); assert(self->klass->open); return self->klass->open(self, host, port, NULL); } int amqp_socket_open_noblock(amqp_socket_t *self, const char *host, int port, const struct timeval *timeout) { assert(self); assert(self->klass->open); return self->klass->open(self, host, port, timeout); } int amqp_socket_close(amqp_socket_t *self, amqp_socket_close_enum force) { assert(self); assert(self->klass->close); return self->klass->close(self, force); } void amqp_socket_delete(amqp_socket_t *self) { if (self) { assert(self->klass->delete); self->klass->delete (self); } } int amqp_socket_get_sockfd(amqp_socket_t *self) { assert(self); assert(self->klass->get_sockfd); return self->klass->get_sockfd(self); } int amqp_poll(int fd, int event, amqp_time_t deadline) { #ifdef HAVE_POLL struct pollfd pfd; int res; int timeout_ms; /* Function should only ever be called with one of these two */ assert(event == AMQP_SF_POLLIN || event == AMQP_SF_POLLOUT); start_poll: pfd.fd = fd; switch (event) { case AMQP_SF_POLLIN: pfd.events = POLLIN; break; case AMQP_SF_POLLOUT: pfd.events = POLLOUT; break; } timeout_ms = amqp_time_ms_until(deadline); if (-1 > timeout_ms) { return timeout_ms; } res = poll(&pfd, 1, timeout_ms); if (0 < res) { /* TODO: optimize this a bit by returning the AMQP_STATUS_SOCKET_ERROR or * equivalent when pdf.revent is POLLHUP or POLLERR, so an extra syscall * doesn't need to be made. */ return AMQP_STATUS_OK; } else if (0 == res) { return AMQP_STATUS_TIMEOUT; } else { switch (amqp_os_socket_error()) { case EINTR: goto start_poll; default: return AMQP_STATUS_SOCKET_ERROR; } } return AMQP_STATUS_OK; #elif defined(HAVE_SELECT) fd_set fds; fd_set exceptfds; fd_set *exceptfdsp; int res; struct timeval tv; struct timeval *tvp; assert((0 != (event & AMQP_SF_POLLIN)) || (0 != (event & AMQP_SF_POLLOUT))); #ifndef _WIN32 /* On Win32 connect() failure is indicated through the exceptfds, it does not * make any sense to allow POLLERR on any other platform or condition */ assert(0 == (event & AMQP_SF_POLLERR)); #endif start_select: FD_ZERO(&fds); FD_SET(fd, &fds); if (event & AMQP_SF_POLLERR) { FD_ZERO(&exceptfds); FD_SET(fd, &exceptfds); exceptfdsp = &exceptfds; } else { exceptfdsp = NULL; } res = amqp_time_tv_until(deadline, &tv, &tvp); if (res != AMQP_STATUS_OK) { return res; } if (event & AMQP_SF_POLLIN) { res = select(fd + 1, &fds, NULL, exceptfdsp, tvp); } else if (event & AMQP_SF_POLLOUT) { res = select(fd + 1, NULL, &fds, exceptfdsp, tvp); } if (0 < res) { return AMQP_STATUS_OK; } else if (0 == res) { return AMQP_STATUS_TIMEOUT; } else { switch (amqp_os_socket_error()) { case EINTR: goto start_select; default: return AMQP_STATUS_SOCKET_ERROR; } } #else #error "poll() or select() is needed to compile rabbitmq-c" #endif } static ssize_t do_poll(amqp_connection_state_t state, ssize_t res, amqp_time_t deadline) { int fd = amqp_get_sockfd(state); if (-1 == fd) { return AMQP_STATUS_SOCKET_CLOSED; } switch (res) { case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: res = amqp_poll(fd, AMQP_SF_POLLIN, deadline); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: res = amqp_poll(fd, AMQP_SF_POLLOUT, deadline); break; } return res; } ssize_t amqp_try_send(amqp_connection_state_t state, const void *buf, size_t len, amqp_time_t deadline, int flags) { ssize_t res; void *buf_left = (void *)buf; /* Assume that len is not going to be larger than ssize_t can hold. */ ssize_t len_left = (size_t)len; start_send: res = amqp_socket_send(state->socket, buf_left, len_left, flags); if (res > 0) { len_left -= res; buf_left = (char *)buf_left + res; if (0 == len_left) { return (ssize_t)len; } goto start_send; } res = do_poll(state, res, deadline); if (AMQP_STATUS_OK == res) { goto start_send; } if (AMQP_STATUS_TIMEOUT == res) { return (ssize_t)len - len_left; } return res; } int amqp_open_socket(char const *hostname, int portnumber) { return amqp_open_socket_inner(hostname, portnumber, amqp_time_infinite()); } int amqp_open_socket_noblock(char const *hostname, int portnumber, const struct timeval *timeout) { amqp_time_t deadline; int res = amqp_time_from_now(&deadline, timeout); if (AMQP_STATUS_OK != res) { return res; } return amqp_open_socket_inner(hostname, portnumber, deadline); } #ifdef _WIN32 static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { int one = 1; SOCKET sockfd; int last_error; /* * This cast is to squash warnings on Win64, see: * http://stackoverflow.com/questions/1953639/is-it-safe-to-cast-socket-to-int-under-win64 */ sockfd = (int)socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (INVALID_SOCKET == sockfd) { return AMQP_STATUS_SOCKET_ERROR; } /* Set the socket to be non-blocking */ if (SOCKET_ERROR == ioctlsocket(sockfd, FIONBIO, &one)) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } /* Disable nagle */ if (SOCKET_ERROR == setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (const char *)&one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } /* Enable TCP keepalives */ if (SOCKET_ERROR == setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (const char *)&one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } if (SOCKET_ERROR != connect(sockfd, addr->ai_addr, (int)addr->ai_addrlen)) { return (int)sockfd; } if (WSAEWOULDBLOCK != WSAGetLastError()) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } last_error = amqp_poll((int)sockfd, AMQP_SF_POLLOUT | AMQP_SF_POLLERR, deadline); if (AMQP_STATUS_OK != last_error) { goto err; } { int result; int result_len = sizeof(result); if (SOCKET_ERROR == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, (char *)&result, &result_len) || result != 0) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } } return (int)sockfd; err: closesocket(sockfd); return last_error; } #else static int connect_socket(struct addrinfo *addr, amqp_time_t deadline) { int one = 1; int sockfd; int flags; int last_error; sockfd = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); if (-1 == sockfd) { return AMQP_STATUS_SOCKET_ERROR; } /* Enable CLOEXEC on socket */ flags = fcntl(sockfd, F_GETFD); if (flags == -1 || fcntl(sockfd, F_SETFD, (long)(flags | FD_CLOEXEC)) == -1) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } /* Set the socket as non-blocking */ flags = fcntl(sockfd, F_GETFL); if (flags == -1 || fcntl(sockfd, F_SETFL, (long)(flags | O_NONBLOCK)) == -1) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } #ifdef SO_NOSIGPIPE /* Turn off SIGPIPE on platforms that support it, BSD, MacOSX */ if (0 != setsockopt(sockfd, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } #endif /* SO_NOSIGPIPE */ /* Disable nagle */ if (0 != setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } /* Enable TCP keepalives */ if (0 != setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, &one, sizeof(one))) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } if (0 == connect(sockfd, addr->ai_addr, addr->ai_addrlen)) { return sockfd; } if (EINPROGRESS != errno) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } last_error = amqp_poll(sockfd, AMQP_SF_POLLOUT, deadline); if (AMQP_STATUS_OK != last_error) { goto err; } { int result; socklen_t result_len = sizeof(result); if (-1 == getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &result, &result_len) || result != 0) { last_error = AMQP_STATUS_SOCKET_ERROR; goto err; } } return sockfd; err: close(sockfd); return last_error; } #endif int amqp_open_socket_inner(char const *hostname, int portnumber, amqp_time_t deadline) { struct addrinfo hint; struct addrinfo *address_list; struct addrinfo *addr; char portnumber_string[33]; int sockfd = -1; int last_error; last_error = amqp_os_socket_init(); if (AMQP_STATUS_OK != last_error) { return last_error; } memset(&hint, 0, sizeof(hint)); hint.ai_family = PF_UNSPEC; /* PF_INET or PF_INET6 */ hint.ai_socktype = SOCK_STREAM; hint.ai_protocol = IPPROTO_TCP; (void)sprintf(portnumber_string, "%d", portnumber); last_error = getaddrinfo(hostname, portnumber_string, &hint, &address_list); if (0 != last_error) { return AMQP_STATUS_HOSTNAME_RESOLUTION_FAILED; } for (addr = address_list; addr; addr = addr->ai_next) { sockfd = connect_socket(addr, deadline); if (sockfd >= 0) { last_error = AMQP_STATUS_OK; break; } else if (sockfd == AMQP_STATUS_TIMEOUT) { last_error = sockfd; break; } } freeaddrinfo(address_list); if (last_error != AMQP_STATUS_OK || sockfd == -1) { return last_error; } return sockfd; } static int send_header_inner(amqp_connection_state_t state, amqp_time_t deadline) { ssize_t res; static const uint8_t header[8] = {'A', 'M', 'Q', 'P', 0, AMQP_PROTOCOL_VERSION_MAJOR, AMQP_PROTOCOL_VERSION_MINOR, AMQP_PROTOCOL_VERSION_REVISION}; res = amqp_try_send(state, header, sizeof(header), deadline, AMQP_SF_NONE); if (sizeof(header) == res) { return AMQP_STATUS_OK; } return (int)res; } int amqp_send_header(amqp_connection_state_t state) { return send_header_inner(state, amqp_time_infinite()); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { amqp_bytes_t res; switch (method) { case AMQP_SASL_METHOD_PLAIN: res = amqp_cstring_bytes("PLAIN"); break; case AMQP_SASL_METHOD_EXTERNAL: res = amqp_cstring_bytes("EXTERNAL"); break; default: amqp_abort("Invalid SASL method: %d", (int)method); } return res; } static int bytes_equal(amqp_bytes_t l, amqp_bytes_t r) { if (l.len == r.len) { if (l.bytes && r.bytes) { if (0 == memcmp(l.bytes, r.bytes, l.len)) { return 1; } } } return 0; } int sasl_mechanism_in_list(amqp_bytes_t mechanisms, amqp_sasl_method_enum method) { amqp_bytes_t mechanism; amqp_bytes_t supported_mechanism; uint8_t *start; uint8_t *end; uint8_t *current; assert(NULL != mechanisms.bytes); mechanism = sasl_method_name(method); start = (uint8_t *)mechanisms.bytes; current = start; end = start + mechanisms.len; for (; current != end; start = current + 1) { /* HACK: SASL states that we should be parsing this string as a UTF-8 * string, which we're plainly not doing here. At this point its not worth * dragging an entire UTF-8 parser for this one case, and this should work * most of the time */ current = memchr(start, ' ', end - start); if (NULL == current) { current = end; } supported_mechanism.bytes = start; supported_mechanism.len = current - start; if (bytes_equal(mechanism, supported_mechanism)) { return 1; } } return 0; } static amqp_bytes_t sasl_response(amqp_pool_t *pool, amqp_sasl_method_enum method, va_list args) { amqp_bytes_t response; switch (method) { case AMQP_SASL_METHOD_PLAIN: { char *username = va_arg(args, char *); size_t username_len = strlen(username); char *password = va_arg(args, char *); size_t password_len = strlen(password); char *response_buf; amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); if (response.bytes == NULL) /* We never request a zero-length block, because of the +2 above, so a NULL here really is ENOMEM. */ { return response; } response_buf = response.bytes; response_buf[0] = 0; memcpy(response_buf + 1, username, username_len); response_buf[username_len + 1] = 0; memcpy(response_buf + username_len + 2, password, password_len); break; } case AMQP_SASL_METHOD_EXTERNAL: { char *identity = va_arg(args, char *); size_t identity_len = strlen(identity); amqp_pool_alloc_bytes(pool, identity_len, &response); if (response.bytes == NULL) { return response; } memcpy(response.bytes, identity, identity_len); break; } default: amqp_abort("Invalid SASL method: %d", (int)method); } return response; } amqp_boolean_t amqp_frames_enqueued(amqp_connection_state_t state) { return (state->first_queued_frame != NULL); } /* * Check to see if we have data in our buffer. If this returns 1, we * will avoid an immediate blocking read in amqp_simple_wait_frame. */ amqp_boolean_t amqp_data_in_buffer(amqp_connection_state_t state) { return (state->sock_inbound_offset < state->sock_inbound_limit); } static int consume_one_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { int res; amqp_bytes_t buffer; buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; buffer.bytes = ((char *)state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; res = amqp_handle_input(state, buffer, decoded_frame); if (res < 0) { return res; } state->sock_inbound_offset += res; return AMQP_STATUS_OK; } static int recv_with_timeout(amqp_connection_state_t state, amqp_time_t timeout) { ssize_t res; int fd; start_recv: res = amqp_socket_recv(state->socket, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len, 0); if (res < 0) { fd = amqp_get_sockfd(state); if (-1 == fd) { return AMQP_STATUS_CONNECTION_CLOSED; } switch (res) { default: return (int)res; case AMQP_PRIVATE_STATUS_SOCKET_NEEDREAD: res = amqp_poll(fd, AMQP_SF_POLLIN, timeout); break; case AMQP_PRIVATE_STATUS_SOCKET_NEEDWRITE: res = amqp_poll(fd, AMQP_SF_POLLOUT, timeout); break; } if (AMQP_STATUS_OK == res) { goto start_recv; } return (int)res; } state->sock_inbound_limit = res; state->sock_inbound_offset = 0; res = amqp_time_s_from_now(&state->next_recv_heartbeat, amqp_heartbeat_recv(state)); if (AMQP_STATUS_OK != res) { return (int)res; } return AMQP_STATUS_OK; } int amqp_try_recv(amqp_connection_state_t state) { amqp_time_t timeout; int res; while (amqp_data_in_buffer(state)) { amqp_frame_t frame; res = consume_one_frame(state, &frame); if (AMQP_STATUS_OK != res) { return res; } if (frame.frame_type != 0) { amqp_pool_t *channel_pool; amqp_frame_t *frame_copy; amqp_link_t *link; channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); if (NULL == channel_pool) { return AMQP_STATUS_NO_MEMORY; } frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); if (frame_copy == NULL || link == NULL) { return AMQP_STATUS_NO_MEMORY; } *frame_copy = frame; link->next = NULL; link->data = frame_copy; if (state->last_queued_frame == NULL) { state->first_queued_frame = link; } else { state->last_queued_frame->next = link; } state->last_queued_frame = link; } } res = amqp_time_s_from_now(&timeout, 0); if (AMQP_STATUS_OK != res) { return res; } return recv_with_timeout(state, timeout); } static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame, amqp_time_t timeout_deadline) { amqp_time_t deadline; int res; for (;;) { while (amqp_data_in_buffer(state)) { res = consume_one_frame(state, decoded_frame); if (AMQP_STATUS_OK != res) { return res; } if (AMQP_FRAME_HEARTBEAT == decoded_frame->frame_type) { amqp_maybe_release_buffers_on_channel(state, 0); continue; } if (decoded_frame->frame_type != 0) { /* Complete frame was read. Return it. */ return AMQP_STATUS_OK; } } beginrecv: res = amqp_time_has_past(state->next_send_heartbeat); if (AMQP_STATUS_TIMER_FAILURE == res) { return res; } else if (AMQP_STATUS_TIMEOUT == res) { amqp_frame_t heartbeat; heartbeat.channel = 0; heartbeat.frame_type = AMQP_FRAME_HEARTBEAT; res = amqp_send_frame(state, &heartbeat); if (AMQP_STATUS_OK != res) { return res; } } deadline = amqp_time_first(timeout_deadline, amqp_time_first(state->next_recv_heartbeat, state->next_send_heartbeat)); /* TODO this needs to wait for a _frame_ and not anything written from the * socket */ res = recv_with_timeout(state, deadline); if (AMQP_STATUS_TIMEOUT == res) { if (amqp_time_equal(deadline, state->next_recv_heartbeat)) { amqp_socket_close(state->socket, AMQP_SC_FORCE); return AMQP_STATUS_HEARTBEAT_TIMEOUT; } else if (amqp_time_equal(deadline, timeout_deadline)) { return AMQP_STATUS_TIMEOUT; } else if (amqp_time_equal(deadline, state->next_send_heartbeat)) { /* send heartbeat happens before we do recv_with_timeout */ goto beginrecv; } else { amqp_abort("Internal error: unable to determine timeout reason"); } } else if (AMQP_STATUS_OK != res) { return res; } } } static amqp_link_t *amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame) { amqp_link_t *link; amqp_frame_t *frame_copy; amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel); if (NULL == channel_pool) { return NULL; } link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); if (NULL == link || NULL == frame_copy) { return NULL; } *frame_copy = *frame; link->data = frame_copy; return link; } int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) { amqp_link_t *link = amqp_create_link_for_frame(state, frame); if (NULL == link) { return AMQP_STATUS_NO_MEMORY; } if (NULL == state->first_queued_frame) { state->first_queued_frame = link; } else { state->last_queued_frame->next = link; } link->next = NULL; state->last_queued_frame = link; return AMQP_STATUS_OK; } int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) { amqp_link_t *link = amqp_create_link_for_frame(state, frame); if (NULL == link) { return AMQP_STATUS_NO_MEMORY; } if (NULL == state->first_queued_frame) { state->first_queued_frame = link; state->last_queued_frame = link; link->next = NULL; } else { link->next = state->first_queued_frame; state->first_queued_frame = link; } return AMQP_STATUS_OK; } int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, amqp_channel_t channel, amqp_frame_t *decoded_frame) { amqp_frame_t *frame_ptr; amqp_link_t *cur; int res; for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { frame_ptr = cur->data; if (channel == frame_ptr->channel) { state->first_queued_frame = cur->next; if (NULL == state->first_queued_frame) { state->last_queued_frame = NULL; } *decoded_frame = *frame_ptr; return AMQP_STATUS_OK; } } for (;;) { res = wait_frame_inner(state, decoded_frame, amqp_time_infinite()); if (AMQP_STATUS_OK != res) { return res; } if (channel == decoded_frame->channel) { return AMQP_STATUS_OK; } else { res = amqp_queue_frame(state, decoded_frame); if (res != AMQP_STATUS_OK) { return res; } } } } int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { return amqp_simple_wait_frame_noblock(state, decoded_frame, NULL); } int amqp_simple_wait_frame_noblock(amqp_connection_state_t state, amqp_frame_t *decoded_frame, const struct timeval *timeout) { amqp_time_t deadline; int res = amqp_time_from_now(&deadline, timeout); if (AMQP_STATUS_OK != res) { return res; } if (state->first_queued_frame != NULL) { amqp_frame_t *f = (amqp_frame_t *)state->first_queued_frame->data; state->first_queued_frame = state->first_queued_frame->next; if (state->first_queued_frame == NULL) { state->last_queued_frame = NULL; } *decoded_frame = *f; return AMQP_STATUS_OK; } else { return wait_frame_inner(state, decoded_frame, deadline); } } static int amqp_simple_wait_method_list(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t *expected_methods, amqp_time_t deadline, amqp_method_t *output) { amqp_frame_t frame; struct timeval tv; struct timeval *tvp; int res = amqp_time_tv_until(deadline, &tv, &tvp); if (res != AMQP_STATUS_OK) { return res; } res = amqp_simple_wait_frame_noblock(state, &frame, tvp); if (AMQP_STATUS_OK != res) { return res; } if (AMQP_FRAME_METHOD != frame.frame_type || expected_channel != frame.channel || !amqp_id_in_reply_list(frame.payload.method.id, expected_methods)) { return AMQP_STATUS_WRONG_METHOD; } *output = frame.payload.method; return AMQP_STATUS_OK; } static int simple_wait_method_inner(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t expected_method, amqp_time_t deadline, amqp_method_t *output) { amqp_method_number_t expected_methods[2]; expected_methods[0] = expected_method; expected_methods[1] = 0; return amqp_simple_wait_method_list(state, expected_channel, expected_methods, deadline, output); } int amqp_simple_wait_method(amqp_connection_state_t state, amqp_channel_t expected_channel, amqp_method_number_t expected_method, amqp_method_t *output) { return simple_wait_method_inner(state, expected_channel, expected_method, amqp_time_infinite(), output); } int amqp_send_method(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, void *decoded) { return amqp_send_method_inner(state, channel, id, decoded, AMQP_SF_NONE, amqp_time_infinite()); } int amqp_send_method_inner(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t id, void *decoded, int flags, amqp_time_t deadline) { amqp_frame_t frame; frame.frame_type = AMQP_FRAME_METHOD; frame.channel = channel; frame.payload.method.id = id; frame.payload.method.decoded = decoded; return amqp_send_frame_inner(state, &frame, flags, deadline); } static int amqp_id_in_reply_list(amqp_method_number_t expected, amqp_method_number_t *list) { while (*list != 0) { if (*list == expected) { return 1; } list++; } return 0; } static amqp_rpc_reply_t simple_rpc_inner( amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, void *decoded_request_method, amqp_time_t deadline) { int status; amqp_rpc_reply_t result; memset(&result, 0, sizeof(result)); status = amqp_send_method(state, channel, request_id, decoded_request_method); if (status < 0) { return amqp_rpc_reply_error(status); } { amqp_frame_t frame; retry: status = wait_frame_inner(state, &frame, deadline); if (status != AMQP_STATUS_OK) { if (status == AMQP_STATUS_TIMEOUT) { amqp_socket_close(state->socket, AMQP_SC_FORCE); } return amqp_rpc_reply_error(status); } /* * We store the frame for later processing unless it's something * that directly affects us here, namely a method frame that is * either * - on the channel we want, and of the expected type, or * - on the channel we want, and a channel.close frame, or * - on channel zero, and a connection.close frame. */ if (!((frame.frame_type == AMQP_FRAME_METHOD) && (((frame.channel == channel) && (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids) || (frame.payload.method.id == AMQP_CHANNEL_CLOSE_METHOD))) || ((frame.channel == 0) && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))))) { amqp_pool_t *channel_pool; amqp_frame_t *frame_copy; amqp_link_t *link; channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); if (NULL == channel_pool) { return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY); } frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); if (frame_copy == NULL || link == NULL) { return amqp_rpc_reply_error(AMQP_STATUS_NO_MEMORY); } *frame_copy = frame; link->next = NULL; link->data = frame_copy; if (state->last_queued_frame == NULL) { state->first_queued_frame = link; } else { state->last_queued_frame->next = link; } state->last_queued_frame = link; goto retry; } result.reply_type = (amqp_id_in_reply_list(frame.payload.method.id, expected_reply_ids)) ? AMQP_RESPONSE_NORMAL : AMQP_RESPONSE_SERVER_EXCEPTION; result.reply = frame.payload.method; return result; } } amqp_rpc_reply_t amqp_simple_rpc(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, amqp_method_number_t *expected_reply_ids, void *decoded_request_method) { amqp_time_t deadline; int res; res = amqp_time_from_now(&deadline, state->rpc_timeout); if (res != AMQP_STATUS_OK) { return amqp_rpc_reply_error(res); } return simple_rpc_inner(state, channel, request_id, expected_reply_ids, decoded_request_method, deadline); } void *amqp_simple_rpc_decoded(amqp_connection_state_t state, amqp_channel_t channel, amqp_method_number_t request_id, amqp_method_number_t reply_id, void *decoded_request_method) { amqp_time_t deadline; int res; amqp_method_number_t replies[2]; res = amqp_time_from_now(&deadline, state->rpc_timeout); if (res != AMQP_STATUS_OK) { state->most_recent_api_result = amqp_rpc_reply_error(res); return NULL; } replies[0] = reply_id; replies[1] = 0; state->most_recent_api_result = simple_rpc_inner( state, channel, request_id, replies, decoded_request_method, deadline); if (state->most_recent_api_result.reply_type == AMQP_RESPONSE_NORMAL) { return state->most_recent_api_result.reply.decoded; } else { return NULL; } } amqp_rpc_reply_t amqp_get_rpc_reply(amqp_connection_state_t state) { return state->most_recent_api_result; } /* * Merge base and add tables. If the two tables contain an entry with the same * key, the entry from the add table takes precedence. For entries that are both * tables with the same key, the table is recursively merged. */ int amqp_merge_capabilities(const amqp_table_t *base, const amqp_table_t *add, amqp_table_t *result, amqp_pool_t *pool) { int i; int res; amqp_pool_t temp_pool; amqp_table_t temp_result; assert(base != NULL); assert(result != NULL); assert(pool != NULL); if (NULL == add) { return amqp_table_clone(base, result, pool); } init_amqp_pool(&temp_pool, 4096); temp_result.num_entries = 0; temp_result.entries = amqp_pool_alloc(&temp_pool, sizeof(amqp_table_entry_t) * (base->num_entries + add->num_entries)); if (NULL == temp_result.entries) { res = AMQP_STATUS_NO_MEMORY; goto error_out; } for (i = 0; i < base->num_entries; ++i) { temp_result.entries[temp_result.num_entries] = base->entries[i]; temp_result.num_entries++; } for (i = 0; i < add->num_entries; ++i) { amqp_table_entry_t *e = amqp_table_get_entry_by_key(&temp_result, add->entries[i].key); if (NULL != e) { if (AMQP_FIELD_KIND_TABLE == add->entries[i].value.kind && AMQP_FIELD_KIND_TABLE == e->value.kind) { amqp_table_entry_t *be = amqp_table_get_entry_by_key(base, add->entries[i].key); res = amqp_merge_capabilities(&be->value.value.table, &add->entries[i].value.value.table, &e->value.value.table, &temp_pool); if (AMQP_STATUS_OK != res) { goto error_out; } } else { e->value = add->entries[i].value; } } else { temp_result.entries[temp_result.num_entries] = add->entries[i]; temp_result.num_entries++; } } res = amqp_table_clone(&temp_result, result, pool); error_out: empty_amqp_pool(&temp_pool); return res; } static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, const amqp_table_t *client_properties, const struct timeval *timeout, amqp_sasl_method_enum sasl_method, va_list vl) { int res; amqp_method_t method; uint16_t client_channel_max; uint32_t client_frame_max; uint16_t client_heartbeat; uint16_t server_channel_max; uint32_t server_frame_max; uint16_t server_heartbeat; amqp_rpc_reply_t result; amqp_time_t deadline; if (channel_max < 0 || channel_max > UINT16_MAX) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); } client_channel_max = (uint16_t)channel_max; if (frame_max < 0) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); } client_frame_max = (uint32_t)frame_max; if (heartbeat < 0 || heartbeat > UINT16_MAX) { return amqp_rpc_reply_error(AMQP_STATUS_INVALID_PARAMETER); } client_heartbeat = (uint16_t)heartbeat; res = amqp_time_from_now(&deadline, timeout); if (AMQP_STATUS_OK != res) { goto error_res; } res = send_header_inner(state, deadline); if (AMQP_STATUS_OK != res) { goto error_res; } res = simple_wait_method_inner(state, 0, AMQP_CONNECTION_START_METHOD, deadline, &method); if (AMQP_STATUS_OK != res) { goto error_res; } { amqp_connection_start_t *s = (amqp_connection_start_t *)method.decoded; if ((s->version_major != AMQP_PROTOCOL_VERSION_MAJOR) || (s->version_minor != AMQP_PROTOCOL_VERSION_MINOR)) { res = AMQP_STATUS_INCOMPATIBLE_AMQP_VERSION; goto error_res; } res = amqp_table_clone(&s->server_properties, &state->server_properties, &state->properties_pool); if (AMQP_STATUS_OK != res) { goto error_res; } /* TODO: check that our chosen SASL mechanism is in the list of acceptable mechanisms. Or even let the application choose from the list! */ if (!sasl_mechanism_in_list(s->mechanisms, sasl_method)) { res = AMQP_STATUS_BROKER_UNSUPPORTED_SASL_METHOD; goto error_res; } } { amqp_table_entry_t default_properties[6]; amqp_table_t default_table; amqp_table_entry_t client_capabilities[2]; amqp_table_t client_capabilities_table; amqp_connection_start_ok_t s; amqp_pool_t *channel_pool; amqp_bytes_t response_bytes; channel_pool = amqp_get_or_create_channel_pool(state, 0); if (NULL == channel_pool) { res = AMQP_STATUS_NO_MEMORY; goto error_res; } response_bytes = sasl_response(channel_pool, sasl_method, vl); if (response_bytes.bytes == NULL) { res = AMQP_STATUS_NO_MEMORY; goto error_res; } client_capabilities[0] = amqp_table_construct_bool_entry("authentication_failure_close", 1); client_capabilities[1] = amqp_table_construct_bool_entry("exchange_exchange_bindings", 1); client_capabilities_table.entries = client_capabilities; client_capabilities_table.num_entries = sizeof(client_capabilities) / sizeof(amqp_table_entry_t); default_properties[0] = amqp_table_construct_utf8_entry("product", "rabbitmq-c"); default_properties[1] = amqp_table_construct_utf8_entry("version", AMQP_VERSION_STRING); default_properties[2] = amqp_table_construct_utf8_entry("platform", AMQ_PLATFORM); default_properties[3] = amqp_table_construct_utf8_entry("copyright", AMQ_COPYRIGHT); default_properties[4] = amqp_table_construct_utf8_entry( "information", "See https://github.com/alanxz/rabbitmq-c"); default_properties[5] = amqp_table_construct_table_entry( "capabilities", &client_capabilities_table); default_table.entries = default_properties; default_table.num_entries = sizeof(default_properties) / sizeof(amqp_table_entry_t); res = amqp_merge_capabilities(&default_table, client_properties, &state->client_properties, channel_pool); if (AMQP_STATUS_OK != res) { goto error_res; } s.client_properties = state->client_properties; s.mechanism = sasl_method_name(sasl_method); s.response = response_bytes; s.locale = amqp_cstring_bytes("en_US"); res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s, AMQP_SF_NONE, deadline); if (res < 0) { goto error_res; } } amqp_release_buffers(state); { amqp_method_number_t expected[] = {AMQP_CONNECTION_TUNE_METHOD, AMQP_CONNECTION_CLOSE_METHOD, 0}; res = amqp_simple_wait_method_list(state, 0, expected, deadline, &method); if (AMQP_STATUS_OK != res) { goto error_res; } } if (AMQP_CONNECTION_CLOSE_METHOD == method.id) { result.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; result.reply = method; result.library_error = 0; goto out; } { amqp_connection_tune_t *s = (amqp_connection_tune_t *)method.decoded; server_channel_max = s->channel_max; server_frame_max = s->frame_max; server_heartbeat = s->heartbeat; } if (server_channel_max != 0 && (server_channel_max < client_channel_max || client_channel_max == 0)) { client_channel_max = server_channel_max; } else if (server_channel_max == 0 && client_channel_max == 0) { client_channel_max = UINT16_MAX; } if (server_frame_max != 0 && server_frame_max < client_frame_max) { client_frame_max = server_frame_max; } if (server_heartbeat != 0 && server_heartbeat < client_heartbeat) { client_heartbeat = server_heartbeat; } res = amqp_tune_connection(state, client_channel_max, client_frame_max, client_heartbeat); if (res < 0) { goto error_res; } { amqp_connection_tune_ok_t s; s.frame_max = client_frame_max; s.channel_max = client_channel_max; s.heartbeat = client_heartbeat; res = amqp_send_method_inner(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s, AMQP_SF_NONE, deadline); if (res < 0) { goto error_res; } } amqp_release_buffers(state); { amqp_method_number_t replies[] = {AMQP_CONNECTION_OPEN_OK_METHOD, 0}; amqp_connection_open_t s; s.virtual_host = amqp_cstring_bytes(vhost); s.capabilities = amqp_empty_bytes; s.insist = 1; result = simple_rpc_inner(state, 0, AMQP_CONNECTION_OPEN_METHOD, replies, &s, deadline); if (result.reply_type != AMQP_RESPONSE_NORMAL) { goto out; } } result.reply_type = AMQP_RESPONSE_NORMAL; result.reply.id = 0; result.reply.decoded = NULL; result.library_error = 0; amqp_maybe_release_buffers(state); out: return result; error_res: amqp_socket_close(state->socket, AMQP_SC_FORCE); result = amqp_rpc_reply_error(res); goto out; } amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, amqp_sasl_method_enum sasl_method, ...) { va_list vl; amqp_rpc_reply_t ret; va_start(vl, sasl_method); ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, &amqp_empty_table, state->handshake_timeout, sasl_method, vl); va_end(vl); return ret; } amqp_rpc_reply_t amqp_login_with_properties( amqp_connection_state_t state, char const *vhost, int channel_max, int frame_max, int heartbeat, const amqp_table_t *client_properties, amqp_sasl_method_enum sasl_method, ...) { va_list vl; amqp_rpc_reply_t ret; va_start(vl, sasl_method); ret = amqp_login_inner(state, vhost, channel_max, frame_max, heartbeat, client_properties, state->handshake_timeout, sasl_method, vl); va_end(vl); return ret; }