diff options
author | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-21 16:57:32 -0700 |
---|---|---|
committer | Alan Antonuk <alan.antonuk@gmail.com> | 2013-06-21 16:57:32 -0700 |
commit | 4a2d899cd3ae3ef8bb9305eddd88c95d3dfc0463 (patch) | |
tree | 5d40072884197ddcd5cb3e71ecb4455f30a7881b | |
parent | 837a0b540595f8d0fab3214b0126ef436712aa98 (diff) | |
download | rabbitmq-c-4a2d899cd3ae3ef8bb9305eddd88c95d3dfc0463.tar.gz |
Channel-based memory management
Assign a decoding pool on a per-channel basis. This allows memory to be
released on a per-channel basis which is helpful for clients handling
multiple channels
-rw-r--r-- | librabbitmq/amqp.h | 4 | ||||
-rw-r--r-- | librabbitmq/amqp_connection.c | 111 | ||||
-rw-r--r-- | librabbitmq/amqp_mem.c | 43 | ||||
-rw-r--r-- | librabbitmq/amqp_private.h | 19 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 29 |
5 files changed, 164 insertions, 42 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 3c77a5d..de2afbd 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -461,6 +461,10 @@ void AMQP_CALL amqp_maybe_release_buffers(amqp_connection_state_t state); AMQP_PUBLIC_FUNCTION +void +AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel); + +AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame); diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c index 8d7f7b5..ae3edf4 100644 --- a/librabbitmq/amqp_connection.c +++ b/librabbitmq/amqp_connection.c @@ -71,18 +71,13 @@ amqp_connection_state_t amqp_new_connection(void) return NULL; } - init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE); - init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE); - res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0); if (0 != res) { goto out_nomem; } - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) { - goto out_nomem; - } + state->inbound_buffer.bytes = state->header_buffer; + state->inbound_buffer.len = sizeof(state->header_buffer); state->state = CONNECTION_STATE_INITIAL; /* the server protocol version response is 8 bytes, which conveniently @@ -99,8 +94,6 @@ amqp_connection_state_t amqp_new_connection(void) out_nomem: free(state->sock_inbound_buffer.bytes); - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); free(state); return NULL; } @@ -140,10 +133,6 @@ int amqp_tune_connection(amqp_connection_state_t state, state->frame_max = frame_max; state->heartbeat = heartbeat; - empty_amqp_pool(&state->frame_pool); - init_amqp_pool(&state->frame_pool, frame_max); - - state->inbound_buffer.len = frame_max; state->outbound_buffer.len = frame_max; newbuf = realloc(state->outbound_buffer.bytes, frame_max); if (newbuf == NULL) { @@ -163,8 +152,17 @@ int amqp_destroy_connection(amqp_connection_state_t state) { int status = AMQP_STATUS_OK; if (state) { - empty_amqp_pool(&state->frame_pool); - empty_amqp_pool(&state->decoding_pool); + int i; + for (i = 0; i < POOL_TABLE_SIZE; ++i) { + amqp_pool_table_entry_t *entry = state->pool_table[i]; + while (NULL != entry) { + amqp_pool_table_entry_t *todelete = entry; + empty_amqp_pool(&entry->pool); + entry = entry->next; + free(todelete); + } + } + free(state->outbound_buffer.bytes); free(state->sock_inbound_buffer.bytes); status = amqp_socket_close(state->socket); @@ -175,7 +173,8 @@ int amqp_destroy_connection(amqp_connection_state_t state) static void return_to_idle(amqp_connection_state_t state) { - state->inbound_buffer.bytes = NULL; + state->inbound_buffer.len = sizeof(state->header_buffer); + state->inbound_buffer.bytes = state->header_buffer; state->inbound_offset = 0; state->target_size = HEADER_SIZE; state->state = CONNECTION_STATE_IDLE; @@ -215,16 +214,6 @@ int amqp_handle_input(amqp_connection_state_t state, } if (state->state == CONNECTION_STATE_IDLE) { - state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, - state->inbound_buffer.len); - if (state->inbound_buffer.bytes == NULL) - /* state->inbound_buffer.len is always nonzero, because it - corresponds to frame_max, which is not permitted to be less - than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */ - { - return AMQP_STATUS_NO_MEMORY; - } - state->state = CONNECTION_STATE_HEADER; } @@ -261,10 +250,27 @@ int amqp_handle_input(amqp_connection_state_t state, /* it's not a protocol header; fall through to process it as a regular frame header */ - case CONNECTION_STATE_HEADER: + case CONNECTION_STATE_HEADER: { + amqp_channel_t channel; + amqp_pool_t *channel_pool; /* frame length is 3 bytes in */ + channel = amqp_d16(raw_frame, 1); + + channel_pool = amqp_get_or_create_channel_pool(state, channel); + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; + } + state->target_size = amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE; + + amqp_pool_alloc_bytes(channel_pool, state->target_size, &state->inbound_buffer); + if (NULL == state->inbound_buffer.bytes) { + return AMQP_STATUS_NO_MEMORY; + } + memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE); + raw_frame = state->inbound_buffer.bytes; + state->state = CONNECTION_STATE_BODY; bytes_consumed += consume_data(state, &received_data); @@ -275,11 +281,13 @@ int amqp_handle_input(amqp_connection_state_t state, return bytes_consumed; } + } /* fall through to process body */ case CONNECTION_STATE_BODY: { amqp_bytes_t encoded; int res; + amqp_pool_t *channel_pool; /* Check frame end marker (footer) */ if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) { @@ -289,6 +297,11 @@ int amqp_handle_input(amqp_connection_state_t state, decoded_frame->frame_type = amqp_d8(raw_frame, 0); decoded_frame->channel = amqp_d16(raw_frame, 1); + channel_pool = amqp_get_or_create_channel_pool(state, decoded_frame->channel); + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; + } + switch (decoded_frame->frame_type) { case AMQP_FRAME_METHOD: decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE); @@ -296,7 +309,7 @@ int amqp_handle_input(amqp_connection_state_t state, encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE; res = amqp_decode_method(decoded_frame->payload.method.id, - &state->decoding_pool, encoded, + channel_pool, encoded, &decoded_frame->payload.method.decoded); if (res < 0) { return res; @@ -315,7 +328,7 @@ int amqp_handle_input(amqp_connection_state_t state, decoded_frame->payload.properties.raw = encoded; res = amqp_decode_properties(decoded_frame->payload.properties.class_id, - &state->decoding_pool, encoded, + channel_pool, encoded, &decoded_frame->payload.properties.decoded); if (res < 0) { return res; @@ -351,19 +364,21 @@ int amqp_handle_input(amqp_connection_state_t state, amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state) { - return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL); + return (state->state == CONNECTION_STATE_IDLE); } void amqp_release_buffers(amqp_connection_state_t state) { + int i; ENFORCE_STATE(state, CONNECTION_STATE_IDLE); - if (state->first_queued_frame) { - amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued"); - } + for (i = 0; i < POOL_TABLE_SIZE; ++i) { + amqp_pool_table_entry_t *entry = state->pool_table[i]; - recycle_amqp_pool(&state->frame_pool); - recycle_amqp_pool(&state->decoding_pool); + for ( ;NULL != entry; entry = entry->next) { + amqp_maybe_release_buffers_on_channel(state, entry->channel); + } + } } void amqp_maybe_release_buffers(amqp_connection_state_t state) @@ -373,6 +388,32 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state) } } +void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel) +{ + amqp_link_t *queued_link; + amqp_pool_t *pool; + if (CONNECTION_STATE_IDLE != state->state) { + return; + } + + queued_link = state->first_queued_frame; + + while (NULL != queued_link) { + amqp_frame_t *frame = queued_link->data; + if (channel == frame->channel) { + return; + } + + queued_link = queued_link->next; + } + + pool = amqp_get_channel_pool(state, channel); + + if (pool != NULL) { + recycle_amqp_pool(pool); + } +} + int amqp_send_frame(amqp_connection_state_t state, const amqp_frame_t *frame) { diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c index b0844e5..88b1e9f 100644 --- a/librabbitmq/amqp_mem.c +++ b/librabbitmq/amqp_mem.c @@ -202,3 +202,46 @@ void amqp_bytes_free(amqp_bytes_t bytes) { free(bytes.bytes); } + +amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t state, amqp_channel_t channel) +{ + amqp_pool_table_entry_t *entry; + size_t index = channel % POOL_TABLE_SIZE; + + entry = state->pool_table[index]; + + for ( ; NULL != entry; entry = entry->next) { + if (channel == entry->channel) { + return &entry->pool; + } + } + + entry = malloc(sizeof(amqp_pool_table_entry_t)); + if (NULL == entry) { + return NULL; + } + + entry->channel = channel; + entry->next = state->pool_table[index]; + state->pool_table[index] = entry; + + init_amqp_pool(&entry->pool, state->frame_max); + + return &entry->pool; +} + +amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel) +{ + amqp_pool_table_entry_t *entry; + size_t index = channel % POOL_TABLE_SIZE; + + entry = state->pool_table[index]; + + for ( ; NULL != entry; entry = entry->next) { + if (channel == entry->channel) { + return &entry->pool; + } + } + + return NULL; +} diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h index e152c8c..baf96e1 100644 --- a/librabbitmq/amqp_private.h +++ b/librabbitmq/amqp_private.h @@ -119,15 +119,27 @@ typedef struct amqp_link_t_ { void *data; } amqp_link_t; +#define POOL_TABLE_SIZE 16 + +typedef struct amqp_pool_table_entry_t_ { + struct amqp_pool_table_entry_t_ *next; + amqp_pool_t pool; + amqp_channel_t channel; +} amqp_pool_table_entry_t; + struct amqp_connection_state_t_ { - amqp_pool_t frame_pool; - amqp_pool_t decoding_pool; + amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE]; amqp_connection_state_enum state; int channel_max; int frame_max; int heartbeat; + + /* buffer for holding frame headers. Allows us to delay allocating + * the raw frame buffer until the type, channel, and size are all known + */ + char header_buffer[HEADER_SIZE + 1]; amqp_bytes_t inbound_buffer; size_t inbound_offset; @@ -147,6 +159,9 @@ struct amqp_connection_state_t_ { amqp_rpc_reply_t most_recent_api_result; }; +amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel); +amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel); + static inline void *amqp_offset(void *data, size_t offset) { return (char *)data + offset; diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 5232330..0f15d07 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -575,8 +575,19 @@ retry: && (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD)) ) )) { - amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t)); - amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t)); + amqp_pool_t *channel_pool; + amqp_frame_t *frame_copy; + amqp_link_t *link; + + channel_pool = amqp_get_or_create_channel_pool(state, frame.channel); + if (NULL == channel_pool) { + result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + result.library_error = AMQP_STATUS_NO_MEMORY; + return result; + } + + frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); + link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); if (frame_copy == NULL || link == NULL) { result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; @@ -699,9 +710,17 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, amqp_table_entry_t default_properties[2]; amqp_table_t default_table; amqp_connection_start_ok_t s; - amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool, - sasl_method, vl); + amqp_pool_t *channel_pool; + amqp_bytes_t response_bytes; + + channel_pool = amqp_get_or_create_channel_pool(state, 0); + if (NULL == channel_pool) { + res = AMQP_STATUS_NO_MEMORY; + goto error_res; + } + response_bytes = sasl_response(channel_pool, + sasl_method, vl); if (response_bytes.bytes == NULL) { res = AMQP_STATUS_NO_MEMORY; goto error_res; @@ -735,7 +754,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state, int i; amqp_table_entry_t *current_entry; - s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool, + s.client_properties.entries = amqp_pool_alloc(channel_pool, sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries)); if (NULL == s.client_properties.entries) { res = AMQP_STATUS_NO_MEMORY; |