summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp118
1 files changed, 71 insertions, 47 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index d6312e7b93..9756ad0a62 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -162,13 +162,14 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
- void initialStatus(bool active, bool persistent, const framing::FieldTable& props) {
- cluster.initialStatus(member, active, persistent, props);
+ void initialStatus(bool active, bool persistent, const Uuid& clusterId,
+ uint32_t version, const std::string& url) {
+ cluster.initialStatus(member, active, persistent, clusterId, version, url, l);
}
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& current) { cluster.configChange(member, current, l); }
- void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) {
- cluster.updateOffer(member, updatee, id, version, l);
+ void updateOffer(uint64_t updatee) {
+ cluster.updateOffer(member, updatee, l);
}
void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); }
void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); }
@@ -190,6 +191,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
name(settings.name),
myUrl(settings.url.empty() ? Url() : Url(settings.url)),
self(cpg.self()),
+ clusterId(true),
expiryPolicy(new ExpiryPolicy(mcast, self, broker.getTimer())),
mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
@@ -206,6 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
+ initMap(self),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -265,8 +268,8 @@ void Cluster::addLocalConnection(const boost::intrusive_ptr<Connection>& c) {
// Called in connection thread to insert an updated shadow connection.
void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) {
QPID_LOG(info, *this << " new shadow connection " << c->getId());
- // Safe to use connections here because we're pre-catchup, either
- // discarding or stalled, so deliveredFrame is not processing any
+ // Safe to use connections here because we're pre-catchup, stalled
+ // and discarding, so deliveredFrame is not processing any
// connection events.
assert(discarding);
pair<ConnectionMap::iterator, bool> ib
@@ -522,7 +525,8 @@ void Cluster::configChange (
const cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- if (state == INIT) { // First config change.
+ if (state == INIT) {
+ // FIXME aconway 2009-11-16: persistent restart
// Recover only if we are first in cluster.
broker.setRecovery(nCurrent == 1);
initialized = true;
@@ -545,39 +549,55 @@ void Cluster::setReady(Lock&) {
broker.getQueueEvents().enable();
}
-void Cluster::configChange(const MemberId&, const std::string& current, Lock& l) {
- bool memberChange = map.configChange(current);
- QPID_LOG(debug, *this << " applied config change: " << map);
+void Cluster::initMapCompleted(Lock& l) {
+ if (state == INIT) {
+ elders = initMap.getElders();
+ if (!elders.empty()) { // I'm not the elder, I don't handle links & replication.
+ broker.getLinks().setPassive(true);
+ broker.getQueueEvents().disable();
+ }
+ setClusterId(initMap.getClusterId(), l);
+ if (initMap.isUpdateNeeded()) { // Joining established cluster.
+ state = JOINER;
+ mcast.mcastControl(
+ ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
+ }
+ else { // I can go ready.
+ QPID_LOG(notice, *this << " ready.");
+ discarding = false;
+ setReady(l);
+ map = ClusterMap(initMap.getMemberUrls());
+ memberUpdate(l);
+ }
+ }
+}
+
+void Cluster::configChange(const MemberId&, const std::string& configStr, Lock& l) {
if (state == LEFT) return;
-
- if (!map.isAlive(self)) { // Final config change.
+
+ MemberSet config = decodeMemberSet(configStr);
+ elders = intersection(elders, config);
+ if (elders.empty() && INIT < state && state < CATCHUP) {
+ QPID_LOG(critical, "Cannot update, all potential updaters left the cluster.");
leave(l);
return;
}
+ bool memberChange = map.configChange(config);
- if (state == INIT) { // First configChange
- if (map.aliveCount() == 1) {
- setClusterId(true, l);
- discarding = false;
- setReady(l);
- map = ClusterMap(self, myUrl, true);
- memberUpdate(l);
- QPID_LOG(notice, *this << " first in cluster");
- }
- else { // Joining established group.
- state = JOINER;
- mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
- elders = map.getAlive();
- elders.erase(self);
- broker.getLinks().setPassive(true);
- broker.getQueueEvents().disable();
- }
- }
- else if (state >= CATCHUP && memberChange) {
+ // Update initital status for new members joining.
+ initMap.configChange(config);
+ if (initMap.isResendNeeded()) {
+ mcast.mcastControl(
+ // FIXME aconway 2009-11-17: persistent restart, set persistence bit.
+ ClusterInitialStatusBody(ProtocolVersion(), state > INIT, false, clusterId,
+ CLUSTER_VERSION, myUrl.str()), self);
+ }
+ if (initMap.transitionToComplete()) initMapCompleted(l);
+
+ if (state >= CATCHUP && memberChange) {
memberUpdate(l);
- elders = ClusterMap::intersection(elders, map.getAlive());
if (elders.empty()) {
- //assume we are oldest, reactive links if necessary
+ // We are the oldest, reactive links if necessary
broker.getLinks().setPassive(false);
}
}
@@ -587,8 +607,7 @@ void Cluster::makeOffer(const MemberId& id, Lock& ) {
if (state == READY && map.isJoiner(id)) {
state = OFFER;
QPID_LOG(info, *this << " send update-offer to " << id);
- mcast.mcastControl(
- ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId, CLUSTER_VERSION), self);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id), self);
}
}
@@ -610,10 +629,23 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l)
makeOffer(id, l);
}
-void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/,
- const framing::FieldTable&) {
- // FIXME aconway 2009-11-12: fill in.
+void Cluster::initialStatus(const MemberId& member, bool active, bool persistent,
+ const framing::Uuid& id, uint32_t version,
+ const std::string& url, Lock& l)
+{
+ if (version != CLUSTER_VERSION) {
+ QPID_LOG(critical, *this << " incompatible cluster versions " <<
+ version << " != " << CLUSTER_VERSION);
+ leave(l);
+ return;
+ }
+ initMap.received(
+ member,
+ ClusterInitialStatusBody(ProtocolVersion(), active, persistent, id, version, url)
+ );
+ if (initMap.transitionToComplete()) initMapCompleted(l);
}
+
void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
if (map.ready(id, Url(url)))
memberUpdate(l);
@@ -623,17 +655,10 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
-void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid,
- uint32_t version, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, Lock& l) {
// NOTE: deliverEventQueue has been stopped at the update offer by
// deliveredEvent in case an update is required.
if (state == LEFT) return;
- if (version != CLUSTER_VERSION) {
- QPID_LOG(critical, *this << " incompatible cluster versions " <<
- version << " != " << CLUSTER_VERSION);
- leave(l);
- return;
- }
MemberId updatee(updateeInt);
boost::optional<Url> url = map.updateOffer(updater, updatee);
if (updater == self) {
@@ -649,7 +674,6 @@ void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uu
}
else if (updatee == self && url) {
assert(state == JOINER);
- setClusterId(uuid, l);
state = UPDATEE;
QPID_LOG(notice, *this << " receiving update from " << updater);
checkUpdateIn(l);