diff options
author | Jani Hur <webmaster@jani-hur.net> | 2012-09-29 18:10:42 +0300 |
---|---|---|
committer | Alan Antonuk <aega@med.umich.edu> | 2012-10-05 10:31:26 -0400 |
commit | 2fd0ccf00717b68fae4cc0007bde26f6601fd9bb (patch) | |
tree | 2e396d8340eb9afe94a78edca0eaf56e5aa19e8a | |
parent | b7b379acd60776cedf211f262b9ee5ff55bee96a (diff) | |
download | rabbitmq-c-2fd0ccf00717b68fae4cc0007bde26f6601fd9bb.tar.gz |
New example: amqp_rpc_sendstring_client. The client sends a plain text message to a queue and expects to receive a reply to a private reply_to queue.
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | .hgignore | 1 | ||||
-rw-r--r-- | Makefile.am | 6 | ||||
-rw-r--r-- | examples/CMakeLists.txt | 3 | ||||
-rwxr-xr-x | examples/amqp_rpc_sendstring_client.c | 228 |
5 files changed, 239 insertions, 0 deletions
@@ -29,6 +29,7 @@ examples/amqp_exchange_declare examples/amqp_listen examples/amqp_listenq examples/amqp_producer +examples/amqp_rpc_sendstring_client examples/amqp_sendstring examples/amqp_unbind librabbitmq.pc @@ -22,6 +22,7 @@ ^tests/test_tables$ ^tests/test_parse_url$ ^examples/amqp_sendstring$ +^examples/amqp_rpc_sendstring_client$ ^examples/amqp_exchange_declare$ ^examples/amqp_listen$ ^examples/amqp_producer$ diff --git a/Makefile.am b/Makefile.am index eee1961..4f6095a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -106,6 +106,7 @@ noinst_PROGRAMS = \ examples/amqp_listen \ examples/amqp_listenq \ examples/amqp_producer \ + examples/amqp_rpc_sendstring_client \ examples/amqp_sendstring \ examples/amqp_unbind @@ -149,6 +150,11 @@ examples_amqp_listenq_LDADD = \ examples/libutils.la \ librabbitmq/librabbitmq.la +examples_amqp_rpc_sendstring_client_SOURCES = examples/amqp_rpc_sendstring_client.c +examples_amqp_rpc_sendstring_client_LDADD = \ + examples/libutils.la \ + librabbitmq/librabbitmq.la + if TOOLS noinst_LTLIBRARIES += tools/libcommon.la diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 3194e13..9aae0ae 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -15,6 +15,9 @@ set(COMMON_SRCS add_executable(amqp_sendstring amqp_sendstring.c ${COMMON_SRCS}) target_link_libraries(amqp_sendstring rabbitmq) +add_executable(amqp_rpc_sendstring_client amqp_rpc_sendstring_client.c ${COMMON_SRCS}) +target_link_libraries(amqp_rpc_sendstring_client rabbitmq) + add_executable(amqp_exchange_declare amqp_exchange_declare.c ${COMMON_SRCS}) target_link_libraries(amqp_exchange_declare rabbitmq) diff --git a/examples/amqp_rpc_sendstring_client.c b/examples/amqp_rpc_sendstring_client.c new file mode 100755 index 0000000..b2dcde6 --- /dev/null +++ b/examples/amqp_rpc_sendstring_client.c @@ -0,0 +1,228 @@ +/* + * ***** BEGIN LICENSE BLOCK ***** + * Version: MIT + * + * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc. + * All Rights Reserved. + * + * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010 + * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, copy, + * modify, merge, publish, distribute, sublicense, and/or sell copies + * of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * ***** END LICENSE BLOCK ***** + */ + +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include <stdint.h> +#include <amqp.h> +#include <amqp_framing.h> + +#include <assert.h> + +#include "utils.h" + +int main(int argc, char* argv[]) { + char const *hostname; + int port; + char const *exchange; + char const *routingkey; + char const *messagebody; + + int sockfd; + amqp_connection_state_t conn; + amqp_bytes_t reply_to_queue; + + if (argc < 6) { /* minimum number of mandatory arguments */ + fprintf(stderr, "usage:\namqp_rpc_sendstring_client host port exchange routingkey messagebody\n"); + return 1; + } + + hostname = argv[1]; + port = atoi(argv[2]); + exchange = argv[3]; + routingkey = argv[4]; + messagebody = argv[5]; + + /* + establish a channel that is used to connect RabbitMQ server + */ + + conn = amqp_new_connection(); + + die_on_error(sockfd = amqp_open_socket(hostname, port), "Opening socket"); + amqp_set_sockfd(conn, sockfd); + die_on_amqp_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in"); + amqp_channel_open(conn, 1); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Opening channel"); + + /* + create private reply_to queue + */ + + { + amqp_queue_declare_ok_t *r = amqp_queue_declare(conn, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Declaring queue"); + reply_to_queue = amqp_bytes_malloc_dup(r->queue); + if (reply_to_queue.bytes == NULL) { + fprintf(stderr, "Out of memory while copying queue name"); + return 1; + } + } + + /* + send the message + */ + + { + /* + set properties + */ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | + AMQP_BASIC_DELIVERY_MODE_FLAG | + AMQP_BASIC_REPLY_TO_FLAG | + AMQP_BASIC_CORRELATION_ID_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = 2; /* persistent delivery mode */ + props.reply_to = amqp_bytes_malloc_dup(reply_to_queue); + if (props.reply_to.bytes == NULL) { + fprintf(stderr, "Out of memory while copying queue name"); + return 1; + } + props.correlation_id = amqp_cstring_bytes("1"); + + /* + publish + */ + die_on_error(amqp_basic_publish(conn, + 1, + amqp_cstring_bytes(exchange), + amqp_cstring_bytes(routingkey), + 0, + 0, + &props, + amqp_cstring_bytes(messagebody)), + "Publishing"); + + amqp_bytes_free(props.reply_to); + } + + /* + wait an answer + */ + + { + amqp_basic_consume(conn, 1, reply_to_queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table); + die_on_amqp_error(amqp_get_rpc_reply(conn), "Consuming"); + amqp_bytes_free(reply_to_queue); + + { + amqp_frame_t frame; + int result; + + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; + size_t body_target; + size_t body_received; + + while (1) { + amqp_maybe_release_buffers(conn); + result = amqp_simple_wait_frame(conn, &frame); + printf("Result: %d\n", result); + if (result < 0) + break; + + printf("Frame type: %d channel: %d\n", frame.frame_type, frame.channel); + if (frame.frame_type != AMQP_FRAME_METHOD) + continue; + + printf("Method: %s\n", amqp_method_name(frame.payload.method.id)); + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + printf("Delivery: %u exchange: %.*s routingkey: %.*s\n", + (unsigned) d->delivery_tag, + (int) d->exchange.len, (char *) d->exchange.bytes, + (int) d->routing_key.len, (char *) d->routing_key.bytes); + + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + break; + + if (frame.frame_type != AMQP_FRAME_HEADER) { + fprintf(stderr, "Expected header!"); + abort(); + } + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + printf("Content-type: %.*s\n", + (int) p->content_type.len, (char *) p->content_type.bytes); + } + printf("----\n"); + + body_target = frame.payload.properties.body_size; + body_received = 0; + + while (body_received < body_target) { + result = amqp_simple_wait_frame(conn, &frame); + if (result < 0) + break; + + if (frame.frame_type != AMQP_FRAME_BODY) { + fprintf(stderr, "Expected body!"); + abort(); + } + + body_received += frame.payload.body_fragment.len; + assert(body_received <= body_target); + + amqp_dump(frame.payload.body_fragment.bytes, + frame.payload.body_fragment.len); + } + + if (body_received != body_target) { + /* Can only happen when amqp_simple_wait_frame returns <= 0 */ + /* We break here to close the connection */ + break; + } + + /* everything was fine, we can quit now because we received the reply */ + break; + } + + } + } + + /* + closing + */ + + die_on_amqp_error(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "Closing channel"); + die_on_amqp_error(amqp_connection_close(conn, AMQP_REPLY_SUCCESS), "Closing connection"); + die_on_error(amqp_destroy_connection(conn), "Ending connection"); + + return 0; +} |