diff options
author | Alan Conway <aconway@apache.org> | 2007-11-09 02:38:33 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-11-09 02:38:33 +0000 |
commit | efb5b0171cd6049db8fdd2aa3f9546c65be19852 (patch) | |
tree | cb22b69282fbcd204f5dad2a2795c21d58a08f9e /cpp/examples/pub-sub | |
parent | 976252c508b4feebef3b0811f640cd9bf0cfeeaa (diff) | |
download | qpid-python-efb5b0171cd6049db8fdd2aa3f9546c65be19852.tar.gz |
QPID-676: Jonathan Robie's C++ examples.
Made the following alterations for recent C++ API changes:
- use arg:: namespace for Session keyword arguments.
- removed trailing _ on session method names.
cpp/examples/Makefile.am calls make in each example directory with
flags to build examples from headers/libraries SVN checkout.
Examples themselves have a plain Makefile (not automake) which will
work as is if qpid is installed in standard places.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593402 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/examples/pub-sub')
-rw-r--r-- | cpp/examples/pub-sub/Makefile | 17 | ||||
-rw-r--r-- | cpp/examples/pub-sub/topic_config_queues.cpp | 128 | ||||
-rw-r--r-- | cpp/examples/pub-sub/topic_listener.cpp | 169 | ||||
-rw-r--r-- | cpp/examples/pub-sub/topic_publisher.cpp | 123 |
4 files changed, 437 insertions, 0 deletions
diff --git a/cpp/examples/pub-sub/Makefile b/cpp/examples/pub-sub/Makefile new file mode 100644 index 0000000000..ea08031da5 --- /dev/null +++ b/cpp/examples/pub-sub/Makefile @@ -0,0 +1,17 @@ +CXX=g++ +CXXFLAGS= + +PROGRAMS=topic_config_queues topic_listener topic_publisher +all: $(PROGRAMS) + +topic_config_queues: topic_config_queues.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +topic_listener: topic_listener.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +topic_publisher: topic_publisher.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +clean: + rm -f $(PROGRAMS) diff --git a/cpp/examples/pub-sub/topic_config_queues.cpp b/cpp/examples/pub-sub/topic_config_queues.cpp new file mode 100644 index 0000000000..8c05241f3c --- /dev/null +++ b/cpp/examples/pub-sub/topic_config_queues.cpp @@ -0,0 +1,128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_config_queues.cpp + * + * This program is one of three programs designed to be used + * together. These programs use the topic exchange. + * + * topic_config_queues.cpp (this program): + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * topic_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * topic_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main() { + Connection connection; + Message msg; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + + /* A consumer application reads from the queue, and needs no + * knowledge of the exchanges used to route messages to the + * queue, or of the routing keys. + * + * A publisher application writes to the exchange, providing a + * routing key, It needs no knowledge of the queues or bindings + * used to route messages to consumers. + */ + + + /* Create queues on the broker. */ + + session.queueDeclare(arg::queue="news_queue"); + session.queueDeclare(arg::queue="weather_queue"); + session.queueDeclare(arg::queue="usa_queue"); + session.queueDeclare(arg::queue="europe_queue"); + + /* Bind these queues using routing keys, so messages will be + delivered to the right queues. */ + + session.queueBind(arg::exchange="amq.topic", arg::queue="news_queue", arg::routingKey="#.news"); + session.queueBind(arg::exchange="amq.topic", arg::queue="weather_queue", arg::routingKey="#.weather"); + session.queueBind(arg::exchange="amq.topic", arg::queue="usa_queue", arg::routingKey="usa.#"); + session.queueBind(arg::exchange="amq.topic", arg::queue="europe_queue", arg::routingKey="europe.#"); + + + /* + * We use a separate 'control' routing key for control + * messages. All such messages are routed to each queue. In + * this demo, we use a message with the content "That's all, + * Folks!" to signal that no more messages will be sent, and + * users of the queue can stop listening for messages. + * + * Because wildcard matching can result in more than one match for + * a given message, it can place more messages on the queues than + * were originally received. + * + * We do not use wildcard matching for control messages. We + * want to make sure that each such message is received once + * and only once. + */ + + + session.queueBind(arg::exchange="amq.topic", arg::queue="news_queue", arg::routingKey="control"); + session.queueBind(arg::exchange="amq.topic", arg::queue="weather_queue", arg::routingKey="control"); + session.queueBind(arg::exchange="amq.topic", arg::queue="usa_queue", arg::routingKey="control"); + session.queueBind(arg::exchange="amq.topic", arg::queue="europe_queue", arg::routingKey="control"); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/cpp/examples/pub-sub/topic_listener.cpp b/cpp/examples/pub-sub/topic_listener.cpp new file mode 100644 index 0000000000..323c93dd0b --- /dev/null +++ b/cpp/examples/pub-sub/topic_listener.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_listener.cpp: + * + * This program is one of three programs designed to be used + * together. These programs use the topic exchange. + * + * topic_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * topic_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * topic_listener.cpp (this program): + * + * Reads from a queue on the broker using a message listener. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/Queue.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> +#include <set> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener { + private: + Session& session; + SubscriptionManager subscriptions; + public: + Listener(Session& session); + virtual void prepareQueue(std::string queue, std::string routing_key); + virtual void received(Message& message); + virtual void listen(); + ~Listener() { }; +}; + + +/* + * Listener::Listener + * + * Subscribe to the queue, route it to a client destination for the + * listener. (The destination name merely identifies the destination + * in the listener, you can use any name as long as you use the same + * name for the listener). + */ + +Listener::Listener(Session& session) : + session(session), + subscriptions(session) +{ +} + + +void Listener::prepareQueue(std::string queue, std::string routing_key) { + + /* Create a unique queue name for this consumer by concatenating + * the queue name parameter with the Session ID. + */ + + queue += session.getId().str(); + std::cout << "Declaring queue: " << queue << std::endl; + + /* Declare an exclusive queue on the broker + */ + + session.queueDeclare(arg::queue=queue, arg::exclusive=true); + + /* Route messages to the new queue if they match the routing key. + * + * Also route any messages to with the "control" routing key to + * this queue so we know when it's time to stop. A publisher sends + * a message with the content "That's all, Folks!", using the + * "control" routing key, when it is finished. + */ + + session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey=routing_key); + session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey="control"); + + /* + * subscribe to the queue using the subscription manager. + */ + + std::cout << "Subscribing to queue " << queue << std::endl; + subscriptions.subscribe(*this, queue); +} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << " from " << message.getDestination() << std::endl; + + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +void Listener::listen() { + subscriptions.run(); +} + +int main() { + Connection connection; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Create a listener for the session + + Listener listener(session); + + // Subscribe to messages on the queues we are interested in + + listener.prepareQueue("usa", "usa.#"); + listener.prepareQueue("europe", "europe.#"); + listener.prepareQueue("news", "#.news"); + listener.prepareQueue("weather", "#.weather"); + + std::cout << "Listening for messages ..." << std::endl; + + // Give up control and receive messages + listener.listen(); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/pub-sub/topic_publisher.cpp b/cpp/examples/pub-sub/topic_publisher.cpp new file mode 100644 index 0000000000..52c2827e58 --- /dev/null +++ b/cpp/examples/pub-sub/topic_publisher.cpp @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * topic_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs use the topic exchange. + * + * topic_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * topic_publisher.cpp (this program): + * + * Publishes to a broker, specifying a routing key. + * + * topic_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +void publish_messages(Session& session, string routing_key) +{ + Message message; + + // Set the routing key once, we'll use the same routing key for all + // messages. + + message.getDeliveryProperties().setRoutingKey(routing_key); + for (int i=0; i<5; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + session.messageTransfer(arg::content=message, arg::destination="amq.topic"); + } + +} + +/* + * no_more_messages() + * + * Send a message to indicate that no more messages are coming. + * Use the 'control' routing key (see comments in topic_config_queues.cpp). + * + */ + +void no_more_messages(Session& session) +{ + Message message; + + message.getDeliveryProperties().setRoutingKey("control"); + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message, arg::destination="amq.topic"); +} + +int main() { + Connection connection; + Message message; + try { + connection.open("127.0.0.1", 5672 ); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + publish_messages(session, "usa.news"); + publish_messages(session, "usa.weather"); + publish_messages(session, "europe.news"); + publish_messages(session, "europe.weather"); + + no_more_messages(session); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + |