diff options
Diffstat (limited to 'librabbitmq/amqp_socket.c')
-rw-r--r-- | librabbitmq/amqp_socket.c | 185 |
1 files changed, 92 insertions, 93 deletions
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 13f6376..f23b42b 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -59,8 +59,6 @@ #include "amqp_framing.h" #include "amqp_private.h" -#include "socket.h" - int amqp_open_socket(char const *hostname, int portnumber) @@ -98,37 +96,28 @@ int amqp_open_socket(char const *hostname, return sockfd; } -static char *header() { - static char header[8]; - header[0] = 'A'; - header[1] = 'M'; - header[2] = 'Q'; - header[3] = 'P'; - header[4] = 0; - header[5] = AMQP_PROTOCOL_VERSION_MAJOR; - header[6] = AMQP_PROTOCOL_VERSION_MINOR; - header[7] = AMQP_PROTOCOL_VERSION_REVISION; - return header; -} - int amqp_send_header(amqp_connection_state_t state) { - return send(state->sockfd, header(), 8, 0); -} - -int amqp_send_header_to(amqp_connection_state_t state, - amqp_output_fn_t fn, - void *context) -{ - return fn(context, header(), 8); + static const uint8_t header[8] = { 'A', 'M', 'Q', 'P', 0, + AMQP_PROTOCOL_VERSION_MAJOR, + AMQP_PROTOCOL_VERSION_MINOR, + AMQP_PROTOCOL_VERSION_REVISION }; + return send(state->sockfd, (void *)header, 8, 0); } static amqp_bytes_t sasl_method_name(amqp_sasl_method_enum method) { + amqp_bytes_t res; + switch (method) { - case AMQP_SASL_METHOD_PLAIN: return (amqp_bytes_t) {.len = 5, .bytes = "PLAIN"}; - default: - amqp_assert(0, "Invalid SASL method: %d", (int) method); + case AMQP_SASL_METHOD_PLAIN: + res.bytes = "PLAIN"; + res.len = 5; + break; + + default: + amqp_abort("Invalid SASL method: %d", (int) method); } - abort(); /* unreachable */ + + return res; } static amqp_bytes_t sasl_response(amqp_pool_t *pool, @@ -143,20 +132,23 @@ static amqp_bytes_t sasl_response(amqp_pool_t *pool, size_t username_len = strlen(username); char *password = va_arg(args, char *); size_t password_len = strlen(password); + char *response_buf; + amqp_pool_alloc_bytes(pool, strlen(username) + strlen(password) + 2, &response); - if (response.bytes == NULL) { + if (response.bytes == NULL) /* We never request a zero-length block, because of the +2 above, so a NULL here really is ENOMEM. */ return response; - } - *BUF_AT(response, 0) = 0; - memcpy(((char *) response.bytes) + 1, username, username_len); - *BUF_AT(response, username_len + 1) = 0; - memcpy(((char *) response.bytes) + username_len + 2, password, password_len); + + response_buf = response.bytes; + response_buf[0] = 0; + memcpy(response_buf + 1, username, username_len); + response_buf[username_len + 1] = 0; + memcpy(response_buf + username_len + 2, password, password_len); break; } default: - amqp_assert(0, "Invalid SASL method: %d", (int) method); + amqp_abort("Invalid SASL method: %d", (int) method); } return response; @@ -178,33 +170,37 @@ static int wait_frame_inner(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { while (1) { - int result; + int res; while (amqp_data_in_buffer(state)) { amqp_bytes_t buffer; buffer.len = state->sock_inbound_limit - state->sock_inbound_offset; buffer.bytes = ((char *) state->sock_inbound_buffer.bytes) + state->sock_inbound_offset; - AMQP_CHECK_RESULT((result = amqp_handle_input(state, buffer, decoded_frame))); - state->sock_inbound_offset += result; + + res = amqp_handle_input(state, buffer, decoded_frame); + if (res < 0) + return res; + + state->sock_inbound_offset += res; if (decoded_frame->frame_type != 0) /* Complete frame was read. Return it. */ return 0; /* Incomplete or ignored frame. Keep processing input. */ - assert(result != 0); - } + assert(res != 0); + } - result = recv(state->sockfd, state->sock_inbound_buffer.bytes, + res = recv(state->sockfd, state->sock_inbound_buffer.bytes, state->sock_inbound_buffer.len, 0); - if (result <= 0) { - if (result == 0) + if (res <= 0) { + if (res == 0) return -ERROR_CONNECTION_CLOSED; else return -amqp_socket_error(); } - state->sock_inbound_limit = result; + state->sock_inbound_limit = res; state->sock_inbound_offset = 0; } } @@ -234,22 +230,22 @@ int amqp_simple_wait_method(amqp_connection_state_t state, int res = amqp_simple_wait_frame(state, &frame); if (res < 0) return res; - - amqp_assert(frame.channel == expected_channel, - "Expected 0x%08X method frame on channel %d, got frame on channel %d", - expected_method, - expected_channel, - frame.channel); - amqp_assert(frame.frame_type == AMQP_FRAME_METHOD, - "Expected 0x%08X method frame on channel %d, got frame type %d", - expected_method, - expected_channel, - frame.frame_type); - amqp_assert(frame.payload.method.id == expected_method, - "Expected method ID 0x%08X on channel %d, got ID 0x%08X", - expected_method, - expected_channel, - frame.payload.method.id); + + if (frame.channel != expected_channel) + amqp_abort("Expected 0x%08X method frame on channel %d, got frame on channel %d", + expected_method, + expected_channel, + frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) + amqp_abort("Expected 0x%08X method frame on channel %d, got frame type %d", + expected_method, + expected_channel, + frame.frame_type); + if (frame.payload.method.id != expected_method) + amqp_abort("Expected method ID 0x%08X on channel %d, got ID 0x%08X", + expected_method, + expected_channel, + frame.payload.method.id); *output = frame.payload.method; return 0; } @@ -388,19 +384,23 @@ static int amqp_login_inner(amqp_connection_state_t state, } { - amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, sasl_method, vl); amqp_connection_start_ok_t s; - if (response_bytes.bytes == NULL) { + amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, + sasl_method, vl); + + if (response_bytes.bytes == NULL) return -ERROR_NO_MEMORY; - } - s = - (amqp_connection_start_ok_t) { - .client_properties = {.num_entries = 0, .entries = NULL}, - .mechanism = sasl_method_name(sasl_method), - .response = response_bytes, - .locale = {.len = 5, .bytes = "en_US"} - }; - AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s)); + + s.client_properties.num_entries = 0; + s.client_properties.entries = NULL; + s.mechanism = sasl_method_name(sasl_method); + s.response = response_bytes; + s.locale.bytes = "en_US"; + s.locale.len = 5; + + res = amqp_send_method(state, 0, AMQP_CONNECTION_START_OK_METHOD, &s); + if (res < 0) + return res; } amqp_release_buffers(state); @@ -409,7 +409,7 @@ static int amqp_login_inner(amqp_connection_state_t state, &method); if (res < 0) return res; - + { amqp_connection_tune_t *s = (amqp_connection_tune_t *) method.decoded; server_channel_max = s->channel_max; @@ -417,28 +417,28 @@ static int amqp_login_inner(amqp_connection_state_t state, server_heartbeat = s->heartbeat; } - if (server_channel_max != 0 && server_channel_max < channel_max) { + if (server_channel_max != 0 && server_channel_max < channel_max) channel_max = server_channel_max; - } - if (server_frame_max != 0 && server_frame_max < frame_max) { + if (server_frame_max != 0 && server_frame_max < frame_max) frame_max = server_frame_max; - } - if (server_heartbeat != 0 && server_heartbeat < heartbeat) { + if (server_heartbeat != 0 && server_heartbeat < heartbeat) heartbeat = server_heartbeat; - } - AMQP_CHECK_RESULT(amqp_tune_connection(state, channel_max, frame_max, heartbeat)); + res = amqp_tune_connection(state, channel_max, frame_max, heartbeat); + if (res < 0) + return res; { - amqp_connection_tune_ok_t s = - (amqp_connection_tune_ok_t) { - .channel_max = channel_max, - .frame_max = frame_max, - .heartbeat = heartbeat - }; - AMQP_CHECK_RESULT(amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s)); + amqp_connection_tune_ok_t s; + s.frame_max = frame_max; + s.channel_max = channel_max; + s.heartbeat = heartbeat; + + res = amqp_send_method(state, 0, AMQP_CONNECTION_TUNE_OK_METHOD, &s); + if (res < 0) + return res; } amqp_release_buffers(state); @@ -470,21 +470,20 @@ amqp_rpc_reply_t amqp_login(amqp_connection_state_t state, } { - amqp_connection_open_t s = - (amqp_connection_open_t) { - .virtual_host = amqp_cstring_bytes(vhost), - .capabilities = {.len = 0, .bytes = NULL}, - .insist = 1 - }; amqp_method_number_t replies[] = { AMQP_CONNECTION_OPEN_OK_METHOD, 0 }; + amqp_connection_open_t s; + s.virtual_host = amqp_cstring_bytes(vhost); + s.capabilities.len = 0; + s.capabilities.bytes = NULL; + s.insist = 1; + result = amqp_simple_rpc(state, 0, AMQP_CONNECTION_OPEN_METHOD, (amqp_method_number_t *) &replies, &s); - if (result.reply_type != AMQP_RESPONSE_NORMAL) { + if (result.reply_type != AMQP_RESPONSE_NORMAL) return result; - } } amqp_maybe_release_buffers(state); |