summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-02-27 19:34:47 +0000
committerAlan Conway <aconway@apache.org>2009-02-27 19:34:47 +0000
commit7c79adf16acfeb31cd2b90699c456698237a2e82 (patch)
tree743738de6f0d5448ebc8280d45976511de8c9ad4 /cpp/src
parent739e6e1c66341b6b8dbda6776ada8179765ed347 (diff)
downloadqpid-python-7c79adf16acfeb31cd2b90699c456698237a2e82.tar.gz
cluster: apply membership updates while in CATCHUP mode.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@748651 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp17
-rw-r--r--cpp/src/qpid/cluster/Cluster.h4
2 files changed, 11 insertions, 10 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 6b4cd0256c..312d1e90e3 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -234,9 +234,9 @@ void Cluster::deliveredEvent(const Event& e) {
// Check for deliver close here so we can erase the
// connection decoder safely in this thread.
if (frame.getMethod()->isA<ClusterConnectionDeliverCloseBody>())
- decoder.erase(e.getConnectionId());
+ decoder.erase(e.getConnectionId());
deliverFrameQueue.push(EventFrame(e, frame));
- }
+ }
}
else if (e.getType() == DATA)
decoder.decode(e, e.getData());
@@ -345,7 +345,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
broker.getLinks().setPassive(true);
}
}
- else if (state >= READY && memberChange) {
+ else if (state >= CATCHUP && memberChange) {
memberUpdate(l);
elders = ClusterMap::intersection(elders, map.getAlive());
if (elders.empty()) {
@@ -357,7 +357,7 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
bool Cluster::isLeader() const { return elders.empty(); }
-void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
+void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
@@ -382,7 +382,7 @@ void Cluster::brokerShutdown() {
void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
map.updateRequest(id, url);
- tryMakeOffer(id, l);
+ makeOffer(id, l);
}
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
@@ -406,7 +406,7 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
else { // Another offer was first.
setReady(l);
QPID_LOG(info, *this << " cancelled update offer to " << updatee);
- tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
+ makeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
else if (updatee == myId && url) {
@@ -446,7 +446,6 @@ void Cluster::updateInDone(const ClusterMap& m, uint64_t fid) {
}
void Cluster::checkUpdateIn(Lock& ) {
- if (state == LEFT) return;
if (state == UPDATEE && updatedMap) {
map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
@@ -467,7 +466,7 @@ void Cluster::updateOutDone(Lock& l) {
state = READY;
mcast.release();
deliverFrameQueue.start();
- tryMakeOffer(map.firstJoiner(), l); // Try another offer
+ makeOffer(map.firstJoiner(), l); // Try another offer
}
void Cluster::updateOutError(const std::exception& e) {
@@ -522,7 +521,7 @@ void Cluster::memberUpdate(Lock& l) {
size_t size = urls.size();
failoverExchange->setUrls(urls);
- if (size == 1 && lastSize > 1 && state >= READY) {
+ if (size == 1 && lastSize > 1 && state >= CATCHUP) {
QPID_LOG(info, *this << " last broker standing, update queue policies");
lastBroker = true;
broker.getQueues().updateQueueClusterState(true);
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index e3e259dac4..ea472a9ecf 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -119,7 +119,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
std::vector<Url> getUrls(Lock&) const;
// Make an offer if we can - called in deliver thread.
- void tryMakeOffer(const MemberId&, Lock&);
+ void makeOffer(const MemberId&, Lock&);
// Called in main thread in ~Broker.
void brokerShutdown();
@@ -133,6 +133,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void messageExpired(const MemberId&, uint64_t, Lock& l);
void shutdown(const MemberId&, Lock&);
+
+ // Handlers for pollable queues.
void deliveredEvent(const Event&);
void deliveredFrame(const EventFrame&);