diff options
author | Alan Conway <aconway@apache.org> | 2007-08-15 14:56:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2007-08-15 14:56:51 +0000 |
commit | 4ea2e527fedb69bd0ae93e79acefebf106b34318 (patch) | |
tree | 6310181a0d61b675ccd97491ba52c161883434a0 /cpp | |
parent | 8c9a768363eb8a5069920adc054eb89295584db1 (diff) | |
download | qpid-python-4ea2e527fedb69bd0ae93e79acefebf106b34318.tar.gz |
* perftest/topic_publisher.cpp, topic_listener.cpp:
Combined into a single preftest.cpp executable and moved to
src/tests.
New perftest:
- Supports all client-side options (--host, --port etc.)
- Can be run as producer (--listen), consumer (--publish) or both.
- --count specifies number of messages (default 500000 as before)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566211 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/perftest/Makefile.am | 11 | ||||
-rw-r--r-- | cpp/perftest/topic_listener.cpp | 121 | ||||
-rw-r--r-- | cpp/perftest/topic_publisher.cpp | 188 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 7 | ||||
-rw-r--r-- | cpp/src/tests/TestOptions.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/perftest.cpp | 298 |
6 files changed, 312 insertions, 320 deletions
diff --git a/cpp/perftest/Makefile.am b/cpp/perftest/Makefile.am deleted file mode 100644 index 41ece4452c..0000000000 --- a/cpp/perftest/Makefile.am +++ /dev/null @@ -1,11 +0,0 @@ -noinst_PROGRAMS=topic_listener topic_publisher -INCLUDES=-I$(top_srcdir)/src -I$(top_srcdir)/src/gen -I$(top_builddir)src/gen -lib_client = $(top_builddir)/src/libqpidclient.la - - -topic_listener_LDADD=$(lib_client) -topic_listener_SOURCES=topic_listener.cpp - -topic_publisher_LDADD=$(lib_client) -topic_publisher_SOURCES=topic_publisher.cpp - diff --git a/cpp/perftest/topic_listener.cpp b/cpp/perftest/topic_listener.cpp deleted file mode 100644 index e9d31f3a29..0000000000 --- a/cpp/perftest/topic_listener.cpp +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * Test hack code -- will upadte into a harness, work in progress - */ - -#include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/Connection.h" -#include "qpid/client/ClientExchange.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/ClientQueue.h" -#include "qpid/sys/Monitor.h" -#include <unistd.h> -#include "qpid/sys/Time.h" -#include <cstdlib> -#include <iostream> -#include <time.h> - -using namespace qpid::client; -using namespace qpid::sys; -using namespace std; - - -bool done = 0; - -class Listener : public MessageListener{ - string queueName; -public: - virtual void received(Message& msg); - ~Listener() { }; - Listener (string& _queueName): queueName(_queueName){}; -}; - - -int main() { - Connection connection; - Channel channel; - Message msg; - Message msg1; - cout << "Starting listener" << endl; - try { - connection.open("127.0.0.1", 5672, "guest", "guest", "/test"); - connection.openChannel(channel); - channel.start(); - - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); - - - while (!channel.get(msg, response, AUTO_ACK)) { - cout <<" waiting... " << endl; - sleep(1); - } - string queueName =msg.getData(); - string queueNameC =queueName+ "-1"; - - cout << "Using Queue:" << queueName << endl; - cout << "Reply Queue:" << queueNameC << endl; - // create consume queue - Queue consume(queueName); - channel.declareQueue(consume); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName); - - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - Listener listener(queueName); - channel.consume(consume, queueName, &listener); - cout << "Consuming" << endl; - - - while (!done) - sleep(1); - - // complete. - msg1.setData(queueName); - channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC); - - - channel.close(); - connection.close(); - return 0; - } catch(const std::exception& error) { - cout << "Unexpected exception: " << error.what() << endl; - } - connection.close(); - return 1; -} - -void Listener::received(Message& msg) { - if (msg.getData() == queueName) - { - cout << "Done:" << queueName <<endl; - done = 1; - } -} diff --git a/cpp/perftest/topic_publisher.cpp b/cpp/perftest/topic_publisher.cpp deleted file mode 100644 index 80815fb143..0000000000 --- a/cpp/perftest/topic_publisher.cpp +++ /dev/null @@ -1,188 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * Test hack code -- will upadte into a harness, work in progress - */ - -#include "qpid/QpidError.h" -#include "qpid/client/ClientChannel.h" -#include "qpid/client/Connection.h" -#include "qpid/client/ClientExchange.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/ClientQueue.h" -#include "qpid/sys/Monitor.h" -#include <unistd.h> -#include "qpid/sys/Time.h" -#include <cstdlib> -#include <iostream> -#include <iomanip> -#include <time.h> - -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - - -bool done = 0; - - -struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { - timespec r; - r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; - r.tv_sec = lhs.tv_sec - rhs.tv_sec; - if (r.tv_nsec < 0) { - r.tv_nsec += 1000000000; - r.tv_sec -= 1; - } - return r; -} - -std::ostream& operator<<(std::ostream& o, const struct timespec& ts) { - o << ts.tv_sec << "." << std::setw(9) << std::setfill('0') << std::right << ts.tv_nsec; - return o; -} - -double toDouble(const struct timespec& ts) { - return double(ts.tv_nsec)/1000000000 + ts.tv_sec; -} - -class Listener : public MessageListener{ - -void set_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - std::cout << "Error" << std::endl; - startTime = ts; - } - -void print_time() { - timespec ts; - if (::clock_gettime(CLOCK_REALTIME, &ts)) - std::cout << "Error" << std::endl; - std::cout << "Total Time:" << ts-startTime << std::endl; - double rate = messageCount*2/toDouble(ts-startTime); - std::cout << "returned Messages:" << messageCount << std::endl; - std::cout << "round trip Rate:" << rate << std::endl; - } - - struct timespec startTime; - int messageCount; - - public: - - - Listener(int mcount): messageCount(mcount) { - set_time(); - } - - virtual void received(Message& msg) { - print_time(); - std::cout << "Message: " << msg.getData() << std::endl; - done = 1; - } - - ~Listener() { }; - - - - -}; - - -int main() { - Connection connection; - Channel channel; - Message msg; - try { - connection.open("127.0.0.1", 5672, "guest", "guest", "/test"); - connection.openChannel(channel); - channel.start(); - - string queueControl = "control"; - Queue response(queueControl); - channel.declareQueue(response); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); - - string queueName ="queue01"; - string queueNameC =queueName+"-1"; - - // create publish queue - Queue publish(queueName); - channel.declareQueue(publish); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); - - // create completion queue - Queue completion(queueNameC); - channel.declareQueue(completion); - channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); - - // pass queue name - msg.setData(queueName); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); - - std::cout << "Setup return queue:"<< queueNameC << std::endl; - - int count = 500000; - Listener listener(count); - channel.consume(completion, queueNameC, &listener); - std::cout << "Setup consumer:"<< queueNameC << std::endl; - - - - - struct timespec startTime; - if (::clock_gettime(CLOCK_REALTIME, &startTime)) - std::cout << "Error" << std::endl; - - for (int i=0; i<count; i++) { - msg.setData("Message 0123456789 "); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); - } - - struct timespec endTime; - if (::clock_gettime(CLOCK_REALTIME, &endTime)) - std::cout << "Error" << std::endl; - std::cout << "publish Time:" << endTime-startTime << std::endl; - double rate = count/toDouble(endTime-startTime); - std::cout << "publish Messages:" << count << std::endl; - std::cout << "publish Rate:" << rate << std::endl; - - - msg.setData(queueName); // last message to queue. - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); - - std::cout << "wait:"<< queueNameC << std::endl; - - while (!done) - sleep(1); - - - channel.close(); - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - - diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index dfc86302ae..67b173af33 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -60,6 +60,13 @@ Visitor_LDADD=-lboost_unit_test_framework include cluster.mk +# +# Other test programs +# +check_PROGRAMS+=perftest +perftest_SOURCES=perftest.cpp test_tools.h +perftest_LDADD=$(lib_client) + # NB: CppUnit test libraries below will be migrated to boost test programs. # diff --git a/cpp/src/tests/TestOptions.h b/cpp/src/tests/TestOptions.h index 5b3c0958f5..95ebb1e219 100644 --- a/cpp/src/tests/TestOptions.h +++ b/cpp/src/tests/TestOptions.h @@ -25,6 +25,7 @@ #include "qpid/log/Options.h" #include "qpid/Url.h" #include "qpid/log/Logger.h" +#include "qpid/client/Connection.h" #include <iostream> #include <exception> @@ -63,6 +64,12 @@ struct TestOptions : public qpid::Options trace = log.trace; qpid::log::Logger::instance().configure(log, argv[0]); } + + /** Open a connection usin option values */ + void open(qpid::client::Connection& connection) { + connection.open(host, port, username, password, virtualhost); + } + std::string host; uint16_t port; diff --git a/cpp/src/tests/perftest.cpp b/cpp/src/tests/perftest.cpp new file mode 100644 index 0000000000..7c0941bf68 --- /dev/null +++ b/cpp/src/tests/perftest.cpp @@ -0,0 +1,298 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "TestOptions.h" + +#include "qpid/client/ClientChannel.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/client/Connection.h" +#include "qpid/client/MessageListener.h" +#include "qpid/QpidError.h" +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Time.h" + +#include <iostream> +#include <cstdlib> +#include <iomanip> +#include <time.h> +#include <unistd.h> + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::sys; +using namespace std; + +struct Opts : public TestOptions { + + bool listen; + bool publish; + int count; + + Opts() : listen(false), publish(false), count(500000) { + addOptions() + ("listen", optValue(listen), "Consume messages.") + ("publish", optValue(publish), "Produce messages.") + ("count", optValue(count, "N"), "Messages to send/receive."); + } +}; + +Opts opts; + +struct ListenThread : public Runnable { Thread thread; void run(); }; +struct PublishThread : public Runnable { Thread thread; void run(); }; + +int main(int argc, char** argv) { + try { + opts.parse(argc, argv); + ListenThread listen; + PublishThread publish; + if (opts.listen) + listen.thread=Thread(listen); + if (opts.publish) + publish.thread=Thread(publish); + if (opts.listen) + listen.thread.join(); + if (opts.publish) + publish.thread.join(); + } + catch (const std::exception& e) { + cout << "Unexpected exception: " << e.what() << endl; + } +} + +// ================================================================ +// Publish client +// + +struct timespec operator-(const struct timespec& lhs, const struct timespec& rhs) { + timespec r; + r.tv_nsec = lhs.tv_nsec - rhs.tv_nsec; + r.tv_sec = lhs.tv_sec - rhs.tv_sec; + if (r.tv_nsec < 0) { + r.tv_nsec += 1000000000; + r.tv_sec -= 1; + } + return r; +} + +ostream& operator<<(ostream& o, const struct timespec& ts) { + o << ts.tv_sec << "." << setw(9) << setfill('0') << right << ts.tv_nsec; + return o; +} + +double toDouble(const struct timespec& ts) { + return double(ts.tv_nsec)/1000000000 + ts.tv_sec; +} + +class PublishListener : public MessageListener { + + void set_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + startTime = ts; + } + + void print_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + cout << "Total Time:" << ts-startTime << endl; + double rate = messageCount*2/toDouble(ts-startTime); + cout << "returned Messages:" << messageCount << endl; + cout << "round trip Rate:" << rate << endl; + } + + struct timespec startTime; + int messageCount; + bool done; + Monitor lock; + + public: + + PublishListener(int mcount): messageCount(mcount), done(false) { + set_time(); + } + + void received(Message& msg) { + print_time(); + QPID_LOG(info, "Publisher: received: " << msg.getData()); + Mutex::ScopedLock l(lock); + QPID_LOG(info, "Publisher: done."); + done = true; + lock.notify(); + } + + void wait() { + Mutex::ScopedLock l(lock); + while (!done) + lock.wait(); + } +}; + + +void PublishThread::run() { + Connection connection; + Channel channel; + Message msg; + opts.open(connection); + connection.openChannel(channel); + channel.start(); + + cout << "Started publisher." << endl; + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + + string queueName ="queue01"; + string queueNameC =queueName+"-1"; + + // create publish queue + Queue publish(queueName); + channel.declareQueue(publish); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + // pass queue name + msg.setData(queueName); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); + + QPID_LOG(info, "Publisher: setup return queue: "<< queueNameC); + + int count = opts.count; + PublishListener listener(count); + channel.consume(completion, queueNameC, &listener); + QPID_LOG(info, "Publisher setup consumer: "<< queueNameC); + + struct timespec startTime; + if (::clock_gettime(CLOCK_REALTIME, &startTime)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + + for (int i=0; i<count; i++) { + msg.setData("Message 0123456789 "); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + } + + struct timespec endTime; + if (::clock_gettime(CLOCK_REALTIME, &endTime)) + throw Exception(QPID_MSG("clock_gettime failed: " << strError(errno))); + + cout << "publish Time:" << endTime-startTime << endl; + double rate = count/toDouble(endTime-startTime); + cout << "publish Messages:" << count << endl; + cout << "publish Rate:" << rate << endl; + + msg.setData(queueName); // last message to queue. + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + + listener.wait(); + + channel.close(); + connection.close(); +} + + + +// ================================================================ +// Listen client +// + +class Listener : public MessageListener{ + string queueName; + Monitor lock; + bool done; + + public: + Listener (string& _queueName): queueName(_queueName), done(false) {}; + + void received(Message& msg) { + if (msg.getData() == queueName) + { + Mutex::ScopedLock l(lock); + QPID_LOG(info, "Listener: done. " << queueName); + done = true; + lock.notify(); + } + } + + void wait() { + Mutex::ScopedLock l(lock); + while (!done) + lock.wait(); + } +}; + +void ListenThread::run() { + Connection connection; + Channel channel; + Message msg; + Message msg1; + cout << "Started listener." << endl;; + opts.open(connection); + connection.openChannel(channel); + channel.start(); + + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + while (!channel.get(msg, response, AUTO_ACK)) { + QPID_LOG(info, "Listener: waiting for queue name."); + sleep(1); + } + string queueName =msg.getData(); + string queueNameC =queueName+ "-1"; + + QPID_LOG(info, "Listener: Using Queue:" << queueName); + QPID_LOG(info, "Listener: Reply Queue:" << queueNameC); + // create consume queue + Queue consume(queueName); + channel.declareQueue(consume); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName); + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + Listener listener(queueName); + channel.consume(consume, queueName, &listener); + QPID_LOG(info, "Listener: consuming..."); + + listener.wait(); + + QPID_LOG(info, "Listener: send final message."); + // complete. + msg1.setData(queueName); + channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC); + + channel.close(); + connection.close(); +} + + |