summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-06-22 19:28:42 +0000
committerAlan Conway <aconway@apache.org>2012-06-22 19:28:42 +0000
commitba8f39a8b818af7773bbdda6a021a4bb8aa4459c (patch)
treeddd798b5020a870c8097d5a88b6f45e75344c168
parent3637c9d02c28dad758662f7a3c863c57caa4f1f3 (diff)
downloadqpid-python-ba8f39a8b818af7773bbdda6a021a4bb8aa4459c.tar.gz
NO-JIRA: Simplify locking and remove member-update callback in HA code.
Get rid of the separate Membership lock and put HaBroker in control of membership changes. Removes a potential deadlock which could explain some observed failures in long-running failover tests. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1353001 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp52
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h9
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.cpp31
-rw-r--r--qpid/cpp/src/qpid/ha/Membership.h12
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp6
7 files changed, 60 insertions, 55 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 90c615aaf5..4b9cef05bc 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -83,6 +83,8 @@ void Backup::initialize(const Url& brokers) {
false, // durable
settings.mechanism, settings.username, settings.password,
false); // amq.failover
+
+ sys::Mutex::ScopedLock l(lock);
link = result.first;
link->setUrl(url);
replicator.reset(new BrokerReplicator(haBroker, link));
@@ -93,7 +95,6 @@ void Backup::initialize(const Url& brokers) {
Backup::~Backup() {
if (link) link->close();
if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
- replicator.reset();
}
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 8f09c5db8f..c3521162a1 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -546,7 +546,7 @@ void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
if (mine != primary)
throw Exception(QPID_MSG("Replicate default on backup (" << mine
<< ") does not match primary (" << primary << ")"));
- haBroker.getMembership().assign(values[MEMBERS].asList());
+ haBroker.setMembership(values[MEMBERS].asList());
} catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Invalid HA Broker response: " << e.what());
haBroker.shutdown();
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 4752d51190..395a3ce6fa 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -66,7 +66,7 @@ HaBroker::HaBroker(broker::Broker& b, const Settings& s)
brokerInfo(broker.getSystem()->getNodeName(),
// TODO aconway 2012-05-24: other transports?
broker.getPort(broker::Broker::TCP_TRANSPORT), systemId),
- membership(systemId, boost::bind(&HaBroker::membershipUpdate, this, _1)),
+ membership(systemId),
replicationTest(s.replicateDefault.get())
{
// Set up the management object.
@@ -271,17 +271,53 @@ void HaBroker::statusChanged(Mutex::ScopedLock& l) {
setLinkProperties(l);
}
-void HaBroker::membershipUpdate(const Variant::List& brokers) {
- // FIXME aconway 2012-06-12: nasty callback in callback, clean up.
- BrokerInfo info;
- if (getStatus() == CATCHUP && getMembership().get(systemId, info) && info.getStatus() == READY)
- setStatus(READY);
-
- // No lock, only calls thread-safe objects.
+void HaBroker::membershipUpdated(const Variant::List& brokers) {
+ // No lock, these are thread-safe.
mgmtObject->set_members(brokers);
broker.getManagementAgent()->raiseEvent(_qmf::EventMembersUpdate(brokers));
}
+void HaBroker::setMembership(const Variant::List& brokers) {
+ Mutex::ScopedLock l(lock);
+ membership.assign(brokers);
+ BrokerInfo info;
+ // Check if my own status has been updated to READY
+ if (getStatus() == CATCHUP &&
+ membership.get(systemId, info) && info.getStatus() == READY)
+ setStatus(READY, l);
+ membershipUpdated(brokers);
+}
+
+void HaBroker::resetMembership(const BrokerInfo& b) {
+ Variant::List members;
+ {
+ Mutex::ScopedLock l(lock);
+ membership.reset(b);
+ members = membership.asList();
+ }
+ membershipUpdated(members);
+}
+
+void HaBroker::addBroker(const BrokerInfo& b) {
+ Variant::List members;
+ {
+ Mutex::ScopedLock l(lock);
+ membership.add(b);
+ members = membership.asList();
+ }
+ membershipUpdated(members);
+}
+
+void HaBroker::removeBroker(const Uuid& id) {
+ Variant::List members;
+ {
+ Mutex::ScopedLock l(lock);
+ membership.remove(id);
+ members = membership.asList();
+ }
+ membershipUpdated(members);
+}
+
void HaBroker::setLinkProperties(Mutex::ScopedLock&) {
framing::FieldTable linkProperties = broker.getLinkClientProperties();
if (isBackup(status)) {
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 7ba1599c09..28fe3c755e 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -87,8 +87,11 @@ class HaBroker : public management::Manageable
boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
const BrokerInfo& getBrokerInfo() const { return brokerInfo; }
- Membership& getMembership() { return membership; }
- void membershipUpdate(const types::Variant::List&);
+
+ void setMembership(const types::Variant::List&); // Set membership from list.
+ void resetMembership(const BrokerInfo& b); // Reset to contain just one member.
+ void addBroker(const BrokerInfo& b); // Add a broker to the membership.
+ void removeBroker(const types::Uuid& id); // Remove a broker from membership.
private:
void setClientUrl(const Url&, sys::Mutex::ScopedLock&);
@@ -105,6 +108,8 @@ class HaBroker : public management::Manageable
std::vector<Url> getKnownBrokers() const;
+ void membershipUpdated(const types::Variant::List&);
+
std::string logPrefix;
broker::Broker& broker;
types::Uuid systemId;
diff --git a/qpid/cpp/src/qpid/ha/Membership.cpp b/qpid/cpp/src/qpid/ha/Membership.cpp
index 34c1ccb657..b85c4d4164 100644
--- a/qpid/cpp/src/qpid/ha/Membership.cpp
+++ b/qpid/cpp/src/qpid/ha/Membership.cpp
@@ -28,70 +28,42 @@ namespace ha {
void Membership::reset(const BrokerInfo& b) {
- sys::Mutex::ScopedLock l(lock);
brokers.clear();
brokers[b.getSystemId()] = b;
- update(l);
}
void Membership::add(const BrokerInfo& b) {
- sys::Mutex::ScopedLock l(lock);
brokers[b.getSystemId()] = b;
- update(l);
}
void Membership::remove(const types::Uuid& id) {
- sys::Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i != brokers.end()) {
brokers.erase(i);
- update(l);
- }
+ }
}
bool Membership::contains(const types::Uuid& id) {
- sys::Mutex::ScopedLock l(lock);
return brokers.find(id) != brokers.end();
}
void Membership::assign(const types::Variant::List& list) {
- sys::Mutex::ScopedLock l(lock);
brokers.clear();
for (types::Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
BrokerInfo b(i->asMap());
brokers[b.getSystemId()] = b;
}
- update(l);
}
types::Variant::List Membership::asList() const {
- sys::Mutex::ScopedLock l(lock);
- return asList(l);
-}
-
-types::Variant::List Membership::asList(sys::Mutex::ScopedLock&) const {
types::Variant::List list;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
list.push_back(i->second.asMap());
return list;
}
-void Membership::update(sys::Mutex::ScopedLock& l) {
- if (updateCallback) {
- types::Variant::List list = asList(l);
- sys::Mutex::ScopedUnlock u(lock);
- updateCallback(list);
- }
- QPID_LOG(debug, " HA: Membership update: " << brokers);
-}
-
BrokerInfo::Set Membership::otherBackups() const {
- sys::Mutex::ScopedLock l(lock);
- return otherBackups(l);
-}
-
-BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const {
BrokerInfo::Set result;
for (BrokerInfo::Map::const_iterator i = brokers.begin(); i != brokers.end(); ++i)
if (isBackup(i->second.getStatus()) && i->second.getSystemId() != self)
@@ -100,7 +72,6 @@ BrokerInfo::Set Membership::otherBackups(sys::Mutex::ScopedLock&) const {
}
bool Membership::get(const types::Uuid& id, BrokerInfo& result) {
- sys::Mutex::ScopedLock l(lock);
BrokerInfo::Map::iterator i = brokers.find(id);
if (i == brokers.end()) return false;
result = i->second;
diff --git a/qpid/cpp/src/qpid/ha/Membership.h b/qpid/cpp/src/qpid/ha/Membership.h
index 623d77970c..41245f0e06 100644
--- a/qpid/cpp/src/qpid/ha/Membership.h
+++ b/qpid/cpp/src/qpid/ha/Membership.h
@@ -36,14 +36,12 @@ namespace ha {
/**
* Keep track of the brokers in the membership.
- * THREAD SAFE: updated in arbitrary connection threads.
+ * THREAD UNSAFE: caller must serialize
*/
class Membership
{
public:
- typedef boost::function<void (const types::Variant::List&) > UpdateCallback;
- Membership(const types::Uuid& self_, UpdateCallback updateFn)
- : self(self_), updateCallback(updateFn) {}
+ Membership(const types::Uuid& self_) : self(self_) {}
void reset(const BrokerInfo& b); ///< Reset to contain just one member.
void add(const BrokerInfo& b);
@@ -58,14 +56,8 @@ class Membership
bool get(const types::Uuid& id, BrokerInfo& result);
private:
- BrokerInfo::Set otherBackups(sys::Mutex::ScopedLock&) const;
- types::Variant::List asList(sys::Mutex::ScopedLock&) const;
- void update(sys::Mutex::ScopedLock&);
-
- mutable sys::Mutex lock;
types::Uuid self;
BrokerInfo::Map brokers;
- UpdateCallback updateCallback;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 1fa51b6f68..b315142e70 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -114,7 +114,7 @@ void Primary::checkReady(BackupMap::iterator i, Mutex::ScopedLock& l) {
if (i != backups.end() && i->second->isReady()) {
BrokerInfo info = i->second->getBrokerInfo();
info.setStatus(READY);
- haBroker.getMembership().add(info);
+ haBroker.addBroker(info);
initialBackups.erase(i->second);
checkReady(l);
}
@@ -160,7 +160,7 @@ void Primary::opened(broker::Connection& connection) {
else {
QPID_LOG(debug, logPrefix << "Known backup connected: " << info);
}
- haBroker.getMembership().add(info);
+ haBroker.addBroker(info);
}
}
@@ -168,7 +168,7 @@ void Primary::closed(broker::Connection& connection) {
Mutex::ScopedLock l(lock);
BrokerInfo info;
if (ha::ConnectionObserver::getBrokerInfo(connection, info)) {
- haBroker.getMembership().remove(info.getSystemId());
+ haBroker.removeBroker(info.getSystemId());
QPID_LOG(debug, logPrefix << "Backup disconnected: " << info);
}
// NOTE: we do not modify backups here, we only add to the known backups set