diff options
author | Alan Conway <aconway@apache.org> | 2012-07-26 20:10:29 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2012-07-26 20:10:29 +0000 |
commit | f5a4940ed0d4b54edac1bb768ec7ac0595118d0c (patch) | |
tree | 34ea7305d9633b19ada88222bb9ec4c19a4405fd | |
parent | 828e28e9e58028b04ddaa97bb57c9d5b5c7a4f73 (diff) | |
download | qpid-python-f5a4940ed0d4b54edac1bb768ec7ac0595118d0c.tar.gz |
QPID-4159: HA missing messages in failover test.
Fix test_failover_send_receive showing missing messages. With this fix,
ran with -DDURATION=2 overnight with no failures.
- Primary, RemoteBackup: Only report "ready" once per remote backup.
- HaBroker: Put membership updates under mutex.
- ReplicatingSubscription: Check for backup missing messages at the front.
- ha_tests.py: Added assertion to test_priority_ring, verify primary queue as expected.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1366180 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.cpp | 39 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/HaBroker.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/RemoteBackup.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 11 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 6 |
7 files changed, 40 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 4cddbcd657..7b5d70bab4 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -290,8 +290,8 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) { setLinkProperties(l); } -void HaBroker::membershipUpdated(const Variant::List& brokers) { - // No lock, these are thread-safe. +void HaBroker::membershipUpdated(Mutex::ScopedLock&) { + Variant::List brokers = membership.asList(); mgmtObject->set_members(brokers); broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers)); } @@ -304,37 +304,28 @@ void HaBroker::setMembership(const Variant::List& brokers) { // Update my status to what the primary thinks. if (membership.get(systemId, info) && status != info.getStatus()) setStatus(info.getStatus(), l); - membershipUpdated(brokers); + membershipUpdated(l); } void HaBroker::resetMembership(const BrokerInfo& b) { - Variant::List members; - { - Mutex::ScopedLock l(lock); - membership.reset(b); - members = membership.asList(); - } - membershipUpdated(members); + Mutex::ScopedLock l(lock); + membership.reset(b); + QPID_LOG(debug, logPrefix << "Membership reset to: " << membership); + membershipUpdated(l); } void HaBroker::addBroker(const BrokerInfo& b) { - Variant::List members; - { - Mutex::ScopedLock l(lock); - membership.add(b); - members = membership.asList(); - } - membershipUpdated(members); + Mutex::ScopedLock l(lock); + membership.add(b); + QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership); + membershipUpdated(l); } void HaBroker::removeBroker(const Uuid& id) { - Variant::List members; - { - Mutex::ScopedLock l(lock); - membership.remove(id); - members = membership.asList(); - } - membershipUpdated(members); + Mutex::ScopedLock l(lock); + membership.remove(id); + QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership); + membershipUpdated(l); } void HaBroker::setLinkProperties(Mutex::ScopedLock&) { diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 7e47cadf7e..0ffc152097 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -111,7 +111,7 @@ class HaBroker : public management::Manageable std::vector<Url> getKnownBrokers() const; - void membershipUpdated(const types::Variant::List&); + void membershipUpdated(sys::Mutex::ScopedLock&); std::string logPrefix; broker::Broker& broker; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index fecb7a1205..951320d7fb 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -127,7 +127,7 @@ void Primary::checkReady(Mutex::ScopedLock&) { } void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) { - if (i != backups.end() && i->second->isReady()) { + if (i != backups.end() && i->second->reportReady()) { BrokerInfo info = i->second->getBrokerInfo(); info.setStatus(READY); QPID_LOG(info, "Expected backup is ready: " << info); diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp index 95b9f748b4..a5693fd14e 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp @@ -33,7 +33,7 @@ using boost::bind; RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) : logPrefix("Primary remote backup "+info.getLogId()+": "), - brokerInfo(info), replicationTest(rt), connected(con) + brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false) {} void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards) @@ -109,4 +109,12 @@ void RemoteBackup::queueDestroy(const QueuePtr& q) { } } +bool RemoteBackup::reportReady() { + if (!reportedReady && isReady()) { + reportedReady = true; + return true; + } + return false; +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h index 99e5a24494..8ee776e90b 100644 --- a/qpid/cpp/src/qpid/ha/RemoteBackup.h +++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h @@ -83,6 +83,9 @@ class RemoteBackup /**@return true when all initial queues for this backup are ready. */ bool isReady(); + /**@return true if isReady() and this is the first call to reportReady */ + bool reportReady(); + /**Cancel all queue guards, called if we are timed out. */ void cancel(); @@ -100,6 +103,7 @@ class RemoteBackup GuardMap guards; QueueSet initialQueues; bool connected; + bool reportedReady; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 0c4e61ba6d..08d8877e77 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -184,16 +184,17 @@ ReplicatingSubscription::ReplicatingSubscription( if (!guard) guard.reset(new QueueGuard(*queue, info)); guard->attach(*this); - QueueRange backup(arguments); // The remote backup state. - QueueRange primary(guard->getRange()); // The local state at the time the guard was set. + QueueRange backup(arguments); // Remote backup range. + QueueRange primary(guard->getRange()); // Unguarded range when the guard was set. backupPosition = backup.back; // Sync backup and primary queues, don't send messages already on the backup - if (backup.back < primary.front || backup.front > primary.back - || primary.empty() || backup.empty()) + if (backup.front > primary.front || // Missing messages at front + backup.back < primary.front || // No overlap + primary.empty() || backup.empty()) // Empty { - // No overlap - erase backup and start from the beginning + // No useful overlap - erase backup and start from the beginning if (!backup.empty()) dequeued(backup.front, backup.back); position = primary.front-1; } diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index a841423121..cf2ab3508f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -588,8 +588,10 @@ class ReplicationTests(BrokerTest): # correct result, the uncommented one is for the actualy buggy # result. See https://issues.apache.org/jira/browse/QPID-3866 # - # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) - backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority) + # expect = sorted(priorities,reverse=True)[0:5] + expect = [9,9,9,9,2] + primary.assert_browse("q", expect, transform=lambda m: m.priority) + backup.assert_browse_backup("q", expect, transform=lambda m: m.priority) def test_backup_acquired(self): """Verify that acquired messages are backed up, for all queue types.""" |