diff options
author | Alan Conway <aconway@apache.org> | 2009-12-09 16:58:51 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-12-09 16:58:51 +0000 |
commit | c87bce67ac12ee37f8257efd02ab62fe19336b32 (patch) | |
tree | b2dd6b7752ecb9efdfd3003fe83f0efbf982d223 | |
parent | 84c5cd72cfa3eac53f889ae140c913fae4aa61c3 (diff) | |
download | qpid-python-c87bce67ac12ee37f8257efd02ab62fe19336b32.tar.gz |
QPID-2253 - Cluster node shuts down with inconsistent error.
Add a missing memberUpdate on the transition to CATCHUP mode.
The inconsistent error was caused because the newly updated member
did not have its membership updated and so was missing an failover
update message that the existing members sent to a new client.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@888874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 9 | ||||
-rwxr-xr-x | java/testkit/bin/qpid-python-testkit | 8 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 9 |
3 files changed, 8 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f9ad734d79..f877720350 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -600,6 +600,7 @@ void Cluster::setReady(Lock&) { void Cluster::initMapCompleted(Lock& l) { // Called on completion of the initial status map. + QPID_LOG(debug, *this << " initial status map complete. "); if (state == INIT) { // We have status for all members so we can make join descisions. initMap.checkConsistent(); @@ -705,10 +706,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ member, ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId) ); - if (initMap.transitionToComplete()) { - QPID_LOG(debug, *this << " initial status map complete. "); - initMapCompleted(l); - } + if (initMap.transitionToComplete()) initMapCompleted(l); } void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { @@ -808,10 +806,11 @@ void Cluster::updateInRetracted() { checkUpdateIn(l); } -void Cluster::checkUpdateIn(Lock&) { +void Cluster::checkUpdateIn(Lock& l) { if (state != UPDATEE) return; // Wait till we reach the stall point. if (updatedMap) { // We're up to date map = *updatedMap; + memberUpdate(l); mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self); state = CATCHUP; discarding = false; // ok to set, we're stalled for update. diff --git a/java/testkit/bin/qpid-python-testkit b/java/testkit/bin/qpid-python-testkit index e1696ff722..2c1d015281 100755 --- a/java/testkit/bin/qpid-python-testkit +++ b/java/testkit/bin/qpid-python-testkit @@ -25,10 +25,6 @@ . ./setenv.sh export PYTHONPATH=../:$PYTHONPATH - -if [ -d $OUTDIR ]; then - rm -rf $OUTDIR -fi - -$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit +rm -rf $OUTDIR +$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@" diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index c3145c06ea..9fa79a220b 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -89,6 +89,7 @@ class Popen(popen2.Popen3): self.stdout = ExceptionWrapper(self.fromchild, msg) self.stderr = ExceptionWrapper(self.childerr, msg) self.dump(self.cmd_str(), "cmd") + log.debug("Started process %s" % self.pname) def dump(self, str, ext): name = "%s.%s" % (self.pname, ext) @@ -107,7 +108,7 @@ class Popen(popen2.Popen3): try: self.kill() except: - self.unexpected("Exit code %d" % self.wait()) + self.unexpected("expected running, exit code %d" % self.wait()) else: # Give the process some time to exit. delay = 0.1 @@ -393,13 +394,11 @@ class NumberedSender(Thread): self.condition.release() def stop(self): - log.debug("NumberedSender.stop") self.condition.acquire() self.stopped = True self.condition.notify() self.condition.release() self.join() - log.debug("NumberedSender.stop - joined") if self.error: raise self.error class NumberedReceiver(Thread): @@ -437,18 +436,14 @@ class NumberedReceiver(Thread): if self.sender: self.sender.notify_received(self.received) except Exception, e: - log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02: self.error = RethrownException(e, self.receiver.pname) def stop(self, count): """Returns when received >= count""" - log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02: self.lock.acquire() - log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received)) self.stopat = count self.lock.release() self.join() - log.debug("NumberedReceiver.stop - joined") if self.error: raise self.error class ErrorGenerator(StoppableThread): |