summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-07-26 20:10:29 +0000
committerAlan Conway <aconway@apache.org>2012-07-26 20:10:29 +0000
commitf5a4940ed0d4b54edac1bb768ec7ac0595118d0c (patch)
tree34ea7305d9633b19ada88222bb9ec4c19a4405fd
parent828e28e9e58028b04ddaa97bb57c9d5b5c7a4f73 (diff)
downloadqpid-python-f5a4940ed0d4b54edac1bb768ec7ac0595118d0c.tar.gz
QPID-4159: HA missing messages in failover test.
Fix test_failover_send_receive showing missing messages. With this fix, ran with -DDURATION=2 overnight with no failures. - Primary, RemoteBackup: Only report "ready" once per remote backup. - HaBroker: Put membership updates under mutex. - ReplicatingSubscription: Check for backup missing messages at the front. - ha_tests.py: Added assertion to test_priority_ring, verify primary queue as expected. git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/0.18@1366180 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp39
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/RemoteBackup.h4
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp11
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py6
7 files changed, 40 insertions, 34 deletions
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 4cddbcd657..7b5d70bab4 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -290,8 +290,8 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) {
setLinkProperties(l);
}
-void HaBroker::membershipUpdated(const Variant::List& brokers) {
- // No lock, these are thread-safe.
+void HaBroker::membershipUpdated(Mutex::ScopedLock&) {
+ Variant::List brokers = membership.asList();
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
}
@@ -304,37 +304,28 @@ void HaBroker::setMembership(const Variant::List& brokers) {
// Update my status to what the primary thinks.
if (membership.get(systemId, info) && status != info.getStatus())
setStatus(info.getStatus(), l);
- membershipUpdated(brokers);
+ membershipUpdated(l);
}
void HaBroker::resetMembership(const BrokerInfo& b) {
- Variant::List members;
- {
- Mutex::ScopedLock l(lock);
- membership.reset(b);
- members = membership.asList();
- }
- membershipUpdated(members);
+ Mutex::ScopedLock l(lock);
+ membership.reset(b);
+ QPID_LOG(debug, logPrefix << "Membership reset to: " << membership);
+ membershipUpdated(l);
}
void HaBroker::addBroker(const BrokerInfo& b) {
- Variant::List members;
- {
- Mutex::ScopedLock l(lock);
- membership.add(b);
- members = membership.asList();
- }
- membershipUpdated(members);
+ Mutex::ScopedLock l(lock);
+ membership.add(b);
+ QPID_LOG(debug, logPrefix << "Membership add: " << b << " now: " << membership);
+ membershipUpdated(l);
}
void HaBroker::removeBroker(const Uuid& id) {
- Variant::List members;
- {
- Mutex::ScopedLock l(lock);
- membership.remove(id);
- members = membership.asList();
- }
- membershipUpdated(members);
+ Mutex::ScopedLock l(lock);
+ membership.remove(id);
+ QPID_LOG(debug, logPrefix << "Membership remove: " << id << " now: " << membership);
+ membershipUpdated(l);
}
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 7e47cadf7e..0ffc152097 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -111,7 +111,7 @@ class HaBroker : public management::Manageable
std::vector<Url> getKnownBrokers() const;
- void membershipUpdated(const types::Variant::List&);
+ void membershipUpdated(sys::Mutex::ScopedLock&);
std::string logPrefix;
broker::Broker& broker;
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index fecb7a1205..951320d7fb 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -127,7 +127,7 @@ void Primary::checkReady(Mutex::ScopedLock&) {
}
void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
- if (i != backups.end() && i->second->isReady()) {
+ if (i != backups.end() && i->second->reportReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
QPID_LOG(info, "Expected backup is ready: " << info);
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
index 95b9f748b4..a5693fd14e 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
@@ -33,7 +33,7 @@ using boost::bind;
RemoteBackup::RemoteBackup(const BrokerInfo& info, ReplicationTest rt, bool con) :
logPrefix("Primary remote backup "+info.getLogId()+": "),
- brokerInfo(info), replicationTest(rt), connected(con)
+ brokerInfo(info), replicationTest(rt), connected(con), reportedReady(false)
{}
void RemoteBackup::setInitialQueues(broker::QueueRegistry& queues, bool createGuards)
@@ -109,4 +109,12 @@ void RemoteBackup::queueDestroy(const QueuePtr& q) {
}
}
+bool RemoteBackup::reportReady() {
+ if (!reportedReady && isReady()) {
+ reportedReady = true;
+ return true;
+ }
+ return false;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/RemoteBackup.h b/qpid/cpp/src/qpid/ha/RemoteBackup.h
index 99e5a24494..8ee776e90b 100644
--- a/qpid/cpp/src/qpid/ha/RemoteBackup.h
+++ b/qpid/cpp/src/qpid/ha/RemoteBackup.h
@@ -83,6 +83,9 @@ class RemoteBackup
/**@return true when all initial queues for this backup are ready. */
bool isReady();
+ /**@return true if isReady() and this is the first call to reportReady */
+ bool reportReady();
+
/**Cancel all queue guards, called if we are timed out. */
void cancel();
@@ -100,6 +103,7 @@ class RemoteBackup
GuardMap guards;
QueueSet initialQueues;
bool connected;
+ bool reportedReady;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 0c4e61ba6d..08d8877e77 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -184,16 +184,17 @@ ReplicatingSubscription::ReplicatingSubscription(
if (!guard) guard.reset(new QueueGuard(*queue, info));
guard->attach(*this);
- QueueRange backup(arguments); // The remote backup state.
- QueueRange primary(guard->getRange()); // The local state at the time the guard was set.
+ QueueRange backup(arguments); // Remote backup range.
+ QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
backupPosition = backup.back;
// Sync backup and primary queues, don't send messages already on the backup
- if (backup.back < primary.front || backup.front > primary.back
- || primary.empty() || backup.empty())
+ if (backup.front > primary.front || // Missing messages at front
+ backup.back < primary.front || // No overlap
+ primary.empty() || backup.empty()) // Empty
{
- // No overlap - erase backup and start from the beginning
+ // No useful overlap - erase backup and start from the beginning
if (!backup.empty()) dequeued(backup.front, backup.back);
position = primary.front-1;
}
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index a841423121..cf2ab3508f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -588,8 +588,10 @@ class ReplicationTests(BrokerTest):
# correct result, the uncommented one is for the actualy buggy
# result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # expect = sorted(priorities,reverse=True)[0:5]
+ expect = [9,9,9,9,2]
+ primary.assert_browse("q", expect, transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
def test_backup_acquired(self):
"""Verify that acquired messages are backed up, for all queue types."""