diff options
author | Ask Solem <ask@celeryproject.org> | 2014-04-14 17:29:03 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2014-04-14 17:29:03 +0100 |
commit | be3000b4c84d7503f5ef4067de44ff16d060d158 (patch) | |
tree | fecacb0f149b067202c443b59aad3cc027a0ff1c /librabbitmq/amqp_consumer.c | |
parent | dcb8edaccd6e164d624edfab0f3120d96f707f0a (diff) | |
parent | fe844e41ffad5691607982cbfe4054aacdcb81e0 (diff) | |
download | rabbitmq-c-github-ask-be3000b4c84d7503f5ef4067de44ff16d060d158.tar.gz |
Merge branch 'alanxz/master'
Conflicts:
Makefile.am
codegen
Diffstat (limited to 'librabbitmq/amqp_consumer.c')
-rw-r--r-- | librabbitmq/amqp_consumer.c | 301 |
1 files changed, 301 insertions, 0 deletions
diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c new file mode 100644 index 0000000..6c6c1c9 --- /dev/null +++ b/librabbitmq/amqp_consumer.c @@ -0,0 +1,301 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2013 + * Alan Antonuk. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ +#include "amqp.h" +#include "amqp_private.h" +#include "amqp_socket.h" + +#include <stdlib.h> +#include <string.h> + +static +int amqp_basic_properties_clone(amqp_basic_properties_t *original, + amqp_basic_properties_t *clone, + amqp_pool_t *pool) +{ + memset(clone, 0, sizeof(amqp_basic_properties_t)); + clone->_flags = original->_flags; + +#define CLONE_BYTES_POOL(original, clone, pool) \ + if (0 == original.len) { \ + clone = amqp_empty_bytes; \ + } else { \ + amqp_pool_alloc_bytes(pool, original.len, &clone); \ + if (NULL == clone.bytes) { \ + return AMQP_STATUS_NO_MEMORY; \ + } \ + memcpy(clone.bytes, original.bytes, clone.len); \ + } + + if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + CLONE_BYTES_POOL(original->content_type, clone->content_type, pool) + } + + if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) { + CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool) + } + + if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) { + int res = amqp_table_clone(&original->headers, &clone->headers, pool); + if (AMQP_STATUS_OK != res) { + return res; + } + } + + if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) { + clone->delivery_mode = original->delivery_mode; + } + + if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) { + clone->priority = original->priority; + } + + if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) { + CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool) + } + + if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) { + CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool) + } + + if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) { + CLONE_BYTES_POOL(original->expiration, clone->expiration, pool) + } + + if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) { + CLONE_BYTES_POOL(original->message_id, clone->message_id, pool) + } + + if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) { + clone->timestamp = original->timestamp; + } + + if (clone->_flags & AMQP_BASIC_TYPE_FLAG) { + CLONE_BYTES_POOL(original->type, clone->type, pool) + } + + if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) { + CLONE_BYTES_POOL(original->user_id, clone->user_id, pool) + } + + if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) { + CLONE_BYTES_POOL(original->app_id, clone->app_id, pool) + } + + if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) { + CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool) + } + + return AMQP_STATUS_OK; +#undef CLONE_BYTES_POOL +} + + +void amqp_destroy_message(amqp_message_t *message) +{ + empty_amqp_pool(&message->pool); + amqp_bytes_free(message->body); +} + +void amqp_destroy_envelope(amqp_envelope_t *envelope) +{ + amqp_destroy_message(&envelope->message); + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +} + + +amqp_rpc_reply_t +amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, + struct timeval *timeout, AMQP_UNUSED int flags) +{ + int res; + amqp_frame_t frame; + amqp_basic_deliver_t *delivery_method; + amqp_rpc_reply_t ret; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(envelope, 0, sizeof(amqp_envelope_t)); + + res = amqp_simple_wait_frame_noblock(state, &frame, timeout); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out1; + } + + if (AMQP_FRAME_METHOD != frame.frame_type + || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) { + amqp_put_back_frame(state, &frame); + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + goto error_out1; + } + + delivery_method = frame.payload.method.decoded; + + envelope->channel = frame.channel; + envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag); + envelope->delivery_tag = delivery_method->delivery_tag; + envelope->redelivered = delivery_method->redelivered; + envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange); + envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key); + + if (NULL == envelope->consumer_tag.bytes || + NULL == envelope->exchange.bytes || + NULL == envelope->routing_key.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out2; + } + + ret = amqp_read_message(state, envelope->channel, &envelope->message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + goto error_out2; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +error_out1: + return ret; +} + +amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_message_t *message, + AMQP_UNUSED int flags) +{ + amqp_frame_t frame; + amqp_rpc_reply_t ret; + + size_t body_read; + char *body_read_ptr; + int res; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(message, 0, sizeof(amqp_message_t)); + + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + + goto error_out1; + } + + if (AMQP_FRAME_HEADER != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + + amqp_put_back_frame(state, &frame); + } + goto error_out1; + } + + init_amqp_pool(&message->pool, 4096); + res = amqp_basic_properties_clone(frame.payload.properties.decoded, + &message->properties, &message->pool); + + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out3; + } + + if (0 == frame.payload.properties.body_size) { + message->body = amqp_empty_bytes; + } else { + message->body = amqp_bytes_malloc(frame.payload.properties.body_size); + if (NULL == message->body.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out1; + } + } + + body_read = 0; + body_read_ptr = message->body.bytes; + + while (body_read < message->body.len) { + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out2; + } + if (AMQP_FRAME_BODY != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + } + goto error_out2; + } + + if (body_read + frame.payload.body_fragment.len > message->body.len) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + goto error_out2; + } + + memcpy(body_read_ptr, frame.payload.body_fragment.bytes, frame.payload.body_fragment.len); + + body_read += frame.payload.body_fragment.len; + body_read_ptr += frame.payload.body_fragment.len; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(message->body); +error_out3: + empty_amqp_pool(&message->pool); +error_out1: + return ret; +} |