summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-26 14:23:16 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-08 15:13:32 -0700
commit33ebeede97a81c2e82ac0c3a6d88d4db0695bf29 (patch)
tree1151d9a060cb9afdf6ac25b66ddbdd03e46f8674
parent08af83a97a99c08aaa3878724a07829e0bb955da (diff)
downloadrabbitmq-c-33ebeede97a81c2e82ac0c3a6d88d4db0695bf29.tar.gz
Add a high level API for consuming messages
-rw-r--r--Makefile.am3
-rw-r--r--examples/amqp_consumer.c99
-rw-r--r--examples/amqp_listen.c76
-rw-r--r--examples/amqp_listenq.c78
-rw-r--r--examples/amqps_consumer.c99
-rw-r--r--examples/amqps_listen.c76
-rw-r--r--examples/amqps_listenq.c78
-rw-r--r--librabbitmq/CMakeLists.txt1
-rw-r--r--librabbitmq/amqp.h100
-rw-r--r--librabbitmq/amqp_consumer.c293
-rw-r--r--librabbitmq/amqp_socket.c74
-rw-r--r--librabbitmq/amqp_socket.h7
12 files changed, 646 insertions, 338 deletions
diff --git a/Makefile.am b/Makefile.am
index 989f5e9..1de9b0f 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -22,7 +22,8 @@ librabbitmq_librabbitmq_la_SOURCES = \
librabbitmq/amqp_table.c \
librabbitmq/amqp_url.c \
librabbitmq/amqp_timer.h \
- librabbitmq/amqp_timer.c
+ librabbitmq/amqp_timer.c \
+ librabbitmq/amqp_consumer.c
if REGENERATE_AMQP_FRAMING
librabbitmq_librabbitmq_la_SOURCES += librabbitmq/gen/amqp_framing.c
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 21a5b48..62da0ca 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -58,13 +58,13 @@ static void run(amqp_connection_state_t conn)
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
- int result;
- size_t body_received;
- size_t body_target;
uint64_t now;
while (1) {
+ amqp_rpc_reply_t ret;
+ amqp_envelope_t envelope;
+
now = now_microseconds();
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
@@ -78,45 +78,64 @@ static void run(amqp_connection_state_t conn)
}
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
-
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
- }
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ ret = amqp_consume_message(conn, &envelope, NULL, 0);
+
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
+ AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
+ if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
+ return;
+ }
+
+ if (AMQP_FRAME_METHOD == frame.frame_type) {
+ switch (frame.payload.method.id) {
+ case AMQP_BASIC_ACK_METHOD:
+ /* if we've turned publisher confirms on, and we've published a message
+ * here is a message being confirmed
+ */
+
+ break;
+ case AMQP_BASIC_RETURN_METHOD:
+ /* if a published message couldn't be routed and the mandatory flag was set
+ * this is what would be returned. The message then needs to be read.
+ */
+ {
+ amqp_message_t message;
+ ret = amqp_read_message(conn, frame.channel, &message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ return;
+ }
+
+ amqp_destroy_message(&message);
+ }
+
+ break;
+
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ /* a channel.close method happens when a channel exception occurs, this
+ * can happen by publishing to an exchange that doesn't exist for example
+ *
+ * In this case you would need to open another channel redeclare any queues
+ * that were declared auto-delete, and restart any consumers that were attached
+ * to the previous channel
+ */
+ return;
+
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ /* a connection.close method happens when a connection exception occurs,
+ * this can happen by trying to use a channel that isn't open for example.
+ *
+ * In this case the whole connection must be restarted.
+ */
+ return;
+
+ default:
+ fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
+ return;
+ }
+ }
}
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
}
received++;
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index bf5b716..ca7d538 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -104,80 +104,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
- }
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index e76cdb1..b2e8094 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -85,82 +85,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
- }
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
-
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- amqp_basic_ack(conn, 1, d->delivery_tag, 0);
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
index fff6677..d4cd294 100644
--- a/examples/amqps_consumer.c
+++ b/examples/amqps_consumer.c
@@ -60,13 +60,13 @@ static void run(amqp_connection_state_t conn)
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
- int result;
- size_t body_received;
- size_t body_target;
uint64_t now;
while (1) {
+ amqp_rpc_reply_t ret;
+ amqp_envelope_t envelope;
+
now = now_microseconds();
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
@@ -80,45 +80,64 @@ static void run(amqp_connection_state_t conn)
}
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
-
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
- }
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ ret = amqp_consume_message(conn, &envelope, NULL, 0);
+
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
+ AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
+ if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
+ return;
+ }
+
+ if (AMQP_FRAME_METHOD == frame.frame_type) {
+ switch (frame.payload.method.id) {
+ case AMQP_BASIC_ACK_METHOD:
+ /* if we've turned publisher confirms on, and we've published a message
+ * here is a message being confirmed
+ */
+
+ break;
+ case AMQP_BASIC_RETURN_METHOD:
+ /* if a published message couldn't be routed and the mandatory flag was set
+ * this is what would be returned. The message then needs to be read.
+ */
+ {
+ amqp_message_t message;
+ ret = amqp_read_message(conn, frame.channel, &message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ return;
+ }
+
+ amqp_destroy_message(&message);
+ }
+
+ break;
+
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ /* a channel.close method happens when a channel exception occurs, this
+ * can happen by publishing to an exchange that doesn't exist for example
+ *
+ * In this case you would need to open another channel redeclare any queues
+ * that were declared auto-delete, and restart any consumers that were attached
+ * to the previous channel
+ */
+ return;
+
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ /* a connection.close method happens when a connection exception occurs,
+ * this can happen by trying to use a channel that isn't open for example.
+ *
+ * In this case the whole connection must be restarted.
+ */
+ return;
+
+ default:
+ fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
+ return;
+ }
+ }
}
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
}
received++;
diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c
index a5eb692..44bb88c 100644
--- a/examples/amqps_listen.c
+++ b/examples/amqps_listen.c
@@ -121,80 +121,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
- }
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c
index 0210d88..6643500 100644
--- a/examples/amqps_listenq.c
+++ b/examples/amqps_listenq.c
@@ -102,82 +102,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
- }
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
-
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- amqp_basic_ack(conn, 1, d->delivery_tag, 0);
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/librabbitmq/CMakeLists.txt b/librabbitmq/CMakeLists.txt
index 8ab8bff..613f138 100644
--- a/librabbitmq/CMakeLists.txt
+++ b/librabbitmq/CMakeLists.txt
@@ -121,6 +121,7 @@ set(RABBITMQ_SOURCES
amqp_api.c amqp.h amqp_connection.c amqp_mem.c amqp_private.h amqp_socket.c
amqp_table.c amqp_url.c amqp_socket.h amqp_tcp_socket.c amqp_tcp_socket.h
amqp_timer.c amqp_timer.h
+ amqp_consumer.c
${AMQP_SSL_SRCS}
)
diff --git a/librabbitmq/amqp.h b/librabbitmq/amqp.h
index 24547b3..5680753 100644
--- a/librabbitmq/amqp.h
+++ b/librabbitmq/amqp.h
@@ -349,6 +349,7 @@ typedef enum amqp_status_enum_
AMQP_STATUS_TIMEOUT = -0x000D,
AMQP_STATUS_TIMER_FAILURE = -0x000E,
AMQP_STATUS_HEARTBEAT_TIMEOUT = -0x000F,
+ AMQP_STATUS_UNEXPECTED_STATE = -0x0010,
AMQP_STATUS_TCP_ERROR = -0x0100,
AMQP_STATUS_TCP_SOCKETLIB_INIT_ERROR = -0x0101,
@@ -359,6 +360,12 @@ typedef enum amqp_status_enum_
AMQP_STATUS_SSL_CONNECTION_FAILED = -0x0203
} amqp_status_enum;
+AMQP_END_DECLS
+
+#include <amqp_framing.h>
+
+AMQP_BEGIN_DECLS
+
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_version(void);
@@ -631,6 +638,98 @@ AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_table_clone(amqp_table_t *original, amqp_table_t *clone, amqp_pool_t *pool);
+typedef struct amqp_message_t_ {
+ amqp_basic_properties_t properties;
+ amqp_bytes_t body;
+ amqp_pool_t pool;
+} amqp_message_t;
+
+/**
+ * Reads the next message on a channel
+ *
+ * Reads a complete message (header + body) on a specified channel. This
+ * function is intended to be used with amqp_basic_get() or when an
+ * AMQP_BASIC_DELIVERY_METHOD method is received.
+ *
+ * \param [in,out] state the connection object
+ * \param [in] channel the channel on which to read the message from
+ * \param [in,out] message a pointer to a amqp_message_t object. Caller should
+ * call amqp_message_destroy() when it is done using the
+ * fields in the message object. The caller is responsible for
+ * allocating/destroying the amqp_message_t object itself.
+ * \param [in] flags pass in 0. Currently unused.
+ * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL on success.
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_rpc_reply_t
+AMQP_CALL amqp_read_message(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_message_t *message, int flags);
+
+/**
+ * Frees memory associated with a amqp_message_t allocated in amqp_read_message
+ *
+ * \param [in] message
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL amqp_destroy_message(amqp_message_t *message);
+
+typedef struct amqp_envelope_t_ {
+ amqp_channel_t channel;
+ amqp_bytes_t consumer_tag;
+ uint64_t delivery_tag;
+ amqp_boolean_t redelivered;
+ amqp_bytes_t exchange;
+ amqp_bytes_t routing_key;
+ amqp_message_t message;
+} amqp_envelope_t;
+
+/**
+ * Wait for and consume a message
+ *
+ * Waits for a basic.deliver method on any channel, upon receipt of
+ * basic.deliver it reads that message, and returns. If any other method is
+ * received before basic.deliver, this function will return an amqp_rpc_reply_t
+ * with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
+ * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME. The caller should then
+ * call amqp_simple_wait_frame() to read this frame and take appropriate action.
+ *
+ * This function should be used after starting a consumer with the
+ * amqp_basic_consume() funtion
+ *
+ * \param [in,out] state the connection object
+ * \param [in,out] envelope a pointer to a amqp_envelope_t object. Caller
+ * should call amqp_envelope_destroy() when it is done using
+ * the fields in the envelope object. The caller is responsible
+ * for allocating/destroying the amqp_envelope_t object itself.
+ * \param [in] timeout a timeout to wait for a message delivery. Passing in
+ * NULL will result in blocking behavior.
+ * \param [in] flags pass in 0. Currently unused.
+ * \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL
+ * on success. If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
+ * ret.library_error == AMQP_STATUS_UNEXPECTED_FRAME, a frame other
+ * than AMQP_BASIC_DELIVER_METHOD was received, the caller should call
+ * amqp_simple_wait_frame() to read this frame and take appropriate
+ * action.
+ */
+AMQP_PUBLIC_FUNCTION
+amqp_rpc_reply_t
+AMQP_CALL amqp_consume_message(amqp_connection_state_t state,
+ amqp_envelope_t *envelope,
+ struct timeval *timeout, int flags);
+
+/**
+ * Frees memory associated iwth a amqp_envelope_t allocated in amqp_consume_message
+ *
+ * \param [in] envelope
+ * \returns
+ */
+AMQP_PUBLIC_FUNCTION
+void
+AMQP_CALL amqp_destroy_envelope(amqp_envelope_t *envelope);
+
+
struct amqp_connection_info {
char *user;
char *password;
@@ -711,6 +810,5 @@ amqp_get_socket(amqp_connection_state_t state);
AMQP_END_DECLS
-#include <amqp_framing.h>
#endif /* AMQP_H */
diff --git a/librabbitmq/amqp_consumer.c b/librabbitmq/amqp_consumer.c
new file mode 100644
index 0000000..25b676e
--- /dev/null
+++ b/librabbitmq/amqp_consumer.c
@@ -0,0 +1,293 @@
+/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
+/*
+ * ***** BEGIN LICENSE BLOCK *****
+ * Version: MIT
+ *
+ * Portions created by Alan Antonuk are Copyright (c) 2013
+ * Alan Antonuk. All Rights Reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use, copy,
+ * modify, merge, publish, distribute, sublicense, and/or sell copies
+ * of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ * ***** END LICENSE BLOCK *****
+ */
+#include "amqp.h"
+#include "amqp_private.h"
+#include "amqp_socket.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+static
+int amqp_basic_properties_clone(amqp_basic_properties_t *original,
+ amqp_basic_properties_t *clone,
+ amqp_pool_t *pool)
+{
+ memset(clone, 0, sizeof(amqp_basic_properties_t));
+ clone->_flags = original->_flags;
+
+#define CLONE_BYTES_POOL(original, clone, pool) \
+ amqp_pool_alloc_bytes(pool, original.len, &clone); \
+ if (NULL == clone.bytes) { \
+ return AMQP_STATUS_NO_MEMORY; \
+ } \
+ memcpy(clone.bytes, original.bytes, clone.len);
+
+ if (clone->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ CLONE_BYTES_POOL(original->content_type, clone->content_type, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_CONTENT_ENCODING_FLAG) {
+ CLONE_BYTES_POOL(original->content_encoding, clone->content_encoding, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_HEADERS_FLAG) {
+ int res = amqp_table_clone(&original->headers, &clone->headers, pool);
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+ }
+
+ if (clone->_flags & AMQP_BASIC_DELIVERY_MODE_FLAG) {
+ clone->delivery_mode = original->delivery_mode;
+ }
+
+ if (clone->_flags & AMQP_BASIC_PRIORITY_FLAG) {
+ clone->priority = original->priority;
+ }
+
+ if (clone->_flags & AMQP_BASIC_CORRELATION_ID_FLAG) {
+ CLONE_BYTES_POOL(original->correlation_id, clone->correlation_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_REPLY_TO_FLAG) {
+ CLONE_BYTES_POOL(original->reply_to, clone->reply_to, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_EXPIRATION_FLAG) {
+ CLONE_BYTES_POOL(original->expiration, clone->expiration, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_MESSAGE_ID_FLAG) {
+ CLONE_BYTES_POOL(original->message_id, clone->message_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_TIMESTAMP_FLAG) {
+ clone->timestamp = original->timestamp;
+ }
+
+ if (clone->_flags & AMQP_BASIC_TYPE_FLAG) {
+ CLONE_BYTES_POOL(original->type, clone->type, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_USER_ID_FLAG) {
+ CLONE_BYTES_POOL(original->user_id, clone->user_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_APP_ID_FLAG) {
+ CLONE_BYTES_POOL(original->app_id, clone->app_id, pool)
+ }
+
+ if (clone->_flags & AMQP_BASIC_CLUSTER_ID_FLAG) {
+ CLONE_BYTES_POOL(original->cluster_id, clone->cluster_id, pool)
+ }
+
+ return AMQP_STATUS_OK;
+#undef CLONE_BYTES_POOL
+}
+
+
+void amqp_destroy_message(amqp_message_t *message)
+{
+ empty_amqp_pool(&message->pool);
+ amqp_bytes_free(message->body);
+}
+
+void amqp_destroy_envelope(amqp_envelope_t *envelope)
+{
+ amqp_destroy_message(&envelope->message);
+ amqp_bytes_free(envelope->routing_key);
+ amqp_bytes_free(envelope->exchange);
+ amqp_bytes_free(envelope->consumer_tag);
+}
+
+
+amqp_rpc_reply_t
+amqp_consume_message(amqp_connection_state_t state, amqp_envelope_t *envelope,
+ struct timeval *timeout, AMQP_UNUSED int flags)
+{
+ int res;
+ amqp_frame_t frame;
+ amqp_basic_deliver_t *delivery_method;
+ amqp_rpc_reply_t ret;
+
+ memset(&ret, 0, sizeof(amqp_rpc_reply_t));
+ memset(envelope, 0, sizeof(amqp_envelope_t));
+
+ res = amqp_simple_wait_frame_noblock(state, &frame, timeout);
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+ goto error_out1;
+ }
+
+ if (AMQP_FRAME_METHOD != frame.frame_type
+ || AMQP_BASIC_DELIVER_METHOD != frame.payload.method.id) {
+ amqp_queue_frame(state, &frame);
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
+ goto error_out1;
+ }
+
+ delivery_method = frame.payload.method.decoded;
+
+ envelope->channel = frame.channel;
+ envelope->consumer_tag = amqp_bytes_malloc_dup(delivery_method->consumer_tag);
+ envelope->delivery_tag = delivery_method->delivery_tag;
+ envelope->redelivered = delivery_method->redelivered;
+ envelope->exchange = amqp_bytes_malloc_dup(delivery_method->exchange);
+ envelope->routing_key = amqp_bytes_malloc_dup(delivery_method->routing_key);
+
+ if (NULL == envelope->consumer_tag.bytes ||
+ NULL == envelope->exchange.bytes ||
+ NULL == envelope->routing_key.bytes) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_NO_MEMORY;
+ goto error_out2;
+ }
+
+ ret = amqp_read_message(state, envelope->channel, &envelope->message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ goto error_out2;
+ }
+
+ ret.reply_type = AMQP_RESPONSE_NORMAL;
+ return ret;
+
+error_out2:
+ amqp_bytes_free(envelope->routing_key);
+ amqp_bytes_free(envelope->exchange);
+ amqp_bytes_free(envelope->consumer_tag);
+error_out1:
+ return ret;
+}
+
+amqp_rpc_reply_t amqp_read_message(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_message_t *message,
+ AMQP_UNUSED int flags)
+{
+ amqp_frame_t frame;
+ amqp_rpc_reply_t ret;
+
+ size_t body_read;
+ char *body_read_ptr;
+ int res;
+
+ memset(&ret, 0, sizeof(amqp_rpc_reply_t));
+ memset(message, 0, sizeof(amqp_message_t));
+
+ res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+
+ goto error_out1;
+ }
+
+ if (AMQP_FRAME_HEADER != frame.frame_type) {
+ if (AMQP_FRAME_METHOD == frame.frame_type &&
+ (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
+ AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
+
+ ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
+ ret.reply = frame.payload.method;
+
+ } else {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_UNEXPECTED_STATE;
+
+ amqp_queue_frame(state, &frame);
+ }
+ goto error_out1;
+ }
+
+ init_amqp_pool(&message->pool, 4096);
+ res = amqp_basic_properties_clone(frame.payload.properties.decoded,
+ &message->properties, &message->pool);
+
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+ goto error_out3;
+ }
+
+ message->body = amqp_bytes_malloc(frame.payload.properties.body_size);
+ if (NULL == message->body.bytes) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_NO_MEMORY;
+ goto error_out1;
+ }
+
+ body_read = 0;
+ body_read_ptr = message->body.bytes;
+
+ while (body_read < message->body.len) {
+ res = amqp_simple_wait_frame_on_channel(state, channel, &frame);
+ if (AMQP_STATUS_OK != res) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = res;
+ goto error_out2;
+ }
+ if (AMQP_FRAME_BODY != frame.frame_type) {
+ if (AMQP_FRAME_METHOD == frame.frame_type &&
+ (AMQP_CHANNEL_CLOSE_METHOD == frame.payload.method.id ||
+ AMQP_CONNECTION_CLOSE_METHOD == frame.payload.method.id)) {
+
+ ret.reply_type = AMQP_RESPONSE_SERVER_EXCEPTION;
+ ret.reply = frame.payload.method;
+ } else {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
+ }
+ goto error_out2;
+ }
+
+ if (body_read + frame.payload.body_fragment.len > message->body.len) {
+ ret.reply_type = AMQP_RESPONSE_LIBRARY_EXCEPTION;
+ ret.library_error = AMQP_STATUS_BAD_AMQP_DATA;
+ goto error_out2;
+ }
+
+ memcpy(body_read_ptr, frame.payload.body_fragment.bytes, frame.payload.body_fragment.len);
+
+ body_read += frame.payload.body_fragment.len;
+ body_read_ptr += frame.payload.body_fragment.len;
+ }
+
+ ret.reply_type = AMQP_RESPONSE_NORMAL;
+ return ret;
+
+error_out2:
+ amqp_bytes_free(message->body);
+error_out3:
+ empty_amqp_pool(&message->pool);
+error_out1:
+ return ret;
+}
diff --git a/librabbitmq/amqp_socket.c b/librabbitmq/amqp_socket.c
index be85ff5..ef9debd 100644
--- a/librabbitmq/amqp_socket.c
+++ b/librabbitmq/amqp_socket.c
@@ -782,6 +782,80 @@ beginrecv:
}
}
+int amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame)
+{
+ amqp_link_t *link;
+ amqp_frame_t *frame_copy;
+
+ amqp_pool_t *channel_pool = amqp_get_or_create_channel_pool(state, frame->channel);
+
+ if (NULL == channel_pool) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ link = amqp_pool_alloc(channel_pool, sizeof(amqp_link_t));
+ frame_copy = amqp_pool_alloc(channel_pool, sizeof(amqp_frame_t));
+
+ if (NULL == link || NULL == frame_copy) {
+ return AMQP_STATUS_NO_MEMORY;
+ }
+
+ *frame_copy = *frame;
+ link->data = frame_copy;
+
+ if (NULL == state->first_queued_frame) {
+ state->first_queued_frame = link;
+ } else {
+ state->last_queued_frame->next = link;
+ }
+
+ link->next = NULL;
+ state->last_queued_frame = link;
+
+ return AMQP_STATUS_OK;
+}
+
+int amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_frame_t *decoded_frame)
+{
+ amqp_frame_t *frame_ptr;
+ amqp_link_t *cur;
+ int res;
+
+ for (cur = state->first_queued_frame; NULL != cur; cur = cur->next) {
+ frame_ptr = cur->data;
+
+ if (channel == frame_ptr->channel) {
+ state->first_queued_frame = cur->next;
+ if (NULL == state->first_queued_frame) {
+ state->last_queued_frame = NULL;
+ }
+
+ *decoded_frame = *frame_ptr;
+
+ return AMQP_STATUS_OK;
+ }
+ }
+
+ while (1) {
+ res = wait_frame_inner(state, decoded_frame, NULL);
+
+ if (AMQP_STATUS_OK != res) {
+ return res;
+ }
+
+ if (channel == decoded_frame->channel) {
+ return AMQP_STATUS_OK;
+ } else {
+ res = amqp_queue_frame(state, decoded_frame);
+ if (res != AMQP_STATUS_OK) {
+ return res;
+ }
+ }
+ }
+}
+
int amqp_simple_wait_frame(amqp_connection_state_t state,
amqp_frame_t *decoded_frame)
{
diff --git a/librabbitmq/amqp_socket.h b/librabbitmq/amqp_socket.h
index b0a1805..1fd20f8 100644
--- a/librabbitmq/amqp_socket.h
+++ b/librabbitmq/amqp_socket.h
@@ -181,6 +181,13 @@ amqp_socket_delete(amqp_socket_t *self);
int
amqp_open_socket_noblock(char const *hostname, int portnumber, struct timeval *timeout);
+int
+amqp_queue_frame(amqp_connection_state_t state, amqp_frame_t *frame);
+
+int
+amqp_simple_wait_frame_on_channel(amqp_connection_state_t state,
+ amqp_channel_t channel,
+ amqp_frame_t *decoded_frame);
AMQP_END_DECLS