summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-09-29 21:53:52 +0000
committerAlan Conway <aconway@apache.org>2011-09-29 21:53:52 +0000
commit8115eff7fcd46f4b69327caef0eb985a6d209cc6 (patch)
tree4d975fde471c207067129002563caf0fc82a58ef
parent6c9ffc8cb38805e7c8a802e7bd1f1b2f05910e28 (diff)
downloadqpid-python-8115eff7fcd46f4b69327caef0eb985a6d209cc6.tar.gz
QPID-2920: Minor improvements to cluster tests and logging.
Changed default --consume-lock to 10000. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177443 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp1
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp11
-rw-r--r--qpid/cpp/src/qpid/cluster/exp/Settings.cpp2
-rwxr-xr-xqpid/cpp/src/tests/cluster2_tests.py9
-rwxr-xr-xqpid/cpp/src/tests/qpid-cluster-benchmark9
-rwxr-xr-xqpid/cpp/src/tests/qpid-cpp-benchmark24
6 files changed, 40 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
index 170baa488e..b73a3747c0 100644
--- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
@@ -61,6 +61,7 @@ void MessageHandler::enqueue(const std::string& q, const std::string& message) {
boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed");
// FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers.
+ // FIXME aconway 2011-09-28: don't re-decode my own messages
boost::intrusive_ptr<broker::Message> msg = new broker::Message();
framing::Buffer buf(const_cast<char*>(&message[0]), message.size());
msg->decodeHeader(buf);
diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
index 248fb05dc6..c3ff94b897 100644
--- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
@@ -1,3 +1,4 @@
+
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -56,6 +57,8 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
void QueueContext::replicaState(
QueueOwnership before, QueueOwnership after, bool selfDelivered)
{
+ // No lock, this function does not touch any member variables.
+
// Invariants for ownership:
// UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
// SOLE_OWNER <=> timer stopped, queue started
@@ -64,7 +67,9 @@ void QueueContext::replicaState(
// Interested in state changes and my own events which lead to
// ownership.
if ((before != after || selfDelivered) && isOwner(after)) {
- sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, "cluster start consumers on " << queue.getName() << ", timer "
+ << (after==SHARED_OWNER? "start" : "stop"));
+ sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-09-29: REMOVE
queue.startConsumers();
if (after == SHARED_OWNER) timer.start();
else timer.stop();
@@ -90,6 +95,7 @@ void QueueContext::cancel(size_t n) {
consumers = n;
// When consuming threads are stopped, this->stopped will be called.
if (n == 0) {
+ QPID_LOG(trace, "cluster stop consumers and timer on " << queue.getName());
timer.stop();
queue.stopConsumers();
}
@@ -98,6 +104,7 @@ void QueueContext::cancel(size_t n) {
// Called in timer thread.
void QueueContext::timeout() {
// When all threads have stopped, queue will call stopped()
+ QPID_LOG(trace, "cluster timeout, stopping consumers on " << queue.getName());
queue.stopConsumers();
}
@@ -105,6 +112,8 @@ void QueueContext::timeout() {
// Called when no threads are dispatching from the queue.
void QueueContext::stopped() {
sys::Mutex::ScopedLock l(lock);
+ QPID_LOG(trace, "cluster timeout, stopped consumers on " << queue.getName()
+ << (consumers == 0 ? " unsubscribed" : " resubscribe"));
if (consumers == 0)
mcast.mcast(framing::ClusterQueueUnsubscribeBody(
framing::ProtocolVersion(), queue.getName()));
diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
index 59c8c4274c..6c572849be 100644
--- a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
+++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp
@@ -25,7 +25,7 @@ namespace qpid {
namespace cluster {
Settings::Settings() : // Default settings
- consumeLockMicros(100000)
+ consumeLockMicros(10000)
{}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py
index 81bc71d22f..3c4af6d83b 100755
--- a/qpid/cpp/src/tests/cluster2_tests.py
+++ b/qpid/cpp/src/tests/cluster2_tests.py
@@ -150,6 +150,7 @@ class Cluster2Tests(BrokerTest):
self.session = session
self.receiver = session.receiver("q")
self.messages = []
+ self.error = None
Thread.__init__(self)
def run(self):
@@ -158,6 +159,7 @@ class Cluster2Tests(BrokerTest):
self.messages.append(self.receiver.fetch(1))
self.session.acknowledge()
except Empty: pass
+ except Exception,e: self.error = e
cluster = self.cluster(3, cluster2=True)
connections = [ b.connect() for b in cluster]
@@ -173,8 +175,11 @@ class Cluster2Tests(BrokerTest):
while time.time() < t:
sender.send(str(n))
n += 1
- for r in receivers: r.join();
- for r in receivers: len(r.messages) > n/6 # Fairness test.
+ for r in receivers:
+ r.join();
+ if (r.error): self.fail("Receiver failed: %s" % r.error)
+ for r in receivers:
+ len(r.messages) > n/6 # Fairness test.
messages = [int(m.content) for r in receivers for m in r.messages ]
messages.sort()
self.assertEqual(range(n), messages)
diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark
index 29c8ef54dd..7a7542d419 100755
--- a/qpid/cpp/src/tests/qpid-cluster-benchmark
+++ b/qpid/cpp/src/tests/qpid-cluster-benchmark
@@ -23,7 +23,7 @@
# Default options
MESSAGES="-m 10000"
FLOW="--flow-control 100" # Flow control limit on queue depth for latency.
-REPEAT="--repeat 10"
+REPEAT="--repeat 5"
QUEUES="-q 4"
SENDERS="-s 3"
RECEIVERS="-r 3"
@@ -43,9 +43,10 @@ while getopts "m:f:n:b:q:s:r:c:" opt; do
*) echo "Unknown option"; exit 1;;
esac
done
-
+BROKER=$(echo $BROKERS | sed s/,.*//)
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
-run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
-run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS
+run_test "Multiple active brokers:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
+run_test "Single active broker :" qpid-cpp-benchmark $REPEAT $BROKER --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS
+run_test "Latency under low load:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS
diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark
index bd83403f7e..1bc8770915 100755
--- a/qpid/cpp/src/tests/qpid-cpp-benchmark
+++ b/qpid/cpp/src/tests/qpid-cpp-benchmark
@@ -177,9 +177,9 @@ def recreate_queues(queues, brokers):
c.close()
def print_header(timestamp):
- if timestamp: latency_header="\tl-min\tl-max\tl-avg"
+ if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp"
else: latency_header=""
- print "send-tp\t\trecv-tp%s"%latency_header
+ print "send-tp\trecv-tp%s"%latency_header
def parse(parser, lines): # Parse sender/receiver output
return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
@@ -190,25 +190,29 @@ def parse_senders(senders):
def parse_receivers(receivers):
return parse([int,float,float,float],[first_line(p) for p in receivers if p])
-def print_data(send_stats, recv_stats):
+def print_data(send_stats, recv_stats, total_tp):
for send,recv in map(None, send_stats, recv_stats):
line=""
if send: line += "%d"%send[0]
if recv:
- line += "\t\t%d"%recv[0]
+ line += "\t%d"%recv[0]
if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+ if total_tp is not None:
+ line += "\t%d"%total_tp
+ total_tp = None
print line
-def print_summary(send_stats, recv_stats):
+def print_summary(send_stats, recv_stats, total_tp):
def avg(s): sum(s) / len(s)
send_tp = sum([l[0] for l in send_stats])
recv_tp = sum([l[0] for l in recv_stats])
- summary = "%d\t\t%d"%(send_tp, recv_tp)
+ summary = "%d\t%d"%(send_tp, recv_tp)
if recv_stats and len(recv_stats[0]) == 4:
l_min = sum(l[1] for l in recv_stats)/len(recv_stats)
l_max = sum(l[2] for l in recv_stats)/len(recv_stats)
l_avg = sum(l[3] for l in recv_stats)/len(recv_stats)
summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg)
+ summary += "\t%d"%total_tp
print summary
@@ -269,13 +273,17 @@ def main():
brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.receivers)]
ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+ start = time.time()
senders = [start_send(q, opts,brokers.next(), client_hosts.next())
for q in queues for j in xrange(opts.senders)]
if opts.report_header and i == 0: print_header(opts.timestamp)
+ for p in senders + receivers: p.wait()
+ total_sent = opts.queues * opts.senders * opts.messages
+ total_tp = total_sent / (time.time()-start)
send_stats=parse_senders(senders)
recv_stats=parse_receivers(receivers)
- if opts.summarize: print_summary(send_stats, recv_stats)
- else: print_data(send_stats, recv_stats)
+ if opts.summarize: print_summary(send_stats, recv_stats, total_tp)
+ else: print_data(send_stats, recv_stats, total_tp)
finally: clients.kill() # No strays
if __name__ == "__main__": main()