diff options
author | Alan Conway <aconway@apache.org> | 2009-01-27 02:08:25 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-01-27 02:08:25 +0000 |
commit | 306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e (patch) | |
tree | 04c1f8f85b0cf469c7c7e526f436e09cd12e350a /cpp | |
parent | 57acf95c94d52b15b2ad6e6038bf3390d9063282 (diff) | |
download | qpid-python-306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e.tar.gz |
Cluster rename: dump -> update, newbie -> joiner
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737971 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/cluster.mk | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 10 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 107 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.h | 44 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 36 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.h | 16 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.cpp | 46 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Connection.h | 8 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ConnectionCodec.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/Multicaster.cpp | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.cpp (renamed from cpp/src/qpid/cluster/DumpClient.cpp) | 152 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/UpdateClient.h (renamed from cpp/src/qpid/cluster/DumpClient.h) | 48 | ||||
-rw-r--r-- | cpp/src/tests/cluster_test.cpp | 12 | ||||
-rw-r--r-- | cpp/xml/cluster.xml | 26 |
15 files changed, 271 insertions, 270 deletions
diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 31eed2aec6..3809c86090 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -52,8 +52,8 @@ cluster_la_SOURCES = \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ - qpid/cluster/DumpClient.cpp \ - qpid/cluster/DumpClient.h \ + qpid/cluster/UpdateClient.cpp \ + qpid/cluster/UpdateClient.h \ qpid/cluster/Event.cpp \ qpid/cluster/Event.h \ qpid/cluster/EventFrame.h \ diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 53c49bf0ce..f8b9e4b183 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -98,6 +98,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) : } } +static const std::string QPID_MANAGEMENT("qpid.management"); + Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), @@ -111,9 +113,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); mgmtExchange->set_arguments(args); if (!durable) { - if (name == "") { + if (name.empty()) { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID - } else if (name == "qpid.management") { + } else if (name == QPID_MANAGEMENT) { agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID } else { ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent); @@ -125,12 +127,12 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel sequence = _args.get(qpidMsgSequence); if (sequence) { - QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + QPID_LOG(debug, "Configured exchange " << _name << " with Msg sequencing"); args.setInt64(std::string(qpidSequenceCounter), sequenceNo); } ive = _args.get(qpidIVE); - if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); + if (ive) QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); } Exchange::~Exchange () diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f8adb8ee98..0d082fc226 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -18,7 +18,7 @@ #include "Cluster.h" #include "Connection.h" -#include "DumpClient.h" +#include "UpdateClient.h" #include "FailoverExchange.h" #include "ClusterQueueHandler.h" @@ -29,10 +29,10 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterDumpRequestBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterDumpOfferBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" @@ -77,10 +77,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { Cluster::Lock& l; ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} - void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } + void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } - void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); } + void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -124,7 +124,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b } Cluster::~Cluster() { - if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { @@ -205,7 +205,6 @@ void Cluster::deliver( void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; - QPID_LOG(trace, *this << " PUSH: " << e); QPID_LATENCY_INIT(e); deliverEventQueue.push(e); } @@ -216,7 +215,7 @@ void Cluster::deliveredEvent(const Event& e) { Buffer buf(const_cast<char*>(e.getData()), e.getSize()); boost::intrusive_ptr<Connection> connection; if (e.isConnection()) { - if (state == NEWBIE) { + if (state == JOINER) { QPID_LOG(trace, *this << " DROP: " << e); return; } @@ -236,11 +235,11 @@ void Cluster::deliveredEvent(const Event& e) { void Cluster::deliveredFrame(const EventFrame& e) { QPID_LATENCY_RECORD("delivered frame queue", e.frame); - QPID_LOG(trace, *this << " DLVR: " << e.frame); if (e.connection) { e.connection->deliveredFrame(e); } else { + QPID_LOG(trace, *this << " DLVR: " << e.frame); Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? ClusterDispatcher dispatch(*this, e.member, l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) @@ -313,9 +312,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& memberUpdate(l); } else { // Joining established group. - state = NEWBIE; + state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); ClusterMap::Set members = map.getAlive(); members.erase(myId); myElders = members; @@ -336,10 +335,10 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { - if (state == READY && map.isNewbie(id)) { + if (state == READY && map.isJoiner(id)) { state = OFFER; - QPID_LOG(info, *this << " send dump-offer to " << id); - mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId); + QPID_LOG(info, *this << " send update-offer to " << id); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); } } @@ -359,8 +358,8 @@ void Cluster::brokerShutdown() { delete this; } -void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { - map.dumpRequest(id, url); +void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { + map.updateRequest(id, url); tryMakeOffer(id, l); } @@ -376,81 +375,81 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; - MemberId dumpee(dumpeeInt); - boost::optional<Url> url = map.dumpOffer(dumper, dumpee); - if (dumper == myId) { + MemberId updatee(updateeInt); + boost::optional<Url> url = map.updateOffer(updater, updatee); + if (updater == myId) { assert(state == OFFER); if (url) { // My offer was first. - dumpStart(dumpee, *url, l); + updateStart(updatee, *url, l); } else { // Another offer was first. state = READY; mcast.release(); - QPID_LOG(info, *this << " cancelled dump offer to " << dumpee); - tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. + QPID_LOG(info, *this << " cancelled update offer to " << updatee); + tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (dumpee == myId && url) { - assert(state == NEWBIE); + else if (updatee == myId && url) { + assert(state == JOINER); setClusterId(uuid); - state = DUMPEE; - QPID_LOG(info, *this << " receiving dump from " << dumper); + state = UPDATEE; + QPID_LOG(info, *this << " receiving update from " << updater); deliverEventQueue.stop(); - checkDumpIn(l); + checkUpdateIn(l); } } -void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { if (state == LEFT) return; assert(state == OFFER); - state = DUMPER; - QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url); + state = UPDATER; + QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); deliverEventQueue.stop(); - if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. - dumpThread = Thread( - new DumpClient(myId, dumpee, url, broker, map, connections.values(), - boost::bind(&Cluster::dumpOutDone, this), - boost::bind(&Cluster::dumpOutError, this, _1))); + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + updateThread = Thread( + new UpdateClient(myId, updatee, url, broker, map, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } -// Called in dump thread. -void Cluster::dumpInDone(const ClusterMap& m) { +// Called in update thread. +void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); - dumpedMap = m; - checkDumpIn(l); + updatedMap = m; + checkUpdateIn(l); } -void Cluster::checkDumpIn(Lock& ) { +void Cluster::checkUpdateIn(Lock& ) { if (state == LEFT) return; - if (state == DUMPEE && dumpedMap) { - map = *dumpedMap; + if (state == UPDATEE && updatedMap) { + map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; - QPID_LOG(info, *this << " received dump, starting catch-up"); + QPID_LOG(info, *this << " received update, starting catch-up"); deliverEventQueue.start(); } } -void Cluster::dumpOutDone() { +void Cluster::updateOutDone() { Monitor::ScopedLock l(lock); - dumpOutDone(l); + updateOutDone(l); } -void Cluster::dumpOutDone(Lock& l) { - assert(state == DUMPER); +void Cluster::updateOutDone(Lock& l) { + assert(state == UPDATER); state = READY; mcast.release(); - QPID_LOG(info, *this << " sent dump"); + QPID_LOG(info, *this << " sent update"); deliverEventQueue.start(); - tryMakeOffer(map.firstNewbie(), l); // Try another offer + tryMakeOffer(map.firstJoiner(), l); // Try another offer } -void Cluster::dumpOutError(const std::exception& e) { +void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending dump: " << e.what()); - dumpOutDone(l); + QPID_LOG(error, *this << " error sending update: " << e.what()); + updateOutDone(l); } void Cluster ::shutdown(const MemberId& id, Lock& l) { @@ -534,7 +533,7 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" }; + static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; return o << cluster.myId << "(" << STATE[cluster.state] << ")"; } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index ef63c4c3fe..711383d4dd 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -59,7 +59,7 @@ class Connection; /** * Connection to the cluster * - * Threading notes: 3 thread categories: connection, deliver, dump. + * Threading notes: 3 thread categories: connection, deliver, update. * */ class Cluster : private Cpg::Handler, public management::Manageable { @@ -87,8 +87,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Leave the cluster - called in any thread. void leave(); - // Dump completed - called in dump thread - void dumpInDone(const ClusterMap&); + // Update completed - called in update thread + void updateInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; @@ -124,8 +124,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Cluster controls implement XML methods from cluster.xml. // Called in deliver thread. // - void dumpRequest(const MemberId&, const std::string&, Lock&); - void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&); + void updateRequest(const MemberId&, const std::string&, Lock&); + void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); @@ -133,7 +133,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliveredFrame(const EventFrame&); // Helper, called in deliver thread. - void dumpStart(const MemberId& dumpee, const Url& url, Lock&); + void updateStart(const MemberId& updatee, const Url& url, Lock&); void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, @@ -163,12 +163,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { void memberUpdate(Lock&); // Called in connection IO threads . - void checkDumpIn(Lock&); + void checkUpdateIn(Lock&); - // Called in DumpClient thread. - void dumpOutDone(); - void dumpOutError(const std::exception&); - void dumpOutDone(Lock&); + // Called in UpdateClient thread. + void updateOutDone(); + void updateOutError(const std::exception&); + void updateOutDone(Lock&); void setClusterId(const framing::Uuid&); @@ -201,23 +201,23 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Local cluster state, cluster map enum { - INIT, ///< Initial state, no CPG messages received. - NEWBIE, ///< Sent dump request, waiting for dump offer. - DUMPEE, ///< Stalled receive queue at dump offer, waiting for dump to complete. - CATCHUP, ///< Dump complete, unstalled but has not yet seen own "ready" event. - READY, ///< Fully operational - OFFER, ///< Sent an offer, waiting for accept/reject. - DUMPER, ///< Offer accepted, sending a state dump. - LEFT ///< Final state, left the cluster. + INIT, ///< Initial state, no CPG messages received. + JOINER, ///< Sent update request, waiting for update offer. + UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. + CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. + READY, ///< Fully operational + OFFER, ///< Sent an offer, waiting for accept/reject. + UPDATER, ///< Offer accepted, sending a state update. + LEFT ///< Final state, left the cluster. } state; ClusterMap map; size_t lastSize; bool lastBroker; uint64_t sequence; - // Dump related - sys::Thread dumpThread; - boost::optional<ClusterMap> dumpedMap; + // Update related + sys::Thread updateThread; + boost::optional<ClusterMap> updatedMap; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index b00699c903..bcfade2b8c 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -61,21 +61,21 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { if (isMember) members[id] = url; else - newbies[id] = url; + joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) { - std::for_each(newbiesFt.begin(), newbiesFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive))); +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) { + std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { framing::ClusterConnectionMembershipBody b; - b.getNewbies().clear(); - std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1)); + b.getJoiners().clear(); + std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1)); for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { - if (!isMember(*i) && !isNewbie(*i)) - b.getNewbies().setString(i->str(), std::string()); + if (!isMember(*i) && !isJoiner(*i)) + b.getJoiners().setString(i->str(), std::string()); } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); @@ -91,7 +91,7 @@ bool ClusterMap::configChange( bool memberChange=false; for (a = left; a != left+nLeft; ++a) { memberChange = memberChange || members.erase(*a); - newbies.erase(*a); + joiners.erase(*a); } alive.clear(); std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); @@ -103,8 +103,8 @@ Url ClusterMap::getUrl(const Map& map, const MemberId& id) { return i == map.end() ? Url() : i->second; } -MemberId ClusterMap::firstNewbie() const { - return newbies.empty() ? MemberId() : newbies.begin()->first; +MemberId ClusterMap::firstJoiner() const { + return joiners.empty() ? MemberId() : joiners.begin()->first; } std::vector<string> ClusterMap::memberIds() const { @@ -139,16 +139,16 @@ std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { o << *i; if (m.isMember(*i)) o << "(member)"; - else if (m.isNewbie(*i)) o << "(newbie)"; + else if (m.isJoiner(*i)) o << "(joiner)"; else o << "(unknown)"; o << " "; } return o; } -bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) { +bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) { if (isAlive(id)) { - newbies[id] = Url(url); + joiners[id] = Url(url); return true; } return false; @@ -170,16 +170,16 @@ bool ClusterMap::configChange(const std::string& addresses) { alive = update; for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) { memberChange = memberChange || members.erase(*i); - newbies.erase(*i); + joiners.erase(*i); } return memberChange; } -boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { - Map::iterator i = newbies.find(to); - if (isAlive(from) && i != newbies.end()) { +boost::optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) { + Map::iterator i = joiners.find(to); + if (isAlive(from) && i != joiners.end()) { Url url= i->second; - newbies.erase(i); // No longer a potential dumpee. + joiners.erase(i); // No longer a potential updatee. return url; } return boost::optional<Url>(); diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 1893d0e796..9756daf977 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -39,7 +39,7 @@ namespace qpid { namespace cluster { /** - * Map of established cluster members and newbies waiting for a brain dump. + * Map of established cluster members and joiners waiting for an update. */ class ClusterMap { public: @@ -60,15 +60,15 @@ class ClusterMap { bool configChange(const std::string& addresses); - bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); } + bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); } - Url getNewbieUrl(const MemberId& id) { return getUrl(newbies, id); } + Url getJoinerUrl(const MemberId& id) { return getUrl(joiners, id); } Url getMemberUrl(const MemberId& id) { return getUrl(members, id); } - /** First newbie in the cluster in ID order, target for offers */ - MemberId firstNewbie() const; + /** First joiner in the cluster in ID order, target for offers */ + MemberId firstJoiner() const; /** Convert map contents to a cluster control body. */ framing::ClusterConnectionMembershipBody asMethodBody() const; @@ -79,9 +79,9 @@ class ClusterMap { std::vector<Url> memberUrls() const; Set getAlive() const; - bool dumpRequest(const MemberId& id, const std::string& url); + bool updateRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ - boost::optional<Url> dumpOffer(const MemberId& from, const MemberId& to); + boost::optional<Url> updateOffer(const MemberId& from, const MemberId& to); /**@return true If this is a new member */ bool ready(const MemberId& id, const Url&); @@ -93,7 +93,7 @@ class ClusterMap { private: Url getUrl(const Map& map, const MemberId& id); - Map newbies, members; + Map joiners, members; Set alive; friend std::ostream& operator<<(std::ostream&, const Map&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 79c34d6873..2a3df8f465 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,7 +21,7 @@ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" -#include "qpid/cluster/DumpClient.h" +#include "qpid/cluster/UpdateClient.h" #include "qpid/broker/Broker.h" #include "qpid/Plugin.h" @@ -87,43 +87,43 @@ struct ClusterOptions : public Options { } }; -struct DumpClientIdAllocator : management::IdAllocator +struct UpdateClientIdAllocator : management::IdAllocator { qpid::sys::AtomicValue<uint64_t> sequence; - DumpClientIdAllocator() : sequence(0x4000000000000000LL) {} + UpdateClientIdAllocator() : sequence(0x4000000000000000LL) {} uint64_t getIdFor(management::Manageable* m) { - if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) { + if (isUpdateQueue(m) || isUpdateExchange(m) || isUpdateSession(m) || isUpdateBinding(m)) { return ++sequence; } else { return 0; } } - bool isDumpQueue(management::Manageable* manageable) + bool isUpdateQueue(management::Manageable* manageable) { qpid::broker::Queue* queue = dynamic_cast<qpid::broker::Queue*>(manageable); - return queue && queue->getName() == DumpClient::DUMP; + return queue && queue->getName() == UpdateClient::UPDATE; } - bool isDumpExchange(management::Manageable* manageable) + bool isUpdateExchange(management::Manageable* manageable) { qpid::broker::Exchange* exchange = dynamic_cast<qpid::broker::Exchange*>(manageable); - return exchange && exchange->getName() == DumpClient::DUMP; + return exchange && exchange->getName() == UpdateClient::UPDATE; } - bool isDumpSession(management::Manageable* manageable) + bool isUpdateSession(management::Manageable* manageable) { broker::SessionState* session = dynamic_cast<broker::SessionState*>(manageable); - return session && session->getId().getName() == DumpClient::DUMP; + return session && session->getId().getName() == UpdateClient::UPDATE; } - bool isDumpBinding(management::Manageable* manageable) + bool isUpdateBinding(management::Manageable* manageable) { broker::Exchange::Binding* binding = dynamic_cast<broker::Exchange::Binding*>(manageable); - return binding && binding->queue->getName() == DumpClient::DUMP; + return binding && binding->queue->getName() == UpdateClient::UPDATE; } }; @@ -155,7 +155,7 @@ struct ClusterPlugin : public Plugin { broker->getExchanges().registerExchange(cluster->getFailoverExchange()); ManagementBroker* mgmt = dynamic_cast<ManagementBroker*>(ManagementAgent::Singleton::getInstance()); if (mgmt) { - std::auto_ptr<IdAllocator> allocator(new DumpClientIdAllocator()); + std::auto_ptr<IdAllocator> allocator(new UpdateClientIdAllocator()); mgmt->setAllocator(allocator); } } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 839a0e67b9..4b3e6da3fb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -19,7 +19,7 @@ * */ #include "Connection.h" -#include "DumpClient.h" +#include "UpdateClient.h" #include "Cluster.h" #include "qpid/broker/SessionState.h" @@ -45,8 +45,8 @@ // TODO aconway 2008-11-03: // -// Disproportionate amount of code here is dedicated to receiving a -// brain-dump when joining a cluster and building initial +// Disproportionate amount of code here is dedicated to receiving an +// update when joining a cluster and building initial // state. Should be separated out into its own classes. // @@ -104,7 +104,7 @@ void Connection::received(framing::AMQFrame& f) { if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or dumped ex catch-up connection. + else { // Shadow or updated ex catch-up connection. if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) { if (isShadow()) { QPID_LOG(debug, cluster << " inserting connection " << *this); @@ -155,7 +155,7 @@ void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { // Delivered from cluster. void Connection::deliveredFrame(const EventFrame& f) { - QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame); + QPID_LOG(trace, cluster << " DLVR: " << *this << ": " << f.frame); assert(!catchUp); currentChannel = f.frame.getChannel(); if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. @@ -174,8 +174,8 @@ void Connection::closed() { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isDumped()) { - QPID_LOG(debug, cluster << " closed dump connection " << *this); + else if (isUpdated()) { + QPID_LOG(debug, cluster << " closed update connection " << *this); connection.closed(); } else if (isLocal()) { @@ -268,7 +268,7 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId()); + QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { @@ -277,10 +277,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::membership(const FieldTable& newbies, const FieldTable& members) { - QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this); - cluster.dumpInDone(ClusterMap(newbies, members)); - self.second = 0; // Mark this as completed dump connection. +void Connection::membership(const FieldTable& joiners, const FieldTable& members) { + QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); + cluster.updateInDone(ClusterMap(joiners, members)); + self.second = 0; // Mark this as completed update connection. } bool Connection::isLocal() const { @@ -291,7 +291,7 @@ bool Connection::isShadow() const { return self.first != cluster.getId(); } -bool Connection::isDumped() const { +bool Connection::isUpdated() const { return self.first == cluster.getId() && self.second == 0; } @@ -302,9 +302,9 @@ shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) { return queue; } -broker::QueuedMessage Connection::getDumpMessage() { - broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get(); - if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); +broker::QueuedMessage Connection::getUpdateMessage() { + broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -323,12 +323,12 @@ void Connection::deliveryRecord(const string& qname, broker::QueuedMessage m; broker::Queue::shared_ptr queue = findQueue(qname); if (!ended) { // Has a message - if (acquired) // Message is on the dump queue - m = getDumpMessage(); + if (acquired) // Message is on the update queue + m = getUpdateMessage(); else // Message at original position in original queue m = queue->find(position); if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no dump message")); + throw Exception(QPID_MSG("deliveryRecord no update message")); } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -349,7 +349,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; else if (c.isShadow()) type = "shadow"; - else if (c.isDumped()) type = "dumped"; + else if (c.isUpdated()) type = "updated"; return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } @@ -361,15 +361,15 @@ void Connection::txAccept(const framing::SequenceSet& acked) { } void Connection::txDequeue(const std::string& queue) { - txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload))); + txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload))); } void Connection::txEnqueue(const std::string& queue) { - txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload))); + txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getDumpMessage().payload)); + boost::shared_ptr<broker::TxPublish> txPub(new broker::TxPublish(getUpdateMessage().payload)); for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get<std::string>())); txPub->delivered = delivered; diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 3b18e22d17..e22ff05c08 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -81,8 +81,8 @@ class Connection : /** True if the connection is in "catch-up" mode: building initial broker state. */ bool isCatchUp() const { return catchUp; } - /** True if the connection is a completed shared dump connection */ - bool isDumped() const; + /** True if the connection is a completed shared update connection */ + bool isUpdated() const; Cluster& getCluster() { return cluster; } @@ -108,7 +108,7 @@ class Connection : // ==== Used in catch-up mode to build initial state. // - // State dump methods. + // State update methods. void sessionState(const framing::SequenceNumber& replayStart, const framing::SequenceNumber& sendCommandPoint, const framing::SequenceSet& sentIncomplete, @@ -156,7 +156,7 @@ class Connection : boost::shared_ptr<broker::Queue> findQueue(const std::string& qname); broker::SessionState& sessionState(); broker::SemanticState& semanticState(); - broker::QueuedMessage getDumpMessage(); + broker::QueuedMessage getUpdateMessage(); static NoOpConnectionOutputHandler discardHandler; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 28d2750ff9..1334f97eec 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -66,7 +66,6 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, ConnectionCodec::~ConnectionCodec() {} size_t ConnectionCodec::decode(const char* buffer, size_t size) { - QPID_LOG(trace, "RECVB [" << localId << "]: " << size << " bytes"); return interceptor->decode(buffer, size); } diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 847088435c..446722745c 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/framing/AMQBody.h" namespace qpid { namespace cluster { @@ -40,12 +41,14 @@ Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_, } void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { + QPID_LOG(trace, "MCAST " << id << ": " << body); mcast(Event::control(body, id)); } void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { Event e(DATA, id, size); memcpy(e.getData(), data, size); + QPID_LOG(trace, "MCAST " << e); mcast(e); } @@ -54,7 +57,6 @@ void Multicaster::mcast(const Event& e) { sys::Mutex::ScopedLock l(lock); if (e.getType() == DATA && e.isConnection() && holding) { holdingQueue.push_back(e); - QPID_LOG(trace, " MCAST held: " << e ); return; } } @@ -85,7 +87,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { } break; } - QPID_LOG(trace, " MCAST " << *i); ++i; } values.erase(values.begin(), i); // Erase sent events. diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp index 00328eb310..c58133f453 100644 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include "DumpClient.h" +#include "UpdateClient.h" #include "Cluster.h" #include "ClusterMap.h" #include "Connection.h" @@ -83,49 +83,49 @@ void send(client::AsyncSession& s, const AMQBody& body) { sb.get()->send(body); } -// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. +// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. -DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url, +UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, const boost::function<void()>& ok, const boost::function<void(const std::exception&)>& fail) - : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), + : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), connection(catchUpConnection()), shadowConnection(catchUpConnection()), done(ok), failed(fail) { connection.open(url); - session = connection.newSession(DUMP); + session = connection.newSession("update_shared"); } -DumpClient::~DumpClient() {} +UpdateClient::~UpdateClient() {} // Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -static const char DUMP_CHARS[] = "\000qpid-dump"; -const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS)); - -void DumpClient::dump() { - QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl); - Broker& b = dumperBroker; - b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); - - // Dump exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true); - b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); - // Dump queue is used to transfer acquired messages that are no longer on their original queue. - session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true); +static const char UPDATE_CHARS[] = "\000qpid-update"; +const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); + +void UpdateClient::update() { + QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl); + Broker& b = updaterBroker; + b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); + + // Update exchange is used to route messages to the proper queue without modifying routing key. + session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); + // Update queue is used to transfer acquired messages that are no longer on their original queue. + session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); session.sync(); session.close(); - std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1)); + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); AMQFrame frame(map.asMethodBody()); client::ConnectionAccess::getImpl(connection)->handle(frame); connection.close(); - QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl); + QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); } -void DumpClient::run() { +void UpdateClient::run() { try { - dump(); + update(); done(); } catch (const std::exception& e) { failed(e); @@ -143,16 +143,16 @@ template <class T> std::string encode(const T& t) { } } // namespace -void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { - QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName()); +void UpdateClient::updateExchange(const boost::shared_ptr<Exchange>& ex) { + QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName()); ClusterConnectionProxy proxy(session); proxy.exchange(encode(*ex)); } -/** Bind a queue to the dump exchange and dump messges to it +/** Bind a queue to the update exchange and update messges to it * setting the message possition as needed. */ -class MessageDumper { +class MessageUpdater { std::string queue; bool haveLastPos; framing::SequenceNumber lastPos; @@ -160,15 +160,15 @@ class MessageDumper { public: - MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { - session.exchangeBind(queue, DumpClient::DUMP); + MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { + session.exchangeBind(queue, UpdateClient::UPDATE); } - ~MessageDumper() { - session.exchangeUnbind(queue, DumpClient::DUMP); + ~MessageUpdater() { + session.exchangeUnbind(queue, UpdateClient::UPDATE); } - void dumpQueuedMessage(const broker::QueuedMessage& message) { + void updateQueuedMessage(const broker::QueuedMessage& message) { if (!haveLastPos || message.position - lastPos != 1) { ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); haveLastPos = true; @@ -176,52 +176,52 @@ class MessageDumper { lastPos = message.position; SessionBase_0_10Access sb(session); framing::MessageTransferBody transfer( - framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); sb.get()->send(transfer, message.payload->getFrames()); } - void dumpMessage(const boost::intrusive_ptr<broker::Message>& message) { - dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); + void updateMessage(const boost::intrusive_ptr<broker::Message>& message) { + updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); } }; -void DumpClient::dumpQueue(const boost::shared_ptr<Queue>& q) { - QPID_LOG(debug, dumperId << " dumping queue " << q->getName()); +void UpdateClient::updateQueue(const boost::shared_ptr<Queue>& q) { + QPID_LOG(debug, updaterId << " updateing queue " << q->getName()); ClusterConnectionProxy proxy(session); proxy.queue(encode(*q)); - MessageDumper dumper(q->getName(), session); - q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1)); - q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); + MessageUpdater updater(q->getName(), session); + q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); + q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1)); } -void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) { +void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) { session.exchangeBind(queue, binding.exchange, binding.key, binding.args); } -void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& dumpConnection) { - QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); +void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& updateConnection) { + QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection); shadowConnection = catchUpConnection(); - broker::Connection& bc = dumpConnection->getBrokerConnection(); + broker::Connection& bc = updateConnection->getBrokerConnection(); // FIXME aconway 2008-10-20: What authentication info to use on reconnect? - shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); - bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); + shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); + bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); ClusterConnectionProxy(shadowConnection).shadowReady( - dumpConnection->getId().getMember(), - reinterpret_cast<uint64_t>(dumpConnection->getId().getPointer())); + updateConnection->getId().getMember(), + reinterpret_cast<uint64_t>(updateConnection->getId().getPointer())); shadowConnection.close(); - QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection); + QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); } -void DumpClient::dumpSession(broker::SessionHandler& sh) { - QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " +void UpdateClient::updateSession(broker::SessionHandler& sh) { + QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " << sh.getSession()->getId()); broker::SessionState* ss = sh.getSession(); if (!ss) return; // no session. - // Create a client session to dump session state. + // Create a client session to update session state. boost::shared_ptr<client::ConnectionImpl> cimpl = client::ConnectionAccess::getImpl(shadowConnection); boost::shared_ptr<client::SessionImpl> simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); client::SessionBase_0_10Access(shadowSession).set(simpl); @@ -229,15 +229,15 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // Re-create session state on remote connection. - // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33. - QPID_LOG(debug, dumperId << " dumping consumers."); - ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); + // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. + QPID_LOG(debug, updaterId << " updateing consumers."); + ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); - QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); + QPID_LOG(debug, updaterId << " updateing unacknowledged messages."); broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); - std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1)); + std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); - dumpTxState(ss->getSemanticState()); // Tx transaction state. + updateTxState(ss->getSemanticState()); // Tx transaction state. // Adjust for command counter for message in progress, will be sent after state update. boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress(); @@ -263,11 +263,11 @@ void DumpClient::dumpSession(broker::SessionHandler& sh) { // FIXME aconway 2008-09-23: update session replay list. - QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); + QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); } -void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId()); +void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { + QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId()); using namespace message; shadowSession.messageSubscribe( arg::queue = ci->getQueue()->getName(), @@ -289,15 +289,15 @@ void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { ci->isNotifyEnabled() ); client::SessionBase_0_10Access(shadowSession).get()->send(state); - QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); + QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); } -void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { // If the message is acquired then it is no longer on the - // dumpees queue, put it on the dump queue for dumpee to pick up. + // updatees queue, put it on the update queue for updatee to pick up. // - MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); + MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage()); } ClusterConnectionProxy(shadowSession).deliveryRecord( dr.getQueue()->getName(), @@ -314,22 +314,22 @@ void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { ); } -class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { +class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { public: - TxOpDumper(DumpClient& dc, client::AsyncSession s) - : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {} + TxOpUpdater(UpdateClient& dc, client::AsyncSession s) + : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {} void operator()(const broker::DtxAck& ) { throw InternalErrorException("DTX transactions not currently supported by cluster."); } void operator()(const broker::RecoveredDequeue& rdeq) { - dumpMessage(rdeq.getMessage()); + updateMessage(rdeq.getMessage()); proxy.txEnqueue(rdeq.getQueue()->getName()); } void operator()(const broker::RecoveredEnqueue& renq) { - dumpMessage(renq.getMessage()); + updateMessage(renq.getMessage()); proxy.txEnqueue(renq.getQueue()->getName()); } @@ -338,7 +338,7 @@ class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { } void operator()(const broker::TxPublish& txPub) { - dumpMessage(txPub.getMessage()); + updateMessage(txPub.getMessage()); typedef std::list<Queue::shared_ptr> QueueList; const QueueList& qlist = txPub.getQueues(); Array qarray(TYPE_CODE_STR8); @@ -348,20 +348,20 @@ class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { } private: - DumpClient& parent; + UpdateClient& parent; client::AsyncSession session; ClusterConnectionProxy proxy; }; -void DumpClient::dumpTxState(broker::SemanticState& s) { - QPID_LOG(debug, dumperId << " dumping TX transaction state."); +void UpdateClient::updateTxState(broker::SemanticState& s) { + QPID_LOG(debug, updaterId << " updateing TX transaction state."); ClusterConnectionProxy proxy(shadowSession); proxy.accumulatedAck(s.getAccumulatedAck()); broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); if (txBuffer) { proxy.txStart(); - TxOpDumper dumper(*this, shadowSession); - txBuffer->accept(dumper); + TxOpUpdater updater(*this, shadowSession); + txBuffer->accept(updater); proxy.txEnd(); } } diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/UpdateClient.h index 23676e7646..93dca9f0c6 100644 --- a/cpp/src/qpid/cluster/DumpClient.h +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -1,5 +1,5 @@ -#ifndef QPID_CLUSTER_DUMPCLIENT_H -#define QPID_CLUSTER_DUMPCLIENT_H +#ifndef QPID_CLUSTER_UPDATECLIENT_H +#define QPID_CLUSTER_UPDATECLIENT_H /* * @@ -56,38 +56,38 @@ class Connection; class ClusterMap; /** - * A client that dumps the contents of a local broker to a remote one using AMQP. + * A client that updates the contents of a local broker to a remote one using AMQP. */ -class DumpClient : public sys::Runnable { +class UpdateClient : public sys::Runnable { public: - static const std::string DUMP; // Name for special dump queue and exchange. + static const std::string UPDATE; // Name for special update queue and exchange. - DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&, + UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, broker::Broker& donor, const ClusterMap& map, const std::vector<boost::intrusive_ptr<Connection> >& , const boost::function<void()>& done, const boost::function<void(const std::exception&)>& fail); - ~DumpClient(); - void dump(); + ~UpdateClient(); + void update(); void run(); // Will delete this when finished. - void dumpUnacked(const broker::DeliveryRecord&); + void updateUnacked(const broker::DeliveryRecord&); private: - void dumpQueue(const boost::shared_ptr<broker::Queue>&); - void dumpExchange(const boost::shared_ptr<broker::Exchange>&); - void dumpMessage(const broker::QueuedMessage&); - void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s); - void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); - void dumpConnection(const boost::intrusive_ptr<Connection>& connection); - void dumpSession(broker::SessionHandler& s); - void dumpTxState(broker::SemanticState& s); - void dumpConsumer(const broker::SemanticState::ConsumerImpl*); - - MemberId dumperId; - MemberId dumpeeId; - Url dumpeeUrl; - broker::Broker& dumperBroker; + void updateQueue(const boost::shared_ptr<broker::Queue>&); + void updateExchange(const boost::shared_ptr<broker::Exchange>&); + void updateMessage(const broker::QueuedMessage&); + void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s); + void updateBinding(const std::string& queue, const broker::QueueBinding& binding); + void updateConnection(const boost::intrusive_ptr<Connection>& connection); + void updateSession(broker::SessionHandler& s); + void updateTxState(broker::SemanticState& s); + void updateConsumer(const broker::SemanticState::ConsumerImpl*); + + MemberId updaterId; + MemberId updateeId; + Url updateeUrl; + broker::Broker& updaterBroker; ClusterMap map; std::vector<boost::intrusive_ptr<Connection> > connections; client::Connection connection, shadowConnection; @@ -98,4 +98,4 @@ class DumpClient : public sys::Runnable { }} // namespace qpid::cluster -#endif /*!QPID_CLUSTER_DUMPCLIENT_H*/ +#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 6ca957f310..b7d28bf914 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -27,7 +27,7 @@ #include "qpid/client/FailoverListener.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" -#include "qpid/cluster/DumpClient.h" +#include "qpid/cluster/UpdateClient.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/reply_exceptions.h" @@ -352,8 +352,8 @@ QPID_AUTO_TEST_CASE(testUnacked) { BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22"); } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) { - // Verify that we dump transaction state correctly to new members. +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { + // Verify that we update transaction state correctly to new members. ClusterFixture cluster(1); Client c0(cluster[0], "c0"); @@ -386,8 +386,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) { BOOST_CHECK_EQUAL(m.getData(), "3"); } -QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { - // Verify that we dump a partially recieved message to a new member. +QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { + // Verify that we update a partially recieved message to a new member. ClusterFixture cluster(1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); @@ -452,7 +452,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { BOOST_CHECK_EQUAL(kb0, kb2); } -QPID_AUTO_TEST_CASE(DumpConsumers) { +QPID_AUTO_TEST_CASE(UpdateConsumers) { ClusterFixture cluster(1, 1); Client c0(cluster[0], "c0"); diff --git a/cpp/xml/cluster.xml b/cpp/xml/cluster.xml index 19d9f7ea56..e6cacb0223 100644 --- a/cpp/xml/cluster.xml +++ b/cpp/xml/cluster.xml @@ -27,12 +27,12 @@ <class name = "cluster" code = "0x80" label="Qpid clustering extensions."> <doc>Qpid extension class to allow clustered brokers to communicate.</doc> - <control name="dump-request" code="0x1" label="URL for a member."> + <control name="update-request" code="0x1" label="URL for a member."> <field name="url" type="str16"/> </control> - <control name = "dump-offer" code="0x2" label="Member offering to be dumper for dumpee."> - <field name="dumpee" type="uint64"/> + <control name = "update-offer" code="0x2" label="Member offering to be updater for updatee."> + <field name="updatee" type="uint64"/> <field name="cluster-id" type="uuid"/> </control> @@ -60,13 +60,13 @@ Min <control name="ready" code="0x10" label="New member is ready."> <field name="bytes" type="uint32"/> </control> - <!-- Brain-dump controls. Sent to a new broker in joining mode. - A connection is dumped as followed: + <!-- Update controls. Sent to a new broker in joining mode. + A connection is updateed as followed: - open as a normal connection. - attach sessions, create consumers, set flow with normal AMQP cokmmands. - send /reset additional session state with controls below. - - send shadow-ready to mark end of shadow dump. - - send dump-complete when entire dump is complete. + - send shadow-ready to mark end of shadow update. + - send update-complete when entire update is complete. --> <!-- Consumer state that cannot be set by standard AMQP controls. --> @@ -103,8 +103,8 @@ Min <control name="ready" code="0x10" label="New member is ready."> <control name="tx-end" code="0x17"/> <control name="accumulated-ack" code="0x18"> <field name="commands" type="sequence-set"/> </control> - <!-- Complete a session state dump. --> - <control name="session-state" code="0x1F" label="Set session state during a brain dump."> + <!-- Complete a session state update. --> + <control name="session-state" code="0x1F" label="Set session state during a brain update."> <!-- Target session deduced from channel number. --> <field name="replay-start" type="sequence-no"/> <!-- Replay frames will start from this point.--> <field name="command-point" type="sequence-no"/> <!-- Id of next command sent --> @@ -116,15 +116,15 @@ Min <control name="ready" code="0x10" label="New member is ready."> <field name="received-incomplete" type="sequence-set"/> <!-- Received and incomplete --> </control> - <!-- Complete a shadow connection dump. --> - <control name="shadow-ready" code="0x20" label="End of shadow connection dump."> + <!-- Complete a shadow connection update. --> + <control name="shadow-ready" code="0x20" label="End of shadow connection update."> <field name="member-id" type="uint64"/> <field name="connection-id" type="uint64"/> </control> - <!-- Complete a cluster state dump. --> + <!-- Complete a cluster state update. --> <control name="membership" code="0x21" label="Cluster membership details."> - <field name="newbies" type="map"/> <!-- member-id -> URL --> + <field name="joiners" type="map"/> <!-- member-id -> URL --> <field name="members" type="map"/> <!-- member-id -> state --> </control> |