summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
committerAlan Conway <aconway@apache.org>2008-08-21 18:04:18 +0000
commit2b97a69197fb986c209339c48ed98bb45203e107 (patch)
tree8bd157cc9d19757b6d9c00c5ab2c353ca336f8bf /cpp/src/tests
parentc6c237e2450250a6ef18c5af93e2a733aba10932 (diff)
downloadqpid-python-2b97a69197fb986c209339c48ed98bb45203e107.tar.gz
Pre-buffering output strategy for cluster.
Additional hooks in broker code, should not affect standalone broker. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@687813 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/cluster_test.cpp2
-rw-r--r--cpp/src/tests/consume.cpp11
-rw-r--r--cpp/src/tests/latencytest.cpp15
-rwxr-xr-xcpp/src/tests/start_cluster2
4 files changed, 18 insertions, 12 deletions
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 49c5264990..8f3927732d 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -78,7 +78,7 @@ ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
::sleep(1);
--retry;
}
- BOOST_CHECK_EQUAL(n, getGlobalCluster()->size());
+ BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
}
void ClusterFixture::add() {
diff --git a/cpp/src/tests/consume.cpp b/cpp/src/tests/consume.cpp
index fecf6bb315..c20a738755 100644
--- a/cpp/src/tests/consume.cpp
+++ b/cpp/src/tests/consume.cpp
@@ -42,13 +42,15 @@ struct Args : public qpid::TestOptions {
uint count;
uint ack;
string queue;
-
+ bool declare;
+
Args() : count(0), ack(1)
{
addOptions()
("count", optValue(count, "N"), "number of messages to publish")
("ack-frequency", optValue(ack, "N"), "ack every N messages (0 means use no-ack mode)")
- ("queue", optValue(queue, "<queue name>"), "queue to consume from");
+ ("queue", optValue(queue, "<queue name>"), "queue to consume from")
+ ("declare", optValue(declare), "declare the queue");
}
};
@@ -67,7 +69,8 @@ struct Client
void consume()
{
-
+ if (opts.declare)
+ session.queueDeclare(opts.queue);
SubscriptionManager subs(session);
LocalQueue lq(AckPolicy(opts.ack));
subs.setAcceptMode(opts.ack > 0 ? 0 : 1);
@@ -77,7 +80,7 @@ struct Client
Message msg;
for (size_t i = 0; i < opts.count; ++i) {
msg=lq.pop();
- std::cout << "Received: " << msg.getMessageProperties().getCorrelationId() << std::endl;
+ QPID_LOG(info, "Received: " << msg.getMessageProperties().getCorrelationId());
}
if (opts.ack != 0)
subs.getAckPolicy().ackOutstanding(session); // Cumulative ack for final batch.
diff --git a/cpp/src/tests/latencytest.cpp b/cpp/src/tests/latencytest.cpp
index e1a6f156a5..6c3fdd23bd 100644
--- a/cpp/src/tests/latencytest.cpp
+++ b/cpp/src/tests/latencytest.cpp
@@ -44,6 +44,7 @@ struct Args : public qpid::TestOptions {
uint size;
uint count;
uint rate;
+ bool sync;
uint reportFrequency;
uint timeLimit;
uint queues;
@@ -65,6 +66,7 @@ struct Args : public qpid::TestOptions {
("queues", optValue(queues, "N"), "number of queues")
("count", optValue(count, "N"), "number of messages to send")
("rate", optValue(rate, "N"), "target message rate (causes count to be ignored)")
+ ("sync", optValue(sync), "send messages synchronously")
("report-frequency", optValue(reportFrequency, "N"),
"number of milliseconds to wait between reports (ignored unless rate specified)")
("time-limit", optValue(timeLimit, "N"),
@@ -143,6 +145,7 @@ class Sender : public Client
void sendByRate();
void sendByCount();
Receiver& receiver;
+ const string data;
public:
Sender(const string& queue, Receiver& receiver);
void test();
@@ -285,7 +288,7 @@ void Stats::reset()
totalLatency = maxLatency = minLatency = 0;
}
-Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver) {}
+Sender::Sender(const string& q, Receiver& receiver) : Client(q), receiver(receiver), data(generateData(opts.size)) {}
void Sender::test()
{
@@ -295,7 +298,7 @@ void Sender::test()
void Sender::sendByCount()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -303,15 +306,15 @@ void Sender::sendByCount()
for (uint i = 0; i < opts.count; i++) {
uint64_t sentAt(current_time());
msg.getDeliveryProperties().setTimestamp(sentAt);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
+ if (opts.sync) session.sync();
}
session.sync();
}
void Sender::sendByRate()
{
- Message msg(generateData(opts.size), queue);
+ Message msg(data, queue);
if (opts.durable) {
msg.getDeliveryProperties().setDeliveryMode(framing::PERSISTENT);
}
@@ -324,9 +327,9 @@ void Sender::sendByRate()
while (true) {
uint64_t start_msg(current_time());
msg.getDeliveryProperties().setTimestamp(start_msg);
- //msg.getHeaders().setTimestamp("sent-at", sentAt);//TODO add support for uint64_t to field tables
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
-
+ if (opts.sync) session.sync();
+
uint64_t now = current_time();
if (timeLimit != 0 && (now - start) > timeLimit) {
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 55f989a3e9..6d254190df 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -16,7 +16,7 @@ OPTS="-d --load-module ../.libs/libqpidcluster.so --cluster-name=$CLUSTER --no-
if test "$SIZE" = "one"; then # Special case of singleton cluster, use default port.
../qpidd -q
- with_ais_group ../qpidd $OPTS || exit 1
+ with_ais_group ../qpidd $OPTS --log-output=cluster.log || exit 1
else
for (( i=0; i<SIZE; ++i )); do
PORT=`with_ais_group ../qpidd -p0 --log-output=cluster$i.log $OPTS` || exit 1