diff options
author | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-21 18:04:18 +0000 |
commit | 2b97a69197fb986c209339c48ed98bb45203e107 (patch) | |
tree | 8bd157cc9d19757b6d9c00c5ab2c353ca336f8bf /cpp/src/tests | |
parent | c6c237e2450250a6ef18c5af93e2a733aba10932 (diff) | |
download | qpid-python-2b97a69197fb986c209339c48ed98bb45203e107.tar.gz |
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 2 | ||||
-rw-r--r-- | cpp/src/tests/consume.cpp | 11 | ||||
-rw-r--r-- | cpp/src/tests/latencytest.cpp | 15 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 2 |
4 files changed, 18 insertions, 12 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 49c5264990..8f3927732d 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -78,7 +78,7 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) { ::sleep(1); --retry; } - BOOST_CHECK_EQUAL(n, getGlobalCluster()->size()); + BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size()); } void ClusterFixture::add() { diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp index fecf6bb315..c20a738755 100644 --- a/cpp/src/tests/consume.cpp +++ b/cpp/src/tests/consume.cpp @@ -42,13 +42,15 @@ struct Args : public qpid::TestOptions { uint count; uint ack; string queue; - + bool declare; + Args() : count(0), ack(1) { addOptions() ("count", optValue(count, "N"), "number of messages to publish") ("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)") - ("queue", optValue(queue, "<queue name>"), "queue to consume from"); + ("queue", optValue(queue, "<queue name>"), "queue to consume from") + ("declare", optValue(declare), "declare the queue"); } }; @@ -67,7 +69,8 @@ struct Client void consume() { - + if (opts.declare) + session.queueDeclare(opts.queue); SubscriptionManager subs(session); LocalQueue lq(AckPolicy(opts.ack)); subs.setAcceptMode(opts.ack > 0 ? 0 : 1); @@ -77,7 +80,7 @@ struct Client Message msg; for (size_t i = 0; i < opts.count; ++i) { msg=lq.pop(); - std::cout << "Received: " << msg.getMessageProperties().getCorrelationId() << std::endl; + QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId()); } if (opts.ack != 0) subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch. diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp index e1a6f156a5..6c3fdd23bd 100644 --- a/cpp/src/tests/latencytest.cpp +++ b/cpp/src/tests/latencytest.cpp @@ -44,6 +44,7 @@ struct Args : public qpid::TestOptions { uint size; uint count; uint rate; + bool sync; uint reportFrequency; uint timeLimit; uint queues; @@ -65,6 +66,7 @@ struct Args : public qpid::TestOptions { ("queues", optValue(queues, "N"), "number of queues") ("count", optValue(count, "N"), "number of messages to send") ("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)") + ("sync", optValue(sync), "send messages synchronously") ("report-frequency", optValue(reportFrequency, "N"), "number of milliseconds to wait between reports (ignored unless rate specified)") ("time-limit", optValue(timeLimit, "N"), @@ -143,6 +145,7 @@ class Sender : public Client void sendByRate(); void sendByCount(); Receiver& receiver; + const string data; public: Sender(const string& queue, Receiver& receiver); void test(); @@ -285,7 +288,7 @@ void Stats::reset() totalLatency = maxLatency = minLatency = 0; } -Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {} +Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {} void Sender::test() { @@ -295,7 +298,7 @@ void Sender::test() void Sender::sendByCount() { - Message msg(generateData(opts.size), queue); + Message msg(data, queue); if (opts.durable) { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } @@ -303,15 +306,15 @@ void Sender::sendByCount() for (uint i = 0; i < opts.count; i++) { uint64_t sentAt(current_time()); msg.getDeliveryProperties().setTimestamp(sentAt); - //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); + if (opts.sync) session.sync(); } session.sync(); } void Sender::sendByRate() { - Message msg(generateData(opts.size), queue); + Message msg(data, queue); if (opts.durable) { msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT); } @@ -324,9 +327,9 @@ void Sender::sendByRate() while (true) { uint64_t start_msg(current_time()); msg.getDeliveryProperties().setTimestamp(start_msg); - //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); - + if (opts.sync) session.sync(); + uint64_t now = current_time(); if (timeLimit != 0 && (now - start) > timeLimit) { diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index 55f989a3e9..6d254190df 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -16,7 +16,7 @@ OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no- if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port. ../qpidd -q - with_ais_group ../qpidd $OPTS || exit 1 + with_ais_group ../qpidd $OPTS --log-output=cluster.log || exit 1 else for (( i=0; i<SIZE; ++i )); do PORT=`with_ais_group ../qpidd -p0 --log-output=cluster$i.log $OPTS` || exit 1 |