diff options
author | Alan Conway <aconway@apache.org> | 2007-08-15 14:56:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-15 14:56:51 +0000 |
commit | 4ea2e527fedb69bd0ae93e79acefebf106b34318 (patch) | |
tree | 6310181a0d61b675ccd97491ba52c161883434a0 /cpp/src | |
parent | 8c9a768363eb8a5069920adc054eb89295584db1 (diff) | |
download | qpid-python-4ea2e527fedb69bd0ae93e79acefebf106b34318.tar.gz |
* perftest/topic_publisher.cpp, topic_listener.cpp:
Combined into a single preftest.cpp executable and moved to
src/tests.
New perftest:
- Supports all client-side options (--host, --port etc.)
- Can be run as producer (--listen), consumer (--publish) or both.
- --count specifies number of messages (default 500000 as before)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/Makefile.am | 7 | ||||
-rw-r--r-- | cpp/src/tests/TestOptions.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 298 |
3 files changed, 312 insertions, 0 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index dfc86302ae..67b173af33 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -60,6 +60,13 @@ Visitor_LDADD=-lboost_unit_test_framework include cluster.mk +# +# Other test programs +# +check_PROGRAMS+=perftest +perftest_SOURCES=perftest.cpp test_tools.h +perftest_LDADD=$(lib_client) + # NB: CppUnit test libraries below will be migrated to boost test programs. # diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h index 5b3c0958f5..95ebb1e219 100644 --- a/cpp/src/tests/TestOptions.h +++ b/cpp/src/tests/TestOptions.h @@ -25,6 +25,7 @@ #include "qpid/log/Options.h" #include "qpid/Url.h" #include "qpid/log/Logger.h" +#include "qpid/client/Connection.h" #include <iostream> #include <exception> @@ -63,6 +64,12 @@ struct TestOptions : public qpid::Options trace = log.trace; qpid::log::Logger::instance().configure(log, argv[0]); } + + /** Open a connection usin option values */ + void open(qpid::client::Connection& connection) { + connection.open(host, port, username, password, virtualhost); + } + std::string host; uint16_t port; diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp new file mode 100644 index 0000000000..7c0941bf68 --- /dev/null +++ b/cpp/src/tests/perftest.cpp @@ -0,0 +1,298 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TestOptions.h" + +#include "qpid/client/ClientChannel.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "qpid/QpidError.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + +#include <iostream> +#include <cstdlib> +#include <iomanip> +#include <time.h> +#include <unistd.h> + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using namespace std; + +struct Opts : public TestOptions { + + bool listen; + bool publish; + int count; + + Opts() : listen(false), publish(false), count(500000) { + addOptions() + ("listen", optValue(listen), "Consume messages.") + ("publish", optValue(publish), "Produce messages.") + ("count", optValue(count, "N"), "Messages to send/receive."); + } +}; + +Opts opts; + +struct ListenThread : public Runnable { Thread thread; void run(); }; +struct PublishThread : public Runnable { Thread thread; void run(); }; + +int main(int argc, char** argv) { + try { + opts.parse(argc, argv); + ListenThread listen; + PublishThread publish; + if (opts.listen) + listen.thread=Thread(listen); + if (opts.publish) + publish.thread=Thread(publish); + if (opts.listen) + listen.thread.join(); + if (opts.publish) + publish.thread.join(); + } + catch (const std::exception& e) { + cout << "Unexpected exception: " << e.what() << endl; + } +} + +// ================================================================ +// Publish client +// + +struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { + timespec r; + r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; + r.tv_sec = lhs.tv_sec - rhs.tv_sec; + if (r.tv_nsec < 0) { + r.tv_nsec += 1000000000; + r.tv_sec -= 1; + } + return r; +} + +ostream& operator<<(ostream& o, const struct timespec& ts) { + o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec; + return o; +} + +double toDouble(const struct timespec& ts) { + return double(ts.tv_nsec)/1000000000 + ts.tv_sec; +} + +class PublishListener : public MessageListener { + + void set_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + startTime = ts; + } + + void print_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + cout << "Total Time:" << ts-startTime << endl; + double rate = messageCount*2/toDouble(ts-startTime); + cout << "returned Messages:" << messageCount << endl; + cout << "round trip Rate:" << rate << endl; + } + + struct timespec startTime; + int messageCount; + bool done; + Monitor lock; + + public: + + PublishListener(int mcount): messageCount(mcount), done(false) { + set_time(); + } + + void received(Message& msg) { + print_time(); + QPID_LOG(info, "Publisher: received: " << msg.getData()); + Mutex::ScopedLock l(lock); + QPID_LOG(info, "Publisher: done."); + done = true; + lock.notify(); + } + + void wait() { + Mutex::ScopedLock l(lock); + while (!done) + lock.wait(); + } +}; + + +void PublishThread::run() { + Connection connection; + Channel channel; + Message msg; + opts.open(connection); + connection.openChannel(channel); + channel.start(); + + cout << "Started publisher." << endl; + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + + string queueName ="queue01"; + string queueNameC =queueName+"-1"; + + // create publish queue + Queue publish(queueName); + channel.declareQueue(publish); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + // pass queue name + msg.setData(queueName); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); + + QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC); + + int count = opts.count; + PublishListener listener(count); + channel.consume(completion, queueNameC, &listener); + QPID_LOG(info, "Publisher setup consumer: "<< queueNameC); + + struct timespec startTime; + if (::clock_gettime(CLOCK_REALTIME, &startTime)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + + for (int i=0; i<count; i++) { + msg.setData("Message 0123456789 "); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + } + + struct timespec endTime; + if (::clock_gettime(CLOCK_REALTIME, &endTime)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + + cout << "publish Time:" << endTime-startTime << endl; + double rate = count/toDouble(endTime-startTime); + cout << "publish Messages:" << count << endl; + cout << "publish Rate:" << rate << endl; + + msg.setData(queueName); // last message to queue. + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + + listener.wait(); + + channel.close(); + connection.close(); +} + + + +// ================================================================ +// Listen client +// + +class Listener : public MessageListener{ + string queueName; + Monitor lock; + bool done; + + public: + Listener (string& _queueName): queueName(_queueName), done(false) {}; + + void received(Message& msg) { + if (msg.getData() == queueName) + { + Mutex::ScopedLock l(lock); + QPID_LOG(info, "Listener: done. " << queueName); + done = true; + lock.notify(); + } + } + + void wait() { + Mutex::ScopedLock l(lock); + while (!done) + lock.wait(); + } +}; + +void ListenThread::run() { + Connection connection; + Channel channel; + Message msg; + Message msg1; + cout << "Started listener." << endl;; + opts.open(connection); + connection.openChannel(channel); + channel.start(); + + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + while (!channel.get(msg, response, AUTO_ACK)) { + QPID_LOG(info, "Listener: waiting for queue name."); + sleep(1); + } + string queueName =msg.getData(); + string queueNameC =queueName+ "-1"; + + QPID_LOG(info, "Listener: Using Queue:" << queueName); + QPID_LOG(info, "Listener: Reply Queue:" << queueNameC); + // create consume queue + Queue consume(queueName); + channel.declareQueue(consume); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName); + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + Listener listener(queueName); + channel.consume(consume, queueName, &listener); + QPID_LOG(info, "Listener: consuming..."); + + listener.wait(); + + QPID_LOG(info, "Listener: send final message."); + // complete. + msg1.setData(queueName); + channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC); + + channel.close(); + connection.close(); +} + + |