diff options
author | Alan Conway <aconway@apache.org> | 2008-08-05 16:24:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-08-05 16:24:25 +0000 |
commit | 6bed4cf29dda54560af44d16143fa4b24b65245b (patch) | |
tree | c5752b2219a78e786dbbc6f532ce5d4bcbd5c858 /cpp | |
parent | bb667cf14d21dc8a873636a9cf5e3017f4aa5503 (diff) | |
download | qpid-python-6bed4cf29dda54560af44d16143fa4b24b65245b.tar.gz |
Fix sporadic shutdown hang in clustered broker.
Add start|stop_cluster test scripts
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@682774 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 38 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 2 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 2 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 23 | ||||
-rwxr-xr-x | cpp/src/tests/stop_cluster | 13 |
6 files changed, 55 insertions, 25 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index d876f6d253..a713a6c345 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -292,11 +292,11 @@ void Broker::shutdown() { // call any function that is not async-signal safe. // Any unsafe shutdown actions should be done in the destructor. poller->shutdown(); - finalize(); // Finalize any plugins. } Broker::~Broker() { shutdown(); + finalize(); // Finalize any plugins. delete store; if (config.auth) { #if HAVE_SASL diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 6623d1cde0..d18fd452e4 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -56,8 +56,8 @@ ostream& operator <<(ostream& out, const Cluster::MemberMap& members) { } Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : - cpg(*this), broker(&b), + cpg(*this), name(name_), url(url_), self(cpg.self()) @@ -75,10 +75,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : } } -Cluster::~Cluster() { - cpg.shutdown(); - dispatcher.join(); -} +Cluster::~Cluster() {} // local connection initializes plugins void Cluster::initialize(broker::Connection& c) { @@ -88,16 +85,16 @@ void Cluster::initialize(broker::Connection& c) { } void Cluster::leave() { - if (!broker.get()) return; // Already left - QPID_LOG(info, QPID_MSG("Leaving cluster " << *this)); - // Must not be called in the dispatch thread. - assert(Thread::current().id() != dispatcher.id()); + Mutex::ScopedLock l(lock); + if (!broker) return; // Already left. + assert(Thread::current().id() != dispatcher.id()); // Must not be called in the dispatch thread. + QPID_LOG(debug, "Leaving cluster " << *this); cpg.leave(name); - // Wait till final config-change is delivered and broker is released. - { - Mutex::ScopedLock l(lock); - while(broker.get()) lock.wait(); - } + // The dispatch thread sets broker=0 when the final config-change + // is delivered. + while(broker) lock.wait(); + cpg.shutdown(); + dispatcher.join(); } template <class T> void decodePtr(Buffer& buf, T*& ptr) { @@ -115,7 +112,6 @@ void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) { // FIXME aconway 2008-07-03: More efficient buffer management. // Cache coded form of decoded frames for re-encoding? Buffer buf(buffer); - assert(frame.size() + 64 < sizeof(buffer)); frame.encode(buf); encodePtr(buf, connection); iovec iov = { buffer, buf.getPosition() }; @@ -145,6 +141,7 @@ ConnectionInterceptor* Cluster::getShadowConnection(const Cpg::Id& member, void* if (i == shadowConnectionMap.end()) { // A new shadow connection. std::ostringstream os; os << name << ":" << member << ":" << remotePtr; + assert(broker); broker::Connection* c = new broker::Connection(&shadowOut, *broker, os.str()); ShadowConnectionMap::value_type value(id, new ConnectionInterceptor(*c, *this, id)); i = shadowConnectionMap.insert(value).first; @@ -169,8 +166,8 @@ void Cluster::deliver( decodePtr(buf, connection); QPID_LOG(trace, "DLVR [" << from << " " << connection << "] " << frame); - if (!broker.get()) { - QPID_LOG(warning, "Ignoring late DLVR, already left the cluster."); + if (!broker) { + QPID_LOG(warning, "Unexpected DLVR, already left the cluster."); return; } @@ -232,11 +229,8 @@ void Cluster::configChange( QPID_LOG(debug, "Cluster members: " << nCurrent << " ("<< nLeft << " left, " << nJoined << " joined):" << members); assert(members.size() == size_t(nCurrent)); - if (members.find(self) == members.end()) { - QPID_LOG(debug, "Left cluster " << *this); - broker = 0; // Release broker reference. - } - + if (members.find(self) == members.end()) + broker = 0; // We have left the group, this is the final config change. lock.notifyAll(); // Threads waiting for membership changes. } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index 7147b1ac05..7ce0354a80 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -122,8 +122,8 @@ class Cluster : private sys::Runnable, private Cpg::Handler, public RefCounted ConnectionInterceptor* getShadowConnection(const Cpg::Id&, void*); mutable sys::Monitor lock; // Protect access to members. + broker::Broker* broker; Cpg cpg; - boost::intrusive_ptr<broker::Broker> broker; Cpg::Name name; Url url; MemberMap members; diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index 9190eee4e5..8b83e927ec 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -12,7 +12,7 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la # ais_check checks pre-requisites for cluster tests and runs them if ok. TESTS+=ais_check -EXTRA_DIST+=ais_check +EXTRA_DIST+=ais_check start_cluster stop_cluster check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster new file mode 100755 index 0000000000..e399d213dc --- /dev/null +++ b/cpp/src/tests/start_cluster @@ -0,0 +1,23 @@ +#!/bin/sh +# Start a cluster of brokers on local host, put the list of ports for cluster members in cluster.ports +# +echo $1 | grep '^[0-9][0-9]*$' > /dev/null || { echo "Usage: $0 cluster-size [options]"; exit 1; } + +# Execute command with the ais group set. +with_ais_group() { + id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group."; exit 1; } + echo $* | newgrp ais +} + +test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } +rm -f cluster*.log +SIZE=$1; shift +CLUSTER=`pwd` # Cluster name=pwd, avoid clashes. + +for (( i=0; i<SIZE; ++i )); do + OPTS="--load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name=$CLUSTER --no-data-dir --auth=no $*" + PORT=`with_ais_group ../qpidd $OPTS` || exit 1 + echo $PORT >> cluster.ports +done + + diff --git a/cpp/src/tests/stop_cluster b/cpp/src/tests/stop_cluster new file mode 100755 index 0000000000..9bd05092de --- /dev/null +++ b/cpp/src/tests/stop_cluster @@ -0,0 +1,13 @@ +#!/bin/sh +# Stop brokers on ports listed in cluster.ports + +PORTS=`cat cluster.ports` +for PORT in $PORTS ; do + ../qpidd -qp $PORT || ERROR="$ERROR $PORT" +done +rm -f cluster.ports + +if [ -n "$ERROR" ]; then + echo "Errors stopping brokers on ports: $ERROR" + exit 1 +fi |