summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/Cluster.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/Cluster.cpp')
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp107
1 files changed, 53 insertions, 54 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f8adb8ee98..0d082fc226 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -18,7 +18,7 @@
#include "Cluster.h"
#include "Connection.h"
-#include "DumpClient.h"
+#include "UpdateClient.h"
#include "FailoverExchange.h"
#include "ClusterQueueHandler.h"
@@ -29,10 +29,10 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterDumpOfferBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -77,10 +77,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
Cluster::Lock& l;
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
- void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); }
+ void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
- void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); }
+ void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -124,7 +124,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
}
Cluster::~Cluster() {
- if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
@@ -205,7 +205,6 @@ void Cluster::deliver(
void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
- QPID_LOG(trace, *this << " PUSH: " << e);
QPID_LATENCY_INIT(e);
deliverEventQueue.push(e);
}
@@ -216,7 +215,7 @@ void Cluster::deliveredEvent(const Event& e) {
Buffer buf(const_cast<char*>(e.getData()), e.getSize());
boost::intrusive_ptr<Connection> connection;
if (e.isConnection()) {
- if (state == NEWBIE) {
+ if (state == JOINER) {
QPID_LOG(trace, *this << " DROP: " << e);
return;
}
@@ -236,11 +235,11 @@ void Cluster::deliveredEvent(const Event& e) {
void Cluster::deliveredFrame(const EventFrame& e) {
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- QPID_LOG(trace, *this << " DLVR: " << e.frame);
if (e.connection) {
e.connection->deliveredFrame(e);
}
else {
+ QPID_LOG(trace, *this << " DLVR: " << e.frame);
Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
ClusterDispatcher dispatch(*this, e.member, l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
@@ -313,9 +312,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
memberUpdate(l);
}
else { // Joining established group.
- state = NEWBIE;
+ state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
ClusterMap::Set members = map.getAlive();
members.erase(myId);
myElders = members;
@@ -336,10 +335,10 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
- if (state == READY && map.isNewbie(id)) {
+ if (state == READY && map.isJoiner(id)) {
state = OFFER;
- QPID_LOG(info, *this << " send dump-offer to " << id);
- mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId);
+ QPID_LOG(info, *this << " send update-offer to " << id);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
}
}
@@ -359,8 +358,8 @@ void Cluster::brokerShutdown() {
delete this;
}
-void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
- map.dumpRequest(id, url);
+void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
+ map.updateRequest(id, url);
tryMakeOffer(id, l);
}
@@ -376,81 +375,81 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
-void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
if (state == LEFT) return;
- MemberId dumpee(dumpeeInt);
- boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
- if (dumper == myId) {
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- dumpStart(dumpee, *url, l);
+ updateStart(updatee, *url, l);
}
else { // Another offer was first.
state = READY;
mcast.release();
- QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
- tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
+ QPID_LOG(info, *this << " cancelled update offer to " << updatee);
+ tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
- else if (dumpee == myId && url) {
- assert(state == NEWBIE);
+ else if (updatee == myId && url) {
+ assert(state == JOINER);
setClusterId(uuid);
- state = DUMPEE;
- QPID_LOG(info, *this << " receiving dump from " << dumper);
+ state = UPDATEE;
+ QPID_LOG(info, *this << " receiving update from " << updater);
deliverEventQueue.stop();
- checkDumpIn(l);
+ checkUpdateIn(l);
}
}
-void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
if (state == LEFT) return;
assert(state == OFFER);
- state = DUMPER;
- QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
+ state = UPDATER;
+ QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
deliverEventQueue.stop();
- if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
- dumpThread = Thread(
- new DumpClient(myId, dumpee, url, broker, map, connections.values(),
- boost::bind(&Cluster::dumpOutDone, this),
- boost::bind(&Cluster::dumpOutError, this, _1)));
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ updateThread = Thread(
+ new UpdateClient(myId, updatee, url, broker, map, connections.values(),
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1)));
}
-// Called in dump thread.
-void Cluster::dumpInDone(const ClusterMap& m) {
+// Called in update thread.
+void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
- dumpedMap = m;
- checkDumpIn(l);
+ updatedMap = m;
+ checkUpdateIn(l);
}
-void Cluster::checkDumpIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& ) {
if (state == LEFT) return;
- if (state == DUMPEE && dumpedMap) {
- map = *dumpedMap;
+ if (state == UPDATEE && updatedMap) {
+ map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
- QPID_LOG(info, *this << " received dump, starting catch-up");
+ QPID_LOG(info, *this << " received update, starting catch-up");
deliverEventQueue.start();
}
}
-void Cluster::dumpOutDone() {
+void Cluster::updateOutDone() {
Monitor::ScopedLock l(lock);
- dumpOutDone(l);
+ updateOutDone(l);
}
-void Cluster::dumpOutDone(Lock& l) {
- assert(state == DUMPER);
+void Cluster::updateOutDone(Lock& l) {
+ assert(state == UPDATER);
state = READY;
mcast.release();
- QPID_LOG(info, *this << " sent dump");
+ QPID_LOG(info, *this << " sent update");
deliverEventQueue.start();
- tryMakeOffer(map.firstNewbie(), l); // Try another offer
+ tryMakeOffer(map.firstJoiner(), l); // Try another offer
}
-void Cluster::dumpOutError(const std::exception& e) {
+void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending dump: " << e.what());
- dumpOutDone(l);
+ QPID_LOG(error, *this << " error sending update: " << e.what());
+ updateOutDone(l);
}
void Cluster ::shutdown(const MemberId& id, Lock& l) {
@@ -534,7 +533,7 @@ void Cluster::memberUpdate(Lock& l) {
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" };
+ static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
return o << cluster.myId << "(" << STATE[cluster.state] << ")";
}