summaryrefslogtreecommitdiff
path: root/cpp/src/tests/latencytest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/latencytest.cpp')
-rw-r--r--cpp/src/tests/latencytest.cpp223
1 files changed, 133 insertions, 90 deletions
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index e1a6f156a5..a205ef6c7c 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,53 +21,64 @@
#include <algorithm>
+#include <limits>
#include <iostream>
#include <memory>
#include <sstream>
#include <vector>
-#include <unistd.h>
#include "TestOptions.h"
+#include "qpid/sys/Thread.h"
#include "qpid/client/Connection.h"
#include "qpid/client/Message.h"
#include "qpid/client/AsyncSession.h"
#include "qpid/client/SubscriptionManager.h"
+#include "qpid/sys/Time.h"
using namespace qpid;
using namespace qpid::client;
using namespace qpid::sys;
using std::string;
+namespace qpid {
+namespace tests {
+
typedef std::vector<std::string> StringSet;
struct Args : public qpid::TestOptions {
uint size;
uint count;
uint rate;
+ bool sync;
uint reportFrequency;
uint timeLimit;
- uint queues;
+ uint concurrentConnections;
uint prefetch;
uint ack;
bool cumulative;
bool csv;
bool durable;
string base;
+ bool singleConnect;
- Args() : size(256), count(1000), rate(0), reportFrequency(100),
- timeLimit(0), queues(1),
+ Args() : size(256), count(1000), rate(0), reportFrequency(1000),
+ timeLimit(0), concurrentConnections(1),
prefetch(100), ack(0),
- durable(false), base("latency-test")
+ durable(false), base("latency-test"), singleConnect(false)
+
{
- addOptions()
+ addOptions()
("size", optValue(size, "N"), "message size")
- ("queues", optValue(queues, "N"), "number of queues")
+ ("concurrentTests", optValue(concurrentConnections, "N"), "number of concurrent test setups, will create another publisher,\
+ subcriber, queue, and connections")
+ ("single-connection", optValue(singleConnect, "yes|no"), "Use one connection for multiple sessions.")
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
- ("report-frequency", optValue(reportFrequency, "N"),
+ ("sync", optValue(sync), "send messages synchronously")
+ ("report-frequency", optValue(reportFrequency, "N"),
"number of milliseconds to wait between reports (ignored unless rate specified)")
- ("time-limit", optValue(timeLimit, "N"),
+ ("time-limit", optValue(timeLimit, "N"),
"test duration, in seconds")
("prefetch", optValue(prefetch, "N"), "prefetch count (0 implies no flow control, and no acking)")
("ack", optValue(ack, "N"), "Ack frequency in messages (defaults to half the prefetch value)")
@@ -82,6 +93,7 @@ const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
Args opts;
double c_min, c_avg, c_max;
+Connection globalConnection;
uint64_t current_time()
{
@@ -89,7 +101,7 @@ uint64_t current_time()
return t;
}
-struct Stats
+struct Stats
{
Mutex lock;
uint count;
@@ -106,14 +118,15 @@ struct Stats
class Client : public Runnable
{
protected:
- Connection connection;
+ Connection* connection;
+ Connection localConnection;
AsyncSession session;
Thread thread;
string queue;
public:
Client(const string& q);
- virtual ~Client() {}
+ virtual ~Client();
void start();
void join();
@@ -122,7 +135,7 @@ public:
};
class Receiver : public Client, public MessageListener
-{
+{
SubscriptionManager mgr;
uint count;
Stats& stats;
@@ -143,6 +156,8 @@ class Sender : public Client
void sendByRate();
void sendByCount();
Receiver& receiver;
+ const string data;
+
public:
Sender(const string& queue, Receiver& receiver);
void test();
@@ -156,7 +171,7 @@ class Test
Receiver receiver;
Sender sender;
AbsTime begin;
-
+
public:
Test(const string& q) : queue(q), receiver(queue, stats), sender(queue, receiver), begin(now()) {}
void start();
@@ -167,8 +182,14 @@ public:
Client::Client(const string& q) : queue(q)
{
- opts.open(connection);
- session = connection.newSession();
+ if (opts.singleConnect){
+ connection = &globalConnection;
+ if (!globalConnection.isOpen()) opts.open(globalConnection);
+ }else{
+ connection = &localConnection;
+ opts.open(localConnection);
+ }
+ session = connection->newSession();
}
void Client::start()
@@ -185,8 +206,16 @@ void Client::run()
{
try{
test();
+ } catch(const std::exception& e) {
+ std::cout << "Error in receiver: " << e.what() << std::endl;
+ }
+}
+
+Client::~Client()
+{
+ try{
session.close();
- connection.close();
+ connection->close();
} catch(const std::exception& e) {
std::cout << "Error in receiver: " << e.what() << std::endl;
}
@@ -199,15 +228,17 @@ Receiver::Receiver(const string& q, Stats& s) : Client(q), mgr(session), count(0
if (msgCount) {
std::cout << "Warning: found " << msgCount << " msgs on " << queue << ". Purging..." << std::endl;
session.queuePurge(arg::queue=queue);
+ session.sync();
}
+ SubscriptionSettings settings;
if (opts.prefetch) {
- mgr.setAckPolicy(AckPolicy(opts.ack ? opts.ack : (opts.prefetch / 2)));
- mgr.setFlowControl(opts.prefetch, SubscriptionManager::UNLIMITED, true);
+ settings.autoAck = (opts.ack ? opts.ack : (opts.prefetch / 2));
+ settings.flowControl = FlowControl::messageWindow(opts.prefetch);
} else {
- mgr.setAcceptMode(1/*not-required*/);
- mgr.setFlowControl(SubscriptionManager::UNLIMITED, SubscriptionManager::UNLIMITED, false);
+ settings.acceptMode = ACCEPT_MODE_NONE;
+ settings.flowControl = FlowControl::unlimited();
}
- mgr.subscribe(*this, queue);
+ mgr.subscribe(*this, queue, settings);
}
void Receiver::test()
@@ -219,11 +250,9 @@ void Receiver::test()
void Receiver::received(Message& msg)
{
++count;
- uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
- //uint64_t sentAt = msg.getHeaders().getTimestamp("sent-at");// TODO: add support for uint64_t as a field table type
uint64_t receivedAt = current_time();
+ uint64_t sentAt = msg.getDeliveryProperties().getTimestamp();
- //std::cerr << "Latency: " << (receivedAt - sentAt) << std::endl;
stats.update(((double) (receivedAt - sentAt)) / TIME_MSEC);
if (!opts.rate && count >= opts.count) {
@@ -235,57 +264,70 @@ void Stats::update(double latency)
{
Mutex::ScopedLock l(lock);
count++;
- if (minLatency == 0 || minLatency > latency) minLatency = latency;
- if (maxLatency == 0 || maxLatency < latency) maxLatency = latency;
+ minLatency = std::min(minLatency, latency);
+ maxLatency = std::max(maxLatency, latency);
totalLatency += latency;
}
-Stats::Stats() : count(0), minLatency(0), maxLatency(0), totalLatency(0) {}
+Stats::Stats() : count(0), minLatency(std::numeric_limits<double>::max()), maxLatency(0), totalLatency(0) {}
void Stats::print()
{
static bool already_have_stats = false;
uint value;
- double aux_avg = (totalLatency / count);
if (opts.rate)
value = opts.rate;
else
value = opts.count;
Mutex::ScopedLock l(lock);
+ double aux_avg = (totalLatency / count);
if (!opts.cumulative) {
if (!opts.csv) {
- std::cout << "Latency(ms): min=" << minLatency << ", max=" <<
- maxLatency << ", avg=" << aux_avg;
+ if (count) {
+ std::cout << "Latency(ms): min=" << minLatency << ", max=" <<
+ maxLatency << ", avg=" << aux_avg;
+ } else {
+ std::cout << "Stalled: no samples for interval";
+ }
} else {
- std::cout << value << "," << minLatency << "," << maxLatency <<
+ if (count) {
+ std::cout << value << "," << minLatency << "," << maxLatency <<
"," << aux_avg;
+ } else {
+ std::cout << value << "," << minLatency << "," << maxLatency <<
+ ", Stalled";
+ }
}
} else {
- if (already_have_stats) {
- c_avg = (c_min + aux_avg) / 2;
- if (c_min > minLatency) c_min = minLatency;
- if (c_max < maxLatency) c_max = maxLatency;
+ if (count) {
+ if (already_have_stats) {
+ c_avg = (c_min + aux_avg) / 2;
+ if (c_min > minLatency) c_min = minLatency;
+ if (c_max < maxLatency) c_max = maxLatency;
+ } else {
+ c_avg = aux_avg;
+ c_min = minLatency;
+ c_max = maxLatency;
+ already_have_stats = true;
+ }
+ std::cout << value << "," << c_min << "," << c_max <<
+ "," << c_avg;
} else {
- c_avg = aux_avg;
- c_min = minLatency;
- c_max = maxLatency;
- already_have_stats = true;
+ std::cout << "Stalled: no samples for interval";
}
- std::cout << value << "," << c_min << "," << c_max <<
- "," << c_avg;
}
-
}
void Stats::reset()
{
Mutex::ScopedLock l(lock);
count = 0;
- totalLatency = maxLatency = minLatency = 0;
+ totalLatency = maxLatency = 0;
+ minLatency = std::numeric_limits<double>::max();
}
-Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {}
+Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {}
void Sender::test()
{
@@ -295,7 +337,7 @@ void Sender::test()
void Sender::sendByCount()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -303,45 +345,38 @@ void Sender::sendByCount()
for (uint i = 0; i < opts.count; i++) {
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+ if (opts.sync) session.sync();
}
session.sync();
}
void Sender::sendByRate()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
-
- //calculate interval (in micro secs) between messages to achieve desired rate
- uint64_t interval = (1000*1000)/opts.rate;
- uint64_t timeLimit(opts.timeLimit * TIME_SEC);
- uint64_t start(current_time());
-
+ uint64_t interval = TIME_SEC/opts.rate;
+ int64_t timeLimit = opts.timeLimit * TIME_SEC;
+ uint64_t sent = 0, missedRate = 0;
+ AbsTime start = now();
while (true) {
- uint64_t start_msg(current_time());
- msg.getDeliveryProperties().setTimestamp(start_msg);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
+ AbsTime sentAt=now();
+ msg.getDeliveryProperties().setTimestamp(Duration(sentAt));
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
-
- uint64_t now = current_time();
-
- if (timeLimit != 0 && (now - start) > timeLimit) {
- session.sync();
- receiver.stop();
- break;
- }
-
- uint64_t timeTaken = (now - start_msg) / TIME_USEC;
- if (timeTaken < interval) {
- usleep(interval - timeTaken);
- } else if (timeTaken > interval &&
- !opts.csv && !opts.cumulative) { // Don't be so verbose in this case, we're piping the results to another program
- std::cout << "Could not achieve desired rate! (Took " << timeTaken
- << " microsecs to send message, aiming for " << interval << " microsecs)" << std::endl;
+ if (opts.sync) session.sync();
+ ++sent;
+ AbsTime waitTill(start, sent*interval);
+ Duration delay(sentAt, waitTill);
+ if (delay < 0)
+ ++missedRate;
+ else
+ sys::usleep(delay / TIME_USEC);
+ if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
+ session.sync();
+ receiver.stop();
+ break;
}
}
}
@@ -350,7 +385,7 @@ string Sender::generateData(uint size)
{
if (size < chars.length()) {
return chars.substr(0, size);
- }
+ }
std::string data;
for (uint i = 0; i < (size / chars.length()); i++) {
data += chars;
@@ -360,43 +395,51 @@ string Sender::generateData(uint size)
}
-void Test::start()
-{
- receiver.start();
+void Test::start()
+{
+ receiver.start();
begin = AbsTime(now());
- sender.start();
+ sender.start();
}
-void Test::join()
-{
- sender.join();
- receiver.join();
+void Test::join()
+{
+ sender.join();
+ receiver.join();
AbsTime end = now();
Duration time(begin, end);
double msecs(time / TIME_MSEC);
if (!opts.csv) {
- std::cout << "Sent " << receiver.getCount() << " msgs through " << queue
+ std::cout << "Sent " << receiver.getCount() << " msgs through " << queue
<< " in " << msecs << "ms (" << (receiver.getCount() * 1000 / msecs) << " msgs/s) ";
}
stats.print();
std::cout << std::endl;
}
-void Test::report()
-{
+void Test::report()
+{
stats.print();
std::cout << std::endl;
stats.reset();
}
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
int main(int argc, char** argv)
{
try {
opts.parse(argc, argv);
if (opts.cumulative)
opts.csv = true;
- boost::ptr_vector<Test> tests(opts.queues);
- for (uint i = 0; i < opts.queues; i++) {
+
+ Connection localConnection;
+ AsyncSession session;
+
+ boost::ptr_vector<Test> tests(opts.concurrentConnections);
+ for (uint i = 0; i < opts.concurrentConnections; i++) {
std::ostringstream out;
out << opts.base << "-" << (i+1);
tests.push_back(new Test(out.str()));
@@ -406,7 +449,7 @@ int main(int argc, char** argv)
}
if (opts.rate && !opts.timeLimit) {
while (true) {
- usleep(opts.reportFrequency * 1000);
+ qpid::sys::usleep(opts.reportFrequency * 1000);
//print latency report:
for (boost::ptr_vector<Test>::iterator i = tests.begin(); i != tests.end(); i++) {
i->report();