summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-21 16:57:32 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-06-21 16:57:32 -0700
commit4a2d899cd3ae3ef8bb9305eddd88c95d3dfc0463 (patch)
tree5d40072884197ddcd5cb3e71ecb4455f30a7881b
parent837a0b540595f8d0fab3214b0126ef436712aa98 (diff)
downloadrabbitmq-c-4a2d899cd3ae3ef8bb9305eddd88c95d3dfc0463.tar.gz
Channel-based memory management
Assign a decoding pool on a per-channel basis. This allows memory to be released on a per-channel basis which is helpful for clients handling multiple channels
-rw-r--r--librabbitmq/amqp.h4
-rw-r--r--librabbitmq/amqp_connection.c111
-rw-r--r--librabbitmq/amqp_mem.c43
-rw-r--r--librabbitmq/amqp_private.h19
-rw-r--r--librabbitmq/amqp_socket.c29
5 files changed, 164 insertions, 42 deletions
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 3c77a5d..de2afbd 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -461,6 +461,10 @@ void
AMQP_CALL amqp_maybe_release_buffers(amqp_connection_state_t state);
AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel);
+
+AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_send_frame(amqp_connection_state_t state, amqp_frame_t const *frame);
diff --git a/librabbitmq/amqp_connection.c b/librabbitmq/amqp_connection.c
index 8d7f7b5..ae3edf4 100644
--- a/librabbitmq/amqp_connection.c
+++ b/librabbitmq/amqp_connection.c
@@ -71,18 +71,13 @@ amqp_connection_state_t amqp_new_connection(void)
return NULL;
}
- init_amqp_pool(&state->frame_pool, INITIAL_FRAME_POOL_PAGE_SIZE);
- init_amqp_pool(&state->decoding_pool, INITIAL_DECODING_POOL_PAGE_SIZE);
-
res = amqp_tune_connection(state, 0, INITIAL_FRAME_POOL_PAGE_SIZE, 0);
if (0 != res) {
goto out_nomem;
}
- state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool, state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL) {
- goto out_nomem;
- }
+ state->inbound_buffer.bytes = state->header_buffer;
+ state->inbound_buffer.len = sizeof(state->header_buffer);
state->state = CONNECTION_STATE_INITIAL;
/* the server protocol version response is 8 bytes, which conveniently
@@ -99,8 +94,6 @@ amqp_connection_state_t amqp_new_connection(void)
out_nomem:
free(state->sock_inbound_buffer.bytes);
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
free(state);
return NULL;
}
@@ -140,10 +133,6 @@ int amqp_tune_connection(amqp_connection_state_t state,
state->frame_max = frame_max;
state->heartbeat = heartbeat;
- empty_amqp_pool(&state->frame_pool);
- init_amqp_pool(&state->frame_pool, frame_max);
-
- state->inbound_buffer.len = frame_max;
state->outbound_buffer.len = frame_max;
newbuf = realloc(state->outbound_buffer.bytes, frame_max);
if (newbuf == NULL) {
@@ -163,8 +152,17 @@ int amqp_destroy_connection(amqp_connection_state_t state)
{
int status = AMQP_STATUS_OK;
if (state) {
- empty_amqp_pool(&state->frame_pool);
- empty_amqp_pool(&state->decoding_pool);
+ int i;
+ for (i = 0; i < POOL_TABLE_SIZE; ++i) {
+ amqp_pool_table_entry_t *entry = state->pool_table[i];
+ while (NULL != entry) {
+ amqp_pool_table_entry_t *todelete = entry;
+ empty_amqp_pool(&entry->pool);
+ entry = entry->next;
+ free(todelete);
+ }
+ }
+
free(state->outbound_buffer.bytes);
free(state->sock_inbound_buffer.bytes);
status = amqp_socket_close(state->socket);
@@ -175,7 +173,8 @@ int amqp_destroy_connection(amqp_connection_state_t state)
static void return_to_idle(amqp_connection_state_t state)
{
- state->inbound_buffer.bytes = NULL;
+ state->inbound_buffer.len = sizeof(state->header_buffer);
+ state->inbound_buffer.bytes = state->header_buffer;
state->inbound_offset = 0;
state->target_size = HEADER_SIZE;
state->state = CONNECTION_STATE_IDLE;
@@ -215,16 +214,6 @@ int amqp_handle_input(amqp_connection_state_t state,
}
if (state->state == CONNECTION_STATE_IDLE) {
- state->inbound_buffer.bytes = amqp_pool_alloc(&state->frame_pool,
- state->inbound_buffer.len);
- if (state->inbound_buffer.bytes == NULL)
- /* state->inbound_buffer.len is always nonzero, because it
- corresponds to frame_max, which is not permitted to be less
- than AMQP_FRAME_MIN_SIZE (currently 4096 bytes). */
- {
- return AMQP_STATUS_NO_MEMORY;
- }
-
state->state = CONNECTION_STATE_HEADER;
}
@@ -261,10 +250,27 @@ int amqp_handle_input(amqp_connection_state_t state,
/* it's not a protocol header; fall through to process it as a
regular frame header */
- case CONNECTION_STATE_HEADER:
+ case CONNECTION_STATE_HEADER: {
+ amqp_channel_t channel;
+ amqp_pool_t *channel_pool;
/* frame length is 3 bytes in */
+ channel = amqp_d16(raw_frame, 1);
+
+ channel_pool = amqp_get_or_create_channel_pool(state, channel);
+ if (NULL == channel_pool) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
state->target_size
= amqp_d32(raw_frame, 3) + HEADER_SIZE + FOOTER_SIZE;
+
+ amqp_pool_alloc_bytes(channel_pool, state->target_size, &state->inbound_buffer);
+ if (NULL == state->inbound_buffer.bytes) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+ memcpy(state->inbound_buffer.bytes, state->header_buffer, HEADER_SIZE);
+ raw_frame = state->inbound_buffer.bytes;
+
state->state = CONNECTION_STATE_BODY;
bytes_consumed += consume_data(state, &received_data);
@@ -275,11 +281,13 @@ int amqp_handle_input(amqp_connection_state_t state,
return bytes_consumed;
}
+ }
/* fall through to process body */
case CONNECTION_STATE_BODY: {
amqp_bytes_t encoded;
int res;
+ amqp_pool_t *channel_pool;
/* Check frame end marker (footer) */
if (amqp_d8(raw_frame, state->target_size - 1) != AMQP_FRAME_END) {
@@ -289,6 +297,11 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->frame_type = amqp_d8(raw_frame, 0);
decoded_frame->channel = amqp_d16(raw_frame, 1);
+ channel_pool = amqp_get_or_create_channel_pool(state, decoded_frame->channel);
+ if (NULL == channel_pool) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
switch (decoded_frame->frame_type) {
case AMQP_FRAME_METHOD:
decoded_frame->payload.method.id = amqp_d32(raw_frame, HEADER_SIZE);
@@ -296,7 +309,7 @@ int amqp_handle_input(amqp_connection_state_t state,
encoded.len = state->target_size - HEADER_SIZE - 4 - FOOTER_SIZE;
res = amqp_decode_method(decoded_frame->payload.method.id,
- &state->decoding_pool, encoded,
+ channel_pool, encoded,
&decoded_frame->payload.method.decoded);
if (res < 0) {
return res;
@@ -315,7 +328,7 @@ int amqp_handle_input(amqp_connection_state_t state,
decoded_frame->payload.properties.raw = encoded;
res = amqp_decode_properties(decoded_frame->payload.properties.class_id,
- &state->decoding_pool, encoded,
+ channel_pool, encoded,
&decoded_frame->payload.properties.decoded);
if (res < 0) {
return res;
@@ -351,19 +364,21 @@ int amqp_handle_input(amqp_connection_state_t state,
amqp_boolean_t amqp_release_buffers_ok(amqp_connection_state_t state)
{
- return (state->state == CONNECTION_STATE_IDLE) && (state->first_queued_frame == NULL);
+ return (state->state == CONNECTION_STATE_IDLE);
}
void amqp_release_buffers(amqp_connection_state_t state)
{
+ int i;
ENFORCE_STATE(state, CONNECTION_STATE_IDLE);
- if (state->first_queued_frame) {
- amqp_abort("Programming error: attempt to amqp_release_buffers while waiting events enqueued");
- }
+ for (i = 0; i < POOL_TABLE_SIZE; ++i) {
+ amqp_pool_table_entry_t *entry = state->pool_table[i];
- recycle_amqp_pool(&state->frame_pool);
- recycle_amqp_pool(&state->decoding_pool);
+ for ( ;NULL != entry; entry = entry->next) {
+ amqp_maybe_release_buffers_on_channel(state, entry->channel);
+ }
+ }
}
void amqp_maybe_release_buffers(amqp_connection_state_t state)
@@ -373,6 +388,32 @@ void amqp_maybe_release_buffers(amqp_connection_state_t state)
}
}
+void amqp_maybe_release_buffers_on_channel(amqp_connection_state_t state, amqp_channel_t channel)
+{
+ amqp_link_t *queued_link;
+ amqp_pool_t *pool;
+ if (CONNECTION_STATE_IDLE != state->state) {
+ return;
+ }
+
+ queued_link = state->first_queued_frame;
+
+ while (NULL != queued_link) {
+ amqp_frame_t *frame = queued_link->data;
+ if (channel == frame->channel) {
+ return;
+ }
+
+ queued_link = queued_link->next;
+ }
+
+ pool = amqp_get_channel_pool(state, channel);
+
+ if (pool != NULL) {
+ recycle_amqp_pool(pool);
+ }
+}
+
int amqp_send_frame(amqp_connection_state_t state,
const amqp_frame_t *frame)
{
diff --git a/librabbitmq/amqp_mem.c b/librabbitmq/amqp_mem.c
index b0844e5..88b1e9f 100644
--- a/librabbitmq/amqp_mem.c
+++ b/librabbitmq/amqp_mem.c
@@ -202,3 +202,46 @@ void amqp_bytes_free(amqp_bytes_t bytes)
{
free(bytes.bytes);
}
+
+amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t state, amqp_channel_t channel)
+{
+ amqp_pool_table_entry_t *entry;
+ size_t index = channel % POOL_TABLE_SIZE;
+
+ entry = state->pool_table[index];
+
+ for ( ; NULL != entry; entry = entry->next) {
+ if (channel == entry->channel) {
+ return &entry->pool;
+ }
+ }
+
+ entry = malloc(sizeof(amqp_pool_table_entry_t));
+ if (NULL == entry) {
+ return NULL;
+ }
+
+ entry->channel = channel;
+ entry->next = state->pool_table[index];
+ state->pool_table[index] = entry;
+
+ init_amqp_pool(&entry->pool, state->frame_max);
+
+ return &entry->pool;
+}
+
+amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel)
+{
+ amqp_pool_table_entry_t *entry;
+ size_t index = channel % POOL_TABLE_SIZE;
+
+ entry = state->pool_table[index];
+
+ for ( ; NULL != entry; entry = entry->next) {
+ if (channel == entry->channel) {
+ return &entry->pool;
+ }
+ }
+
+ return NULL;
+}
diff --git a/librabbitmq/amqp_private.h b/librabbitmq/amqp_private.h
index e152c8c..baf96e1 100644
--- a/librabbitmq/amqp_private.h
+++ b/librabbitmq/amqp_private.h
@@ -119,15 +119,27 @@ typedef struct amqp_link_t_ {
void *data;
} amqp_link_t;
+#define POOL_TABLE_SIZE 16
+
+typedef struct amqp_pool_table_entry_t_ {
+ struct amqp_pool_table_entry_t_ *next;
+ amqp_pool_t pool;
+ amqp_channel_t channel;
+} amqp_pool_table_entry_t;
+
struct amqp_connection_state_t_ {
- amqp_pool_t frame_pool;
- amqp_pool_t decoding_pool;
+ amqp_pool_table_entry_t *pool_table[POOL_TABLE_SIZE];
amqp_connection_state_enum state;
int channel_max;
int frame_max;
int heartbeat;
+
+ /* buffer for holding frame headers. Allows us to delay allocating
+ * the raw frame buffer until the type, channel, and size are all known
+ */
+ char header_buffer[HEADER_SIZE + 1];
amqp_bytes_t inbound_buffer;
size_t inbound_offset;
@@ -147,6 +159,9 @@ struct amqp_connection_state_t_ {
amqp_rpc_reply_t most_recent_api_result;
};
+amqp_pool_t *amqp_get_or_create_channel_pool(amqp_connection_state_t connection, amqp_channel_t channel);
+amqp_pool_t *amqp_get_channel_pool(amqp_connection_state_t state, amqp_channel_t channel);
+
static inline void *amqp_offset(void *data, size_t offset)
{
return (char *)data + offset;
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 5232330..0f15d07 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -575,8 +575,19 @@ retry:
&& (frame.payload.method.id == AMQP_CONNECTION_CLOSE_METHOD))
)
)) {
- amqp_frame_t *frame_copy = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_frame_t));
- amqp_link_t *link = amqp_pool_alloc(&state->decoding_pool, sizeof(amqp_link_t));
+ amqp_pool_t *channel_pool;
+ amqp_frame_t *frame_copy;
+ amqp_link_t *link;
+
+ channel_pool = amqp_get_or_create_channel_pool(state, frame.channel);
+ if (NULL == channel_pool) {
+ result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ result.library_error = AMQP_STATUS_NO_MEMORY;
+ return result;
+ }
+
+ frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
+ link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
if (frame_copy == NULL || link == NULL) {
result.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
@@ -699,9 +710,17 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
amqp_table_entry_t default_properties[2];
amqp_table_t default_table;
amqp_connection_start_ok_t s;
- amqp_bytes_t response_bytes = sasl_response(&state->decoding_pool,
- sasl_method, vl);
+ amqp_pool_t *channel_pool;
+ amqp_bytes_t response_bytes;
+
+ channel_pool = amqp_get_or_create_channel_pool(state, 0);
+ if (NULL == channel_pool) {
+ res = AMQP_STATUS_NO_MEMORY;
+ goto error_res;
+ }
+ response_bytes = sasl_response(channel_pool,
+ sasl_method, vl);
if (response_bytes.bytes == NULL) {
res = AMQP_STATUS_NO_MEMORY;
goto error_res;
@@ -735,7 +754,7 @@ static amqp_rpc_reply_t amqp_login_inner(amqp_connection_state_t state,
int i;
amqp_table_entry_t *current_entry;
- s.client_properties.entries = amqp_pool_alloc(&state->decoding_pool,
+ s.client_properties.entries = amqp_pool_alloc(channel_pool,
sizeof(amqp_table_entry_t) * (default_table.num_entries + client_properties->num_entries));
if (NULL == s.client_properties.entries) {
res = AMQP_STATUS_NO_MEMORY;