diff options
author | Gordon Sim <gsim@apache.org> | 2007-12-04 14:19:06 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-12-04 14:19:06 +0000 |
commit | 6ee7a743ee2a306b9e34f3dc471046b68b21680a (patch) | |
tree | a745136f3d3517a8dd89a555c1af1fbd86d9c0f1 /cpp/src | |
parent | b4e4f652d251c66b2ca487ad8d347130c874173d (diff) | |
download | qpid-python-6ee7a743ee2a306b9e34f3dc471046b68b21680a.tar.gz |
Updates to topic test
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@600962 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/topic_listener.cpp | 19 | ||||
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 61 |
2 files changed, 37 insertions, 43 deletions
diff --git a/cpp/src/tests/topic_listener.cpp b/cpp/src/tests/topic_listener.cpp index 2456a85190..c78bc4d73b 100644 --- a/cpp/src/tests/topic_listener.cpp +++ b/cpp/src/tests/topic_listener.cpp @@ -99,7 +99,6 @@ int main(int argc, char** argv){ if(args.help) cout << args << endl; else { - cout << "topic_listener: Started." << endl; Connection connection(args.trace); args.open(connection); Session_0_10 session = connection.newSession(); @@ -110,7 +109,11 @@ int main(int argc, char** argv){ //declare exchange, queue and bind them: session.queueDeclare(arg::queue="response"); std::string control = "control_" + session.getId().str(); - session.queueDeclare(arg::queue=control, arg::durable=args.durable); + if (args.durable) { + session.queueDeclare(arg::queue=control, arg::durable=true); + } else { + session.queueDeclare(arg::queue=control, arg::exclusive=true, arg::autoDelete=true); + } session.queueBind(arg::exchange="amq.topic", arg::queue=control, arg::routingKey="topic_control"); //set up listener @@ -125,13 +128,14 @@ int main(int argc, char** argv){ } mgr.subscribe(listener, control); - cout << "topic_listener: Consuming." << endl; + cout << "topic_listener: listening..." << endl; mgr.run(); - cout << "topic_listener: run returned, closing session" << endl; + if (args.durable) { + session.queueDelete(arg::queue=control); + } session.close(); cout << "closing connection" << endl; connection.close(); - cout << "topic_listener: normal exit" << endl; } return 0; } catch (const std::exception& error) { @@ -148,16 +152,19 @@ void Listener::received(Message& message){ start = now(); count = 0; init = true; + cout << "Batch started." << endl; } FieldTable::ValuePtr type(message.getHeaders().get("TYPE")); if(!!type && StringValue("TERMINATION_REQUEST") == *type){ shutdown(); }else if(!!type && StringValue("REPORT_REQUEST") == *type){ + message.acknowledge();//acknowledge everything upto this point + cout <<"Batch ended, sending report." << endl; //send a report: report(); init = false; - }else if (++count % 100 == 0){ + }else if (++count % 1000 == 0){ cout <<"Received " << count << " messages." << endl; } } diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index f678b0eb21..80c9bf6607 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -55,20 +55,18 @@ using namespace std; * message listener and can therfore be used to receive messages sent * back by the subscribers. */ -class Publisher : public MessageListener{ +class Publisher { Session_0_10& session; + SubscriptionManager mgr; + LocalQueue queue; const string controlTopic; const bool transactional; const bool durable; - Monitor monitor; - int count; - void waitForCompletion(int msgs); string generateData(int size); public: Publisher(Session_0_10& session, const string& controlTopic, bool tx, bool durable); - virtual void received(Message& msg); int64_t publish(int msgs, int listeners, int size); void terminate(); }; @@ -118,11 +116,7 @@ int main(int argc, char** argv) { //declare queue (relying on default binding): session.queueDeclare(arg::queue="response"); - //set up listener - SubscriptionManager mgr(session); Publisher publisher(session, "topic_control", args.transactional, args.durable); - mgr.subscribe(publisher, "response"); - mgr.start(); int batchSize(args.batches); int64_t max(0); @@ -141,7 +135,6 @@ int main(int argc, char** argv) { << " in " << msecs << "ms" << endl; } publisher.terminate(); - mgr.stop(); int64_t avg = sum / batchSize; if(batchSize > 1){ cout << batchSize << " batches completed. avg=" << avg << @@ -158,19 +151,9 @@ int main(int argc, char** argv) { } Publisher::Publisher(Session_0_10& _session, const string& _controlTopic, bool tx, bool d) : - session(_session), controlTopic(_controlTopic), transactional(tx), durable(d), count(0) {} - -void Publisher::received(Message& ){ - //count responses and when all are received end the current batch - Monitor::ScopedLock l(monitor); - if(--count == 0){ - monitor.notify(); - } -} - -void Publisher::waitForCompletion(int msgs){ - count = msgs; - monitor.wait(); + session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) +{ + mgr.subscribe(queue, "response"); } int64_t Publisher::publish(int msgs, int listeners, int size){ @@ -179,22 +162,26 @@ int64_t Publisher::publish(int msgs, int listeners, int size){ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } AbsTime start = now(); - { - Monitor::ScopedLock l(monitor); - for(int i = 0; i < msgs; i++){ - session.messageTransfer(arg::content=msg, arg::destination="amq.topic"); - } - //send report request - Message reportRequest("", controlTopic); - reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic"); - if(transactional){ - session.txCommit(); - } - - waitForCompletion(listeners); + + for(int i = 0; i < msgs; i++){ + session.messageTransfer(arg::content=msg, arg::destination="amq.topic"); + } + //send report request + Message reportRequest("", controlTopic); + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic"); + if(transactional){ + session.txCommit(); + } + //wait for a response from each listener (TODO, could log these) + for (int i = 0; i < listeners; i++) { + Message report = queue.pop(); } + if(transactional){ + session.txCommit(); + } + AbsTime finish = now(); return Duration(start, finish); } |