diff options
Diffstat (limited to 'librabbitmq/amqp_connection.c')
-rw-r--r-- | librabbitmq/amqp_connection.c | 312 |
1 files changed, 144 insertions, 168 deletions
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 529c4ef..034b2e9 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -38,11 +38,11 @@ #endif #ifdef _MSC_VER -# define _CRT_SECURE_NO_WARNINGS +#define _CRT_SECURE_NO_WARNINGS #endif -#include "amqp_tcp_socket.h" #include "amqp_private.h" +#include "amqp_tcp_socket.h" #include "amqp_time.h" #include <errno.h> #include <stdint.h> @@ -73,11 +73,10 @@ _wanted_state, _check_state->state); \ } -amqp_connection_state_t amqp_new_connection(void) -{ +amqp_connection_state_t amqp_new_connection(void) { int res; - amqp_connection_state_t state = - (amqp_connection_state_t) calloc(1, sizeof(struct amqp_connection_state_t_)); + amqp_connection_state_t state = (amqp_connection_state_t)calloc( + 1, sizeof(struct amqp_connection_state_t_)); if (state == NULL) { return NULL; @@ -97,7 +96,8 @@ amqp_connection_state_t amqp_new_connection(void) state->target_size = 8; state->sock_inbound_buffer.len = AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE; - state->sock_inbound_buffer.bytes = malloc(AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE); + state->sock_inbound_buffer.bytes = + malloc(AMQP_INITIAL_INBOUND_SOCK_BUFFER_SIZE); if (state->sock_inbound_buffer.bytes == NULL) { goto out_nomem; } @@ -117,14 +117,11 @@ out_nomem: return NULL; } -int amqp_get_sockfd(amqp_connection_state_t state) -{ +int amqp_get_sockfd(amqp_connection_state_t state) { return state->socket ? amqp_socket_get_sockfd(state->socket) : -1; } -void amqp_set_sockfd(amqp_connection_state_t state, - int sockfd) -{ +void amqp_set_sockfd(amqp_connection_state_t state, int sockfd) { amqp_socket_t *socket = amqp_tcp_socket_new(state); if (!socket) { amqp_abort("%s", strerror(errno)); @@ -132,23 +129,17 @@ void amqp_set_sockfd(amqp_connection_state_t state, amqp_tcp_socket_set_sockfd(socket, sockfd); } -void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) -{ +void amqp_set_socket(amqp_connection_state_t state, amqp_socket_t *socket) { amqp_socket_delete(state->socket); state->socket = socket; } -amqp_socket_t * -amqp_get_socket(amqp_connection_state_t state) -{ +amqp_socket_t *amqp_get_socket(amqp_connection_state_t state) { return state->socket; } -int amqp_tune_connection(amqp_connection_state_t state, - int channel_max, - int frame_max, - int heartbeat) -{ +int amqp_tune_connection(amqp_connection_state_t state, int channel_max, + int frame_max, int heartbeat) { void *newbuf; int res; @@ -183,23 +174,19 @@ int amqp_tune_connection(amqp_connection_state_t state, return AMQP_STATUS_OK; } -int amqp_get_channel_max(amqp_connection_state_t state) -{ +int amqp_get_channel_max(amqp_connection_state_t state) { return state->channel_max; } -int amqp_get_frame_max(amqp_connection_state_t state) -{ +int amqp_get_frame_max(amqp_connection_state_t state) { return state->frame_max; } -int amqp_get_heartbeat(amqp_connection_state_t state) -{ +int amqp_get_heartbeat(amqp_connection_state_t state) { return state->heartbeat; } -int amqp_destroy_connection(amqp_connection_state_t state) -{ +int amqp_destroy_connection(amqp_connection_state_t state) { int status = AMQP_STATUS_OK; if (state) { int i; @@ -222,8 +209,7 @@ int amqp_destroy_connection(amqp_connection_state_t state) return status; } -static void return_to_idle(amqp_connection_state_t state) -{ +static void return_to_idle(amqp_connection_state_t state) { state->inbound_buffer.len = sizeof(state->header_buffer); state->inbound_buffer.bytes = state->header_buffer; state->inbound_offset = 0; @@ -232,8 +218,7 @@ static void return_to_idle(amqp_connection_state_t state) } static size_t consume_data(amqp_connection_state_t state, - amqp_bytes_t *received_data) -{ + amqp_bytes_t *received_data) { /* how much data is available and will fit? */ size_t bytes_consumed = state->target_size - state->inbound_offset; if (received_data->len < bytes_consumed) { @@ -249,10 +234,8 @@ static size_t consume_data(amqp_connection_state_t state, return bytes_consumed; } -int amqp_handle_input(amqp_connection_state_t state, - amqp_bytes_t received_data, - amqp_frame_t *decoded_frame) -{ +int amqp_handle_input(amqp_connection_state_t state, amqp_bytes_t received_data, + amqp_frame_t *decoded_frame) { size_t bytes_consumed; void *raw_frame; @@ -279,174 +262,172 @@ int amqp_handle_input(amqp_connection_state_t state, raw_frame = state->inbound_buffer.bytes; switch (state->state) { - case CONNECTION_STATE_INITIAL: - /* check for a protocol header from the server */ - if (memcmp(raw_frame, "AMQP", 4) == 0) { - decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; - decoded_frame->channel = 0; - - decoded_frame->payload.protocol_header.transport_high - = amqp_d8(amqp_offset(raw_frame, 4)); - decoded_frame->payload.protocol_header.transport_low - = amqp_d8(amqp_offset(raw_frame, 5)); - decoded_frame->payload.protocol_header.protocol_version_major - = amqp_d8(amqp_offset(raw_frame, 6)); - decoded_frame->payload.protocol_header.protocol_version_minor - = amqp_d8(amqp_offset(raw_frame, 7)); - - return_to_idle(state); - return (int)bytes_consumed; - } + case CONNECTION_STATE_INITIAL: + /* check for a protocol header from the server */ + if (memcmp(raw_frame, "AMQP", 4) == 0) { + decoded_frame->frame_type = AMQP_PSEUDOFRAME_PROTOCOL_HEADER; + decoded_frame->channel = 0; + + decoded_frame->payload.protocol_header.transport_high = + amqp_d8(amqp_offset(raw_frame, 4)); + decoded_frame->payload.protocol_header.transport_low = + amqp_d8(amqp_offset(raw_frame, 5)); + decoded_frame->payload.protocol_header.protocol_version_major = + amqp_d8(amqp_offset(raw_frame, 6)); + decoded_frame->payload.protocol_header.protocol_version_minor = + amqp_d8(amqp_offset(raw_frame, 7)); + + return_to_idle(state); + return (int)bytes_consumed; + } /* it's not a protocol header; fall through to process it as a regular frame header */ - case CONNECTION_STATE_HEADER: { - amqp_channel_t channel; - amqp_pool_t *channel_pool; - /* frame length is 3 bytes in */ - channel = amqp_d16(amqp_offset(raw_frame, 1)); + case CONNECTION_STATE_HEADER: { + amqp_channel_t channel; + amqp_pool_t *channel_pool; + /* frame length is 3 bytes in */ + channel = amqp_d16(amqp_offset(raw_frame, 1)); - state->target_size - = amqp_d32(amqp_offset(raw_frame, 3)) + HEADER_SIZE + FOOTER_SIZE; + state->target_size = + amqp_d32(amqp_offset(raw_frame, 3)) + HEADER_SIZE + FOOTER_SIZE; - if ((size_t)state->frame_max < state->target_size) { - return AMQP_STATUS_BAD_AMQP_DATA; - } + if ((size_t)state->frame_max < state->target_size) { + return AMQP_STATUS_BAD_AMQP_DATA; + } - channel_pool = amqp_get_or_create_channel_pool(state, channel); - if (NULL == channel_pool) { - return AMQP_STATUS_NO_MEMORY; - } + channel_pool = amqp_get_or_create_channel_pool(state, channel); + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; + } - amqp_pool_alloc_bytes(channel_pool, state->target_size, &state->inbound_buffer); - if (NULL == state->inbound_buffer.bytes) { - return AMQP_STATUS_NO_MEMORY; - } - memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE); - raw_frame = state->inbound_buffer.bytes; + amqp_pool_alloc_bytes(channel_pool, state->target_size, + &state->inbound_buffer); + if (NULL == state->inbound_buffer.bytes) { + return AMQP_STATUS_NO_MEMORY; + } + memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE); + raw_frame = state->inbound_buffer.bytes; - state->state = CONNECTION_STATE_BODY; + state->state = CONNECTION_STATE_BODY; - bytes_consumed += consume_data(state, &received_data); + bytes_consumed += consume_data(state, &received_data); - /* do we have target_size data yet? if not, return with the - expectation that more will arrive */ - if (state->inbound_offset < state->target_size) { - return (int)bytes_consumed; + /* do we have target_size data yet? if not, return with the + expectation that more will arrive */ + if (state->inbound_offset < state->target_size) { + return (int)bytes_consumed; + } } - - } /* fall through to process body */ - case CONNECTION_STATE_BODY: { - amqp_bytes_t encoded; - int res; - amqp_pool_t *channel_pool; - - /* Check frame end marker (footer) */ - if (amqp_d8(amqp_offset(raw_frame, state->target_size - 1)) != - AMQP_FRAME_END) { - return AMQP_STATUS_BAD_AMQP_DATA; - } + case CONNECTION_STATE_BODY: { + amqp_bytes_t encoded; + int res; + amqp_pool_t *channel_pool; - decoded_frame->frame_type = amqp_d8(amqp_offset(raw_frame, 0)); - decoded_frame->channel = amqp_d16(amqp_offset(raw_frame, 1)); - - channel_pool = amqp_get_or_create_channel_pool(state, decoded_frame->channel); - if (NULL == channel_pool) { - return AMQP_STATUS_NO_MEMORY; - } + /* Check frame end marker (footer) */ + if (amqp_d8(amqp_offset(raw_frame, state->target_size - 1)) != + AMQP_FRAME_END) { + return AMQP_STATUS_BAD_AMQP_DATA; + } - switch (decoded_frame->frame_type) { - case AMQP_FRAME_METHOD: - decoded_frame->payload.method.id = - amqp_d32(amqp_offset(raw_frame, HEADER_SIZE)); - encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4); - encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; + decoded_frame->frame_type = amqp_d8(amqp_offset(raw_frame, 0)); + decoded_frame->channel = amqp_d16(amqp_offset(raw_frame, 1)); - res = amqp_decode_method(decoded_frame->payload.method.id, - channel_pool, encoded, - &decoded_frame->payload.method.decoded); - if (res < 0) { - return res; + channel_pool = + amqp_get_or_create_channel_pool(state, decoded_frame->channel); + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; } - break; + switch (decoded_frame->frame_type) { + case AMQP_FRAME_METHOD: + decoded_frame->payload.method.id = + amqp_d32(amqp_offset(raw_frame, HEADER_SIZE)); + encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 4); + encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; - case AMQP_FRAME_HEADER: - decoded_frame->payload.properties.class_id - = amqp_d16(amqp_offset(raw_frame, HEADER_SIZE)); - /* unused 2-byte weight field goes here */ - decoded_frame->payload.properties.body_size - = amqp_d64(amqp_offset(raw_frame, HEADER_SIZE + 4)); - encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); - encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; - decoded_frame->payload.properties.raw = encoded; - - res = amqp_decode_properties(decoded_frame->payload.properties.class_id, + res = amqp_decode_method(decoded_frame->payload.method.id, channel_pool, encoded, - &decoded_frame->payload.properties.decoded); - if (res < 0) { - return res; + &decoded_frame->payload.method.decoded); + if (res < 0) { + return res; + } + + break; + + case AMQP_FRAME_HEADER: + decoded_frame->payload.properties.class_id = + amqp_d16(amqp_offset(raw_frame, HEADER_SIZE)); + /* unused 2-byte weight field goes here */ + decoded_frame->payload.properties.body_size = + amqp_d64(amqp_offset(raw_frame, HEADER_SIZE + 4)); + encoded.bytes = amqp_offset(raw_frame, HEADER_SIZE + 12); + encoded.len = state->target_size - HEADER_SIZE - 12 - FOOTER_SIZE; + decoded_frame->payload.properties.raw = encoded; + + res = amqp_decode_properties( + decoded_frame->payload.properties.class_id, channel_pool, encoded, + &decoded_frame->payload.properties.decoded); + if (res < 0) { + return res; + } + + break; + + case AMQP_FRAME_BODY: + decoded_frame->payload.body_fragment.len = + state->target_size - HEADER_SIZE - FOOTER_SIZE; + decoded_frame->payload.body_fragment.bytes = + amqp_offset(raw_frame, HEADER_SIZE); + break; + + case AMQP_FRAME_HEARTBEAT: + break; + + default: + /* Ignore the frame */ + decoded_frame->frame_type = 0; + break; } - break; - - case AMQP_FRAME_BODY: - decoded_frame->payload.body_fragment.len - = state->target_size - HEADER_SIZE - FOOTER_SIZE; - decoded_frame->payload.body_fragment.bytes - = amqp_offset(raw_frame, HEADER_SIZE); - break; - - case AMQP_FRAME_HEARTBEAT: - break; - - default: - /* Ignore the frame */ - decoded_frame->frame_type = 0; - break; + return_to_idle(state); + return (int)bytes_consumed; } - return_to_idle(state); - return (int)bytes_consumed; - } - - default: - amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", - state->state); + default: + amqp_abort("Internal error: invalid amqp_connection_state_t->state %d", + state->state); } } -amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) -{ +amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { return (state->state == CONNECTION_STATE_IDLE); } -void amqp_release_buffers(amqp_connection_state_t state) -{ +void amqp_release_buffers(amqp_connection_state_t state) { int i; ENFORCE_STATE(state, CONNECTION_STATE_IDLE); for (i = 0; i < POOL_TABLE_SIZE; ++i) { amqp_pool_table_entry_t *entry = state->pool_table[i]; - for ( ;NULL != entry; entry = entry->next) { + for (; NULL != entry; entry = entry->next) { amqp_maybe_release_buffers_on_channel(state, entry->channel); } } } -void amqp_maybe_release_buffers(amqp_connection_state_t state) -{ +void amqp_maybe_release_buffers(amqp_connection_state_t state) { if (amqp_release_buffers_ok(state)) { amqp_release_buffers(state); } } -void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel) -{ +void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, + amqp_channel_t channel) { amqp_link_t *queued_link; amqp_pool_t *pool; if (CONNECTION_STATE_IDLE != state->state) { @@ -547,8 +528,7 @@ static int amqp_frame_to_bytes(const amqp_frame_t *frame, amqp_bytes_t buffer, return AMQP_STATUS_OK; } -int amqp_send_frame(amqp_connection_state_t state, - const amqp_frame_t *frame) { +int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) { return amqp_send_frame_inner(state, frame, AMQP_SF_NONE, amqp_time_infinite()); } @@ -596,7 +576,7 @@ start_send: return res; } - encoded.bytes = (uint8_t*)encoded.bytes + sent; + encoded.bytes = (uint8_t *)encoded.bytes + sent; encoded.len -= sent; goto start_send; } @@ -606,14 +586,10 @@ start_send: return res; } -amqp_table_t * -amqp_get_server_properties(amqp_connection_state_t state) -{ +amqp_table_t *amqp_get_server_properties(amqp_connection_state_t state) { return &state->server_properties; } -amqp_table_t * -amqp_get_client_properties(amqp_connection_state_t state) -{ +amqp_table_t *amqp_get_client_properties(amqp_connection_state_t state) { return &state->client_properties; } |