summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-09 16:58:51 +0000
committerAlan Conway <aconway@apache.org>2009-12-09 16:58:51 +0000
commitc87bce67ac12ee37f8257efd02ab62fe19336b32 (patch)
treeb2dd6b7752ecb9efdfd3003fe83f0efbf982d223
parent84c5cd72cfa3eac53f889ae140c913fae4aa61c3 (diff)
downloadqpid-python-c87bce67ac12ee37f8257efd02ab62fe19336b32.tar.gz
QPID-2253 - Cluster node shuts down with inconsistent error.
Add a missing memberUpdate on the transition to CATCHUP mode. The inconsistent error was caused because the newly updated member did not have its membership updated and so was missing an failover update message that the existing members sent to a new client. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@888874 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp9
-rwxr-xr-xjava/testkit/bin/qpid-python-testkit8
-rw-r--r--python/qpid/brokertest.py9
3 files changed, 8 insertions, 18 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f9ad734d79..f877720350 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -600,6 +600,7 @@ void Cluster::setReady(Lock&) {
void Cluster::initMapCompleted(Lock& l) {
// Called on completion of the initial status map.
+ QPID_LOG(debug, *this << " initial status map complete. ");
if (state == INIT) {
// We have status for all members so we can make join descisions.
initMap.checkConsistent();
@@ -705,10 +706,7 @@ void Cluster::initialStatus(const MemberId& member, uint32_t version, bool activ
member,
ClusterInitialStatusBody(ProtocolVersion(), version, active, id, store, shutdownId)
);
- if (initMap.transitionToComplete()) {
- QPID_LOG(debug, *this << " initial status map complete. ");
- initMapCompleted(l);
- }
+ if (initMap.transitionToComplete()) initMapCompleted(l);
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -808,10 +806,11 @@ void Cluster::updateInRetracted() {
checkUpdateIn(l);
}
-void Cluster::checkUpdateIn(Lock&) {
+void Cluster::checkUpdateIn(Lock& l) {
if (state != UPDATEE) return; // Wait till we reach the stall point.
if (updatedMap) { // We're up to date
map = *updatedMap;
+ memberUpdate(l);
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), self);
state = CATCHUP;
discarding = false; // ok to set, we're stalled for update.
diff --git a/java/testkit/bin/qpid-python-testkit b/java/testkit/bin/qpid-python-testkit
index e1696ff722..2c1d015281 100755
--- a/java/testkit/bin/qpid-python-testkit
+++ b/java/testkit/bin/qpid-python-testkit
@@ -25,10 +25,6 @@
. ./setenv.sh
export PYTHONPATH=../:$PYTHONPATH
-
-if [ -d $OUTDIR ]; then
- rm -rf $OUTDIR
-fi
-
-$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit
+rm -rf $OUTDIR
+$PYTHON_DIR/qpid-python-test -DOUTDIR=$OUTDIR -m testkit "$@"
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index c3145c06ea..9fa79a220b 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -89,6 +89,7 @@ class Popen(popen2.Popen3):
self.stdout = ExceptionWrapper(self.fromchild, msg)
self.stderr = ExceptionWrapper(self.childerr, msg)
self.dump(self.cmd_str(), "cmd")
+ log.debug("Started process %s" % self.pname)
def dump(self, str, ext):
name = "%s.%s" % (self.pname, ext)
@@ -107,7 +108,7 @@ class Popen(popen2.Popen3):
try:
self.kill()
except:
- self.unexpected("Exit code %d" % self.wait())
+ self.unexpected("expected running, exit code %d" % self.wait())
else:
# Give the process some time to exit.
delay = 0.1
@@ -393,13 +394,11 @@ class NumberedSender(Thread):
self.condition.release()
def stop(self):
- log.debug("NumberedSender.stop")
self.condition.acquire()
self.stopped = True
self.condition.notify()
self.condition.release()
self.join()
- log.debug("NumberedSender.stop - joined")
if self.error: raise self.error
class NumberedReceiver(Thread):
@@ -437,18 +436,14 @@ class NumberedReceiver(Thread):
if self.sender:
self.sender.notify_received(self.received)
except Exception, e:
- log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02:
self.error = RethrownException(e, self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
- log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02:
self.lock.acquire()
- log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received))
self.stopat = count
self.lock.release()
self.join()
- log.debug("NumberedReceiver.stop - joined")
if self.error: raise self.error
class ErrorGenerator(StoppableThread):