summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-01-17 21:17:42 +0000
committerGordon Sim <gsim@apache.org>2008-01-17 21:17:42 +0000
commit3ba4c0d85c42e4e81e7c0fab673a9267fc98b129 (patch)
tree0075c45084a24626d34859c66d0ac1191c8b7784 /cpp
parent1ee375b5ab824b52463a9e522e60c35ce577c413 (diff)
downloadqpid-python-3ba4c0d85c42e4e81e7c0fab673a9267fc98b129.tar.gz
Added a test to measure latency (still needs refining, still evolving)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@612986 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/tests/Makefile.am4
-rw-r--r--cpp/src/tests/latencytest.cpp372
2 files changed, 376 insertions, 0 deletions
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 5291891510..0dc4b47a84 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -55,6 +55,10 @@ check_PROGRAMS+=txtest
txtest_SOURCES=txtest.cpp TestOptions.h
txtest_LDADD=$(lib_client)
+check_PROGRAMS+=latencytest
+latencytest_SOURCES=latencytest.cpp TestOptions.h
+latencytest_LDADD=$(lib_client)
+
# NB: CppUnit test libraries below will be migrated to boost test programs.
#
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
new file mode 100644
index 0000000000..ddafe0f49b
--- /dev/null
+++ b/cpp/src/tests/latencytest.cpp
@@ -0,0 +1,372 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+#include <algorithm>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <vector>
+#include <unistd.h>
+
+#include "TestOptions.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Message.h"
+#include "qpid/client/Session_0_10.h"
+#include "qpid/client/SubscriptionManager.h"
+
+using namespace qpid;
+using namespace qpid::client;
+using namespace qpid::sys;
+using std::string;
+
+typedef std::vector<std::string> StringSet;
+
+struct Args : public qpid::TestOptions {
+ uint size;
+ uint count;
+ uint rate;
+ uint reportFrequency;
+ uint queues;
+ uint prefetch;
+ uint ack;
+ bool durable;
+ string base;
+
+ Args() : size(256), count(1000), rate(0), reportFrequency(1), queues(1),
+ prefetch(100), ack(0),
+ durable(false), base("latency-test")
+ {
+ addOptions()
+
+ ("size", optValue(size, "N"), "message size")
+ ("queues", optValue(queues, "N"), "number of queues")
+ ("count", optValue(count, "N"), "number of messages to send")
+ ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
+ ("report-frequency", optValue(reportFrequency, "N"),
+ "number of seconds to wait between reports (ignored unless rate specified)")
+ ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
+ ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)")
+ ("durable", optValue(durable, "yes|no"), "use durable messages")
+ ("queue-base-name", optValue(base, "<name>"), "base name for queues");
+ }
+};
+
+const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+Args opts;
+
+uint64_t current_time()
+{
+ Duration t(now());
+ return t;
+}
+
+struct Stats
+{
+ Mutex lock;
+ uint count;
+ double minLatency;
+ double maxLatency;
+ double totalLatency;
+
+ Stats();
+ void update(double l);
+ void print();
+ void reset();
+};
+
+class Client : public Runnable
+{
+protected:
+ Connection connection;
+ Session_0_10 session;
+ Thread thread;
+ string queue;
+
+public:
+ Client(const string& q);
+ virtual ~Client() {}
+
+ void start();
+ void join();
+ void run();
+ virtual void test() = 0;
+};
+
+class Receiver : public Client, public MessageListener
+{
+ SubscriptionManager mgr;
+ uint count;
+ Stats& stats;
+
+public:
+ Receiver(const string& queue, Stats& stats);
+ void test();
+ void received(Message& msg);
+ Stats getStats();
+};
+
+
+class Sender : public Client
+{
+ string generateData(uint size);
+ void sendByRate();
+ void sendByCount();
+public:
+ Sender(const string& queue);
+ void test();
+};
+
+
+class Test
+{
+ const string queue;
+ Stats stats;
+ Receiver receiver;
+ Sender sender;
+ AbsTime begin;
+
+public:
+ Test(const string& q) : queue(q), receiver(queue, stats), sender(queue), begin(now()) {}
+ void start();
+ void join();
+ void report();
+};
+
+
+Client::Client(const string& q) : queue(q)
+{
+ opts.open(connection);
+ session = connection.newSession();
+}
+
+void Client::start()
+{
+ thread = Thread(this);
+}
+
+void Client::join()
+{
+ thread.join();
+}
+
+void Client::run()
+{
+ try{
+ test();
+ session.close();
+ connection.close();
+ } catch(const std::exception& e) {
+ std::cout << "Error in receiver: " << e.what() << std::endl;
+ }
+}
+
+Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0), stats(s)
+{
+ session.queueDeclare(arg::queue=queue, arg::durable=opts.durable, arg::autoDelete=true);
+ uint msgCount = session.queueQuery(arg::queue=queue).get().getMessageCount();
+ if (msgCount) {
+ std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
+ session.queuePurge(arg::queue=queue);
+ }
+ if (opts.prefetch) {
+ mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
+ mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
+ } else {
+ mgr.setConfirmMode(false);
+ mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ }
+ mgr.subscribe(*this, queue);
+}
+
+void Receiver::test()
+{
+ mgr.run();
+ mgr.cancel(queue);
+}
+
+void Receiver::received(Message& msg)
+{
+ ++count;
+ uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
+ //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for uint64_t as a field table type
+ uint64_t receivedAt = current_time();
+
+ stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
+
+ if (!opts.rate && count >= opts.count) {
+ mgr.stop();
+ }
+}
+
+void Stats::update(double latency)
+{
+ Mutex::ScopedLock l(lock);
+ count++;
+ if (minLatency == 0 || minLatency > latency) minLatency = latency;
+ if (maxLatency == 0 || maxLatency < latency) maxLatency = latency;
+ totalLatency += latency;
+}
+
+Stats::Stats() : count(0), minLatency(0), maxLatency(0), totalLatency(0) {}
+
+void Stats::print()
+{
+ Mutex::ScopedLock l(lock);
+ std::cout << "Latency(ms): min=" << minLatency << ", max=" << maxLatency << ", avg=" << (totalLatency / count);
+}
+
+void Stats::reset()
+{
+ Mutex::ScopedLock l(lock);
+ count = 0;
+ totalLatency = maxLatency = minLatency = 0;
+}
+
+Sender::Sender(const string& q) : Client(q) {}
+
+void Sender::test()
+{
+ if (opts.rate) sendByRate();
+ else sendByCount();
+}
+
+void Sender::sendByCount()
+{
+ Message msg(generateData(opts.size), queue);
+ if (opts.durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
+
+ Completion c;
+ for (uint i = 0; i < opts.count; i++) {
+ uint64_t sentAt(current_time());
+ msg.getDeliveryProperties().setTimestamp(sentAt);
+ //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
+ c = session.messageTransfer(arg::content=msg);
+ }
+ c.sync();
+}
+
+void Sender::sendByRate()
+{
+ Message msg(generateData(opts.size), queue);
+ if (opts.durable) {
+ msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
+ }
+
+ //calculate metrics required for target rate
+ uint msgsPerMsec = opts.rate / 1000;
+
+ while (true) {
+ uint64_t start(current_time());
+ for (uint i = 0; i < msgsPerMsec; i++) {
+ uint64_t sentAt(current_time());
+ msg.getDeliveryProperties().setTimestamp(sentAt);
+ //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
+ session.messageTransfer(arg::content=msg);
+ }
+ uint64_t timeTaken = (current_time() - start) / TIME_USEC;
+ if (timeTaken < 1000) {
+ usleep(1000 - timeTaken);
+ } else if (timeTaken > 1000) {
+ double timeMsecs = (double) timeTaken / 1000;
+ std::cout << "Could not achieve desired rate. Sent " << msgsPerMsec << " in "
+ << (timeMsecs) << "ms (" << ((msgsPerMsec * 1000 * 1000) / timeTaken) << " msgs/s)" << std::endl;
+ }
+ }
+}
+
+string Sender::generateData(uint size)
+{
+ if (size < chars.length()) {
+ return chars.substr(0, size);
+ }
+ std::string data;
+ for (uint i = 0; i < (size / chars.length()); i++) {
+ data += chars;
+ }
+ data += chars.substr(0, size % chars.length());
+ return data;
+}
+
+
+void Test::start()
+{
+ receiver.start();
+ begin = AbsTime(now());
+ sender.start();
+}
+
+void Test::join()
+{
+ sender.join();
+ receiver.join();
+ AbsTime end = now();
+ Duration time(begin, end);
+ double msecs(time / TIME_MSEC);
+ std::cout << "Sent " << opts.count << " msgs through " << queue
+ << " in " << msecs << "ms (" << (opts.count * 1000 / msecs) << " msgs/s) ";
+ stats.print();
+ std::cout << std::endl;
+}
+
+void Test::report()
+{
+ stats.print();
+ std::cout << std::endl;
+ stats.reset();
+}
+
+int main(int argc, char** argv)
+{
+ try {
+ opts.parse(argc, argv);
+ boost::ptr_vector<Test> tests(opts.queues);
+ for (uint i = 0; i < opts.queues; i++) {
+ std::ostringstream out;
+ out << opts.base << "-" << (i+1);
+ tests.push_back(new Test(out.str()));
+ }
+ for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
+ i->start();
+ }
+ if (opts.rate) {
+ while (true) {
+ usleep(opts.reportFrequency * 1000 * 1000);
+ //print latency report:
+ for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
+ i->report();
+ }
+ }
+ } else {
+ for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
+ i->join();
+ }
+ }
+
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ }
+ return 1;
+}