diff options
Diffstat (limited to 'cpp/src/tests/topic_publisher.cpp')
-rw-r--r-- | cpp/src/tests/topic_publisher.cpp | 138 |
1 files changed, 72 insertions, 66 deletions
diff --git a/cpp/src/tests/topic_publisher.cpp b/cpp/src/tests/topic_publisher.cpp index f37ad2dc0e..3381132b1a 100644 --- a/cpp/src/tests/topic_publisher.cpp +++ b/cpp/src/tests/topic_publisher.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,7 +23,7 @@ * This file provides one half of a test and example of a pub-sub * style of interaction. See topic_listener.cpp for the other half, in * which the logic for subscribers is defined. - * + * * This file contains the publisher logic. The publisher will send a * number of messages to the exchange with the appropriate routing key * for the logical 'topic'. Once it has done this it will then send a @@ -40,7 +40,6 @@ #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" #include "qpid/sys/Monitor.h" -#include <unistd.h> #include "qpid/sys/Time.h" #include <cstdlib> #include <iostream> @@ -50,19 +49,22 @@ using namespace qpid::client; using namespace qpid::sys; using namespace std; +namespace qpid { +namespace tests { + /** * The publishing logic is defined in this class. It implements * message listener and can therfore be used to receive messages sent * back by the subscribers. */ -class Publisher { +class Publisher { AsyncSession session; SubscriptionManager mgr; LocalQueue queue; const string controlTopic; const bool transactional; const bool durable; - + string generateData(int size); public: @@ -100,6 +102,64 @@ struct Args : public TestOptions { } }; +Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) : + session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) +{ + mgr.subscribe(queue, "response"); +} + +int64_t Publisher::publish(int msgs, int listeners, int size){ + Message msg(generateData(size), controlTopic); + if (durable) { + msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); + } + AbsTime start = now(); + + for(int i = 0; i < msgs; i++){ + session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); + } + //send report request + Message reportRequest("", controlTopic); + reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); + session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); + if(transactional){ + sync(session).txCommit(); + } + //wait for a response from each listener (TODO, could log these) + for (int i = 0; i < listeners; i++) { + Message report = queue.pop(); + } + + if(transactional){ + sync(session).txCommit(); + } + + AbsTime finish = now(); + return Duration(start, finish); +} + +string Publisher::generateData(int size){ + string data; + for(int i = 0; i < size; i++){ + data += ('A' + (i / 26)); + } + return data; +} + +void Publisher::terminate(){ + //send termination request + Message terminationRequest("", controlTopic); + terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); + session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); + if(transactional){ + session.txCommit(); + } +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + int main(int argc, char** argv) { try{ Args args; @@ -121,11 +181,11 @@ int main(int argc, char** argv) { Message m = statusQ.get(); if( m.getData().find("topic_listener: ", 0) == 0 ) { cout << "Listener " << (i+1) << " of " << args.subscribers - << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16) - << ")" << endl; + << " is ready (pid " << m.getData().substr(16, m.getData().length() - 16) + << ")" << endl; } else { throw Exception(QPID_MSG("Unexpected message received on status queue: " << m.getData())); - } + } } } @@ -142,7 +202,7 @@ int main(int argc, char** argv) { int64_t min(0); int64_t sum(0); for(int i = 0; i < batchSize; i++){ - if(i > 0 && args.delay) sleep(args.delay); + if(i > 0 && args.delay) qpid::sys::sleep(args.delay); int64_t msecs = publisher.publish(args.messages, args.subscribers, @@ -151,12 +211,12 @@ int main(int argc, char** argv) { if(!min || msecs < min) min = msecs; sum += msecs; cout << "Completed " << (i+1) << " of " << batchSize - << " in " << msecs << "ms" << endl; + << " in " << msecs << "ms" << endl; } publisher.terminate(); int64_t avg = sum / batchSize; if(batchSize > 1){ - cout << batchSize << " batches completed. avg=" << avg << + cout << batchSize << " batches completed. avg=" << avg << ", max=" << max << ", min=" << min << endl; } session.close(); @@ -168,57 +228,3 @@ int main(int argc, char** argv) { } return 1; } - -Publisher::Publisher(const AsyncSession& _session, const string& _controlTopic, bool tx, bool d) : - session(_session), mgr(session), controlTopic(_controlTopic), transactional(tx), durable(d) -{ - mgr.subscribe(queue, "response"); -} - -int64_t Publisher::publish(int msgs, int listeners, int size){ - Message msg(generateData(size), controlTopic); - if (durable) { - msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); - } - AbsTime start = now(); - - for(int i = 0; i < msgs; i++){ - session.messageTransfer(arg::content=msg, arg::destination="amq.topic", arg::acceptMode=1); - } - //send report request - Message reportRequest("", controlTopic); - reportRequest.getHeaders().setString("TYPE", "REPORT_REQUEST"); - session.messageTransfer(arg::content=reportRequest, arg::destination="amq.topic", arg::acceptMode=1); - if(transactional){ - sync(session).txCommit(); - } - //wait for a response from each listener (TODO, could log these) - for (int i = 0; i < listeners; i++) { - Message report = queue.pop(); - } - - if(transactional){ - sync(session).txCommit(); - } - - AbsTime finish = now(); - return Duration(start, finish); -} - -string Publisher::generateData(int size){ - string data; - for(int i = 0; i < size; i++){ - data += ('A' + (i / 26)); - } - return data; -} - -void Publisher::terminate(){ - //send termination request - Message terminationRequest("", controlTopic); - terminationRequest.getHeaders().setString("TYPE", "TERMINATION_REQUEST"); - session.messageTransfer(arg::content=terminationRequest, arg::destination="amq.topic", arg::acceptMode=1); - if(transactional){ - session.txCommit(); - } -} |