diff options
-rw-r--r-- | Makefile.am | 3 | ||||
-rw-r--r-- | examples/amqp_consumer.c | 99 | ||||
-rw-r--r-- | examples/amqp_listen.c | 76 | ||||
-rw-r--r-- | examples/amqp_listenq.c | 78 | ||||
-rw-r--r-- | examples/amqps_consumer.c | 99 | ||||
-rw-r--r-- | examples/amqps_listen.c | 76 | ||||
-rw-r--r-- | examples/amqps_listenq.c | 78 | ||||
-rw-r--r-- | librabbitmq/CMakeLists.txt | 1 | ||||
-rw-r--r-- | librabbitmq/amqp.h | 100 | ||||
-rw-r--r-- | librabbitmq/amqp_consumer.c | 293 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.c | 74 | ||||
-rw-r--r-- | librabbitmq/amqp_socket.h | 7 |
12 files changed, 646 insertions, 338 deletions
diff --git a/Makefile.am b/Makefile.am index 989f5e9..1de9b0f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -22,7 +22,8 @@ librabbitmq_librabbitmq_la_SOURCES = \ librabbitmq/amqp_table.c \ librabbitmq/amqp_url.c \ librabbitmq/amqp_timer.h \ - librabbitmq/amqp_timer.c + librabbitmq/amqp_timer.c \ + librabbitmq/amqp_consumer.c if REGENERATE_AMQP_FRAMING librabbitmq_librabbitmq_la_SOURCES += librabbitmq/gen/amqp_framing.c diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c index 21a5b48..62da0ca 100644 --- a/examples/amqp_consumer.c +++ b/examples/amqp_consumer.c @@ -58,13 +58,13 @@ static void run(amqp_connection_state_t conn) uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; amqp_frame_t frame; - int result; - size_t body_received; - size_t body_target; uint64_t now; while (1) { + amqp_rpc_reply_t ret; + amqp_envelope_t envelope; + now = now_microseconds(); if (now > next_summary_time) { int countOverInterval = received - previous_received; @@ -78,45 +78,64 @@ static void run(amqp_connection_state_t conn) } amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } - - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; - } - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); + ret = amqp_consume_message(conn, &envelope, NULL, 0); + + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && + AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { + if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) { + return; + } + + if (AMQP_FRAME_METHOD == frame.frame_type) { + switch (frame.payload.method.id) { + case AMQP_BASIC_ACK_METHOD: + /* if we've turned publisher confirms on, and we've published a message + * here is a message being confirmed + */ + + break; + case AMQP_BASIC_RETURN_METHOD: + /* if a published message couldn't be routed and the mandatory flag was set + * this is what would be returned. The message then needs to be read. + */ + { + amqp_message_t message; + ret = amqp_read_message(conn, frame.channel, &message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + return; + } + + amqp_destroy_message(&message); + } + + break; + + case AMQP_CHANNEL_CLOSE_METHOD: + /* a channel.close method happens when a channel exception occurs, this + * can happen by publishing to an exchange that doesn't exist for example + * + * In this case you would need to open another channel redeclare any queues + * that were declared auto-delete, and restart any consumers that were attached + * to the previous channel + */ + return; + + case AMQP_CONNECTION_CLOSE_METHOD: + /* a connection.close method happens when a connection exception occurs, + * this can happen by trying to use a channel that isn't open for example. + * + * In this case the whole connection must be restarted. + */ + return; + + default: + fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id); + return; + } + } } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); } received++; diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c index bf5b716..ca7d538 100644 --- a/examples/amqp_listen.c +++ b/examples/amqp_listen.c @@ -104,80 +104,30 @@ int main(int argc, char const *const *argv) die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { - amqp_frame_t frame; - int result; - - amqp_basic_deliver_t *d; - amqp_basic_properties_t *p; - size_t body_target; - size_t body_received; - while (1) { + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - printf("Result %d\n", result); - if (result < 0) { - break; - } - printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } + res = amqp_consume_message(conn, &envelope, NULL, 0); - printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; + if (AMQP_RESPONSE_NORMAL != res.reply_type) { + break; } - d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } + (unsigned) envelope.delivery_tag, + (int) envelope.exchange.len, (char *) envelope.exchange.bytes, + (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes); - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - p = (amqp_basic_properties_t *) frame.payload.properties.decoded; - if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); + (int) envelope.message.properties.content_type.len, + (char *) envelope.message.properties.content_type.bytes); } - printf("----\n"); - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } - - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); - } - - if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; - } + amqp_destroy_envelope(&envelope); } } diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c index e76cdb1..b2e8094 100644 --- a/examples/amqp_listenq.c +++ b/examples/amqp_listenq.c @@ -85,82 +85,30 @@ int main(int argc, char const *const *argv) die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { - amqp_frame_t frame; - int result; - - amqp_basic_deliver_t *d; - amqp_basic_properties_t *p; - size_t body_target; - size_t body_received; - while (1) { + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - printf("Result %d\n", result); - if (result < 0) { - break; - } - printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } + res = amqp_consume_message(conn, &envelope, NULL, 0); - printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; + if (AMQP_RESPONSE_NORMAL != res.reply_type) { + break; } - d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } + (unsigned) envelope.delivery_tag, + (int) envelope.exchange.len, (char *) envelope.exchange.bytes, + (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes); - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - p = (amqp_basic_properties_t *) frame.payload.properties.decoded; - if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); - } - printf("----\n"); - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } - - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); - - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); - } - - if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; + (int) envelope.message.properties.content_type.len, + (char *) envelope.message.properties.content_type.bytes); } - amqp_basic_ack(conn, 1, d->delivery_tag, 0); + amqp_destroy_envelope(&envelope); } } diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c index fff6677..d4cd294 100644 --- a/examples/amqps_consumer.c +++ b/examples/amqps_consumer.c @@ -60,13 +60,13 @@ static void run(amqp_connection_state_t conn) uint64_t next_summary_time = start_time + SUMMARY_EVERY_US; amqp_frame_t frame; - int result; - size_t body_received; - size_t body_target; uint64_t now; while (1) { + amqp_rpc_reply_t ret; + amqp_envelope_t envelope; + now = now_microseconds(); if (now > next_summary_time) { int countOverInterval = received - previous_received; @@ -80,45 +80,64 @@ static void run(amqp_connection_state_t conn) } amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } - - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; - } - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - return; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); + ret = amqp_consume_message(conn, &envelope, NULL, 0); + + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type && + AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) { + if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) { + return; + } + + if (AMQP_FRAME_METHOD == frame.frame_type) { + switch (frame.payload.method.id) { + case AMQP_BASIC_ACK_METHOD: + /* if we've turned publisher confirms on, and we've published a message + * here is a message being confirmed + */ + + break; + case AMQP_BASIC_RETURN_METHOD: + /* if a published message couldn't be routed and the mandatory flag was set + * this is what would be returned. The message then needs to be read. + */ + { + amqp_message_t message; + ret = amqp_read_message(conn, frame.channel, &message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + return; + } + + amqp_destroy_message(&message); + } + + break; + + case AMQP_CHANNEL_CLOSE_METHOD: + /* a channel.close method happens when a channel exception occurs, this + * can happen by publishing to an exchange that doesn't exist for example + * + * In this case you would need to open another channel redeclare any queues + * that were declared auto-delete, and restart any consumers that were attached + * to the previous channel + */ + return; + + case AMQP_CONNECTION_CLOSE_METHOD: + /* a connection.close method happens when a connection exception occurs, + * this can happen by trying to use a channel that isn't open for example. + * + * In this case the whole connection must be restarted. + */ + return; + + default: + fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id); + return; + } + } } - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); } received++; diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c index a5eb692..44bb88c 100644 --- a/examples/amqps_listen.c +++ b/examples/amqps_listen.c @@ -121,80 +121,30 @@ int main(int argc, char const *const *argv) die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { - amqp_frame_t frame; - int result; - - amqp_basic_deliver_t *d; - amqp_basic_properties_t *p; - size_t body_target; - size_t body_received; - while (1) { + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - printf("Result %d\n", result); - if (result < 0) { - break; - } - printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } + res = amqp_consume_message(conn, &envelope, NULL, 0); - printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; + if (AMQP_RESPONSE_NORMAL != res.reply_type) { + break; } - d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } + (unsigned) envelope.delivery_tag, + (int) envelope.exchange.len, (char *) envelope.exchange.bytes, + (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes); - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - p = (amqp_basic_properties_t *) frame.payload.properties.decoded; - if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); + (int) envelope.message.properties.content_type.len, + (char *) envelope.message.properties.content_type.bytes); } - printf("----\n"); - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } - - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); - } - - if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; - } + amqp_destroy_envelope(&envelope); } } diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c index 0210d88..6643500 100644 --- a/examples/amqps_listenq.c +++ b/examples/amqps_listenq.c @@ -102,82 +102,30 @@ int main(int argc, char const *const *argv) die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); { - amqp_frame_t frame; - int result; - - amqp_basic_deliver_t *d; - amqp_basic_properties_t *p; - size_t body_target; - size_t body_received; - while (1) { + amqp_rpc_reply_t res; + amqp_envelope_t envelope; + amqp_maybe_release_buffers(conn); - result = amqp_simple_wait_frame(conn, &frame); - printf("Result %d\n", result); - if (result < 0) { - break; - } - printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel); - if (frame.frame_type != AMQP_FRAME_METHOD) { - continue; - } + res = amqp_consume_message(conn, &envelope, NULL, 0); - printf("Method %s\n", amqp_method_name(frame.payload.method.id)); - if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) { - continue; + if (AMQP_RESPONSE_NORMAL != res.reply_type) { + break; } - d = (amqp_basic_deliver_t *) frame.payload.method.decoded; printf("Delivery %u, exchange %.*s routingkey %.*s\n", - (unsigned) d->delivery_tag, - (int) d->exchange.len, (char *) d->exchange.bytes, - (int) d->routing_key.len, (char *) d->routing_key.bytes); - - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } + (unsigned) envelope.delivery_tag, + (int) envelope.exchange.len, (char *) envelope.exchange.bytes, + (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes); - if (frame.frame_type != AMQP_FRAME_HEADER) { - fprintf(stderr, "Expected header!"); - abort(); - } - p = (amqp_basic_properties_t *) frame.payload.properties.decoded; - if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { printf("Content-type: %.*s\n", - (int) p->content_type.len, (char *) p->content_type.bytes); - } - printf("----\n"); - - body_target = frame.payload.properties.body_size; - body_received = 0; - - while (body_received < body_target) { - result = amqp_simple_wait_frame(conn, &frame); - if (result < 0) { - break; - } - - if (frame.frame_type != AMQP_FRAME_BODY) { - fprintf(stderr, "Expected body!"); - abort(); - } - - body_received += frame.payload.body_fragment.len; - assert(body_received <= body_target); - - amqp_dump(frame.payload.body_fragment.bytes, - frame.payload.body_fragment.len); - } - - if (body_received != body_target) { - /* Can only happen when amqp_simple_wait_frame returns <= 0 */ - /* We break here to close the connection */ - break; + (int) envelope.message.properties.content_type.len, + (char *) envelope.message.properties.content_type.bytes); } - amqp_basic_ack(conn, 1, d->delivery_tag, 0); + amqp_destroy_envelope(&envelope); } } diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt index 8ab8bff..613f138 100644 --- a/librabbitmq/CMakeLists.txt +++ b/librabbitmq/CMakeLists.txt @@ -121,6 +121,7 @@ set(RABBITMQ_SOURCES amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c amqp_table.c amqp_url.c amqp_socket.h amqp_tcp_socket.c amqp_tcp_socket.h amqp_timer.c amqp_timer.h + amqp_consumer.c ${AMQP_SSL_SRCS} ) diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h index 24547b3..5680753 100644 --- a/librabbitmq/amqp.h +++ b/librabbitmq/amqp.h @@ -349,6 +349,7 @@ typedef enum amqp_status_enum_ AMQP_STATUS_TIMEOUT = -0x000D, AMQP_STATUS_TIMER_FAILURE = -0x000E, AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F, + AMQP_STATUS_UNEXPECTED_STATE = -0x0010, AMQP_STATUS_TCP_ERROR = -0x0100, AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101, @@ -359,6 +360,12 @@ typedef enum amqp_status_enum_ AMQP_STATUS_SSL_CONNECTION_FAILED = -0x0203 } amqp_status_enum; +AMQP_END_DECLS + +#include <amqp_framing.h> + +AMQP_BEGIN_DECLS + AMQP_PUBLIC_FUNCTION char const * AMQP_CALL amqp_version(void); @@ -631,6 +638,98 @@ AMQP_PUBLIC_FUNCTION int AMQP_CALL amqp_table_clone(amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool); +typedef struct amqp_message_t_ { + amqp_basic_properties_t properties; + amqp_bytes_t body; + amqp_pool_t pool; +} amqp_message_t; + +/** + * Reads the next message on a channel + * + * Reads a complete message (header + body) on a specified channel. This + * function is intended to be used with amqp_basic_get() or when an + * AMQP_BASIC_DELIVERY_METHOD method is received. + * + * \param [in,out] state the connection object + * \param [in] channel the channel on which to read the message from + * \param [in,out] message a pointer to a amqp_message_t object. Caller should + * call amqp_message_destroy() when it is done using the + * fields in the message object. The caller is responsible for + * allocating/destroying the amqp_message_t object itself. + * \param [in] flags pass in 0. Currently unused. + * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL on success. + */ +AMQP_PUBLIC_FUNCTION +amqp_rpc_reply_t +AMQP_CALL amqp_read_message(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_message_t *message, int flags); + +/** + * Frees memory associated with a amqp_message_t allocated in amqp_read_message + * + * \param [in] message + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL amqp_destroy_message(amqp_message_t *message); + +typedef struct amqp_envelope_t_ { + amqp_channel_t channel; + amqp_bytes_t consumer_tag; + uint64_t delivery_tag; + amqp_boolean_t redelivered; + amqp_bytes_t exchange; + amqp_bytes_t routing_key; + amqp_message_t message; +} amqp_envelope_t; + +/** + * Wait for and consume a message + * + * Waits for a basic.deliver method on any channel, upon receipt of + * basic.deliver it reads that message, and returns. If any other method is + * received before basic.deliver, this function will return an amqp_rpc_reply_t + * with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and + * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME. The caller should then + * call amqp_simple_wait_frame() to read this frame and take appropriate action. + * + * This function should be used after starting a consumer with the + * amqp_basic_consume() funtion + * + * \param [in,out] state the connection object + * \param [in,out] envelope a pointer to a amqp_envelope_t object. Caller + * should call amqp_envelope_destroy() when it is done using + * the fields in the envelope object. The caller is responsible + * for allocating/destroying the amqp_envelope_t object itself. + * \param [in] timeout a timeout to wait for a message delivery. Passing in + * NULL will result in blocking behavior. + * \param [in] flags pass in 0. Currently unused. + * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL + * on success. If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and + * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME, a frame other + * than AMQP_BASIC_DELIVER_METHOD was received, the caller should call + * amqp_simple_wait_frame() to read this frame and take appropriate + * action. + */ +AMQP_PUBLIC_FUNCTION +amqp_rpc_reply_t +AMQP_CALL amqp_consume_message(amqp_connection_state_t state, + amqp_envelope_t *envelope, + struct timeval *timeout, int flags); + +/** + * Frees memory associated iwth a amqp_envelope_t allocated in amqp_consume_message + * + * \param [in] envelope + * \returns + */ +AMQP_PUBLIC_FUNCTION +void +AMQP_CALL amqp_destroy_envelope(amqp_envelope_t *envelope); + + struct amqp_connection_info { char *user; char *password; @@ -711,6 +810,5 @@ amqp_get_socket(amqp_connection_state_t state); AMQP_END_DECLS -#include <amqp_framing.h> #endif /* AMQP_H */ diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c new file mode 100644 index 0000000..25b676e --- /dev/null +++ b/librabbitmq/amqp_consumer.c @@ -0,0 +1,293 @@ +/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by Alan Antonuk are Copyright (c) 2013 + * Alan Antonuk. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ +#include "amqp.h" +#include "amqp_private.h" +#include "amqp_socket.h" + +#include <stdlib.h> +#include <string.h> + +static +int amqp_basic_properties_clone(amqp_basic_properties_t *original, + amqp_basic_properties_t *clone, + amqp_pool_t *pool) +{ + memset(clone, 0, sizeof(amqp_basic_properties_t)); + clone->_flags = original->_flags; + +#define CLONE_BYTES_POOL(original, clone, pool) \ + amqp_pool_alloc_bytes(pool, original.len, &clone); \ + if (NULL == clone.bytes) { \ + return AMQP_STATUS_NO_MEMORY; \ + } \ + memcpy(clone.bytes, original.bytes, clone.len); + + if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + CLONE_BYTES_POOL(original->content_type, clone->content_type, pool) + } + + if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) { + CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool) + } + + if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) { + int res = amqp_table_clone(&original->headers, &clone->headers, pool); + if (AMQP_STATUS_OK != res) { + return res; + } + } + + if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) { + clone->delivery_mode = original->delivery_mode; + } + + if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) { + clone->priority = original->priority; + } + + if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) { + CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool) + } + + if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) { + CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool) + } + + if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) { + CLONE_BYTES_POOL(original->expiration, clone->expiration, pool) + } + + if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) { + CLONE_BYTES_POOL(original->message_id, clone->message_id, pool) + } + + if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) { + clone->timestamp = original->timestamp; + } + + if (clone->_flags & AMQP_BASIC_TYPE_FLAG) { + CLONE_BYTES_POOL(original->type, clone->type, pool) + } + + if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) { + CLONE_BYTES_POOL(original->user_id, clone->user_id, pool) + } + + if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) { + CLONE_BYTES_POOL(original->app_id, clone->app_id, pool) + } + + if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) { + CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool) + } + + return AMQP_STATUS_OK; +#undef CLONE_BYTES_POOL +} + + +void amqp_destroy_message(amqp_message_t *message) +{ + empty_amqp_pool(&message->pool); + amqp_bytes_free(message->body); +} + +void amqp_destroy_envelope(amqp_envelope_t *envelope) +{ + amqp_destroy_message(&envelope->message); + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +} + + +amqp_rpc_reply_t +amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope, + struct timeval *timeout, AMQP_UNUSED int flags) +{ + int res; + amqp_frame_t frame; + amqp_basic_deliver_t *delivery_method; + amqp_rpc_reply_t ret; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(envelope, 0, sizeof(amqp_envelope_t)); + + res = amqp_simple_wait_frame_noblock(state, &frame, timeout); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out1; + } + + if (AMQP_FRAME_METHOD != frame.frame_type + || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) { + amqp_queue_frame(state, &frame); + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + goto error_out1; + } + + delivery_method = frame.payload.method.decoded; + + envelope->channel = frame.channel; + envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag); + envelope->delivery_tag = delivery_method->delivery_tag; + envelope->redelivered = delivery_method->redelivered; + envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange); + envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key); + + if (NULL == envelope->consumer_tag.bytes || + NULL == envelope->exchange.bytes || + NULL == envelope->routing_key.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out2; + } + + ret = amqp_read_message(state, envelope->channel, &envelope->message, 0); + if (AMQP_RESPONSE_NORMAL != ret.reply_type) { + goto error_out2; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(envelope->routing_key); + amqp_bytes_free(envelope->exchange); + amqp_bytes_free(envelope->consumer_tag); +error_out1: + return ret; +} + +amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_message_t *message, + AMQP_UNUSED int flags) +{ + amqp_frame_t frame; + amqp_rpc_reply_t ret; + + size_t body_read; + char *body_read_ptr; + int res; + + memset(&ret, 0, sizeof(amqp_rpc_reply_t)); + memset(message, 0, sizeof(amqp_message_t)); + + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + + goto error_out1; + } + + if (AMQP_FRAME_HEADER != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_UNEXPECTED_STATE; + + amqp_queue_frame(state, &frame); + } + goto error_out1; + } + + init_amqp_pool(&message->pool, 4096); + res = amqp_basic_properties_clone(frame.payload.properties.decoded, + &message->properties, &message->pool); + + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out3; + } + + message->body = amqp_bytes_malloc(frame.payload.properties.body_size); + if (NULL == message->body.bytes) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_NO_MEMORY; + goto error_out1; + } + + body_read = 0; + body_read_ptr = message->body.bytes; + + while (body_read < message->body.len) { + res = amqp_simple_wait_frame_on_channel(state, channel, &frame); + if (AMQP_STATUS_OK != res) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = res; + goto error_out2; + } + if (AMQP_FRAME_BODY != frame.frame_type) { + if (AMQP_FRAME_METHOD == frame.frame_type && + (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id || + AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) { + + ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION; + ret.reply = frame.payload.method; + } else { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + } + goto error_out2; + } + + if (body_read + frame.payload.body_fragment.len > message->body.len) { + ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION; + ret.library_error = AMQP_STATUS_BAD_AMQP_DATA; + goto error_out2; + } + + memcpy(body_read_ptr, frame.payload.body_fragment.bytes, frame.payload.body_fragment.len); + + body_read += frame.payload.body_fragment.len; + body_read_ptr += frame.payload.body_fragment.len; + } + + ret.reply_type = AMQP_RESPONSE_NORMAL; + return ret; + +error_out2: + amqp_bytes_free(message->body); +error_out3: + empty_amqp_pool(&message->pool); +error_out1: + return ret; +} diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c index be85ff5..ef9debd 100644 --- a/librabbitmq/amqp_socket.c +++ b/librabbitmq/amqp_socket.c @@ -782,6 +782,80 @@ beginrecv: } } +int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame) +{ + amqp_link_t *link; + amqp_frame_t *frame_copy; + + amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel); + + if (NULL == channel_pool) { + return AMQP_STATUS_NO_MEMORY; + } + + link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t)); + frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t)); + + if (NULL == link || NULL == frame_copy) { + return AMQP_STATUS_NO_MEMORY; + } + + *frame_copy = *frame; + link->data = frame_copy; + + if (NULL == state->first_queued_frame) { + state->first_queued_frame = link; + } else { + state->last_queued_frame->next = link; + } + + link->next = NULL; + state->last_queued_frame = link; + + return AMQP_STATUS_OK; +} + +int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_frame_t *decoded_frame) +{ + amqp_frame_t *frame_ptr; + amqp_link_t *cur; + int res; + + for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) { + frame_ptr = cur->data; + + if (channel == frame_ptr->channel) { + state->first_queued_frame = cur->next; + if (NULL == state->first_queued_frame) { + state->last_queued_frame = NULL; + } + + *decoded_frame = *frame_ptr; + + return AMQP_STATUS_OK; + } + } + + while (1) { + res = wait_frame_inner(state, decoded_frame, NULL); + + if (AMQP_STATUS_OK != res) { + return res; + } + + if (channel == decoded_frame->channel) { + return AMQP_STATUS_OK; + } else { + res = amqp_queue_frame(state, decoded_frame); + if (res != AMQP_STATUS_OK) { + return res; + } + } + } +} + int amqp_simple_wait_frame(amqp_connection_state_t state, amqp_frame_t *decoded_frame) { diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h index b0a1805..1fd20f8 100644 --- a/librabbitmq/amqp_socket.h +++ b/librabbitmq/amqp_socket.h @@ -181,6 +181,13 @@ amqp_socket_delete(amqp_socket_t *self); int amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout); +int +amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame); + +int +amqp_simple_wait_frame_on_channel(amqp_connection_state_t state, + amqp_channel_t channel, + amqp_frame_t *decoded_frame); AMQP_END_DECLS |