summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-01-27 02:08:25 +0000
committerAlan Conway <aconway@apache.org>2009-01-27 02:08:25 +0000
commit306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e (patch)
tree04c1f8f85b0cf469c7c7e526f436e09cd12e350a /cpp
parent57acf95c94d52b15b2ad6e6038bf3390d9063282 (diff)
downloadqpid-python-306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e.tar.gz
Cluster rename: dump -> update, newbie -> joiner
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/cluster.mk4
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp10
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp107
-rw-r--r--cpp/src/qpid/cluster/Cluster.h44
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp36
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.h16
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp26
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp46
-rw-r--r--cpp/src/qpid/cluster/Connection.h8
-rw-r--r--cpp/src/qpid/cluster/ConnectionCodec.cpp1
-rw-r--r--cpp/src/qpid/cluster/Multicaster.cpp5
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.cpp (renamed from cpp/src/qpid/cluster/DumpClient.cpp)152
-rw-r--r--cpp/src/qpid/cluster/UpdateClient.h (renamed from cpp/src/qpid/cluster/DumpClient.h)48
-rw-r--r--cpp/src/tests/cluster_test.cpp12
-rw-r--r--cpp/xml/cluster.xml26
15 files changed, 271 insertions, 270 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk
index 31eed2aec6..3809c86090 100644
--- a/cpp/src/cluster.mk
+++ b/cpp/src/cluster.mk
@@ -52,8 +52,8 @@ cluster_la_SOURCES = \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
- qpid/cluster/DumpClient.cpp \
- qpid/cluster/DumpClient.h \
+ qpid/cluster/UpdateClient.cpp \
+ qpid/cluster/UpdateClient.h \
qpid/cluster/Event.cpp \
qpid/cluster/Event.h \
qpid/cluster/EventFrame.h \
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 53c49bf0ce..f8b9e4b183 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -98,6 +98,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
}
}
+static const std::string QPID_MANAGEMENT("qpid.management");
+
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
: name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
@@ -111,9 +113,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable);
mgmtExchange->set_arguments(args);
if (!durable) {
- if (name == "") {
+ if (name.empty()) {
agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID
- } else if (name == "qpid.management") {
+ } else if (name == QPID_MANAGEMENT) {
agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID
} else {
ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
@@ -125,12 +127,12 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
sequence = _args.get(qpidMsgSequence);
if (sequence) {
- QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+ QPID_LOG(debug, "Configured exchange " << _name << " with Msg sequencing");
args.setInt64(std::string(qpidSequenceCounter), sequenceNo);
}
ive = _args.get(qpidIVE);
- if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value");
+ if (ive) QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value");
}
Exchange::~Exchange ()
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index f8adb8ee98..0d082fc226 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -18,7 +18,7 @@
#include "Cluster.h"
#include "Connection.h"
-#include "DumpClient.h"
+#include "UpdateClient.h"
#include "FailoverExchange.h"
#include "ClusterQueueHandler.h"
@@ -29,10 +29,10 @@
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
-#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterUpdateRequestBody.h"
#include "qpid/framing/ClusterReadyBody.h"
#include "qpid/framing/ClusterConfigChangeBody.h"
-#include "qpid/framing/ClusterDumpOfferBody.h"
+#include "qpid/framing/ClusterUpdateOfferBody.h"
#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
@@ -77,10 +77,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
Cluster::Lock& l;
ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {}
- void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); }
+ void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); }
void ready(const std::string& url) { cluster.ready(member, url, l); }
void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); }
- void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); }
+ void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); }
void shutdown() { cluster.shutdown(member, l); }
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
@@ -124,7 +124,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b
}
Cluster::~Cluster() {
- if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
}
void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
@@ -205,7 +205,6 @@ void Cluster::deliver(
void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
- QPID_LOG(trace, *this << " PUSH: " << e);
QPID_LATENCY_INIT(e);
deliverEventQueue.push(e);
}
@@ -216,7 +215,7 @@ void Cluster::deliveredEvent(const Event& e) {
Buffer buf(const_cast<char*>(e.getData()), e.getSize());
boost::intrusive_ptr<Connection> connection;
if (e.isConnection()) {
- if (state == NEWBIE) {
+ if (state == JOINER) {
QPID_LOG(trace, *this << " DROP: " << e);
return;
}
@@ -236,11 +235,11 @@ void Cluster::deliveredEvent(const Event& e) {
void Cluster::deliveredFrame(const EventFrame& e) {
QPID_LATENCY_RECORD("delivered frame queue", e.frame);
- QPID_LOG(trace, *this << " DLVR: " << e.frame);
if (e.connection) {
e.connection->deliveredFrame(e);
}
else {
+ QPID_LOG(trace, *this << " DLVR: " << e.frame);
Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
ClusterDispatcher dispatch(*this, e.member, l);
if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
@@ -313,9 +312,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
memberUpdate(l);
}
else { // Joining established group.
- state = NEWBIE;
+ state = JOINER;
QPID_LOG(info, *this << " joining cluster: " << map);
- mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId);
+ mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId);
ClusterMap::Set members = map.getAlive();
members.erase(myId);
myElders = members;
@@ -336,10 +335,10 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock&
void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
- if (state == READY && map.isNewbie(id)) {
+ if (state == READY && map.isJoiner(id)) {
state = OFFER;
- QPID_LOG(info, *this << " send dump-offer to " << id);
- mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId);
+ QPID_LOG(info, *this << " send update-offer to " << id);
+ mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId);
}
}
@@ -359,8 +358,8 @@ void Cluster::brokerShutdown() {
delete this;
}
-void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) {
- map.dumpRequest(id, url);
+void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) {
+ map.updateRequest(id, url);
tryMakeOffer(id, l);
}
@@ -376,81 +375,81 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
}
}
-void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) {
+void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) {
if (state == LEFT) return;
- MemberId dumpee(dumpeeInt);
- boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
- if (dumper == myId) {
+ MemberId updatee(updateeInt);
+ boost::optional<Url> url = map.updateOffer(updater, updatee);
+ if (updater == myId) {
assert(state == OFFER);
if (url) { // My offer was first.
- dumpStart(dumpee, *url, l);
+ updateStart(updatee, *url, l);
}
else { // Another offer was first.
state = READY;
mcast.release();
- QPID_LOG(info, *this << " cancelled dump offer to " << dumpee);
- tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
+ QPID_LOG(info, *this << " cancelled update offer to " << updatee);
+ tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer.
}
}
- else if (dumpee == myId && url) {
- assert(state == NEWBIE);
+ else if (updatee == myId && url) {
+ assert(state == JOINER);
setClusterId(uuid);
- state = DUMPEE;
- QPID_LOG(info, *this << " receiving dump from " << dumper);
+ state = UPDATEE;
+ QPID_LOG(info, *this << " receiving update from " << updater);
deliverEventQueue.stop();
- checkDumpIn(l);
+ checkUpdateIn(l);
}
}
-void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) {
+void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) {
if (state == LEFT) return;
assert(state == OFFER);
- state = DUMPER;
- QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
+ state = UPDATER;
+ QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url);
deliverEventQueue.stop();
- if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
- dumpThread = Thread(
- new DumpClient(myId, dumpee, url, broker, map, connections.values(),
- boost::bind(&Cluster::dumpOutDone, this),
- boost::bind(&Cluster::dumpOutError, this, _1)));
+ if (updateThread.id()) updateThread.join(); // Join the previous updatethread.
+ updateThread = Thread(
+ new UpdateClient(myId, updatee, url, broker, map, connections.values(),
+ boost::bind(&Cluster::updateOutDone, this),
+ boost::bind(&Cluster::updateOutError, this, _1)));
}
-// Called in dump thread.
-void Cluster::dumpInDone(const ClusterMap& m) {
+// Called in update thread.
+void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
- dumpedMap = m;
- checkDumpIn(l);
+ updatedMap = m;
+ checkUpdateIn(l);
}
-void Cluster::checkDumpIn(Lock& ) {
+void Cluster::checkUpdateIn(Lock& ) {
if (state == LEFT) return;
- if (state == DUMPEE && dumpedMap) {
- map = *dumpedMap;
+ if (state == UPDATEE && updatedMap) {
+ map = *updatedMap;
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
- QPID_LOG(info, *this << " received dump, starting catch-up");
+ QPID_LOG(info, *this << " received update, starting catch-up");
deliverEventQueue.start();
}
}
-void Cluster::dumpOutDone() {
+void Cluster::updateOutDone() {
Monitor::ScopedLock l(lock);
- dumpOutDone(l);
+ updateOutDone(l);
}
-void Cluster::dumpOutDone(Lock& l) {
- assert(state == DUMPER);
+void Cluster::updateOutDone(Lock& l) {
+ assert(state == UPDATER);
state = READY;
mcast.release();
- QPID_LOG(info, *this << " sent dump");
+ QPID_LOG(info, *this << " sent update");
deliverEventQueue.start();
- tryMakeOffer(map.firstNewbie(), l); // Try another offer
+ tryMakeOffer(map.firstJoiner(), l); // Try another offer
}
-void Cluster::dumpOutError(const std::exception& e) {
+void Cluster::updateOutError(const std::exception& e) {
Monitor::ScopedLock l(lock);
- QPID_LOG(error, *this << " error sending dump: " << e.what());
- dumpOutDone(l);
+ QPID_LOG(error, *this << " error sending update: " << e.what());
+ updateOutDone(l);
}
void Cluster ::shutdown(const MemberId& id, Lock& l) {
@@ -534,7 +533,7 @@ void Cluster::memberUpdate(Lock& l) {
}
std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
- static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" };
+ static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" };
return o << cluster.myId << "(" << STATE[cluster.state] << ")";
}
diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h
index ef63c4c3fe..711383d4dd 100644
--- a/cpp/src/qpid/cluster/Cluster.h
+++ b/cpp/src/qpid/cluster/Cluster.h
@@ -59,7 +59,7 @@ class Connection;
/**
* Connection to the cluster
*
- * Threading notes: 3 thread categories: connection, deliver, dump.
+ * Threading notes: 3 thread categories: connection, deliver, update.
*
*/
class Cluster : private Cpg::Handler, public management::Manageable {
@@ -87,8 +87,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Leave the cluster - called in any thread.
void leave();
- // Dump completed - called in dump thread
- void dumpInDone(const ClusterMap&);
+ // Update completed - called in update thread
+ void updateInDone(const ClusterMap&);
MemberId getId() const;
broker::Broker& getBroker() const;
@@ -124,8 +124,8 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Cluster controls implement XML methods from cluster.xml.
// Called in deliver thread.
//
- void dumpRequest(const MemberId&, const std::string&, Lock&);
- void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&);
+ void updateRequest(const MemberId&, const std::string&, Lock&);
+ void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&);
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
@@ -133,7 +133,7 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void deliveredFrame(const EventFrame&);
// Helper, called in deliver thread.
- void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
+ void updateStart(const MemberId& updatee, const Url& url, Lock&);
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
@@ -163,12 +163,12 @@ class Cluster : private Cpg::Handler, public management::Manageable {
void memberUpdate(Lock&);
// Called in connection IO threads .
- void checkDumpIn(Lock&);
+ void checkUpdateIn(Lock&);
- // Called in DumpClient thread.
- void dumpOutDone();
- void dumpOutError(const std::exception&);
- void dumpOutDone(Lock&);
+ // Called in UpdateClient thread.
+ void updateOutDone();
+ void updateOutError(const std::exception&);
+ void updateOutDone(Lock&);
void setClusterId(const framing::Uuid&);
@@ -201,23 +201,23 @@ class Cluster : private Cpg::Handler, public management::Manageable {
// Local cluster state, cluster map
enum {
- INIT, ///< Initial state, no CPG messages received.
- NEWBIE, ///< Sent dump request, waiting for dump offer.
- DUMPEE, ///< Stalled receive queue at dump offer, waiting for dump to complete.
- CATCHUP, ///< Dump complete, unstalled but has not yet seen own "ready" event.
- READY, ///< Fully operational
- OFFER, ///< Sent an offer, waiting for accept/reject.
- DUMPER, ///< Offer accepted, sending a state dump.
- LEFT ///< Final state, left the cluster.
+ INIT, ///< Initial state, no CPG messages received.
+ JOINER, ///< Sent update request, waiting for update offer.
+ UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete.
+ CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event.
+ READY, ///< Fully operational
+ OFFER, ///< Sent an offer, waiting for accept/reject.
+ UPDATER, ///< Offer accepted, sending a state update.
+ LEFT ///< Final state, left the cluster.
} state;
ClusterMap map;
size_t lastSize;
bool lastBroker;
uint64_t sequence;
- // Dump related
- sys::Thread dumpThread;
- boost::optional<ClusterMap> dumpedMap;
+ // Update related
+ sys::Thread updateThread;
+ boost::optional<ClusterMap> updatedMap;
friend std::ostream& operator<<(std::ostream&, const Cluster&);
friend class ClusterDispatcher;
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index b00699c903..bcfade2b8c 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -61,21 +61,21 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) {
if (isMember)
members[id] = url;
else
- newbies[id] = url;
+ joiners[id] = url;
}
-ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) {
- std::for_each(newbiesFt.begin(), newbiesFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive)));
+ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) {
+ std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive)));
std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
}
ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
framing::ClusterConnectionMembershipBody b;
- b.getNewbies().clear();
- std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1));
+ b.getJoiners().clear();
+ std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1));
for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
- if (!isMember(*i) && !isNewbie(*i))
- b.getNewbies().setString(i->str(), std::string());
+ if (!isMember(*i) && !isJoiner(*i))
+ b.getJoiners().setString(i->str(), std::string());
}
b.getMembers().clear();
std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
@@ -91,7 +91,7 @@ bool ClusterMap::configChange(
bool memberChange=false;
for (a = left; a != left+nLeft; ++a) {
memberChange = memberChange || members.erase(*a);
- newbies.erase(*a);
+ joiners.erase(*a);
}
alive.clear();
std::copy(current, current+nCurrent, std::inserter(alive, alive.end()));
@@ -103,8 +103,8 @@ Url ClusterMap::getUrl(const Map& map, const MemberId& id) {
return i == map.end() ? Url() : i->second;
}
-MemberId ClusterMap::firstNewbie() const {
- return newbies.empty() ? MemberId() : newbies.begin()->first;
+MemberId ClusterMap::firstJoiner() const {
+ return joiners.empty() ? MemberId() : joiners.begin()->first;
}
std::vector<string> ClusterMap::memberIds() const {
@@ -139,16 +139,16 @@ std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) {
o << *i;
if (m.isMember(*i)) o << "(member)";
- else if (m.isNewbie(*i)) o << "(newbie)";
+ else if (m.isJoiner(*i)) o << "(joiner)";
else o << "(unknown)";
o << " ";
}
return o;
}
-bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) {
+bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) {
if (isAlive(id)) {
- newbies[id] = Url(url);
+ joiners[id] = Url(url);
return true;
}
return false;
@@ -170,16 +170,16 @@ bool ClusterMap::configChange(const std::string& addresses) {
alive = update;
for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) {
memberChange = memberChange || members.erase(*i);
- newbies.erase(*i);
+ joiners.erase(*i);
}
return memberChange;
}
-boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) {
- Map::iterator i = newbies.find(to);
- if (isAlive(from) && i != newbies.end()) {
+boost::optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) {
+ Map::iterator i = joiners.find(to);
+ if (isAlive(from) && i != joiners.end()) {
Url url= i->second;
- newbies.erase(i); // No longer a potential dumpee.
+ joiners.erase(i); // No longer a potential updatee.
return url;
}
return boost::optional<Url>();
diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h
index 1893d0e796..9756daf977 100644
--- a/cpp/src/qpid/cluster/ClusterMap.h
+++ b/cpp/src/qpid/cluster/ClusterMap.h
@@ -39,7 +39,7 @@ namespace qpid {
namespace cluster {
/**
- * Map of established cluster members and newbies waiting for a brain dump.
+ * Map of established cluster members and joiners waiting for an update.
*/
class ClusterMap {
public:
@@ -60,15 +60,15 @@ class ClusterMap {
bool configChange(const std::string& addresses);
- bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); }
+ bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); }
bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); }
- Url getNewbieUrl(const MemberId& id) { return getUrl(newbies, id); }
+ Url getJoinerUrl(const MemberId& id) { return getUrl(joiners, id); }
Url getMemberUrl(const MemberId& id) { return getUrl(members, id); }
- /** First newbie in the cluster in ID order, target for offers */
- MemberId firstNewbie() const;
+ /** First joiner in the cluster in ID order, target for offers */
+ MemberId firstJoiner() const;
/** Convert map contents to a cluster control body. */
framing::ClusterConnectionMembershipBody asMethodBody() const;
@@ -79,9 +79,9 @@ class ClusterMap {
std::vector<Url> memberUrls() const;
Set getAlive() const;
- bool dumpRequest(const MemberId& id, const std::string& url);
+ bool updateRequest(const MemberId& id, const std::string& url);
/** Return non-empty Url if accepted */
- boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to);
+ boost::optional<Url> updateOffer(const MemberId& from, const MemberId& to);
/**@return true If this is a new member */
bool ready(const MemberId& id, const Url&);
@@ -93,7 +93,7 @@ class ClusterMap {
private:
Url getUrl(const Map& map, const MemberId& id);
- Map newbies, members;
+ Map joiners, members;
Set alive;
friend std::ostream& operator<<(std::ostream&, const Map&);
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 79c34d6873..2a3df8f465 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -21,7 +21,7 @@
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/ConnectionCodec.h"
-#include "qpid/cluster/DumpClient.h"
+#include "qpid/cluster/UpdateClient.h"
#include "qpid/broker/Broker.h"
#include "qpid/Plugin.h"
@@ -87,43 +87,43 @@ struct ClusterOptions : public Options {
}
};
-struct DumpClientIdAllocator : management::IdAllocator
+struct UpdateClientIdAllocator : management::IdAllocator
{
qpid::sys::AtomicValue<uint64_t> sequence;
- DumpClientIdAllocator() : sequence(0x4000000000000000LL) {}
+ UpdateClientIdAllocator() : sequence(0x4000000000000000LL) {}
uint64_t getIdFor(management::Manageable* m)
{
- if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) {
+ if (isUpdateQueue(m) || isUpdateExchange(m) || isUpdateSession(m) || isUpdateBinding(m)) {
return ++sequence;
} else {
return 0;
}
}
- bool isDumpQueue(management::Manageable* manageable)
+ bool isUpdateQueue(management::Manageable* manageable)
{
qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable);
- return queue && queue->getName() == DumpClient::DUMP;
+ return queue && queue->getName() == UpdateClient::UPDATE;
}
- bool isDumpExchange(management::Manageable* manageable)
+ bool isUpdateExchange(management::Manageable* manageable)
{
qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable);
- return exchange && exchange->getName() == DumpClient::DUMP;
+ return exchange && exchange->getName() == UpdateClient::UPDATE;
}
- bool isDumpSession(management::Manageable* manageable)
+ bool isUpdateSession(management::Manageable* manageable)
{
broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable);
- return session && session->getId().getName() == DumpClient::DUMP;
+ return session && session->getId().getName() == UpdateClient::UPDATE;
}
- bool isDumpBinding(management::Manageable* manageable)
+ bool isUpdateBinding(management::Manageable* manageable)
{
broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable);
- return binding && binding->queue->getName() == DumpClient::DUMP;
+ return binding && binding->queue->getName() == UpdateClient::UPDATE;
}
};
@@ -155,7 +155,7 @@ struct ClusterPlugin : public Plugin {
broker->getExchanges().registerExchange(cluster->getFailoverExchange());
ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance());
if (mgmt) {
- std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator());
+ std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator());
mgmt->setAllocator(allocator);
}
}
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index 839a0e67b9..4b3e6da3fb 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -19,7 +19,7 @@
*
*/
#include "Connection.h"
-#include "DumpClient.h"
+#include "UpdateClient.h"
#include "Cluster.h"
#include "qpid/broker/SessionState.h"
@@ -45,8 +45,8 @@
// TODO aconway 2008-11-03:
//
-// Disproportionate amount of code here is dedicated to receiving a
-// brain-dump when joining a cluster and building initial
+// Disproportionate amount of code here is dedicated to receiving an
+// update when joining a cluster and building initial
// state. Should be separated out into its own classes.
//
@@ -104,7 +104,7 @@ void Connection::received(framing::AMQFrame& f) {
if (!framing::invoke(*this, *f.getBody()).wasHandled())
connection.received(f);
}
- else { // Shadow or dumped ex catch-up connection.
+ else { // Shadow or updated ex catch-up connection.
if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
if (isShadow()) {
QPID_LOG(debug, cluster << " inserting connection " << *this);
@@ -155,7 +155,7 @@ void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) {
// Delivered from cluster.
void Connection::deliveredFrame(const EventFrame& f) {
- QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame);
+ QPID_LOG(trace, cluster << " DLVR: " << *this << ": " << f.frame);
assert(!catchUp);
currentChannel = f.frame.getChannel();
if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
@@ -174,8 +174,8 @@ void Connection::closed() {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
cluster.leave();
}
- else if (isDumped()) {
- QPID_LOG(debug, cluster << " closed dump connection " << *this);
+ else if (isUpdated()) {
+ QPID_LOG(debug, cluster << " closed update connection " << *this);
connection.closed();
}
else if (isLocal()) {
@@ -268,7 +268,7 @@ void Connection::sessionState(
received,
unknownCompleted,
receivedIncomplete);
- QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId());
+ QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
@@ -277,10 +277,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
self = shadow;
}
-void Connection::membership(const FieldTable& newbies, const FieldTable& members) {
- QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this);
- cluster.dumpInDone(ClusterMap(newbies, members));
- self.second = 0; // Mark this as completed dump connection.
+void Connection::membership(const FieldTable& joiners, const FieldTable& members) {
+ QPID_LOG(debug, cluster << " incoming update complete on connection " << *this);
+ cluster.updateInDone(ClusterMap(joiners, members));
+ self.second = 0; // Mark this as completed update connection.
}
bool Connection::isLocal() const {
@@ -291,7 +291,7 @@ bool Connection::isShadow() const {
return self.first != cluster.getId();
}
-bool Connection::isDumped() const {
+bool Connection::isUpdated() const {
return self.first == cluster.getId() && self.second == 0;
}
@@ -302,9 +302,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
return queue;
}
-broker::QueuedMessage Connection::getDumpMessage() {
- broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
- if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
+broker::QueuedMessage Connection::getUpdateMessage() {
+ broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get();
+ if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue"));
return m;
}
@@ -323,12 +323,12 @@ void Connection::deliveryRecord(const string& qname,
broker::QueuedMessage m;
broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) // Message is on the dump queue
- m = getDumpMessage();
+ if (acquired) // Message is on the update queue
+ m = getUpdateMessage();
else // Message at original position in original queue
m = queue->find(position);
if (!m.payload)
- throw Exception(QPID_MSG("deliveryRecord no dump message"));
+ throw Exception(QPID_MSG("deliveryRecord no update message"));
}
broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit);
@@ -349,7 +349,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) {
const char* type="unknown";
if (c.isLocal()) type = "local";
else if (c.isShadow()) type = "shadow";
- else if (c.isDumped()) type = "dumped";
+ else if (c.isUpdated()) type = "updated";
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")";
}
@@ -361,15 +361,15 @@ void Connection::txAccept(const framing::SequenceSet& acked) {
}
void Connection::txDequeue(const std::string& queue) {
- txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload)));
}
void Connection::txEnqueue(const std::string& queue) {
- txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+ txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload)));
}
void Connection::txPublish(const framing::Array& queues, bool delivered) {
- boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload));
+ boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload));
for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i)
txPub->deliverTo(findQueue((*i)->get<std::string>()));
txPub->delivered = delivered;
diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h
index 3b18e22d17..e22ff05c08 100644
--- a/cpp/src/qpid/cluster/Connection.h
+++ b/cpp/src/qpid/cluster/Connection.h
@@ -81,8 +81,8 @@ class Connection :
/** True if the connection is in "catch-up" mode: building initial broker state. */
bool isCatchUp() const { return catchUp; }
- /** True if the connection is a completed shared dump connection */
- bool isDumped() const;
+ /** True if the connection is a completed shared update connection */
+ bool isUpdated() const;
Cluster& getCluster() { return cluster; }
@@ -108,7 +108,7 @@ class Connection :
// ==== Used in catch-up mode to build initial state.
//
- // State dump methods.
+ // State update methods.
void sessionState(const framing::SequenceNumber& replayStart,
const framing::SequenceNumber& sendCommandPoint,
const framing::SequenceSet& sentIncomplete,
@@ -156,7 +156,7 @@ class Connection :
boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
- broker::QueuedMessage getDumpMessage();
+ broker::QueuedMessage getUpdateMessage();
static NoOpConnectionOutputHandler discardHandler;
diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp
index 28d2750ff9..1334f97eec 100644
--- a/cpp/src/qpid/cluster/ConnectionCodec.cpp
+++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp
@@ -66,7 +66,6 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id,
ConnectionCodec::~ConnectionCodec() {}
size_t ConnectionCodec::decode(const char* buffer, size_t size) {
- QPID_LOG(trace, "RECVB [" << localId << "]: " << size << " bytes");
return interceptor->decode(buffer, size);
}
diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp
index 847088435c..446722745c 100644
--- a/cpp/src/qpid/cluster/Multicaster.cpp
+++ b/cpp/src/qpid/cluster/Multicaster.cpp
@@ -23,6 +23,7 @@
#include "Cpg.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/LatencyMetric.h"
+#include "qpid/framing/AMQBody.h"
namespace qpid {
namespace cluster {
@@ -40,12 +41,14 @@ Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
}
void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) {
+ QPID_LOG(trace, "MCAST " << id << ": " << body);
mcast(Event::control(body, id));
}
void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) {
Event e(DATA, id, size);
memcpy(e.getData(), data, size);
+ QPID_LOG(trace, "MCAST " << e);
mcast(e);
}
@@ -54,7 +57,6 @@ void Multicaster::mcast(const Event& e) {
sys::Mutex::ScopedLock l(lock);
if (e.getType() == DATA && e.isConnection() && holding) {
holdingQueue.push_back(e);
- QPID_LOG(trace, " MCAST held: " << e );
return;
}
}
@@ -85,7 +87,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
}
break;
}
- QPID_LOG(trace, " MCAST " << *i);
++i;
}
values.erase(values.begin(), i); // Erase sent events.
diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp
index 00328eb310..c58133f453 100644
--- a/cpp/src/qpid/cluster/DumpClient.cpp
+++ b/cpp/src/qpid/cluster/UpdateClient.cpp
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "DumpClient.h"
+#include "UpdateClient.h"
#include "Cluster.h"
#include "ClusterMap.h"
#include "Connection.h"
@@ -83,49 +83,49 @@ void send(client::AsyncSession& s, const AMQBody& body) {
sb.get()->send(body);
}
-// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel.
+// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel.
-DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url,
+UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url,
broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons,
const boost::function<void()>& ok,
const boost::function<void(const std::exception&)>& fail)
- : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons),
+ : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons),
connection(catchUpConnection()), shadowConnection(catchUpConnection()),
done(ok), failed(fail)
{
connection.open(url);
- session = connection.newSession(DUMP);
+ session = connection.newSession("update_shared");
}
-DumpClient::~DumpClient() {}
+UpdateClient::~UpdateClient() {}
// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges.
-static const char DUMP_CHARS[] = "\000qpid-dump";
-const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS));
-
-void DumpClient::dump() {
- QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl);
- Broker& b = dumperBroker;
- b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
-
- // Dump exchange is used to route messages to the proper queue without modifying routing key.
- session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true);
- b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
- // Dump queue is used to transfer acquired messages that are no longer on their original queue.
- session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true);
+static const char UPDATE_CHARS[] = "\000qpid-update";
+const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS));
+
+void UpdateClient::update() {
+ QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl);
+ Broker& b = updaterBroker;
+ b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1));
+
+ // Update exchange is used to route messages to the proper queue without modifying routing key.
+ session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true);
+ b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1));
+ // Update queue is used to transfer acquired messages that are no longer on their original queue.
+ session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
session.sync();
session.close();
- std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1));
+ std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1));
AMQFrame frame(map.asMethodBody());
client::ConnectionAccess::getImpl(connection)->handle(frame);
connection.close();
- QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl);
+ QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl);
}
-void DumpClient::run() {
+void UpdateClient::run() {
try {
- dump();
+ update();
done();
} catch (const std::exception& e) {
failed(e);
@@ -143,16 +143,16 @@ template <class T> std::string encode(const T& t) {
}
} // namespace
-void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) {
- QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName());
+void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) {
+ QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName());
ClusterConnectionProxy proxy(session);
proxy.exchange(encode(*ex));
}
-/** Bind a queue to the dump exchange and dump messges to it
+/** Bind a queue to the update exchange and update messges to it
* setting the message possition as needed.
*/
-class MessageDumper {
+class MessageUpdater {
std::string queue;
bool haveLastPos;
framing::SequenceNumber lastPos;
@@ -160,15 +160,15 @@ class MessageDumper {
public:
- MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
- session.exchangeBind(queue, DumpClient::DUMP);
+ MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) {
+ session.exchangeBind(queue, UpdateClient::UPDATE);
}
- ~MessageDumper() {
- session.exchangeUnbind(queue, DumpClient::DUMP);
+ ~MessageUpdater() {
+ session.exchangeUnbind(queue, UpdateClient::UPDATE);
}
- void dumpQueuedMessage(const broker::QueuedMessage& message) {
+ void updateQueuedMessage(const broker::QueuedMessage& message) {
if (!haveLastPos || message.position - lastPos != 1) {
ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1);
haveLastPos = true;
@@ -176,52 +176,52 @@ class MessageDumper {
lastPos = message.position;
SessionBase_0_10Access sb(session);
framing::MessageTransferBody transfer(
- framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
+ framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED);
sb.get()->send(transfer, message.payload->getFrames());
}
- void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) {
- dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
+ void updateMessage(const boost::intrusive_ptr<broker::Message>& message) {
+ updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1));
}
};
-void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) {
- QPID_LOG(debug, dumperId << " dumping queue " << q->getName());
+void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) {
+ QPID_LOG(debug, updaterId << " updateing queue " << q->getName());
ClusterConnectionProxy proxy(session);
proxy.queue(encode(*q));
- MessageDumper dumper(q->getName(), session);
- q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1));
- q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1));
+ MessageUpdater updater(q->getName(), session);
+ q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1));
+ q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1));
}
-void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) {
+void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) {
session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
}
-void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) {
- QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection);
+void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) {
+ QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection);
shadowConnection = catchUpConnection();
- broker::Connection& bc = dumpConnection->getBrokerConnection();
+ broker::Connection& bc = updateConnection->getBrokerConnection();
// FIXME aconway 2008-10-20: What authentication info to use on reconnect?
- shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
- bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
+ shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax());
+ bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
- dumpConnection->getId().getMember(),
- reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer()));
+ updateConnection->getId().getMember(),
+ reinterpret_cast<uint64_t>(updateConnection->getId().getPointer()));
shadowConnection.close();
- QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection);
+ QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
}
-void DumpClient::dumpSession(broker::SessionHandler& sh) {
- QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
+void UpdateClient::updateSession(broker::SessionHandler& sh) {
+ QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection() << "[" << sh.getChannel() << "] = "
<< sh.getSession()->getId());
broker::SessionState* ss = sh.getSession();
if (!ss) return; // no session.
- // Create a client session to dump session state.
+ // Create a client session to update session state.
boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection);
boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel());
client::SessionBase_0_10Access(shadowSession).set(simpl);
@@ -229,15 +229,15 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
// Re-create session state on remote connection.
- // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
- QPID_LOG(debug, dumperId << " dumping consumers.");
- ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
+ // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33.
+ QPID_LOG(debug, updaterId << " updateing consumers.");
+ ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
- QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
+ QPID_LOG(debug, updaterId << " updateing unacknowledged messages.");
broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
- std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1));
+ std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1));
- dumpTxState(ss->getSemanticState()); // Tx transaction state.
+ updateTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust for command counter for message in progress, will be sent after state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -263,11 +263,11 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) {
// FIXME aconway 2008-09-23: update session replay list.
- QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId());
+ QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId());
}
-void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
- QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId());
+void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) {
+ QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId());
using namespace message;
shadowSession.messageSubscribe(
arg::queue = ci->getQueue()->getName(),
@@ -289,15 +289,15 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) {
ci->isNotifyEnabled()
);
client::SessionBase_0_10Access(shadowSession).get()->send(state);
- QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId());
+ QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId());
}
-void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
+void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) {
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
- // dumpees queue, put it on the dump queue for dumpee to pick up.
+ // updatees queue, put it on the update queue for updatee to pick up.
//
- MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
+ MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage());
}
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
@@ -314,22 +314,22 @@ void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
);
}
-class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater {
public:
- TxOpDumper(DumpClient& dc, client::AsyncSession s)
- : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {}
+ TxOpUpdater(UpdateClient& dc, client::AsyncSession s)
+ : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {}
void operator()(const broker::DtxAck& ) {
throw InternalErrorException("DTX transactions not currently supported by cluster.");
}
void operator()(const broker::RecoveredDequeue& rdeq) {
- dumpMessage(rdeq.getMessage());
+ updateMessage(rdeq.getMessage());
proxy.txEnqueue(rdeq.getQueue()->getName());
}
void operator()(const broker::RecoveredEnqueue& renq) {
- dumpMessage(renq.getMessage());
+ updateMessage(renq.getMessage());
proxy.txEnqueue(renq.getQueue()->getName());
}
@@ -338,7 +338,7 @@ class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
}
void operator()(const broker::TxPublish& txPub) {
- dumpMessage(txPub.getMessage());
+ updateMessage(txPub.getMessage());
typedef std::list<Queue::shared_ptr> QueueList;
const QueueList& qlist = txPub.getQueues();
Array qarray(TYPE_CODE_STR8);
@@ -348,20 +348,20 @@ class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
}
private:
- DumpClient& parent;
+ UpdateClient& parent;
client::AsyncSession session;
ClusterConnectionProxy proxy;
};
-void DumpClient::dumpTxState(broker::SemanticState& s) {
- QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+void UpdateClient::updateTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, updaterId << " updateing TX transaction state.");
ClusterConnectionProxy proxy(shadowSession);
proxy.accumulatedAck(s.getAccumulatedAck());
broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
if (txBuffer) {
proxy.txStart();
- TxOpDumper dumper(*this, shadowSession);
- txBuffer->accept(dumper);
+ TxOpUpdater updater(*this, shadowSession);
+ txBuffer->accept(updater);
proxy.txEnd();
}
}
diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/UpdateClient.h
index 23676e7646..93dca9f0c6 100644
--- a/cpp/src/qpid/cluster/DumpClient.h
+++ b/cpp/src/qpid/cluster/UpdateClient.h
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_DUMPCLIENT_H
-#define QPID_CLUSTER_DUMPCLIENT_H
+#ifndef QPID_CLUSTER_UPDATECLIENT_H
+#define QPID_CLUSTER_UPDATECLIENT_H
/*
*
@@ -56,38 +56,38 @@ class Connection;
class ClusterMap;
/**
- * A client that dumps the contents of a local broker to a remote one using AMQP.
+ * A client that updates the contents of a local broker to a remote one using AMQP.
*/
-class DumpClient : public sys::Runnable {
+class UpdateClient : public sys::Runnable {
public:
- static const std::string DUMP; // Name for special dump queue and exchange.
+ static const std::string UPDATE; // Name for special update queue and exchange.
- DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&,
+ UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&,
broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& ,
const boost::function<void()>& done,
const boost::function<void(const std::exception&)>& fail);
- ~DumpClient();
- void dump();
+ ~UpdateClient();
+ void update();
void run(); // Will delete this when finished.
- void dumpUnacked(const broker::DeliveryRecord&);
+ void updateUnacked(const broker::DeliveryRecord&);
private:
- void dumpQueue(const boost::shared_ptr<broker::Queue>&);
- void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
- void dumpMessage(const broker::QueuedMessage&);
- void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
- void dumpBinding(const std::string& queue, const broker::QueueBinding& binding);
- void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
- void dumpSession(broker::SessionHandler& s);
- void dumpTxState(broker::SemanticState& s);
- void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
-
- MemberId dumperId;
- MemberId dumpeeId;
- Url dumpeeUrl;
- broker::Broker& dumperBroker;
+ void updateQueue(const boost::shared_ptr<broker::Queue>&);
+ void updateExchange(const boost::shared_ptr<broker::Exchange>&);
+ void updateMessage(const broker::QueuedMessage&);
+ void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s);
+ void updateBinding(const std::string& queue, const broker::QueueBinding& binding);
+ void updateConnection(const boost::intrusive_ptr<Connection>& connection);
+ void updateSession(broker::SessionHandler& s);
+ void updateTxState(broker::SemanticState& s);
+ void updateConsumer(const broker::SemanticState::ConsumerImpl*);
+
+ MemberId updaterId;
+ MemberId updateeId;
+ Url updateeUrl;
+ broker::Broker& updaterBroker;
ClusterMap map;
std::vector<boost::intrusive_ptr<Connection> > connections;
client::Connection connection, shadowConnection;
@@ -98,4 +98,4 @@ class DumpClient : public sys::Runnable {
}} // namespace qpid::cluster
-#endif /*!QPID_CLUSTER_DUMPCLIENT_H*/
+#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/
diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp
index 6ca957f310..b7d28bf914 100644
--- a/cpp/src/tests/cluster_test.cpp
+++ b/cpp/src/tests/cluster_test.cpp
@@ -27,7 +27,7 @@
#include "qpid/client/FailoverListener.h"
#include "qpid/cluster/Cluster.h"
#include "qpid/cluster/Cpg.h"
-#include "qpid/cluster/DumpClient.h"
+#include "qpid/cluster/UpdateClient.h"
#include "qpid/framing/AMQBody.h"
#include "qpid/framing/Uuid.h"
#include "qpid/framing/reply_exceptions.h"
@@ -352,8 +352,8 @@ QPID_AUTO_TEST_CASE(testUnacked) {
BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22");
}
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
- // Verify that we dump transaction state correctly to new members.
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) {
+ // Verify that we update transaction state correctly to new members.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
@@ -386,8 +386,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) {
BOOST_CHECK_EQUAL(m.getData(), "3");
}
-QPID_AUTO_TEST_CASE(testDumpMessageBuilder) {
- // Verify that we dump a partially recieved message to a new member.
+QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) {
+ // Verify that we update a partially recieved message to a new member.
ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
@@ -452,7 +452,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
BOOST_CHECK_EQUAL(kb0, kb2);
}
-QPID_AUTO_TEST_CASE(DumpConsumers) {
+QPID_AUTO_TEST_CASE(UpdateConsumers) {
ClusterFixture cluster(1, 1);
Client c0(cluster[0], "c0");
diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml
index 19d9f7ea56..e6cacb0223 100644
--- a/cpp/xml/cluster.xml
+++ b/cpp/xml/cluster.xml
@@ -27,12 +27,12 @@
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <control name="dump-request" code="0x1" label="URL for a member.">
+ <control name="update-request" code="0x1" label="URL for a member.">
<field name="url" type="str16"/>
</control>
- <control name = "dump-offer" code="0x2" label="Member offering to be dumper for dumpee.">
- <field name="dumpee" type="uint64"/>
+ <control name = "update-offer" code="0x2" label="Member offering to be updater for updatee.">
+ <field name="updatee" type="uint64"/>
<field name="cluster-id" type="uuid"/>
</control>
@@ -60,13 +60,13 @@ Min <control name="ready" code="0x10" label="New member is ready.">
<field name="bytes" type="uint32"/>
</control>
- <!-- Brain-dump controls. Sent to a new broker in joining mode.
- A connection is dumped as followed:
+ <!-- Update controls. Sent to a new broker in joining mode.
+ A connection is updateed as followed:
- open as a normal connection.
- attach sessions, create consumers, set flow with normal AMQP cokmmands.
- send /reset additional session state with controls below.
- - send shadow-ready to mark end of shadow dump.
- - send dump-complete when entire dump is complete.
+ - send shadow-ready to mark end of shadow update.
+ - send update-complete when entire update is complete.
-->
<!-- Consumer state that cannot be set by standard AMQP controls. -->
@@ -103,8 +103,8 @@ Min <control name="ready" code="0x10" label="New member is ready.">
<control name="tx-end" code="0x17"/>
<control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control>
- <!-- Complete a session state dump. -->
- <control name="session-state" code="0x1F" label="Set session state during a brain dump.">
+ <!-- Complete a session state update. -->
+ <control name="session-state" code="0x1F" label="Set session state during a brain update.">
<!-- Target session deduced from channel number. -->
<field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.-->
<field name="command-point" type="sequence-no"/> <!-- Id of next command sent -->
@@ -116,15 +116,15 @@ Min <control name="ready" code="0x10" label="New member is ready.">
<field name="received-incomplete" type="sequence-set"/> <!-- Received and incomplete -->
</control>
- <!-- Complete a shadow connection dump. -->
- <control name="shadow-ready" code="0x20" label="End of shadow connection dump.">
+ <!-- Complete a shadow connection update. -->
+ <control name="shadow-ready" code="0x20" label="End of shadow connection update.">
<field name="member-id" type="uint64"/>
<field name="connection-id" type="uint64"/>
</control>
- <!-- Complete a cluster state dump. -->
+ <!-- Complete a cluster state update. -->
<control name="membership" code="0x21" label="Cluster membership details.">
- <field name="newbies" type="map"/> <!-- member-id -> URL -->
+ <field name="joiners" type="map"/> <!-- member-id -> URL -->
<field name="members" type="map"/> <!-- member-id -> state -->
</control>