From ce64e57df8c9f7c53dc98a265513e15960d1841a Mon Sep 17 00:00:00 2001 From: Oleg Blednov Date: Thu, 31 Oct 2013 18:16:04 +0400 Subject: Right unexpected frames requeue in amqp_consumer.c --- librabbitmq/amqp_consumer.c | 4 ++-- librabbitmq/amqp_socket.c | 35 ++++++++++++++++++++++++++++++++--- librabbitmq/amqp_socket.h | 3 +++ 3 files changed, 37 insertions(+), 5 deletions(-) (limited to 'librabbitmq') diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c index 6fb5b96..6c6c1c9 100644 --- a/librabbitmq/amqp_consumer.c +++ b/librabbitmq/amqp_consumer.c @@ -153,7 +153,7 @@ amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, if (AMQP_FRAME_METHOD != frame.frame_type || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) { - amqp_queue_frame(state, &frame); + amqp_put_back_frame(state, &frame); ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; goto error_out1; @@ -227,7 +227,7 @@ amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; - amqp_queue_frame(state, &frame); + amqp_put_back_frame(state, &frame); } goto error_out1; } diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index 061e739..79a7696 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -786,7 +786,7 @@ beginrecv: } } -int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) +static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame) { amqp_link_t *link; amqp_frame_t *frame_copy; @@ -794,19 +794,29 @@ int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel); if (NULL == channel_pool) { - return AMQP_STATUS_NO_MEMORY; + return NULL; } link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); if (NULL == link || NULL == frame_copy) { - return AMQP_STATUS_NO_MEMORY; + return NULL; } *frame_copy = *frame; link->data = frame_copy; + return link; +} + +int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link = amqp_create_link_for_frame(state, frame); + if (NULL == link) { + return AMQP_STATUS_NO_MEMORY; + } + if (NULL == state->first_queued_frame) { state->first_queued_frame = link; } else { @@ -819,6 +829,25 @@ int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) return AMQP_STATUS_OK; } +int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link = amqp_create_link_for_frame(state, frame); + if (NULL == link) { + return AMQP_STATUS_NO_MEMORY; + } + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + state->last_queued_frame = link; + link->next = NULL; + } else { + link->next = state->first_queued_frame; + state->first_queued_frame = link; + } + + return AMQP_STATUS_OK; +} + int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, amqp_channel_t channel, amqp_frame_t *decoded_frame) diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index b7af785..f8b6f51 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -182,6 +182,9 @@ amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *t int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); +int +amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame); + int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, amqp_channel_t channel, -- cgit v1.2.1