summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorAlan Antonuk <alan.antonuk@gmail.com>2013-06-26 14:23:16 -0700
committerAlan Antonuk <alan.antonuk@gmail.com>2013-07-08 15:13:32 -0700
commit33ebeede97a81c2e82ac0c3a6d88d4db0695bf29 (patch)
tree1151d9a060cb9afdf6ac25b66ddbdd03e46f8674 /examples
parent08af83a97a99c08aaa3878724a07829e0bb955da (diff)
downloadrabbitmq-c-33ebeede97a81c2e82ac0c3a6d88d4db0695bf29.tar.gz
Add a high level API for consuming messages
Diffstat (limited to 'examples')
-rw-r--r--examples/amqp_consumer.c99
-rw-r--r--examples/amqp_listen.c76
-rw-r--r--examples/amqp_listenq.c78
-rw-r--r--examples/amqps_consumer.c99
-rw-r--r--examples/amqps_listen.c76
-rw-r--r--examples/amqps_listenq.c78
6 files changed, 170 insertions, 336 deletions
diff --git a/examples/amqp_consumer.c b/examples/amqp_consumer.c
index 21a5b48..62da0ca 100644
--- a/examples/amqp_consumer.c
+++ b/examples/amqp_consumer.c
@@ -58,13 +58,13 @@ static void run(amqp_connection_state_t conn)
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
- int result;
- size_t body_received;
- size_t body_target;
uint64_t now;
while (1) {
+ amqp_rpc_reply_t ret;
+ amqp_envelope_t envelope;
+
now = now_microseconds();
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
@@ -78,45 +78,64 @@ static void run(amqp_connection_state_t conn)
}
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
-
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
- }
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ ret = amqp_consume_message(conn, &envelope, NULL, 0);
+
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
+ AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
+ if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
+ return;
+ }
+
+ if (AMQP_FRAME_METHOD == frame.frame_type) {
+ switch (frame.payload.method.id) {
+ case AMQP_BASIC_ACK_METHOD:
+ /* if we've turned publisher confirms on, and we've published a message
+ * here is a message being confirmed
+ */
+
+ break;
+ case AMQP_BASIC_RETURN_METHOD:
+ /* if a published message couldn't be routed and the mandatory flag was set
+ * this is what would be returned. The message then needs to be read.
+ */
+ {
+ amqp_message_t message;
+ ret = amqp_read_message(conn, frame.channel, &message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ return;
+ }
+
+ amqp_destroy_message(&message);
+ }
+
+ break;
+
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ /* a channel.close method happens when a channel exception occurs, this
+ * can happen by publishing to an exchange that doesn't exist for example
+ *
+ * In this case you would need to open another channel redeclare any queues
+ * that were declared auto-delete, and restart any consumers that were attached
+ * to the previous channel
+ */
+ return;
+
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ /* a connection.close method happens when a connection exception occurs,
+ * this can happen by trying to use a channel that isn't open for example.
+ *
+ * In this case the whole connection must be restarted.
+ */
+ return;
+
+ default:
+ fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
+ return;
+ }
+ }
}
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
}
received++;
diff --git a/examples/amqp_listen.c b/examples/amqp_listen.c
index bf5b716..ca7d538 100644
--- a/examples/amqp_listen.c
+++ b/examples/amqp_listen.c
@@ -104,80 +104,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
- }
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index e76cdb1..b2e8094 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -85,82 +85,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
- }
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
-
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- amqp_basic_ack(conn, 1, d->delivery_tag, 0);
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqps_consumer.c b/examples/amqps_consumer.c
index fff6677..d4cd294 100644
--- a/examples/amqps_consumer.c
+++ b/examples/amqps_consumer.c
@@ -60,13 +60,13 @@ static void run(amqp_connection_state_t conn)
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
amqp_frame_t frame;
- int result;
- size_t body_received;
- size_t body_target;
uint64_t now;
while (1) {
+ amqp_rpc_reply_t ret;
+ amqp_envelope_t envelope;
+
now = now_microseconds();
if (now > next_summary_time) {
int countOverInterval = received - previous_received;
@@ -80,45 +80,64 @@ static void run(amqp_connection_state_t conn)
}
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
-
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
- }
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- return;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
+ ret = amqp_consume_message(conn, &envelope, NULL, 0);
+
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ if (AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
+ AMQP_STATUS_UNEXPECTED_STATE == ret.library_error) {
+ if (AMQP_STATUS_OK != amqp_simple_wait_frame(conn, &frame)) {
+ return;
+ }
+
+ if (AMQP_FRAME_METHOD == frame.frame_type) {
+ switch (frame.payload.method.id) {
+ case AMQP_BASIC_ACK_METHOD:
+ /* if we've turned publisher confirms on, and we've published a message
+ * here is a message being confirmed
+ */
+
+ break;
+ case AMQP_BASIC_RETURN_METHOD:
+ /* if a published message couldn't be routed and the mandatory flag was set
+ * this is what would be returned. The message then needs to be read.
+ */
+ {
+ amqp_message_t message;
+ ret = amqp_read_message(conn, frame.channel, &message, 0);
+ if (AMQP_RESPONSE_NORMAL != ret.reply_type) {
+ return;
+ }
+
+ amqp_destroy_message(&message);
+ }
+
+ break;
+
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ /* a channel.close method happens when a channel exception occurs, this
+ * can happen by publishing to an exchange that doesn't exist for example
+ *
+ * In this case you would need to open another channel redeclare any queues
+ * that were declared auto-delete, and restart any consumers that were attached
+ * to the previous channel
+ */
+ return;
+
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ /* a connection.close method happens when a connection exception occurs,
+ * this can happen by trying to use a channel that isn't open for example.
+ *
+ * In this case the whole connection must be restarted.
+ */
+ return;
+
+ default:
+ fprintf(stderr ,"An unexpected method was received %d\n", frame.payload.method.id);
+ return;
+ }
+ }
}
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
}
received++;
diff --git a/examples/amqps_listen.c b/examples/amqps_listen.c
index a5eb692..44bb88c 100644
--- a/examples/amqps_listen.c
+++ b/examples/amqps_listen.c
@@ -121,80 +121,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
- }
+ amqp_destroy_envelope(&envelope);
}
}
diff --git a/examples/amqps_listenq.c b/examples/amqps_listenq.c
index 0210d88..6643500 100644
--- a/examples/amqps_listenq.c
+++ b/examples/amqps_listenq.c
@@ -102,82 +102,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
- }
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
-
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- amqp_basic_ack(conn, 1, d->delivery_tag, 0);
+ amqp_destroy_envelope(&envelope);
}
}