diff options
Diffstat (limited to 'cpp/examples')
-rw-r--r-- | cpp/examples/examples/direct/listener.cpp | 1 | ||||
-rw-r--r-- | cpp/examples/examples/fanout/listener.cpp | 1 | ||||
-rw-r--r-- | cpp/examples/examples/pub-sub/topic_listener.cpp | 1 | ||||
-rw-r--r-- | cpp/examples/examples/request-response/client.cpp | 101 | ||||
-rw-r--r-- | cpp/examples/examples/request-response/server.cpp | 108 | ||||
-rw-r--r-- | cpp/examples/examples/xml-exchange/listener.cpp | 1 |
6 files changed, 77 insertions, 136 deletions
diff --git a/cpp/examples/examples/direct/listener.cpp b/cpp/examples/examples/direct/listener.cpp index fc2fa96ead..b1aa9754bc 100644 --- a/cpp/examples/examples/direct/listener.cpp +++ b/cpp/examples/examples/direct/listener.cpp @@ -24,7 +24,6 @@ * the broker using a message listener. */ -#include <qpid/client/Dispatcher.h> #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp index d9bf9789dc..294dfc7855 100644 --- a/cpp/examples/examples/fanout/listener.cpp +++ b/cpp/examples/examples/fanout/listener.cpp @@ -24,7 +24,6 @@ * the broker using a message listener. */ -#include <qpid/client/Dispatcher.h> #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp index 4d854e57ff..9996abab19 100644 --- a/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -44,7 +44,6 @@ #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> -#include <qpid/client/Queue.h> #include <qpid/client/SubscriptionManager.h> #include <unistd.h> diff --git a/cpp/examples/examples/request-response/client.cpp b/cpp/examples/examples/request-response/client.cpp index 79bc88c6ae..0ee0e78c92 100644 --- a/cpp/examples/examples/request-response/client.cpp +++ b/cpp/examples/examples/request-response/client.cpp @@ -39,7 +39,7 @@ #include <qpid/client/Connection.h> -#include <qpid/client/Dispatcher.h> +#include <qpid/client/SubscriptionManager.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> @@ -54,53 +54,25 @@ using namespace qpid::client; using namespace qpid::framing; class Listener : public MessageListener{ -private: - Session session; - std::string destination_name; - Dispatcher dispatcher; - int counter; -public: - Listener(Session& session, string destination_name): - session(session), - destination_name(destination_name), - dispatcher(session), - counter(0) - {}; - - virtual void listen(); - virtual void wait(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + int counter; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating response queue listener for: " <<destination_name << std::endl; - - session.messageSubscribe(arg::queue=destination_name, arg::destination=destination_name); - - session.messageFlow(arg::destination=destination_name, arg::unit=MESSAGE_CREDIT, arg::value=1); - session.messageFlow(arg::destination=destination_name, arg::unit=BYTE_CREDIT, arg::value=UNLIMITED_CREDIT); - - - dispatcher.listen(destination_name, this); -} - - -void Listener::wait() { - std::cout << "Waiting for all responses to arrive ..." << std::endl; - dispatcher.run(); -} - +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs), counter(0) +{} void Listener::received(Message& message) { - std::cout << "Response: " << message.getData() << std::endl; + std::cout << "Response: " << message.getData() << std::endl; - ++ counter; - if (counter > 3) { - std::cout << "Shutting down listener for " << destination_name << std::endl; - dispatcher.stop(); - } + ++ counter; + if (counter > 3) { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } } @@ -116,7 +88,7 @@ int main(int argc, char** argv) { connection.open(host, port); Session session = connection.newSession(); - //--------- Main body of program -------------------------------------------- + //--------- Main body of program -------------------------------------------- // Create a response queue so the server can send us responses // to our requests. Use the client's session ID as the name @@ -130,45 +102,40 @@ int main(int argc, char** argv) { session.queueDeclare(arg::queue=response_queue.str()); session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str()); - // Create a listener for the response queue and start listening. - - Listener listener(session, response_queue.str()); - listener.listen(); - - - // The routing key for the request queue is simply - // "request", and all clients use the same routing key. - // // Each client sends the name of their own response queue so // the service knows where to route messages. request.getDeliveryProperties().setRoutingKey("request"); request.getMessageProperties().setReplyTo(ReplyTo("amq.direct", response_queue.str())); + // Create a listener for the response queue and listen for response messages. + std::cout << "Activating response queue listener for: " << response_queue.str() << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, response_queue.str()); + // Now send some requests ... string s[] = { - "Twas brillig, and the slithy toves", - "Did gire and gymble in the wabe.", - "All mimsy were the borogroves,", - "And the mome raths outgrabe." + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." }; for (int i=0; i<4; i++) { - request.setData(s[i]); - // Asynchronous transfer sends messages as quickly as - // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); - std::cout << "Request: " << s[i] << std::endl; + request.setData(s[i]); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); + std::cout << "Request: " << s[i] << std::endl; } - // And wait for any outstanding responses to arrive - - listener.wait(); - + std::cout << "Waiting for all responses to arrive ..." << std::endl; + subscriptions.run(); - //----------------------------------------------------------------------------- + //----------------------------------------------------------------------------- connection.close(); return 0; diff --git a/cpp/examples/examples/request-response/server.cpp b/cpp/examples/examples/request-response/server.cpp index 83144c715d..df189cfdd8 100644 --- a/cpp/examples/examples/request-response/server.cpp +++ b/cpp/examples/examples/request-response/server.cpp @@ -39,8 +39,9 @@ #include <qpid/client/Connection.h> -#include <qpid/client/Dispatcher.h> +#include <qpid/client/SubscriptionManager.h> #include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> @@ -59,102 +60,79 @@ using std::stringstream; using std::string; class Listener : public MessageListener{ -private: - std::string destination_name; - Dispatcher dispatcher; - Session session; -public: - Listener(Session& session, string destination_name): - destination_name(destination_name), - dispatcher(session), - session(session) - {}; - - virtual void listen(); - virtual void received(Message& message); - virtual void wait(); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + AsyncSession asyncSession; + public: + Listener(SubscriptionManager& subscriptions, Session& session); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating request queue listener for: " <<destination_name << std::endl; - - session.messageSubscribe(arg::queue=destination_name, arg::destination=destination_name); - - session.messageFlow(arg::destination=destination_name, arg::unit=MESSAGE_CREDIT, arg::value=1); - session.messageFlow(arg::destination=destination_name, arg::unit=BYTE_CREDIT, arg::value=UNLIMITED_CREDIT); - - dispatcher.listen(destination_name, this); -} - - -void Listener::wait() { - std::cout << "Waiting for requests" << std::endl; - dispatcher.run(); -} - +Listener::Listener(SubscriptionManager& subs, Session& session) + : subscriptions(subs), asyncSession(session) +{} void Listener::received(Message& request) { + Message response; - Message response; - - // Get routing key for response from the request's replyTo property + // Get routing key for response from the request's replyTo property + string routingKey; - string routingKey; + if (request.getMessageProperties().hasReplyTo()) { + routingKey = request.getMessageProperties().getReplyTo().getRoutingKey(); + } else { + std::cout << "Error: " << "No routing key for request (" << request.getData() << ")" << std::endl; + return; + } - if (request.getMessageProperties().hasReplyTo()) { - routingKey = request.getMessageProperties().getReplyTo().getRoutingKey(); - } else { - std::cout << "Error: " << "No routing key for request (" << request.getData() << ")" << std::endl; - return; - } + std::cout << "Request: " << request.getData() << " (" <<routingKey << ")" << std::endl; - std::cout << "Request: " << request.getData() << " (" <<routingKey << ")" << std::endl; + // Transform message content to upper case + std::string s = request.getData(); + std::transform (s.begin(), s.end(), s.begin(), toupper); + response.setData(s); - // Transform message content to upper case - std::string s = request.getData(); - std::transform (s.begin(), s.end(), s.begin(), toupper); - response.setData(s); + // Send it back to the user + response.getDeliveryProperties().setRoutingKey(routingKey); - // Send it back to the user - response.getDeliveryProperties().setRoutingKey(routingKey); - - // Asynchronous transfer sends messages as quickly as - // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=response, arg::destination="amq.direct"); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + asyncSession.messageTransfer(arg::content=response, arg::destination="amq.direct"); } int main(int argc, char** argv) { const char* host = argc>1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; - Connection connection; + Connection connection; Message message; try { connection.open(host, port); Session session = connection.newSession(); - //--------- Main body of program -------------------------------------------- + //--------- Main body of program -------------------------------------------- + // Create a request queue for clients to use when making // requests. - string request_queue = "request"; // Use the name of the request queue as the routing key - session.queueDeclare(arg::queue=request_queue); session.exchangeBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::bindingKey=request_queue); - // Create a listener for the request queue and start listening. - - Listener listener(session, request_queue); - listener.listen(); - listener.wait(); + // Create a listener and subscribe it to the request_queue + std::cout << "Activating request queue listener for: " << request_queue << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions, session); + subscriptions.subscribe(listener, request_queue); + // Deliver messages until the subscription is cancelled + // by Listener::received() + std::cout << "Waiting for requests" << std::endl; + subscriptions.run(); - //----------------------------------------------------------------------------- + //----------------------------------------------------------------------------- connection.close(); return 0; diff --git a/cpp/examples/examples/xml-exchange/listener.cpp b/cpp/examples/examples/xml-exchange/listener.cpp index 559cfaf8c9..f4305b5c5a 100644 --- a/cpp/examples/examples/xml-exchange/listener.cpp +++ b/cpp/examples/examples/xml-exchange/listener.cpp @@ -24,7 +24,6 @@ * the broker using a message listener. */ -#include <qpid/client/Dispatcher.h> #include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> |