diff options
author | Alan Conway <aconway@apache.org> | 2012-09-14 18:48:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-09-14 18:48:09 +0000 |
commit | 928d9ace1dde955a4a673d79edfe71a2bd35b90d (patch) | |
tree | df07128839da20e607a37c4d359c2875cc6634ee /cpp/src | |
parent | 4a7350e3cdb6702c9128f1bb38347cea874d6eb7 (diff) | |
download | qpid-python-928d9ace1dde955a4a673d79edfe71a2bd35b90d.tar.gz |
QPID-4223: HA Completion isn't sent when queue that has acquired but unacknowledged messages is deleted
- Extended ha_test.py test_failover_send_receive to kill backup as well as primary
- QueueRegistry::destroy was not calling observer.
- Primary removes disconnected brokers backups and expectedBackups
- Primary calls checkReady in all cases where broker is removed from expectedBackups
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1384882 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/ha/HaBroker.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Primary.cpp | 21 | ||||
-rw-r--r-- | cpp/src/qpid/ha/Primary.h | 7 | ||||
-rw-r--r-- | cpp/src/tests/brokertest.py | 4 | ||||
-rwxr-xr-x | cpp/src/tests/ha_tests.py | 52 |
5 files changed, 61 insertions, 32 deletions
diff --git a/cpp/src/qpid/ha/HaBroker.cpp b/cpp/src/qpid/ha/HaBroker.cpp index 88627307f2..de9880b6db 100644 --- a/cpp/src/qpid/ha/HaBroker.cpp +++ b/cpp/src/qpid/ha/HaBroker.cpp @@ -342,9 +342,12 @@ void HaBroker::addBroker(const BrokerInfo& b) { void HaBroker::removeBroker(const Uuid& id) { Mutex::ScopedLock l(lock); - membership.remove(id); - QPID_LOG(debug, logPrefix << "Membership remove: " << id); - membershipUpdated(l); + BrokerInfo info; + if (membership.get(id, info)) { + membership.remove(id); + QPID_LOG(debug, logPrefix << "Membership remove: " << info); + membershipUpdated(l); + } } void HaBroker::setLinkProperties(Mutex::ScopedLock&) { diff --git a/cpp/src/qpid/ha/Primary.cpp b/cpp/src/qpid/ha/Primary.cpp index f36d409b3f..453b0b1ab8 100644 --- a/cpp/src/qpid/ha/Primary.cpp +++ b/cpp/src/qpid/ha/Primary.cpp @@ -225,19 +225,20 @@ void Primary::opened(broker::Connection& connection) { } void Primary::closed(broker::Connection& connection) { - // NOTE: It is possible for a backup connection to be rejected while we are - // a backup, but closed() is called after we have become primary. - // - // For this reason we do not remove from the backups map here, the backups - // map holds all the backups we know about whether connected or not. - // - Mutex::ScopedLock l(lock); BrokerInfo info; if (ha::ConnectionObserver::getBrokerInfo(connection, info)) { - QPID_LOG(debug, logPrefix << "Backup disconnected: " << info); - haBroker.removeBroker(info.getSystemId()); + Mutex::ScopedLock l(lock); BackupMap::iterator i = backups.find(info.getSystemId()); - if (i != backups.end()) i->second->setConnected(false); + // NOTE: It is possible for a backup connection to be rejected while we + // are a backup, but closed() is called after we have become primary. + // Checking isConnected() lets us ignore such spurious closes. + if (i != backups.end() && i->second->isConnected()) { + QPID_LOG(info, logPrefix << "Backup disconnected: " << info); + haBroker.removeBroker(info.getSystemId()); + expectedBackups.erase(i->second); + backups.erase(i); + checkReady(l); + } } } diff --git a/cpp/src/qpid/ha/Primary.h b/cpp/src/qpid/ha/Primary.h index 26883f4416..22b231ed72 100644 --- a/cpp/src/qpid/ha/Primary.h +++ b/cpp/src/qpid/ha/Primary.h @@ -95,13 +95,12 @@ class Primary bool active; /** * Set of expected backups that must be ready before we declare ourselves - * active + * active. These are backups that were known before the primary crashed. As + * new primary we expect them to re-connect. */ BackupSet expectedBackups; /** - * Map of all the remote backups we know about: any expected backups plus - * all actual backups that have connected. We do not remove entries when a - * backup disconnects. @see Primary::closed() + * Map of all the expected backups plus all connected backups. */ BackupMap backups; boost::shared_ptr<broker::ConnectionObserver> connectionObserver; diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index aea4460e5a..dd09e8aa27 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -565,7 +565,7 @@ class NumberedSender(Thread): def __init__(self, broker, max_depth=None, queue="test-queue", connection_options=Cluster.CONNECTION_OPTIONS, - failover_updates=True, url=None): + failover_updates=True, url=None, args=[]): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. @@ -576,7 +576,7 @@ class NumberedSender(Thread): "--address", "%s;{create:always}"%queue, "--connection-options", "{%s}"%(connection_options), "--content-stdin" - ] + ] + args if failover_updates: cmd += ["--failover-updates"] self.sender = broker.test.popen( cmd, expect=EXPECT_RUNNING, stdin=PIPE) diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index cf5beb467c..92442b465a 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -18,7 +18,7 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random import traceback from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from qpid.datatypes import uuid4 @@ -114,7 +114,8 @@ class HaBroker(Broker): self._status = self.ha_status() return self._status == status; except ConnectionError: return False - assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status) + assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%( + self, status, self._status) # FIXME aconway 2012-05-01: do direct python call to qpid-config code. def qpid_config(self, args): @@ -887,29 +888,54 @@ class LongTests(BrokerTest): # Kill and restart brokers in a cycle: endtime = time.time() + self.duration() i = 0 + primary = 0 try: while time.time() < endtime or i < 3: # At least 3 iterations + # Precondition: All 3 brokers running, + # primary = index of promoted primary + # one or two backups are running, for s in senders: s.sender.assert_running() for r in receivers: r.receiver.assert_running() - checkpoint = [ r.received for r in receivers ] - # Don't kill primary till it is active and the next - # backup is ready, otherwise we can lose messages. - brokers[i%3].wait_status("active") - brokers[(i+1)%3].wait_status("ready") - brokers.bounce(i%3) + checkpoint = [ r.received+100 for r in receivers ] + dead = None + victim = random.randint(0,2) + if victim == primary: + # Don't kill primary till it is active and the next + # backup is ready, otherwise we can lose messages. + brokers[victim].wait_status("active") + next = (victim+1)%3 + brokers[next].wait_status("ready") + brokers.bounce(victim) # Next one is promoted + primary = next + else: + brokers.kill(victim, False) + dead = victim + + # At this point the primary is running with 1 or 2 backups + # Make sure we are not stalled + map(wait_passed, receivers, checkpoint) + # Run another checkpoint to ensure things work in this configuration + checkpoint = [ r.received+100 for r in receivers ] + map(wait_passed, receivers, checkpoint) + + if dead is not None: + brokers.restart(dead) # Restart backup + brokers[dead].ready(client_properties={"qpid.ha-admin":1}) + dead = None i += 1 - map(wait_passed, receivers, checkpoint) # Wait for all receivers except: traceback.print_exc() raise finally: for s in senders: s.stop() for r in receivers: r.stop() - dead = [] + unexpected_dead = [] for i in xrange(3): - if not brokers[i].is_running(): dead.append(i) - brokers.kill(i, False) - if dead: raise Exception("Brokers not running: %s"%dead) + if not brokers[i].is_running() and i != dead: + unexpected_dead.append(i) + if brokers[i].is_running(): brokers.kill(i, False) + if unexpected_dead: + raise Exception("Brokers not running: %s"%unexpected_dead) class RecoveryTests(BrokerTest): """Tests for recovery after a failure.""" |