diff options
author | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-17 21:02:23 +0000 |
---|---|---|
committer | Carl C. Trieloff <cctrieloff@apache.org> | 2007-07-17 21:02:23 +0000 |
commit | f4553ccc65c43e9dab7c5c3a5f85ac3efdc1bf78 (patch) | |
tree | d5bba434a6f8cae4009198ab45b96026d8e0e4e8 /cpp | |
parent | 1d1f5c021b620f01c6bcb5fbded0eb1b50e445ab (diff) | |
download | qpid-python-f4553ccc65c43e9dab7c5c3a5f85ac3efdc1bf78.tar.gz |
basic perf test, work in progress
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@557055 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/configure.ac | 1 | ||||
-rw-r--r-- | cpp/perftest/Makefile.am | 11 | ||||
-rw-r--r-- | cpp/perftest/topic_listener.cpp | 121 | ||||
-rw-r--r-- | cpp/perftest/topic_publisher.cpp | 175 |
4 files changed, 308 insertions, 0 deletions
diff --git a/cpp/configure.ac b/cpp/configure.ac index 253a8f7ad7..63ec993a56 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -184,6 +184,7 @@ AC_CONFIG_FILES([ qpidc.spec Makefile examples/Makefile + perftest/Makefile src/Makefile src/tests/Makefile docs/man/Makefile diff --git a/cpp/perftest/Makefile.am b/cpp/perftest/Makefile.am new file mode 100644 index 0000000000..41ece4452c --- /dev/null +++ b/cpp/perftest/Makefile.am @@ -0,0 +1,11 @@ +noinst_PROGRAMS=topic_listener topic_publisher +INCLUDES=-I$(top_srcdir)/src -I$(top_srcdir)/src/gen -I$(top_builddir)src/gen +lib_client = $(top_builddir)/src/libqpidclient.la + + +topic_listener_LDADD=$(lib_client) +topic_listener_SOURCES=topic_listener.cpp + +topic_publisher_LDADD=$(lib_client) +topic_publisher_SOURCES=topic_publisher.cpp + diff --git a/cpp/perftest/topic_listener.cpp b/cpp/perftest/topic_listener.cpp new file mode 100644 index 0000000000..e9d31f3a29 --- /dev/null +++ b/cpp/perftest/topic_listener.cpp @@ -0,0 +1,121 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Test hack code -- will upadte into a harness, work in progress + */ + +#include "qpid/QpidError.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/sys/Monitor.h" +#include <unistd.h> +#include "qpid/sys/Time.h" +#include <cstdlib> +#include <iostream> +#include <time.h> + +using namespace qpid::client; +using namespace qpid::sys; +using namespace std; + + +bool done = 0; + +class Listener : public MessageListener{ + string queueName; +public: + virtual void received(Message& msg); + ~Listener() { }; + Listener (string& _queueName): queueName(_queueName){}; +}; + + +int main() { + Connection connection; + Channel channel; + Message msg; + Message msg1; + cout << "Starting listener" << endl; + try { + connection.open("127.0.0.1", 5672, "guest", "guest", "/test"); + connection.openChannel(channel); + channel.start(); + + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + + + while (!channel.get(msg, response, AUTO_ACK)) { + cout <<" waiting... " << endl; + sleep(1); + } + string queueName =msg.getData(); + string queueNameC =queueName+ "-1"; + + cout << "Using Queue:" << queueName << endl; + cout << "Reply Queue:" << queueNameC << endl; + // create consume queue + Queue consume(queueName); + channel.declareQueue(consume); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, consume, queueName); + + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + Listener listener(queueName); + channel.consume(consume, queueName, &listener); + cout << "Consuming" << endl; + + + while (!done) + sleep(1); + + // complete. + msg1.setData(queueName); + channel.publish(msg1, Exchange::STANDARD_TOPIC_EXCHANGE, queueNameC); + + + channel.close(); + connection.close(); + return 0; + } catch(const std::exception& error) { + cout << "Unexpected exception: " << error.what() << endl; + } + connection.close(); + return 1; +} + +void Listener::received(Message& msg) { + if (msg.getData() == queueName) + { + cout << "Done:" << queueName <<endl; + done = 1; + } +} diff --git a/cpp/perftest/topic_publisher.cpp b/cpp/perftest/topic_publisher.cpp new file mode 100644 index 0000000000..53b2c20775 --- /dev/null +++ b/cpp/perftest/topic_publisher.cpp @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * Test hack code -- will upadte into a harness, work in progress + */ + +#include "qpid/QpidError.h" +#include "qpid/client/ClientChannel.h" +#include "qpid/client/Connection.h" +#include "qpid/client/ClientExchange.h" +#include "qpid/client/MessageListener.h" +#include "qpid/client/ClientQueue.h" +#include "qpid/sys/Monitor.h" +#include <unistd.h> +#include "qpid/sys/Time.h" +#include <cstdlib> +#include <iostream> +#include <time.h> + +using namespace qpid::client; +using namespace qpid::sys; +using std::string; + + +bool done = 0; + + +class Listener : public MessageListener{ + + +void set_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + std::cout << "Error" << std::endl; + _ts_sec = ts.tv_sec; + _ts_nsec = ts.tv_nsec; + } + +void print_time() { + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + std::cout << "Error" << std::endl; + std::cout << "Total Time:" << ts.tv_sec-_ts_sec <<"." <<ts.tv_nsec - _ts_nsec << std::endl; + float rate = messageCount/(ts.tv_sec-_ts_sec); + std::cout << "returned Messages:" << messageCount << std::endl; + std::cout << "round trip Rate:" << rate << std::endl; + } + + time_t _ts_sec; ///< Timestamp of journal initilailization + u_int32_t _ts_nsec; ///< Timestamp of journal initilailization + int messageCount; + + public: + + + Listener(int mcount): messageCount(mcount) { + set_time(); + } + + virtual void received(Message& msg) { + print_time(); + std::cout << "Message: " << msg.getData() << std::endl; + done = 1; + } + + ~Listener() { }; + + + + +}; + + +int main() { + Connection connection; + Channel channel; + Message msg; + try { + connection.open("127.0.0.1", 5672, "guest", "guest", "/test"); + connection.openChannel(channel); + channel.start(); + + string queueControl = "control"; + Queue response(queueControl); + channel.declareQueue(response); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, response, queueControl); + + string queueName ="queue01"; + string queueNameC =queueName+"-1"; + + // create publish queue + Queue publish(queueName); + channel.declareQueue(publish); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, publish, queueName); + + // create completion queue + Queue completion(queueNameC); + channel.declareQueue(completion); + channel.bind(Exchange::STANDARD_TOPIC_EXCHANGE, completion, queueNameC); + + // pass queue name + msg.setData(queueName); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueControl); + + std::cout << "Setup return queue:"<< queueNameC << std::endl; + + int count = 100000; + Listener listener(count); + channel.consume(completion, queueNameC, &listener); + std::cout << "Setup consumer:"<< queueNameC << std::endl; + + + + + time_t _ts_sec; ///< Timestamp of journal initilailization + u_int32_t _ts_nsec; ///< Timestamp of journal initilailization + timespec ts; + if (::clock_gettime(CLOCK_REALTIME, &ts)) + std::cout << "Error" << std::endl; + _ts_sec = ts.tv_sec; + _ts_nsec = ts.tv_nsec; + + + + for (int i=0; i<count; i++) { + msg.setData("Message 0123456789 "); + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + } + + if (::clock_gettime(CLOCK_REALTIME, &ts)) + std::cout << "Error" << std::endl; + std::cout << "publish Time:" << ts.tv_sec-_ts_sec <<"." <<ts.tv_nsec - _ts_nsec << std::endl; + float rate = count/(ts.tv_sec-_ts_sec); + std::cout << "publish Messages:" << count << std::endl; + std::cout << "publish Rate:" << rate << std::endl; + + + msg.setData(queueName); // last message to queue. + channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, queueName); + + std::cout << "wait:"<< queueNameC << std::endl; + + while (!done) + sleep(1); + + + channel.close(); + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + |