diff options
Diffstat (limited to 'cpp/src/tests/latencytest.cpp')
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 223 |
1 files changed, 133 insertions, 90 deletions
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index e1a6f156a5..a205ef6c7c 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,53 +21,64 @@ #include <algorithm> +#include <limits> #include <iostream> #include <memory> #include <sstream> #include <vector> -#include <unistd.h> #include "TestOptions.h" +#include "qpid/sys/Thread.h" #include "qpid/client/Connection.h" #include "qpid/client/Message.h" #include "qpid/client/AsyncSession.h" #include "qpid/client/SubscriptionManager.h" +#include "qpid/sys/Time.h" using namespace qpid; using namespace qpid::client; using namespace qpid::sys; using std::string; +namespace qpid { +namespace tests { + typedef std::vector<std::string> StringSet; struct Args : public qpid::TestOptions { uint size; uint count; uint rate; + bool sync; uint reportFrequency; uint timeLimit; - uint queues; + uint concurrentConnections; uint prefetch; uint ack; bool cumulative; bool csv; bool durable; string base; + bool singleConnect; - Args() : size(256), count(1000), rate(0), reportFrequency(100), - timeLimit(0), queues(1), + Args() : size(256), count(1000), rate(0), reportFrequency(1000), + timeLimit(0), concurrentConnections(1), prefetch(100), ack(0), - durable(false), base("latency-test") + durable(false), base("latency-test"), singleConnect(false) + { - addOptions() + addOptions() ("size", optValue(size, "N"), "message size") - ("queues", optValue(queues, "N"), "number of queues") + ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setups, will create another publisher,\ + subcriber, queue, and connections") + ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.") ("count", optValue(count, "N"), "number of messages to send") ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") - ("report-frequency", optValue(reportFrequency, "N"), + ("sync", optValue(sync), "send messages synchronously") + ("report-frequency", optValue(reportFrequency, "N"), "number of milliseconds to wait between reports (ignored unless rate specified)") - ("time-limit", optValue(timeLimit, "N"), + ("time-limit", optValue(timeLimit, "N"), "test duration, in seconds") ("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)") ("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)") @@ -82,6 +93,7 @@ const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); Args opts; double c_min, c_avg, c_max; +Connection globalConnection; uint64_t current_time() { @@ -89,7 +101,7 @@ uint64_t current_time() return t; } -struct Stats +struct Stats { Mutex lock; uint count; @@ -106,14 +118,15 @@ struct Stats class Client : public Runnable { protected: - Connection connection; + Connection* connection; + Connection localConnection; AsyncSession session; Thread thread; string queue; public: Client(const string& q); - virtual ~Client() {} + virtual ~Client(); void start(); void join(); @@ -122,7 +135,7 @@ public: }; class Receiver : public Client, public MessageListener -{ +{ SubscriptionManager mgr; uint count; Stats& stats; @@ -143,6 +156,8 @@ class Sender : public Client void sendByRate(); void sendByCount(); Receiver& receiver; + const string data; + public: Sender(const string& queue, Receiver& receiver); void test(); @@ -156,7 +171,7 @@ class Test Receiver receiver; Sender sender; AbsTime begin; - + public: Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {} void start(); @@ -167,8 +182,14 @@ public: Client::Client(const string& q) : queue(q) { - opts.open(connection); - session = connection.newSession(); + if (opts.singleConnect){ + connection = &globalConnection; + if (!globalConnection.isOpen()) opts.open(globalConnection); + }else{ + connection = &localConnection; + opts.open(localConnection); + } + session = connection->newSession(); } void Client::start() @@ -185,8 +206,16 @@ void Client::run() { try{ test(); + } catch(const std::exception& e) { + std::cout << "Error in receiver: " << e.what() << std::endl; + } +} + +Client::~Client() +{ + try{ session.close(); - connection.close(); + connection->close(); } catch(const std::exception& e) { std::cout << "Error in receiver: " << e.what() << std::endl; } @@ -199,15 +228,17 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0 if (msgCount) { std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl; session.queuePurge(arg::queue=queue); + session.sync(); } + SubscriptionSettings settings; if (opts.prefetch) { - mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2))); - mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true); + settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2)); + settings.flowControl = FlowControl::messageWindow(opts.prefetch); } else { - mgr.setAcceptMode(1/*not-required*/); - mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false); + settings.acceptMode = ACCEPT_MODE_NONE; + settings.flowControl = FlowControl::unlimited(); } - mgr.subscribe(*this, queue); + mgr.subscribe(*this, queue, settings); } void Receiver::test() @@ -219,11 +250,9 @@ void Receiver::test() void Receiver::received(Message& msg) { ++count; - uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); - //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for uint64_t as a field table type uint64_t receivedAt = current_time(); + uint64_t sentAt = msg.getDeliveryProperties().getTimestamp(); - //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl; stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC); if (!opts.rate && count >= opts.count) { @@ -235,57 +264,70 @@ void Stats::update(double latency) { Mutex::ScopedLock l(lock); count++; - if (minLatency == 0 || minLatency > latency) minLatency = latency; - if (maxLatency == 0 || maxLatency < latency) maxLatency = latency; + minLatency = std::min(minLatency, latency); + maxLatency = std::max(maxLatency, latency); totalLatency += latency; } -Stats::Stats() : count(0), minLatency(0), maxLatency(0), totalLatency(0) {} +Stats::Stats() : count(0), minLatency(std::numeric_limits<double>::max()), maxLatency(0), totalLatency(0) {} void Stats::print() { static bool already_have_stats = false; uint value; - double aux_avg = (totalLatency / count); if (opts.rate) value = opts.rate; else value = opts.count; Mutex::ScopedLock l(lock); + double aux_avg = (totalLatency / count); if (!opts.cumulative) { if (!opts.csv) { - std::cout << "Latency(ms): min=" << minLatency << ", max=" << - maxLatency << ", avg=" << aux_avg; + if (count) { + std::cout << "Latency(ms): min=" << minLatency << ", max=" << + maxLatency << ", avg=" << aux_avg; + } else { + std::cout << "Stalled: no samples for interval"; + } } else { - std::cout << value << "," << minLatency << "," << maxLatency << + if (count) { + std::cout << value << "," << minLatency << "," << maxLatency << "," << aux_avg; + } else { + std::cout << value << "," << minLatency << "," << maxLatency << + ", Stalled"; + } } } else { - if (already_have_stats) { - c_avg = (c_min + aux_avg) / 2; - if (c_min > minLatency) c_min = minLatency; - if (c_max < maxLatency) c_max = maxLatency; + if (count) { + if (already_have_stats) { + c_avg = (c_min + aux_avg) / 2; + if (c_min > minLatency) c_min = minLatency; + if (c_max < maxLatency) c_max = maxLatency; + } else { + c_avg = aux_avg; + c_min = minLatency; + c_max = maxLatency; + already_have_stats = true; + } + std::cout << value << "," << c_min << "," << c_max << + "," << c_avg; } else { - c_avg = aux_avg; - c_min = minLatency; - c_max = maxLatency; - already_have_stats = true; + std::cout << "Stalled: no samples for interval"; } - std::cout << value << "," << c_min << "," << c_max << - "," << c_avg; } - } void Stats::reset() { Mutex::ScopedLock l(lock); count = 0; - totalLatency = maxLatency = minLatency = 0; + totalLatency = maxLatency = 0; + minLatency = std::numeric_limits<double>::max(); } -Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {} +Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {} void Sender::test() { @@ -295,7 +337,7 @@ void Sender::test() void Sender::sendByCount() { - Message msg(generateData(opts.size), queue); + Message msg(data, queue); if (opts.durable) { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } @@ -303,45 +345,38 @@ void Sender::sendByCount() for (uint i = 0; i < opts.count; i++) { uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); - //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); + if (opts.sync) session.sync(); } session.sync(); } void Sender::sendByRate() { - Message msg(generateData(opts.size), queue); + Message msg(data, queue); if (opts.durable) { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } - - //calculate interval (in micro secs) between messages to achieve desired rate - uint64_t interval = (1000*1000)/opts.rate; - uint64_t timeLimit(opts.timeLimit * TIME_SEC); - uint64_t start(current_time()); - + uint64_t interval = TIME_SEC/opts.rate; + int64_t timeLimit = opts.timeLimit * TIME_SEC; + uint64_t sent = 0, missedRate = 0; + AbsTime start = now(); while (true) { - uint64_t start_msg(current_time()); - msg.getDeliveryProperties().setTimestamp(start_msg); - //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables + AbsTime sentAt=now(); + msg.getDeliveryProperties().setTimestamp(Duration(sentAt)); async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - - uint64_t now = current_time(); - - if (timeLimit != 0 && (now - start) > timeLimit) { - session.sync(); - receiver.stop(); - break; - } - - uint64_t timeTaken = (now - start_msg) / TIME_USEC; - if (timeTaken < interval) { - usleep(interval - timeTaken); - } else if (timeTaken > interval && - !opts.csv && !opts.cumulative) { // Don't be so verbose in this case, we're piping the results to another program - std::cout << "Could not achieve desired rate! (Took " << timeTaken - << " microsecs to send message, aiming for " << interval << " microsecs)" << std::endl; + if (opts.sync) session.sync(); + ++sent; + AbsTime waitTill(start, sent*interval); + Duration delay(sentAt, waitTill); + if (delay < 0) + ++missedRate; + else + sys::usleep(delay / TIME_USEC); + if (timeLimit != 0 && Duration(start, now()) > timeLimit) { + session.sync(); + receiver.stop(); + break; } } } @@ -350,7 +385,7 @@ string Sender::generateData(uint size) { if (size < chars.length()) { return chars.substr(0, size); - } + } std::string data; for (uint i = 0; i < (size / chars.length()); i++) { data += chars; @@ -360,43 +395,51 @@ string Sender::generateData(uint size) } -void Test::start() -{ - receiver.start(); +void Test::start() +{ + receiver.start(); begin = AbsTime(now()); - sender.start(); + sender.start(); } -void Test::join() -{ - sender.join(); - receiver.join(); +void Test::join() +{ + sender.join(); + receiver.join(); AbsTime end = now(); Duration time(begin, end); double msecs(time / TIME_MSEC); if (!opts.csv) { - std::cout << "Sent " << receiver.getCount() << " msgs through " << queue + std::cout << "Sent " << receiver.getCount() << " msgs through " << queue << " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) "; } stats.print(); std::cout << std::endl; } -void Test::report() -{ +void Test::report() +{ stats.print(); std::cout << std::endl; stats.reset(); } +}} // namespace qpid::tests + +using namespace qpid::tests; + int main(int argc, char** argv) { try { opts.parse(argc, argv); if (opts.cumulative) opts.csv = true; - boost::ptr_vector<Test> tests(opts.queues); - for (uint i = 0; i < opts.queues; i++) { + + Connection localConnection; + AsyncSession session; + + boost::ptr_vector<Test> tests(opts.concurrentConnections); + for (uint i = 0; i < opts.concurrentConnections; i++) { std::ostringstream out; out << opts.base << "-" << (i+1); tests.push_back(new Test(out.str())); @@ -406,7 +449,7 @@ int main(int argc, char** argv) } if (opts.rate && !opts.timeLimit) { while (true) { - usleep(opts.reportFrequency * 1000); + qpid::sys::usleep(opts.reportFrequency * 1000); //print latency report: for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) { i->report(); |