diff options
Diffstat (limited to 'qpid/cpp/src/tests/topic_listener.cpp')
-rw-r--r-- | qpid/cpp/src/tests/topic_listener.cpp | 105 |
1 files changed, 55 insertions, 50 deletions
diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp index 44070cd4c9..aa8c19df99 100644 --- a/qpid/cpp/src/tests/topic_listener.cpp +++ b/qpid/cpp/src/tests/topic_listener.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,7 @@ * This file provides one half of a test and example of a pub-sub * style of interaction. See topic_publisher.cpp for the other half, * in which the logic for publishing is defined. - * + * * This file contains the listener logic. A listener will subscribe to * a logical 'topic'. It will count the number of messages it receives * and the time elapsed between the first one and the last one. It @@ -50,11 +50,14 @@ using namespace qpid::sys; using namespace qpid::framing; using namespace std; +namespace qpid { +namespace tests { + /** * A message listener implementation in which the runtime logic is * defined. */ -class Listener : public MessageListener{ +class Listener : public MessageListener{ Session session; SubscriptionManager& mgr; const string responseQueue; @@ -62,7 +65,7 @@ class Listener : public MessageListener{ bool init; int count; AbsTime start; - + void shutdown(); void report(); public: @@ -91,6 +94,52 @@ struct Args : public qpid::TestOptions { } }; +Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : + session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} + +void Listener::received(Message& message){ + if(!init){ + start = now(); + count = 0; + init = true; + cout << "Batch started." << endl; + } + string type = message.getHeaders().getAsString("TYPE"); + + if(string("TERMINATION_REQUEST") == type){ + shutdown(); + }else if(string("REPORT_REQUEST") == type){ + subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point + cout <<"Batch ended, sending report." << endl; + //send a report: + report(); + init = false; + }else if (++count % 1000 == 0){ + cout <<"Received " << count << " messages." << endl; + } +} + +void Listener::shutdown(){ + mgr.stop(); +} + +void Listener::report(){ + AbsTime finish = now(); + Duration time(start, finish); + stringstream reportstr; + reportstr << "Received " << count << " messages in " + << time/TIME_MSEC << " ms."; + Message msg(reportstr.str(), responseQueue); + msg.getHeaders().setString("TYPE", "REPORT"); + session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1); + if(transactional){ + sync(session).txCommit(); + } +} + +}} // namespace qpid::tests + +using namespace qpid::tests; /** * The main routine creates a Listener instance and sets it up to @@ -142,7 +191,7 @@ int main(int argc, char** argv){ if (args.transactional) { session.txSelect(); } - + cout << "topic_listener: listening..." << endl; mgr.run(); if (args.durable) { @@ -158,47 +207,3 @@ int main(int argc, char** argv){ } return 1; } - -Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) : - session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){} - -void Listener::received(Message& message){ - if(!init){ - start = now(); - count = 0; - init = true; - cout << "Batch started." << endl; - } - string type = message.getHeaders().getAsString("TYPE"); - - if(string("TERMINATION_REQUEST") == type){ - shutdown(); - }else if(string("REPORT_REQUEST") == type){ - subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point - cout <<"Batch ended, sending report." << endl; - //send a report: - report(); - init = false; - }else if (++count % 1000 == 0){ - cout <<"Received " << count << " messages." << endl; - } -} - -void Listener::shutdown(){ - mgr.stop(); -} - -void Listener::report(){ - AbsTime finish = now(); - Duration time(start, finish); - stringstream reportstr; - reportstr << "Received " << count << " messages in " - << time/TIME_MSEC << " ms."; - Message msg(reportstr.str(), responseQueue); - msg.getHeaders().setString("TYPE", "REPORT"); - session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1); - if(transactional){ - sync(session).txCommit(); - } -} - |