diff options
Diffstat (limited to 'cpp/examples/examples/request-response/client.cpp')
-rw-r--r-- | cpp/examples/examples/request-response/client.cpp | 101 |
1 files changed, 34 insertions, 67 deletions
diff --git a/cpp/examples/examples/request-response/client.cpp b/cpp/examples/examples/request-response/client.cpp index 79bc88c6ae..0ee0e78c92 100644 --- a/cpp/examples/examples/request-response/client.cpp +++ b/cpp/examples/examples/request-response/client.cpp @@ -39,7 +39,7 @@ #include <qpid/client/Connection.h> -#include <qpid/client/Dispatcher.h> +#include <qpid/client/SubscriptionManager.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> @@ -54,53 +54,25 @@ using namespace qpid::client; using namespace qpid::framing; class Listener : public MessageListener{ -private: - Session session; - std::string destination_name; - Dispatcher dispatcher; - int counter; -public: - Listener(Session& session, string destination_name): - session(session), - destination_name(destination_name), - dispatcher(session), - counter(0) - {}; - - virtual void listen(); - virtual void wait(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + int counter; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating response queue listener for: " <<destination_name << std::endl; - - session.messageSubscribe(arg::queue=destination_name, arg::destination=destination_name); - - session.messageFlow(arg::destination=destination_name, arg::unit=MESSAGE_CREDIT, arg::value=1); - session.messageFlow(arg::destination=destination_name, arg::unit=BYTE_CREDIT, arg::value=UNLIMITED_CREDIT); - - - dispatcher.listen(destination_name, this); -} - - -void Listener::wait() { - std::cout << "Waiting for all responses to arrive ..." << std::endl; - dispatcher.run(); -} - +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs), counter(0) +{} void Listener::received(Message& message) { - std::cout << "Response: " << message.getData() << std::endl; + std::cout << "Response: " << message.getData() << std::endl; - ++ counter; - if (counter > 3) { - std::cout << "Shutting down listener for " << destination_name << std::endl; - dispatcher.stop(); - } + ++ counter; + if (counter > 3) { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } } @@ -116,7 +88,7 @@ int main(int argc, char** argv) { connection.open(host, port); Session session = connection.newSession(); - //--------- Main body of program -------------------------------------------- + //--------- Main body of program -------------------------------------------- // Create a response queue so the server can send us responses // to our requests. Use the client's session ID as the name @@ -130,45 +102,40 @@ int main(int argc, char** argv) { session.queueDeclare(arg::queue=response_queue.str()); session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str()); - // Create a listener for the response queue and start listening. - - Listener listener(session, response_queue.str()); - listener.listen(); - - - // The routing key for the request queue is simply - // "request", and all clients use the same routing key. - // // Each client sends the name of their own response queue so // the service knows where to route messages. request.getDeliveryProperties().setRoutingKey("request"); request.getMessageProperties().setReplyTo(ReplyTo("amq.direct", response_queue.str())); + // Create a listener for the response queue and listen for response messages. + std::cout << "Activating response queue listener for: " << response_queue.str() << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, response_queue.str()); + // Now send some requests ... string s[] = { - "Twas brillig, and the slithy toves", - "Did gire and gymble in the wabe.", - "All mimsy were the borogroves,", - "And the mome raths outgrabe." + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." }; for (int i=0; i<4; i++) { - request.setData(s[i]); - // Asynchronous transfer sends messages as quickly as - // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); - std::cout << "Request: " << s[i] << std::endl; + request.setData(s[i]); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); + std::cout << "Request: " << s[i] << std::endl; } - // And wait for any outstanding responses to arrive - - listener.wait(); - + std::cout << "Waiting for all responses to arrive ..." << std::endl; + subscriptions.run(); - //----------------------------------------------------------------------------- + //----------------------------------------------------------------------------- connection.close(); return 0; |