diff options
author | Alan Conway <aconway@apache.org> | 2011-09-29 21:53:52 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-09-29 21:53:52 +0000 |
commit | 8115eff7fcd46f4b69327caef0eb985a6d209cc6 (patch) | |
tree | 4d975fde471c207067129002563caf0fc82a58ef | |
parent | 6c9ffc8cb38805e7c8a802e7bd1f1b2f05910e28 (diff) | |
download | qpid-python-8115eff7fcd46f4b69327caef0eb985a6d209cc6.tar.gz |
QPID-2920: Minor improvements to cluster tests and logging.
Changed default --consume-lock to 10000.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-2920-active@1177443 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/exp/Settings.cpp | 2 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/cluster2_tests.py | 9 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cluster-benchmark | 9 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/qpid-cpp-benchmark | 24 |
6 files changed, 40 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp index 170baa488e..b73a3747c0 100644 --- a/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp @@ -61,6 +61,7 @@ void MessageHandler::enqueue(const std::string& q, const std::string& message) { boost::shared_ptr<Queue> queue = findQueue(q, "Cluster enqueue failed"); // FIXME aconway 2010-10-28: decode message by frame in bounded-size buffers. + // FIXME aconway 2011-09-28: don't re-decode my own messages boost::intrusive_ptr<broker::Message> msg = new broker::Message(); framing::Buffer buf(const_cast<char*>(&message[0]), message.size()); msg->decodeHeader(buf); diff --git a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp index 248fb05dc6..c3ff94b897 100644 --- a/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp @@ -1,3 +1,4 @@ + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -56,6 +57,8 @@ bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; } void QueueContext::replicaState( QueueOwnership before, QueueOwnership after, bool selfDelivered) { + // No lock, this function does not touch any member variables. + // Invariants for ownership: // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped // SOLE_OWNER <=> timer stopped, queue started @@ -64,7 +67,9 @@ void QueueContext::replicaState( // Interested in state changes and my own events which lead to // ownership. if ((before != after || selfDelivered) && isOwner(after)) { - sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, "cluster start consumers on " << queue.getName() << ", timer " + << (after==SHARED_OWNER? "start" : "stop")); + sys::Mutex::ScopedLock l(lock); // FIXME aconway 2011-09-29: REMOVE queue.startConsumers(); if (after == SHARED_OWNER) timer.start(); else timer.stop(); @@ -90,6 +95,7 @@ void QueueContext::cancel(size_t n) { consumers = n; // When consuming threads are stopped, this->stopped will be called. if (n == 0) { + QPID_LOG(trace, "cluster stop consumers and timer on " << queue.getName()); timer.stop(); queue.stopConsumers(); } @@ -98,6 +104,7 @@ void QueueContext::cancel(size_t n) { // Called in timer thread. void QueueContext::timeout() { // When all threads have stopped, queue will call stopped() + QPID_LOG(trace, "cluster timeout, stopping consumers on " << queue.getName()); queue.stopConsumers(); } @@ -105,6 +112,8 @@ void QueueContext::timeout() { // Called when no threads are dispatching from the queue. void QueueContext::stopped() { sys::Mutex::ScopedLock l(lock); + QPID_LOG(trace, "cluster timeout, stopped consumers on " << queue.getName() + << (consumers == 0 ? " unsubscribed" : " resubscribe")); if (consumers == 0) mcast.mcast(framing::ClusterQueueUnsubscribeBody( framing::ProtocolVersion(), queue.getName())); diff --git a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp index 59c8c4274c..6c572849be 100644 --- a/qpid/cpp/src/qpid/cluster/exp/Settings.cpp +++ b/qpid/cpp/src/qpid/cluster/exp/Settings.cpp @@ -25,7 +25,7 @@ namespace qpid { namespace cluster { Settings::Settings() : // Default settings - consumeLockMicros(100000) + consumeLockMicros(10000) {} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/tests/cluster2_tests.py b/qpid/cpp/src/tests/cluster2_tests.py index 81bc71d22f..3c4af6d83b 100755 --- a/qpid/cpp/src/tests/cluster2_tests.py +++ b/qpid/cpp/src/tests/cluster2_tests.py @@ -150,6 +150,7 @@ class Cluster2Tests(BrokerTest): self.session = session self.receiver = session.receiver("q") self.messages = [] + self.error = None Thread.__init__(self) def run(self): @@ -158,6 +159,7 @@ class Cluster2Tests(BrokerTest): self.messages.append(self.receiver.fetch(1)) self.session.acknowledge() except Empty: pass + except Exception,e: self.error = e cluster = self.cluster(3, cluster2=True) connections = [ b.connect() for b in cluster] @@ -173,8 +175,11 @@ class Cluster2Tests(BrokerTest): while time.time() < t: sender.send(str(n)) n += 1 - for r in receivers: r.join(); - for r in receivers: len(r.messages) > n/6 # Fairness test. + for r in receivers: + r.join(); + if (r.error): self.fail("Receiver failed: %s" % r.error) + for r in receivers: + len(r.messages) > n/6 # Fairness test. messages = [int(m.content) for r in receivers for m in r.messages ] messages.sort() self.assertEqual(range(n), messages) diff --git a/qpid/cpp/src/tests/qpid-cluster-benchmark b/qpid/cpp/src/tests/qpid-cluster-benchmark index 29c8ef54dd..7a7542d419 100755 --- a/qpid/cpp/src/tests/qpid-cluster-benchmark +++ b/qpid/cpp/src/tests/qpid-cluster-benchmark @@ -23,7 +23,7 @@ # Default options MESSAGES="-m 10000" FLOW="--flow-control 100" # Flow control limit on queue depth for latency. -REPEAT="--repeat 10" +REPEAT="--repeat 5" QUEUES="-q 4" SENDERS="-s 3" RECEIVERS="-r 3" @@ -43,9 +43,10 @@ while getopts "m:f:n:b:q:s:r:c:" opt; do *) echo "Unknown option"; exit 1;; esac done - +BROKER=$(echo $BROKERS | sed s/,.*//) run_test() { echo $*; shift; "$@"; echo; echo; echo; } -run_test "Throughput:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS -run_test "Latency:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS +run_test "Multiple active brokers:" qpid-cpp-benchmark $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS +run_test "Single active broker :" qpid-cpp-benchmark $REPEAT $BROKER --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS +run_test "Latency under low load:" qpid-cpp-benchmark $REPEAT $BROKERS --connection-options "{tcp-nodelay:true}" $MESSAGES $FLOW $CLIENT_HOSTS diff --git a/qpid/cpp/src/tests/qpid-cpp-benchmark b/qpid/cpp/src/tests/qpid-cpp-benchmark index bd83403f7e..1bc8770915 100755 --- a/qpid/cpp/src/tests/qpid-cpp-benchmark +++ b/qpid/cpp/src/tests/qpid-cpp-benchmark @@ -177,9 +177,9 @@ def recreate_queues(queues, brokers): c.close() def print_header(timestamp): - if timestamp: latency_header="\tl-min\tl-max\tl-avg" + if timestamp: latency_header="\tl-min\tl-max\tl-avg\ttotal-tp" else: latency_header="" - print "send-tp\t\trecv-tp%s"%latency_header + print "send-tp\trecv-tp%s"%latency_header def parse(parser, lines): # Parse sender/receiver output return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines] @@ -190,25 +190,29 @@ def parse_senders(senders): def parse_receivers(receivers): return parse([int,float,float,float],[first_line(p) for p in receivers if p]) -def print_data(send_stats, recv_stats): +def print_data(send_stats, recv_stats, total_tp): for send,recv in map(None, send_stats, recv_stats): line="" if send: line += "%d"%send[0] if recv: - line += "\t\t%d"%recv[0] + line += "\t%d"%recv[0] if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]) + if total_tp is not None: + line += "\t%d"%total_tp + total_tp = None print line -def print_summary(send_stats, recv_stats): +def print_summary(send_stats, recv_stats, total_tp): def avg(s): sum(s) / len(s) send_tp = sum([l[0] for l in send_stats]) recv_tp = sum([l[0] for l in recv_stats]) - summary = "%d\t\t%d"%(send_tp, recv_tp) + summary = "%d\t%d"%(send_tp, recv_tp) if recv_stats and len(recv_stats[0]) == 4: l_min = sum(l[1] for l in recv_stats)/len(recv_stats) l_max = sum(l[2] for l in recv_stats)/len(recv_stats) l_avg = sum(l[3] for l in recv_stats)/len(recv_stats) summary += "\t%.2f\t%.2f\t%.2f"%(l_min, l_max, l_avg) + summary += "\t%d"%total_tp print summary @@ -269,13 +273,17 @@ def main(): brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.receivers)] ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready. + start = time.time() senders = [start_send(q, opts,brokers.next(), client_hosts.next()) for q in queues for j in xrange(opts.senders)] if opts.report_header and i == 0: print_header(opts.timestamp) + for p in senders + receivers: p.wait() + total_sent = opts.queues * opts.senders * opts.messages + total_tp = total_sent / (time.time()-start) send_stats=parse_senders(senders) recv_stats=parse_receivers(receivers) - if opts.summarize: print_summary(send_stats, recv_stats) - else: print_data(send_stats, recv_stats) + if opts.summarize: print_summary(send_stats, recv_stats, total_tp) + else: print_data(send_stats, recv_stats, total_tp) finally: clients.kill() # No strays if __name__ == "__main__": main() |