summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/topic_listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/topic_listener.cpp')
-rw-r--r--qpid/cpp/src/tests/topic_listener.cpp105
1 files changed, 55 insertions, 50 deletions
diff --git a/qpid/cpp/src/tests/topic_listener.cpp b/qpid/cpp/src/tests/topic_listener.cpp
index 44070cd4c9..aa8c19df99 100644
--- a/qpid/cpp/src/tests/topic_listener.cpp
+++ b/qpid/cpp/src/tests/topic_listener.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
* This file provides one half of a test and example of a pub-sub
* style of interaction. See topic_publisher.cpp for the other half,
* in which the logic for publishing is defined.
- *
+ *
* This file contains the listener logic. A listener will subscribe to
* a logical 'topic'. It will count the number of messages it receives
* and the time elapsed between the first one and the last one. It
@@ -50,11 +50,14 @@ using namespace qpid::sys;
using namespace qpid::framing;
using namespace std;
+namespace qpid {
+namespace tests {
+
/**
* A message listener implementation in which the runtime logic is
* defined.
*/
-class Listener : public MessageListener{
+class Listener : public MessageListener{
Session session;
SubscriptionManager& mgr;
const string responseQueue;
@@ -62,7 +65,7 @@ class Listener : public MessageListener{
bool init;
int count;
AbsTime start;
-
+
void shutdown();
void report();
public:
@@ -91,6 +94,52 @@ struct Args : public qpid::TestOptions {
}
};
+Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
+ session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
+
+void Listener::received(Message& message){
+ if(!init){
+ start = now();
+ count = 0;
+ init = true;
+ cout << "Batch started." << endl;
+ }
+ string type = message.getHeaders().getAsString("TYPE");
+
+ if(string("TERMINATION_REQUEST") == type){
+ shutdown();
+ }else if(string("REPORT_REQUEST") == type){
+ subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
+ cout <<"Batch ended, sending report." << endl;
+ //send a report:
+ report();
+ init = false;
+ }else if (++count % 1000 == 0){
+ cout <<"Received " << count << " messages." << endl;
+ }
+}
+
+void Listener::shutdown(){
+ mgr.stop();
+}
+
+void Listener::report(){
+ AbsTime finish = now();
+ Duration time(start, finish);
+ stringstream reportstr;
+ reportstr << "Received " << count << " messages in "
+ << time/TIME_MSEC << " ms.";
+ Message msg(reportstr.str(), responseQueue);
+ msg.getHeaders().setString("TYPE", "REPORT");
+ session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1);
+ if(transactional){
+ sync(session).txCommit();
+ }
+}
+
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
/**
* The main routine creates a Listener instance and sets it up to
@@ -142,7 +191,7 @@ int main(int argc, char** argv){
if (args.transactional) {
session.txSelect();
}
-
+
cout << "topic_listener: listening..." << endl;
mgr.run();
if (args.durable) {
@@ -158,47 +207,3 @@ int main(int argc, char** argv){
}
return 1;
}
-
-Listener::Listener(const Session& s, SubscriptionManager& m, const string& _responseq, bool tx) :
- session(s), mgr(m), responseQueue(_responseq), transactional(tx), init(false), count(0){}
-
-void Listener::received(Message& message){
- if(!init){
- start = now();
- count = 0;
- init = true;
- cout << "Batch started." << endl;
- }
- string type = message.getHeaders().getAsString("TYPE");
-
- if(string("TERMINATION_REQUEST") == type){
- shutdown();
- }else if(string("REPORT_REQUEST") == type){
- subscription.accept(subscription.getUnaccepted()); // Accept everything upto this point
- cout <<"Batch ended, sending report." << endl;
- //send a report:
- report();
- init = false;
- }else if (++count % 1000 == 0){
- cout <<"Received " << count << " messages." << endl;
- }
-}
-
-void Listener::shutdown(){
- mgr.stop();
-}
-
-void Listener::report(){
- AbsTime finish = now();
- Duration time(start, finish);
- stringstream reportstr;
- reportstr << "Received " << count << " messages in "
- << time/TIME_MSEC << " ms.";
- Message msg(reportstr.str(), responseQueue);
- msg.getHeaders().setString("TYPE", "REPORT");
- session.messageTransfer(arg::destination="amq.direct", arg::content=msg, arg::acceptMode=1);
- if(transactional){
- sync(session).txCommit();
- }
-}
-