diff options
Diffstat (limited to 'cpp/examples')
-rw-r--r-- | cpp/examples/direct/declare_queues.cpp | 3 | ||||
-rw-r--r-- | cpp/examples/direct/direct_producer.cpp | 7 | ||||
-rw-r--r-- | cpp/examples/fanout/fanout_producer.cpp | 5 | ||||
-rw-r--r-- | cpp/examples/fanout/listener.cpp | 3 | ||||
-rw-r--r-- | cpp/examples/pub-sub/topic_listener.cpp | 19 | ||||
-rw-r--r-- | cpp/examples/pub-sub/topic_publisher.cpp | 19 |
6 files changed, 31 insertions, 25 deletions
diff --git a/cpp/examples/direct/declare_queues.cpp b/cpp/examples/direct/declare_queues.cpp index 0cdb472665..3289efb872 100644 --- a/cpp/examples/direct/declare_queues.cpp +++ b/cpp/examples/direct/declare_queues.cpp @@ -56,6 +56,7 @@ using std::string; int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string exchange(argc>3 ? argv[3] : "amq.direct"); Connection connection; try { @@ -69,7 +70,7 @@ int main(int argc, char** argv) { // routing key is "routing_key" to this newly created queue. session.queueDeclare(arg::queue="message_queue"); - session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key"); + session.exchangeBind(arg::exchange=exchange, arg::queue="message_queue", arg::bindingKey="routing_key"); //----------------------------------------------------------------------------- diff --git a/cpp/examples/direct/direct_producer.cpp b/cpp/examples/direct/direct_producer.cpp index baa8d9092b..9ea3c812a6 100644 --- a/cpp/examples/direct/direct_producer.cpp +++ b/cpp/examples/direct/direct_producer.cpp @@ -65,6 +65,7 @@ int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; int count = argc>3 ? atoi(argv[3]) : 10; + string exchange(argc>4 ? argv[4] : "amq.direct"); Connection connection; Message message; try { @@ -89,14 +90,14 @@ int main(int argc, char** argv) { message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - // async(session).messageTransfer(arg::content=message, arg::destination="amq.direct"); - session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + // async(session).messageTransfer(arg::content=message, arg::destination=exchange); + session.messageTransfer(arg::content=message, arg::destination=exchange); } // And send a final message to indicate termination. message.setData("That's all, folks!"); - session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + session.messageTransfer(arg::content=message, arg::destination=exchange); //----------------------------------------------------------------------------- diff --git a/cpp/examples/fanout/fanout_producer.cpp b/cpp/examples/fanout/fanout_producer.cpp index a1ca407847..bb253d7027 100644 --- a/cpp/examples/fanout/fanout_producer.cpp +++ b/cpp/examples/fanout/fanout_producer.cpp @@ -64,6 +64,7 @@ using std::string; int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string exchange = argc>3 ? argv[3] : "amq.fanout"; Connection connection; Message message; try { @@ -86,13 +87,13 @@ int main(int argc, char** argv) { message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout"); + async(session).messageTransfer(arg::content=message, arg::destination=exchange); } // And send a final message to indicate termination. message.setData("That's all, folks!"); - session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); + session.messageTransfer(arg::content=message, arg::destination=exchange); //----------------------------------------------------------------------------- diff --git a/cpp/examples/fanout/listener.cpp b/cpp/examples/fanout/listener.cpp index b29c82d3d9..2938125f4b 100644 --- a/cpp/examples/fanout/listener.cpp +++ b/cpp/examples/fanout/listener.cpp @@ -60,6 +60,7 @@ void Listener::received(Message& message) { int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + string exchange = argc>3 ? argv[3] : "amq.fanout"; Connection connection; Message msg; try { @@ -82,7 +83,7 @@ int main(int argc, char** argv) { session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, arg::autoDelete=true); - session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key"); + session.exchangeBind(arg::exchange=exchange, arg::queue=myQueue, arg::bindingKey="my-key"); // Create a listener and subscribe it to my queue. SubscriptionManager subscriptions(session); diff --git a/cpp/examples/pub-sub/topic_listener.cpp b/cpp/examples/pub-sub/topic_listener.cpp index 9996abab19..af70cc2672 100644 --- a/cpp/examples/pub-sub/topic_listener.cpp +++ b/cpp/examples/pub-sub/topic_listener.cpp @@ -61,7 +61,7 @@ class Listener : public MessageListener { SubscriptionManager subscriptions; public: Listener(Session& session); - virtual void prepareQueue(std::string queue, std::string routing_key); + virtual void prepareQueue(std::string queue, std::string exchange, std::string routing_key); virtual void received(Message& message); virtual void listen(); ~Listener() { }; @@ -84,7 +84,7 @@ Listener::Listener(Session& session) : } -void Listener::prepareQueue(std::string queue, std::string routing_key) { +void Listener::prepareQueue(std::string queue, std::string exchange, std::string routing_key) { /* Create a unique queue name for this consumer by concatenating * the queue name parameter with the Session ID. @@ -106,8 +106,8 @@ void Listener::prepareQueue(std::string queue, std::string routing_key) { * "control" routing key, when it is finished. */ - session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey=routing_key); - session.exchangeBind(arg::exchange="amq.topic", arg::queue=queue, arg::bindingKey="control"); + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=routing_key); + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey="control"); /* * subscribe to the queue using the subscription manager. @@ -134,6 +134,7 @@ void Listener::listen() { int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + std::string exchange = argc>3 ? argv[3] : "amq.topic"; Connection connection; try { connection.open(host, port); @@ -147,12 +148,12 @@ int main(int argc, char** argv) { // Subscribe to messages on the queues we are interested in - listener.prepareQueue("usa", "usa.#"); - listener.prepareQueue("europe", "europe.#"); - listener.prepareQueue("news", "#.news"); - listener.prepareQueue("weather", "#.weather"); + listener.prepareQueue("usa", exchange, "usa.#"); + listener.prepareQueue("europe", exchange, "europe.#"); + listener.prepareQueue("news", exchange, "#.news"); + listener.prepareQueue("weather", exchange, "#.weather"); - std::cout << "Listening for messages ..." << std::endl; + std::cout << "Listening for messages ..." << std::endl; // Give up control and receive messages listener.listen(); diff --git a/cpp/examples/pub-sub/topic_publisher.cpp b/cpp/examples/pub-sub/topic_publisher.cpp index ab485fec8f..d11e807259 100644 --- a/cpp/examples/pub-sub/topic_publisher.cpp +++ b/cpp/examples/pub-sub/topic_publisher.cpp @@ -60,7 +60,7 @@ using namespace qpid::framing; using std::stringstream; using std::string; -void publish_messages(Session& session, string routing_key) +void publish_messages(Session& session, string exchange, string routing_key) { Message message; @@ -75,7 +75,7 @@ void publish_messages(Session& session, string routing_key) message.setData(message_data.str()); // Asynchronous transfer sends messages as quickly as // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); + async(session).messageTransfer(arg::content=message, arg::destination=exchange); } } @@ -88,18 +88,19 @@ void publish_messages(Session& session, string routing_key) * */ -void no_more_messages(Session& session) +void no_more_messages(Session& session, string exchange) { Message message; message.getDeliveryProperties().setRoutingKey("control"); message.setData("That's all, folks!"); - session.messageTransfer(arg::content=message, arg::destination="amq.topic"); + session.messageTransfer(arg::content=message, arg::destination=exchange); } int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; + std::string exchange = argc>3 ? argv[3] : "amq.topic"; Connection connection; Message message; try { @@ -108,12 +109,12 @@ int main(int argc, char** argv) { //--------- Main body of program -------------------------------------------- - publish_messages(session, "usa.news"); - publish_messages(session, "usa.weather"); - publish_messages(session, "europe.news"); - publish_messages(session, "europe.weather"); + publish_messages(session, exchange, "usa.news"); + publish_messages(session, exchange, "usa.weather"); + publish_messages(session, exchange, "europe.news"); + publish_messages(session, exchange, "europe.weather"); - no_more_messages(session); + no_more_messages(session, exchange); //----------------------------------------------------------------------------- |