summaryrefslogtreecommitdiff
path: root/cpp/examples/examples/request-response/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/examples/examples/request-response/client.cpp')
-rw-r--r--cpp/examples/examples/request-response/client.cpp101
1 files changed, 34 insertions, 67 deletions
diff --git a/cpp/examples/examples/request-response/client.cpp b/cpp/examples/examples/request-response/client.cpp
index 79bc88c6ae..0ee0e78c92 100644
--- a/cpp/examples/examples/request-response/client.cpp
+++ b/cpp/examples/examples/request-response/client.cpp
@@ -39,7 +39,7 @@
#include <qpid/client/Connection.h>
-#include <qpid/client/Dispatcher.h>
+#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
@@ -54,53 +54,25 @@ using namespace qpid::client;
using namespace qpid::framing;
class Listener : public MessageListener{
-private:
- Session session;
- std::string destination_name;
- Dispatcher dispatcher;
- int counter;
-public:
- Listener(Session& session, string destination_name):
- session(session),
- destination_name(destination_name),
- dispatcher(session),
- counter(0)
- {};
-
- virtual void listen();
- virtual void wait();
- virtual void received(Message& message);
- ~Listener() { };
+ private:
+ SubscriptionManager& subscriptions;
+ int counter;
+ public:
+ Listener(SubscriptionManager& subscriptions);
+ virtual void received(Message& message);
};
-
-void Listener::listen() {
- std::cout << "Activating response queue listener for: " <<destination_name << std::endl;
-
- session.messageSubscribe(arg::queue=destination_name, arg::destination=destination_name);
-
- session.messageFlow(arg::destination=destination_name, arg::unit=MESSAGE_CREDIT, arg::value=1);
- session.messageFlow(arg::destination=destination_name, arg::unit=BYTE_CREDIT, arg::value=UNLIMITED_CREDIT);
-
-
- dispatcher.listen(destination_name, this);
-}
-
-
-void Listener::wait() {
- std::cout << "Waiting for all responses to arrive ..." << std::endl;
- dispatcher.run();
-}
-
+Listener::Listener(SubscriptionManager& subs) : subscriptions(subs), counter(0)
+{}
void Listener::received(Message& message) {
- std::cout << "Response: " << message.getData() << std::endl;
+ std::cout << "Response: " << message.getData() << std::endl;
- ++ counter;
- if (counter > 3) {
- std::cout << "Shutting down listener for " << destination_name << std::endl;
- dispatcher.stop();
- }
+ ++ counter;
+ if (counter > 3) {
+ std::cout << "Shutting down listener for " << message.getDestination() << std::endl;
+ subscriptions.cancel(message.getDestination());
+ }
}
@@ -116,7 +88,7 @@ int main(int argc, char** argv) {
connection.open(host, port);
Session session = connection.newSession();
- //--------- Main body of program --------------------------------------------
+ //--------- Main body of program --------------------------------------------
// Create a response queue so the server can send us responses
// to our requests. Use the client's session ID as the name
@@ -130,45 +102,40 @@ int main(int argc, char** argv) {
session.queueDeclare(arg::queue=response_queue.str());
session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str());
- // Create a listener for the response queue and start listening.
-
- Listener listener(session, response_queue.str());
- listener.listen();
-
-
- // The routing key for the request queue is simply
- // "request", and all clients use the same routing key.
- //
// Each client sends the name of their own response queue so
// the service knows where to route messages.
request.getDeliveryProperties().setRoutingKey("request");
request.getMessageProperties().setReplyTo(ReplyTo("amq.direct", response_queue.str()));
+ // Create a listener for the response queue and listen for response messages.
+ std::cout << "Activating response queue listener for: " << response_queue.str() << std::endl;
+ SubscriptionManager subscriptions(session);
+ Listener listener(subscriptions);
+ subscriptions.subscribe(listener, response_queue.str());
+
// Now send some requests ...
string s[] = {
- "Twas brillig, and the slithy toves",
- "Did gire and gymble in the wabe.",
- "All mimsy were the borogroves,",
- "And the mome raths outgrabe."
+ "Twas brillig, and the slithy toves",
+ "Did gire and gymble in the wabe.",
+ "All mimsy were the borogroves,",
+ "And the mome raths outgrabe."
};
for (int i=0; i<4; i++) {
- request.setData(s[i]);
- // Asynchronous transfer sends messages as quickly as
- // possible without waiting for confirmation.
- async(session).messageTransfer(arg::content=request, arg::destination="amq.direct");
- std::cout << "Request: " << s[i] << std::endl;
+ request.setData(s[i]);
+ // Asynchronous transfer sends messages as quickly as
+ // possible without waiting for confirmation.
+ async(session).messageTransfer(arg::content=request, arg::destination="amq.direct");
+ std::cout << "Request: " << s[i] << std::endl;
}
- // And wait for any outstanding responses to arrive
-
- listener.wait();
-
+ std::cout << "Waiting for all responses to arrive ..." << std::endl;
+ subscriptions.run();
- //-----------------------------------------------------------------------------
+ //-----------------------------------------------------------------------------
connection.close();
return 0;