From 33ebeede97a81c2e82ac0c3a6d88d4db0695bf29 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Wed, 26 Jun 2013 14:23:16 -0700 Subject: Add a high level API for consuming messages --- librabbitmq/amqp_consumer.c | 293 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 librabbitmq/amqp_consumer.c (limited to 'librabbitmq/amqp_consumer.c') diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c new file mode 100644 index 0000000..25b676e --- /dev/null +++ b/librabbitmq/amqp_consumer.c @@ -0,0 +1,293 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2013 + * Alan Antonuk. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ +#include "amqp.h" +#include "amqp_private.h" +#include "amqp_socket.h" + +#include +#include + +static +int amqp_basic_properties_clone(amqp_basic_properties_t *original, + amqp_basic_properties_t *clone, + amqp_pool_t *pool) +{ + memset(clone, 0, sizeof(amqp_basic_properties_t)); + clone->_flags = original->_flags; + +#define CLONE_BYTES_POOL(original, clone, pool) \ + amqp_pool_alloc_bytes(pool, original.len, &clone); \ + if (NULL == clone.bytes) { \ + return AMQP_STATUS_NO_MEMORY; \ + } \ + memcpy(clone.bytes, original.bytes, clone.len); + + if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + CLONE_BYTES_POOL(original->content_type, clone->content_type, pool) + } + + if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) { + CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool) + } + + if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) { + int res = amqp_table_clone(&original->headers, &clone->headers, pool); + if (AMQP_STATUS_OK != res) { + return res; + } + } + + if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) { + clone->delivery_mode = original->delivery_mode; + } + + if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) { + clone->priority = original->priority; + } + + if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) { + CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool) + } + + if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) { + CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool) + } + + if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) { + CLONE_BYTES_POOL(original->expiration, clone->expiration, pool) + } + + if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) { + CLONE_BYTES_POOL(original->message_id, clone->message_id, pool) + } + + if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) { + clone->timestamp = original->timestamp; + } + + if (clone->_flags & AMQP_BASIC_TYPE_FLAG) { + CLONE_BYTES_POOL(original->type, clone->type, pool) + } + + if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) { + CLONE_BYTES_POOL(original->user_id, clone->user_id, pool) + } + + if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) { + CLONE_BYTES_POOL(original->app_id, clone->app_id, pool) + } + + if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) { + CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool) + } + + return AMQP_STATUS_OK; +#undef CLONE_BYTES_POOL +} + + +void amqp_destroy_message(amqp_message_t *message) +{ + empty_amqp_pool(&message->pool); + amqp_bytes_free(message->body); +} + +void amqp_destroy_envelope(amqp_envelope_t *envelope) +{ + amqp_destroy_message(&envelope->message); + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +} + + +amqp_rpc_reply_t +amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, + struct timeval *timeout, AMQP_UNUSED int flags) +{ + int res; + amqp_frame_t frame; + amqp_basic_deliver_t *delivery_method; + amqp_rpc_reply_t ret; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(envelope, 0, sizeof(amqp_envelope_t)); + + res = amqp_simple_wait_frame_noblock(state, &frame, timeout); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out1; + } + + if (AMQP_FRAME_METHOD != frame.frame_type + || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) { + amqp_queue_frame(state, &frame); + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + goto error_out1; + } + + delivery_method = frame.payload.method.decoded; + + envelope->channel = frame.channel; + envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag); + envelope->delivery_tag = delivery_method->delivery_tag; + envelope->redelivered = delivery_method->redelivered; + envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange); + envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key); + + if (NULL == envelope->consumer_tag.bytes || + NULL == envelope->exchange.bytes || + NULL == envelope->routing_key.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out2; + } + + ret = amqp_read_message(state, envelope->channel, &envelope->message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + goto error_out2; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +error_out1: + return ret; +} + +amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_message_t *message, + AMQP_UNUSED int flags) +{ + amqp_frame_t frame; + amqp_rpc_reply_t ret; + + size_t body_read; + char *body_read_ptr; + int res; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(message, 0, sizeof(amqp_message_t)); + + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + + goto error_out1; + } + + if (AMQP_FRAME_HEADER != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + + amqp_queue_frame(state, &frame); + } + goto error_out1; + } + + init_amqp_pool(&message->pool, 4096); + res = amqp_basic_properties_clone(frame.payload.properties.decoded, + &message->properties, &message->pool); + + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out3; + } + + message->body = amqp_bytes_malloc(frame.payload.properties.body_size); + if (NULL == message->body.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out1; + } + + body_read = 0; + body_read_ptr = message->body.bytes; + + while (body_read < message->body.len) { + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out2; + } + if (AMQP_FRAME_BODY != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + } + goto error_out2; + } + + if (body_read + frame.payload.body_fragment.len > message->body.len) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + goto error_out2; + } + + memcpy(body_read_ptr, frame.payload.body_fragment.bytes, frame.payload.body_fragment.len); + + body_read += frame.payload.body_fragment.len; + body_read_ptr += frame.payload.body_fragment.len; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(message->body); +error_out3: + empty_amqp_pool(&message->pool); +error_out1: + return ret; +} -- cgit v1.2.1 From 157788ef441a95e09cdf19b8988445f749e3316d Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Wed, 10 Jul 2013 10:37:20 -0700 Subject: FIX: basic_properties_clone handle 0-len strings Properly handle 0-length strings in amqp_basic_properties_clone() --- librabbitmq/amqp_consumer.c | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'librabbitmq/amqp_consumer.c') diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c index 25b676e..29da9ed 100644 --- a/librabbitmq/amqp_consumer.c +++ b/librabbitmq/amqp_consumer.c @@ -42,12 +42,16 @@ int amqp_basic_properties_clone(amqp_basic_properties_t *original, memset(clone, 0, sizeof(amqp_basic_properties_t)); clone->_flags = original->_flags; -#define CLONE_BYTES_POOL(original, clone, pool) \ - amqp_pool_alloc_bytes(pool, original.len, &clone); \ - if (NULL == clone.bytes) { \ - return AMQP_STATUS_NO_MEMORY; \ - } \ - memcpy(clone.bytes, original.bytes, clone.len); +#define CLONE_BYTES_POOL(original, clone, pool) \ + if (0 == original.len) { \ + clone = amqp_empty_bytes; \ + } else { \ + amqp_pool_alloc_bytes(pool, original.len, &clone); \ + if (NULL == clone.bytes) { \ + return AMQP_STATUS_NO_MEMORY; \ + } \ + memcpy(clone.bytes, original.bytes, clone.len); \ + } if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { CLONE_BYTES_POOL(original->content_type, clone->content_type, pool) -- cgit v1.2.1 From 4eaf771fa5f7807c38276b26aaa410bfd136d382 Mon Sep 17 00:00:00 2001 From: Alan Antonuk Date: Wed, 10 Jul 2013 10:44:41 -0700 Subject: FIX: handle 0-len msg body in amqp_read_message --- librabbitmq/amqp_consumer.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'librabbitmq/amqp_consumer.c') diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c index 29da9ed..6fb5b96 100644 --- a/librabbitmq/amqp_consumer.c +++ b/librabbitmq/amqp_consumer.c @@ -242,11 +242,15 @@ amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, goto error_out3; } - message->body = amqp_bytes_malloc(frame.payload.properties.body_size); - if (NULL == message->body.bytes) { - ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; - ret.library_error = AMQP_STATUS_NO_MEMORY; - goto error_out1; + if (0 == frame.payload.properties.body_size) { + message->body = amqp_empty_bytes; + } else { + message->body = amqp_bytes_malloc(frame.payload.properties.body_size); + if (NULL == message->body.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out1; + } } body_read = 0; -- cgit v1.2.1 From ce64e57df8c9f7c53dc98a265513e15960d1841a Mon Sep 17 00:00:00 2001 From: Oleg Blednov Date: Thu, 31 Oct 2013 18:16:04 +0400 Subject: Right unexpected frames requeue in amqp_consumer.c --- librabbitmq/amqp_consumer.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'librabbitmq/amqp_consumer.c') diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c index 6fb5b96..6c6c1c9 100644 --- a/librabbitmq/amqp_consumer.c +++ b/librabbitmq/amqp_consumer.c @@ -153,7 +153,7 @@ amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, if (AMQP_FRAME_METHOD != frame.frame_type || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) { - amqp_queue_frame(state, &frame); + amqp_put_back_frame(state, &frame); ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; goto error_out1; @@ -227,7 +227,7 @@ amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; - amqp_queue_frame(state, &frame); + amqp_put_back_frame(state, &frame); } goto error_out1; } -- cgit v1.2.1