summaryrefslogtreecommitdiff
path: root/examples/amqp_listenq.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/amqp_listenq.c')
-rw-r--r--examples/amqp_listenq.c81
1 files changed, 14 insertions, 67 deletions
diff --git a/examples/amqp_listenq.c b/examples/amqp_listenq.c
index 54c1189..b2e8094 100644
--- a/examples/amqp_listenq.c
+++ b/examples/amqp_listenq.c
@@ -66,7 +66,7 @@ int main(int argc, char const *const *argv)
conn = amqp_new_connection();
- socket = amqp_tcp_socket_new();
+ socket = amqp_tcp_socket_new(conn);
if (!socket) {
die("creating TCP socket");
}
@@ -76,7 +76,6 @@ int main(int argc, char const *const *argv)
die("opening TCP socket");
}
- amqp_set_socket(conn, socket);
die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"),
"Logging in");
amqp_channel_open(conn, 1);
@@ -86,82 +85,30 @@ int main(int argc, char const *const *argv)
die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming");
{
- amqp_frame_t frame;
- int result;
-
- amqp_basic_deliver_t *d;
- amqp_basic_properties_t *p;
- size_t body_target;
- size_t body_received;
-
while (1) {
+ amqp_rpc_reply_t res;
+ amqp_envelope_t envelope;
+
amqp_maybe_release_buffers(conn);
- result = amqp_simple_wait_frame(conn, &frame);
- printf("Result %d\n", result);
- if (result < 0) {
- break;
- }
- printf("Frame type %d, channel %d\n", frame.frame_type, frame.channel);
- if (frame.frame_type != AMQP_FRAME_METHOD) {
- continue;
- }
+ res = amqp_consume_message(conn, &envelope, NULL, 0);
- printf("Method %s\n", amqp_method_name(frame.payload.method.id));
- if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) {
- continue;
+ if (AMQP_RESPONSE_NORMAL != res.reply_type) {
+ break;
}
- d = (amqp_basic_deliver_t *) frame.payload.method.decoded;
printf("Delivery %u, exchange %.*s routingkey %.*s\n",
- (unsigned) d->delivery_tag,
- (int) d->exchange.len, (char *) d->exchange.bytes,
- (int) d->routing_key.len, (char *) d->routing_key.bytes);
-
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
+ (unsigned) envelope.delivery_tag,
+ (int) envelope.exchange.len, (char *) envelope.exchange.bytes,
+ (int) envelope.routing_key.len, (char *) envelope.routing_key.bytes);
- if (frame.frame_type != AMQP_FRAME_HEADER) {
- fprintf(stderr, "Expected header!");
- abort();
- }
- p = (amqp_basic_properties_t *) frame.payload.properties.decoded;
- if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
+ if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) {
printf("Content-type: %.*s\n",
- (int) p->content_type.len, (char *) p->content_type.bytes);
- }
- printf("----\n");
-
- body_target = frame.payload.properties.body_size;
- body_received = 0;
-
- while (body_received < body_target) {
- result = amqp_simple_wait_frame(conn, &frame);
- if (result < 0) {
- break;
- }
-
- if (frame.frame_type != AMQP_FRAME_BODY) {
- fprintf(stderr, "Expected body!");
- abort();
- }
-
- body_received += frame.payload.body_fragment.len;
- assert(body_received <= body_target);
-
- amqp_dump(frame.payload.body_fragment.bytes,
- frame.payload.body_fragment.len);
- }
-
- if (body_received != body_target) {
- /* Can only happen when amqp_simple_wait_frame returns <= 0 */
- /* We break here to close the connection */
- break;
+ (int) envelope.message.properties.content_type.len,
+ (char *) envelope.message.properties.content_type.bytes);
}
- amqp_basic_ack(conn, 1, d->delivery_tag, 0);
+ amqp_destroy_envelope(&envelope);
}
}