summaryrefslogtreecommitdiff
path: root/librabbitmq
diff options
context:
space:
mode:
authorOleg Blednov <blake-r@linkfeed.ru>2013-10-31 18:16:04 +0400
committerAlan Antonuk <alan.antonuk@gmail.com>2013-11-05 22:53:59 -0800
commitce64e57df8c9f7c53dc98a265513e15960d1841a (patch)
tree117f7e5024163da0ca3ece3ca4a23715e8bae19c /librabbitmq
parent715901d6755eddcd3a91b7b80d4699e1cb3414d7 (diff)
downloadrabbitmq-c-github-ask-ce64e57df8c9f7c53dc98a265513e15960d1841a.tar.gz
Right unexpected frames requeue in amqp_consumer.c
Diffstat (limited to 'librabbitmq')
-rw-r--r--librabbitmq/amqp_consumer.c4
-rw-r--r--librabbitmq/amqp_socket.c35
-rw-r--r--librabbitmq/amqp_socket.h3
3 files changed, 37 insertions, 5 deletions
diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c
index 6fb5b96..6c6c1c9 100644
--- a/librabbitmq/amqp_consumer.c
+++ b/librabbitmq/amqp_consumer.c
@@ -153,7 +153,7 @@ amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope,
if (AMQP_FRAME_METHOD != frame.frame_type
|| AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
- amqp_queue_frame(state, &frame);
+ amqp_put_back_frame(state, &frame);
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
goto error_out1;
@@ -227,7 +227,7 @@ amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state,
ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
- amqp_queue_frame(state, &frame);
+ amqp_put_back_frame(state, &frame);
}
goto error_out1;
}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index 061e739..79a7696 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -786,7 +786,7 @@ beginrecv:
}
}
-int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+static amqp_link_t * amqp_create_link_for_frame(amqp_connection_state_t state, amqp_frame_t *frame)
{
amqp_link_t *link;
amqp_frame_t *frame_copy;
@@ -794,19 +794,29 @@ int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel);
if (NULL == channel_pool) {
- return AMQP_STATUS_NO_MEMORY;
+ return NULL;
}
link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
if (NULL == link || NULL == frame_copy) {
- return AMQP_STATUS_NO_MEMORY;
+ return NULL;
}
*frame_copy = *frame;
link->data = frame_copy;
+ return link;
+}
+
+int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link = amqp_create_link_for_frame(state, frame);
+ if (NULL == link) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
if (NULL == state->first_queued_frame) {
state->first_queued_frame = link;
} else {
@@ -819,6 +829,25 @@ int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
return AMQP_STATUS_OK;
}
+int amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link = amqp_create_link_for_frame(state, frame);
+ if (NULL == link) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ state->last_queued_frame = link;
+ link->next = NULL;
+ } else {
+ link->next = state->first_queued_frame;
+ state->first_queued_frame = link;
+ }
+
+ return AMQP_STATUS_OK;
+}
+
int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_frame_t *decoded_frame)
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index b7af785..f8b6f51 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -183,6 +183,9 @@ int
amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);
int
+amqp_put_back_frame(amqp_connection_state_t state, amqp_frame_t *frame);
+
+int
amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_frame_t *decoded_frame);