summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-12-04 14:19:06 +0000
committerGordon Sim <gsim@apache.org>2007-12-04 14:19:06 +0000
commit6ee7a743ee2a306b9e34f3dc471046b68b21680a (patch)
treea745136f3d3517a8dd89a555c1af1fbd86d9c0f1 /cpp/src
parentb4e4f652d251c66b2ca487ad8d347130c874173d (diff)
downloadqpid-python-6ee7a743ee2a306b9e34f3dc471046b68b21680a.tar.gz
Updates to topic test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@600962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/tests/topic_listener.cpp19
-rw-r--r--cpp/src/tests/topic_publisher.cpp61
2 files changed, 37 insertions, 43 deletions
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp
index 2456a85190..c78bc4d73b 100644
--- a/cpp/src/tests/topic_listener.cpp
+++ b/cpp/src/tests/topic_listener.cpp
@@ -99,7 +99,6 @@ int main(int argc, char** argv){
if(args.help)
cout << args << endl;
else {
- cout << "topic_listener: Started." << endl;
Connection connection(args.trace);
args.open(connection);
Session_0_10 session = connection.newSession();
@@ -110,7 +109,11 @@ int main(int argc, char** argv){
//declare exchange, queue and bind them:
session.queueDeclare(arg::queue="response");
std::string control = "control_" + session.getId().str();
- session.queueDeclare(arg::queue=control, arg::durable=args.durable);
+ if (args.durable) {
+ session.queueDeclare(arg::queue=control, arg::durable=true);
+ } else {
+ session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true);
+ }
session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control");
//set up listener
@@ -125,13 +128,14 @@ int main(int argc, char** argv){
}
mgr.subscribe(listener, control);
- cout << "topic_listener: Consuming." << endl;
+ cout << "topic_listener: listening..." << endl;
mgr.run();
- cout << "topic_listener: run returned, closing session" << endl;
+ if (args.durable) {
+ session.queueDelete(arg::queue=control);
+ }
session.close();
cout << "closing connection" << endl;
connection.close();
- cout << "topic_listener: normal exit" << endl;
}
return 0;
} catch (const std::exception& error) {
@@ -148,16 +152,19 @@ void Listener::received(Message& message){
start = now();
count = 0;
init = true;
+ cout << "Batch started." << endl;
}
FieldTable::ValuePtr type(message.getHeaders().get("TYPE"));
if(!!type && StringValue("TERMINATION_REQUEST") == *type){
shutdown();
}else if(!!type && StringValue("REPORT_REQUEST") == *type){
+ message.acknowledge();//acknowledge everything upto this point
+ cout <<"Batch ended, sending report." << endl;
//send a report:
report();
init = false;
- }else if (++count % 100 == 0){
+ }else if (++count % 1000 == 0){
cout <<"Received " << count << " messages." << endl;
}
}
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp
index f678b0eb21..80c9bf6607 100644
--- a/cpp/src/tests/topic_publisher.cpp
+++ b/cpp/src/tests/topic_publisher.cpp
@@ -55,20 +55,18 @@ using namespace std;
* message listener and can therfore be used to receive messages sent
* back by the subscribers.
*/
-class Publisher : public MessageListener{
+class Publisher {
Session_0_10& session;
+ SubscriptionManager mgr;
+ LocalQueue queue;
const string controlTopic;
const bool transactional;
const bool durable;
- Monitor monitor;
- int count;
- void waitForCompletion(int msgs);
string generateData(int size);
public:
Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable);
- virtual void received(Message& msg);
int64_t publish(int msgs, int listeners, int size);
void terminate();
};
@@ -118,11 +116,7 @@ int main(int argc, char** argv) {
//declare queue (relying on default binding):
session.queueDeclare(arg::queue="response");
- //set up listener
- SubscriptionManager mgr(session);
Publisher publisher(session, "topic_control", args.transactional, args.durable);
- mgr.subscribe(publisher, "response");
- mgr.start();
int batchSize(args.batches);
int64_t max(0);
@@ -141,7 +135,6 @@ int main(int argc, char** argv) {
<< " in " << msecs << "ms" << endl;
}
publisher.terminate();
- mgr.stop();
int64_t avg = sum / batchSize;
if(batchSize > 1){
cout << batchSize << " batches completed. avg=" << avg <<
@@ -158,19 +151,9 @@ int main(int argc, char** argv) {
}
Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) :
- session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {}
-
-void Publisher::received(Message& ){
- //count responses and when all are received end the current batch
- Monitor::ScopedLock l(monitor);
- if(--count == 0){
- monitor.notify();
- }
-}
-
-void Publisher::waitForCompletion(int msgs){
- count = msgs;
- monitor.wait();
+ session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d)
+{
+ mgr.subscribe(queue, "response");
}
int64_t Publisher::publish(int msgs, int listeners, int size){
@@ -179,22 +162,26 @@ int64_t Publisher::publish(int msgs, int listeners, int size){
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
AbsTime start = now();
- {
- Monitor::ScopedLock l(monitor);
- for(int i = 0; i < msgs; i++){
- session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
- }
- //send report request
- Message reportRequest("", controlTopic);
- reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
- session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
- if(transactional){
- session.txCommit();
- }
-
- waitForCompletion(listeners);
+
+ for(int i = 0; i < msgs; i++){
+ session.messageTransfer(arg::content=msg, arg::destination="amq.topic");
+ }
+ //send report request
+ Message reportRequest("", controlTopic);
+ reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST");
+ session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic");
+ if(transactional){
+ session.txCommit();
+ }
+ //wait for a response from each listener (TODO, could log these)
+ for (int i = 0; i < listeners; i++) {
+ Message report = queue.pop();
}
+ if(transactional){
+ session.txCommit();
+ }
+
AbsTime finish = now();
return Duration(start, finish);
}