From 306114207d6ff6c3ec6d63f5ab6b4ff9e1dd7d4e Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Tue, 27 Jan 2009 02:08:25 +0000 Subject: Cluster rename: dump -> update, newbie -> joiner git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@737971 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/cluster.mk | 4 +- cpp/src/qpid/broker/Exchange.cpp | 10 +- cpp/src/qpid/cluster/Cluster.cpp | 107 +++++---- cpp/src/qpid/cluster/Cluster.h | 44 ++-- cpp/src/qpid/cluster/ClusterMap.cpp | 36 +-- cpp/src/qpid/cluster/ClusterMap.h | 16 +- cpp/src/qpid/cluster/ClusterPlugin.cpp | 26 +-- cpp/src/qpid/cluster/Connection.cpp | 46 ++-- cpp/src/qpid/cluster/Connection.h | 8 +- cpp/src/qpid/cluster/ConnectionCodec.cpp | 1 - cpp/src/qpid/cluster/DumpClient.cpp | 369 ------------------------------- cpp/src/qpid/cluster/DumpClient.h | 101 --------- cpp/src/qpid/cluster/Multicaster.cpp | 5 +- cpp/src/qpid/cluster/UpdateClient.cpp | 369 +++++++++++++++++++++++++++++++ cpp/src/qpid/cluster/UpdateClient.h | 101 +++++++++ cpp/src/tests/cluster_test.cpp | 12 +- 16 files changed, 628 insertions(+), 627 deletions(-) delete mode 100644 cpp/src/qpid/cluster/DumpClient.cpp delete mode 100644 cpp/src/qpid/cluster/DumpClient.h create mode 100644 cpp/src/qpid/cluster/UpdateClient.cpp create mode 100644 cpp/src/qpid/cluster/UpdateClient.h (limited to 'cpp/src') diff --git a/cpp/src/cluster.mk b/cpp/src/cluster.mk index 31eed2aec6..3809c86090 100644 --- a/cpp/src/cluster.mk +++ b/cpp/src/cluster.mk @@ -52,8 +52,8 @@ cluster_la_SOURCES = \ qpid/cluster/Cpg.cpp \ qpid/cluster/Cpg.h \ qpid/cluster/Dispatchable.h \ - qpid/cluster/DumpClient.cpp \ - qpid/cluster/DumpClient.h \ + qpid/cluster/UpdateClient.cpp \ + qpid/cluster/UpdateClient.h \ qpid/cluster/Event.cpp \ qpid/cluster/Event.h \ qpid/cluster/EventFrame.h \ diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 53c49bf0ce..f8b9e4b183 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -98,6 +98,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) : } } +static const std::string QPID_MANAGEMENT("qpid.management"); + Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), @@ -111,9 +113,9 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange = new _qmf::Exchange (agent, this, parent, _name, durable); mgmtExchange->set_arguments(args); if (!durable) { - if (name == "") { + if (name.empty()) { agent->addObject (mgmtExchange, 0x1000000000000004LL); // Special default exchange ID - } else if (name == "qpid.management") { + } else if (name == QPID_MANAGEMENT) { agent->addObject (mgmtExchange, 0x1000000000000005LL); // Special management exchange ID } else { ManagementBroker* mb = dynamic_cast(agent); @@ -125,12 +127,12 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel sequence = _args.get(qpidMsgSequence); if (sequence) { - QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + QPID_LOG(debug, "Configured exchange " << _name << " with Msg sequencing"); args.setInt64(std::string(qpidSequenceCounter), sequenceNo); } ive = _args.get(qpidIVE); - if (ive) QPID_LOG(debug, "Configured exchange "+ _name +" with Initial Value"); + if (ive) QPID_LOG(debug, "Configured exchange " << _name << " with Initial Value"); } Exchange::~Exchange () diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index f8adb8ee98..0d082fc226 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -18,7 +18,7 @@ #include "Cluster.h" #include "Connection.h" -#include "DumpClient.h" +#include "UpdateClient.h" #include "FailoverExchange.h" #include "ClusterQueueHandler.h" @@ -29,10 +29,10 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterDumpRequestBody.h" +#include "qpid/framing/ClusterUpdateRequestBody.h" #include "qpid/framing/ClusterReadyBody.h" #include "qpid/framing/ClusterConfigChangeBody.h" -#include "qpid/framing/ClusterDumpOfferBody.h" +#include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" @@ -77,10 +77,10 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { Cluster::Lock& l; ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} - void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url, l); } + void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& addresses) { cluster.configChange(member, addresses, l); } - void dumpOffer(uint64_t dumpee, const Uuid& id) { cluster.dumpOffer(member, dumpee, id, l); } + void updateOffer(uint64_t updatee, const Uuid& id) { cluster.updateOffer(member, updatee, id, l); } void shutdown() { cluster.shutdown(member, l); } bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } @@ -124,7 +124,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, b } Cluster::~Cluster() { - if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. } void Cluster::insert(const boost::intrusive_ptr& c) { @@ -205,7 +205,6 @@ void Cluster::deliver( void Cluster::deliver(const Event& e, Lock&) { if (state == LEFT) return; - QPID_LOG(trace, *this << " PUSH: " << e); QPID_LATENCY_INIT(e); deliverEventQueue.push(e); } @@ -216,7 +215,7 @@ void Cluster::deliveredEvent(const Event& e) { Buffer buf(const_cast(e.getData()), e.getSize()); boost::intrusive_ptr connection; if (e.isConnection()) { - if (state == NEWBIE) { + if (state == JOINER) { QPID_LOG(trace, *this << " DROP: " << e); return; } @@ -236,11 +235,11 @@ void Cluster::deliveredEvent(const Event& e) { void Cluster::deliveredFrame(const EventFrame& e) { QPID_LATENCY_RECORD("delivered frame queue", e.frame); - QPID_LOG(trace, *this << " DLVR: " << e.frame); if (e.connection) { e.connection->deliveredFrame(e); } else { + QPID_LOG(trace, *this << " DLVR: " << e.frame); Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big? ClusterDispatcher dispatch(*this, e.member, l); if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled()) @@ -313,9 +312,9 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& memberUpdate(l); } else { // Joining established group. - state = NEWBIE; + state = JOINER; QPID_LOG(info, *this << " joining cluster: " << map); - mcast.mcastControl(ClusterDumpRequestBody(ProtocolVersion(), myUrl.str()), myId); + mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), myId); ClusterMap::Set members = map.getAlive(); members.erase(myId); myElders = members; @@ -336,10 +335,10 @@ void Cluster::configChange(const MemberId&, const std::string& addresses, Lock& void Cluster::tryMakeOffer(const MemberId& id, Lock& ) { - if (state == READY && map.isNewbie(id)) { + if (state == READY && map.isJoiner(id)) { state = OFFER; - QPID_LOG(info, *this << " send dump-offer to " << id); - mcast.mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), myId); + QPID_LOG(info, *this << " send update-offer to " << id); + mcast.mcastControl(ClusterUpdateOfferBody(ProtocolVersion(), id, clusterId), myId); } } @@ -359,8 +358,8 @@ void Cluster::brokerShutdown() { delete this; } -void Cluster::dumpRequest(const MemberId& id, const std::string& url, Lock& l) { - map.dumpRequest(id, url); +void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) { + map.updateRequest(id, url); tryMakeOffer(id, l); } @@ -376,81 +375,81 @@ void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { } } -void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, const Uuid& uuid, Lock& l) { +void Cluster::updateOffer(const MemberId& updater, uint64_t updateeInt, const Uuid& uuid, Lock& l) { if (state == LEFT) return; - MemberId dumpee(dumpeeInt); - boost::optional url = map.dumpOffer(dumper, dumpee); - if (dumper == myId) { + MemberId updatee(updateeInt); + boost::optional url = map.updateOffer(updater, updatee); + if (updater == myId) { assert(state == OFFER); if (url) { // My offer was first. - dumpStart(dumpee, *url, l); + updateStart(updatee, *url, l); } else { // Another offer was first. state = READY; mcast.release(); - QPID_LOG(info, *this << " cancelled dump offer to " << dumpee); - tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer. + QPID_LOG(info, *this << " cancelled update offer to " << updatee); + tryMakeOffer(map.firstJoiner(), l); // Maybe make another offer. } } - else if (dumpee == myId && url) { - assert(state == NEWBIE); + else if (updatee == myId && url) { + assert(state == JOINER); setClusterId(uuid); - state = DUMPEE; - QPID_LOG(info, *this << " receiving dump from " << dumper); + state = UPDATEE; + QPID_LOG(info, *this << " receiving update from " << updater); deliverEventQueue.stop(); - checkDumpIn(l); + checkUpdateIn(l); } } -void Cluster::dumpStart(const MemberId& dumpee, const Url& url, Lock&) { +void Cluster::updateStart(const MemberId& updatee, const Url& url, Lock&) { if (state == LEFT) return; assert(state == OFFER); - state = DUMPER; - QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url); + state = UPDATER; + QPID_LOG(info, *this << " stall for update to " << updatee << " at " << url); deliverEventQueue.stop(); - if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread. - dumpThread = Thread( - new DumpClient(myId, dumpee, url, broker, map, connections.values(), - boost::bind(&Cluster::dumpOutDone, this), - boost::bind(&Cluster::dumpOutError, this, _1))); + if (updateThread.id()) updateThread.join(); // Join the previous updatethread. + updateThread = Thread( + new UpdateClient(myId, updatee, url, broker, map, connections.values(), + boost::bind(&Cluster::updateOutDone, this), + boost::bind(&Cluster::updateOutError, this, _1))); } -// Called in dump thread. -void Cluster::dumpInDone(const ClusterMap& m) { +// Called in update thread. +void Cluster::updateInDone(const ClusterMap& m) { Lock l(lock); - dumpedMap = m; - checkDumpIn(l); + updatedMap = m; + checkUpdateIn(l); } -void Cluster::checkDumpIn(Lock& ) { +void Cluster::checkUpdateIn(Lock& ) { if (state == LEFT) return; - if (state == DUMPEE && dumpedMap) { - map = *dumpedMap; + if (state == UPDATEE && updatedMap) { + map = *updatedMap; mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId); state = CATCHUP; - QPID_LOG(info, *this << " received dump, starting catch-up"); + QPID_LOG(info, *this << " received update, starting catch-up"); deliverEventQueue.start(); } } -void Cluster::dumpOutDone() { +void Cluster::updateOutDone() { Monitor::ScopedLock l(lock); - dumpOutDone(l); + updateOutDone(l); } -void Cluster::dumpOutDone(Lock& l) { - assert(state == DUMPER); +void Cluster::updateOutDone(Lock& l) { + assert(state == UPDATER); state = READY; mcast.release(); - QPID_LOG(info, *this << " sent dump"); + QPID_LOG(info, *this << " sent update"); deliverEventQueue.start(); - tryMakeOffer(map.firstNewbie(), l); // Try another offer + tryMakeOffer(map.firstJoiner(), l); // Try another offer } -void Cluster::dumpOutError(const std::exception& e) { +void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending dump: " << e.what()); - dumpOutDone(l); + QPID_LOG(error, *this << " error sending update: " << e.what()); + updateOutDone(l); } void Cluster ::shutdown(const MemberId& id, Lock& l) { @@ -534,7 +533,7 @@ void Cluster::memberUpdate(Lock& l) { } std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { - static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", "READY", "OFFER", "DUMPER", "LEFT" }; + static const char* STATE[] = { "INIT", "JOINER", "UPDATEE", "CATCHUP", "READY", "OFFER", "UPDATER", "LEFT" }; return o << cluster.myId << "(" << STATE[cluster.state] << ")"; } diff --git a/cpp/src/qpid/cluster/Cluster.h b/cpp/src/qpid/cluster/Cluster.h index ef63c4c3fe..711383d4dd 100644 --- a/cpp/src/qpid/cluster/Cluster.h +++ b/cpp/src/qpid/cluster/Cluster.h @@ -59,7 +59,7 @@ class Connection; /** * Connection to the cluster * - * Threading notes: 3 thread categories: connection, deliver, dump. + * Threading notes: 3 thread categories: connection, deliver, update. * */ class Cluster : private Cpg::Handler, public management::Manageable { @@ -87,8 +87,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Leave the cluster - called in any thread. void leave(); - // Dump completed - called in dump thread - void dumpInDone(const ClusterMap&); + // Update completed - called in update thread + void updateInDone(const ClusterMap&); MemberId getId() const; broker::Broker& getBroker() const; @@ -124,8 +124,8 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Cluster controls implement XML methods from cluster.xml. // Called in deliver thread. // - void dumpRequest(const MemberId&, const std::string&, Lock&); - void dumpOffer(const MemberId& dumper, uint64_t dumpee, const framing::Uuid&, Lock&); + void updateRequest(const MemberId&, const std::string&, Lock&); + void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, Lock&); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); @@ -133,7 +133,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void deliveredFrame(const EventFrame&); // Helper, called in deliver thread. - void dumpStart(const MemberId& dumpee, const Url& url, Lock&); + void updateStart(const MemberId& updatee, const Url& url, Lock&); void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, @@ -163,12 +163,12 @@ class Cluster : private Cpg::Handler, public management::Manageable { void memberUpdate(Lock&); // Called in connection IO threads . - void checkDumpIn(Lock&); + void checkUpdateIn(Lock&); - // Called in DumpClient thread. - void dumpOutDone(); - void dumpOutError(const std::exception&); - void dumpOutDone(Lock&); + // Called in UpdateClient thread. + void updateOutDone(); + void updateOutError(const std::exception&); + void updateOutDone(Lock&); void setClusterId(const framing::Uuid&); @@ -201,23 +201,23 @@ class Cluster : private Cpg::Handler, public management::Manageable { // Local cluster state, cluster map enum { - INIT, ///< Initial state, no CPG messages received. - NEWBIE, ///< Sent dump request, waiting for dump offer. - DUMPEE, ///< Stalled receive queue at dump offer, waiting for dump to complete. - CATCHUP, ///< Dump complete, unstalled but has not yet seen own "ready" event. - READY, ///< Fully operational - OFFER, ///< Sent an offer, waiting for accept/reject. - DUMPER, ///< Offer accepted, sending a state dump. - LEFT ///< Final state, left the cluster. + INIT, ///< Initial state, no CPG messages received. + JOINER, ///< Sent update request, waiting for update offer. + UPDATEE, ///< Stalled receive queue at update offer, waiting for update to complete. + CATCHUP, ///< Update complete, unstalled but has not yet seen own "ready" event. + READY, ///< Fully operational + OFFER, ///< Sent an offer, waiting for accept/reject. + UPDATER, ///< Offer accepted, sending a state update. + LEFT ///< Final state, left the cluster. } state; ClusterMap map; size_t lastSize; bool lastBroker; uint64_t sequence; - // Dump related - sys::Thread dumpThread; - boost::optional dumpedMap; + // Update related + sys::Thread updateThread; + boost::optional updatedMap; friend std::ostream& operator<<(std::ostream&, const Cluster&); friend class ClusterDispatcher; diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index b00699c903..bcfade2b8c 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -61,21 +61,21 @@ ClusterMap::ClusterMap(const MemberId& id, const Url& url , bool isMember) { if (isMember) members[id] = url; else - newbies[id] = url; + joiners[id] = url; } -ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& membersFt) { - std::for_each(newbiesFt.begin(), newbiesFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive))); +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt) { + std::for_each(joinersFt.begin(), joinersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(joiners), boost::ref(alive))); std::for_each(membersFt.begin(), membersFt.end(), boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive))); } ClusterConnectionMembershipBody ClusterMap::asMethodBody() const { framing::ClusterConnectionMembershipBody b; - b.getNewbies().clear(); - std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1)); + b.getJoiners().clear(); + std::for_each(joiners.begin(), joiners.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getJoiners()), _1)); for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { - if (!isMember(*i) && !isNewbie(*i)) - b.getNewbies().setString(i->str(), std::string()); + if (!isMember(*i) && !isJoiner(*i)) + b.getJoiners().setString(i->str(), std::string()); } b.getMembers().clear(); std::for_each(members.begin(), members.end(), boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1)); @@ -91,7 +91,7 @@ bool ClusterMap::configChange( bool memberChange=false; for (a = left; a != left+nLeft; ++a) { memberChange = memberChange || members.erase(*a); - newbies.erase(*a); + joiners.erase(*a); } alive.clear(); std::copy(current, current+nCurrent, std::inserter(alive, alive.end())); @@ -103,8 +103,8 @@ Url ClusterMap::getUrl(const Map& map, const MemberId& id) { return i == map.end() ? Url() : i->second; } -MemberId ClusterMap::firstNewbie() const { - return newbies.empty() ? MemberId() : newbies.begin()->first; +MemberId ClusterMap::firstJoiner() const { + return joiners.empty() ? MemberId() : joiners.begin()->first; } std::vector ClusterMap::memberIds() const { @@ -139,16 +139,16 @@ std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { o << *i; if (m.isMember(*i)) o << "(member)"; - else if (m.isNewbie(*i)) o << "(newbie)"; + else if (m.isJoiner(*i)) o << "(joiner)"; else o << "(unknown)"; o << " "; } return o; } -bool ClusterMap::dumpRequest(const MemberId& id, const std::string& url) { +bool ClusterMap::updateRequest(const MemberId& id, const std::string& url) { if (isAlive(id)) { - newbies[id] = Url(url); + joiners[id] = Url(url); return true; } return false; @@ -170,16 +170,16 @@ bool ClusterMap::configChange(const std::string& addresses) { alive = update; for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) { memberChange = memberChange || members.erase(*i); - newbies.erase(*i); + joiners.erase(*i); } return memberChange; } -boost::optional ClusterMap::dumpOffer(const MemberId& from, const MemberId& to) { - Map::iterator i = newbies.find(to); - if (isAlive(from) && i != newbies.end()) { +boost::optional ClusterMap::updateOffer(const MemberId& from, const MemberId& to) { + Map::iterator i = joiners.find(to); + if (isAlive(from) && i != joiners.end()) { Url url= i->second; - newbies.erase(i); // No longer a potential dumpee. + joiners.erase(i); // No longer a potential updatee. return url; } return boost::optional(); diff --git a/cpp/src/qpid/cluster/ClusterMap.h b/cpp/src/qpid/cluster/ClusterMap.h index 1893d0e796..9756daf977 100644 --- a/cpp/src/qpid/cluster/ClusterMap.h +++ b/cpp/src/qpid/cluster/ClusterMap.h @@ -39,7 +39,7 @@ namespace qpid { namespace cluster { /** - * Map of established cluster members and newbies waiting for a brain dump. + * Map of established cluster members and joiners waiting for an update. */ class ClusterMap { public: @@ -60,15 +60,15 @@ class ClusterMap { bool configChange(const std::string& addresses); - bool isNewbie(const MemberId& id) const { return newbies.find(id) != newbies.end(); } + bool isJoiner(const MemberId& id) const { return joiners.find(id) != joiners.end(); } bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } bool isAlive(const MemberId& id) const { return alive.find(id) != alive.end(); } - Url getNewbieUrl(const MemberId& id) { return getUrl(newbies, id); } + Url getJoinerUrl(const MemberId& id) { return getUrl(joiners, id); } Url getMemberUrl(const MemberId& id) { return getUrl(members, id); } - /** First newbie in the cluster in ID order, target for offers */ - MemberId firstNewbie() const; + /** First joiner in the cluster in ID order, target for offers */ + MemberId firstJoiner() const; /** Convert map contents to a cluster control body. */ framing::ClusterConnectionMembershipBody asMethodBody() const; @@ -79,9 +79,9 @@ class ClusterMap { std::vector memberUrls() const; Set getAlive() const; - bool dumpRequest(const MemberId& id, const std::string& url); + bool updateRequest(const MemberId& id, const std::string& url); /** Return non-empty Url if accepted */ - boost::optional dumpOffer(const MemberId& from, const MemberId& to); + boost::optional updateOffer(const MemberId& from, const MemberId& to); /**@return true If this is a new member */ bool ready(const MemberId& id, const Url&); @@ -93,7 +93,7 @@ class ClusterMap { private: Url getUrl(const Map& map, const MemberId& id); - Map newbies, members; + Map joiners, members; Set alive; friend std::ostream& operator<<(std::ostream&, const Map&); diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 79c34d6873..2a3df8f465 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -21,7 +21,7 @@ #include "qpid/cluster/Cluster.h" #include "qpid/cluster/ConnectionCodec.h" -#include "qpid/cluster/DumpClient.h" +#include "qpid/cluster/UpdateClient.h" #include "qpid/broker/Broker.h" #include "qpid/Plugin.h" @@ -87,43 +87,43 @@ struct ClusterOptions : public Options { } }; -struct DumpClientIdAllocator : management::IdAllocator +struct UpdateClientIdAllocator : management::IdAllocator { qpid::sys::AtomicValue sequence; - DumpClientIdAllocator() : sequence(0x4000000000000000LL) {} + UpdateClientIdAllocator() : sequence(0x4000000000000000LL) {} uint64_t getIdFor(management::Manageable* m) { - if (isDumpQueue(m) || isDumpExchange(m) || isDumpSession(m) || isDumpBinding(m)) { + if (isUpdateQueue(m) || isUpdateExchange(m) || isUpdateSession(m) || isUpdateBinding(m)) { return ++sequence; } else { return 0; } } - bool isDumpQueue(management::Manageable* manageable) + bool isUpdateQueue(management::Manageable* manageable) { qpid::broker::Queue* queue = dynamic_cast(manageable); - return queue && queue->getName() == DumpClient::DUMP; + return queue && queue->getName() == UpdateClient::UPDATE; } - bool isDumpExchange(management::Manageable* manageable) + bool isUpdateExchange(management::Manageable* manageable) { qpid::broker::Exchange* exchange = dynamic_cast(manageable); - return exchange && exchange->getName() == DumpClient::DUMP; + return exchange && exchange->getName() == UpdateClient::UPDATE; } - bool isDumpSession(management::Manageable* manageable) + bool isUpdateSession(management::Manageable* manageable) { broker::SessionState* session = dynamic_cast(manageable); - return session && session->getId().getName() == DumpClient::DUMP; + return session && session->getId().getName() == UpdateClient::UPDATE; } - bool isDumpBinding(management::Manageable* manageable) + bool isUpdateBinding(management::Manageable* manageable) { broker::Exchange::Binding* binding = dynamic_cast(manageable); - return binding && binding->queue->getName() == DumpClient::DUMP; + return binding && binding->queue->getName() == UpdateClient::UPDATE; } }; @@ -155,7 +155,7 @@ struct ClusterPlugin : public Plugin { broker->getExchanges().registerExchange(cluster->getFailoverExchange()); ManagementBroker* mgmt = dynamic_cast(ManagementAgent::Singleton::getInstance()); if (mgmt) { - std::auto_ptr allocator(new DumpClientIdAllocator()); + std::auto_ptr allocator(new UpdateClientIdAllocator()); mgmt->setAllocator(allocator); } } diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp index 839a0e67b9..4b3e6da3fb 100644 --- a/cpp/src/qpid/cluster/Connection.cpp +++ b/cpp/src/qpid/cluster/Connection.cpp @@ -19,7 +19,7 @@ * */ #include "Connection.h" -#include "DumpClient.h" +#include "UpdateClient.h" #include "Cluster.h" #include "qpid/broker/SessionState.h" @@ -45,8 +45,8 @@ // TODO aconway 2008-11-03: // -// Disproportionate amount of code here is dedicated to receiving a -// brain-dump when joining a cluster and building initial +// Disproportionate amount of code here is dedicated to receiving an +// update when joining a cluster and building initial // state. Should be separated out into its own classes. // @@ -104,7 +104,7 @@ void Connection::received(framing::AMQFrame& f) { if (!framing::invoke(*this, *f.getBody()).wasHandled()) connection.received(f); } - else { // Shadow or dumped ex catch-up connection. + else { // Shadow or updated ex catch-up connection. if (f.getMethod() && f.getMethod()->isA()) { if (isShadow()) { QPID_LOG(debug, cluster << " inserting connection " << *this); @@ -155,7 +155,7 @@ void Connection::deliveredEvent(const Event& e, PollableFrameQueue& frameq) { // Delivered from cluster. void Connection::deliveredFrame(const EventFrame& f) { - QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame); + QPID_LOG(trace, cluster << " DLVR: " << *this << ": " << f.frame); assert(!catchUp); currentChannel = f.frame.getChannel(); if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol. @@ -174,8 +174,8 @@ void Connection::closed() { QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this); cluster.leave(); } - else if (isDumped()) { - QPID_LOG(debug, cluster << " closed dump connection " << *this); + else if (isUpdated()) { + QPID_LOG(debug, cluster << " closed update connection " << *this); connection.closed(); } else if (isLocal()) { @@ -268,7 +268,7 @@ void Connection::sessionState( received, unknownCompleted, receivedIncomplete); - QPID_LOG(debug, cluster << " received session state dump for " << sessionState().getId()); + QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId()); } void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { @@ -277,10 +277,10 @@ void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) { self = shadow; } -void Connection::membership(const FieldTable& newbies, const FieldTable& members) { - QPID_LOG(debug, cluster << " incoming dump complete on connection " << *this); - cluster.dumpInDone(ClusterMap(newbies, members)); - self.second = 0; // Mark this as completed dump connection. +void Connection::membership(const FieldTable& joiners, const FieldTable& members) { + QPID_LOG(debug, cluster << " incoming update complete on connection " << *this); + cluster.updateInDone(ClusterMap(joiners, members)); + self.second = 0; // Mark this as completed update connection. } bool Connection::isLocal() const { @@ -291,7 +291,7 @@ bool Connection::isShadow() const { return self.first != cluster.getId(); } -bool Connection::isDumped() const { +bool Connection::isUpdated() const { return self.first == cluster.getId() && self.second == 0; } @@ -302,9 +302,9 @@ shared_ptr Connection::findQueue(const std::string& qname) { return queue; } -broker::QueuedMessage Connection::getDumpMessage() { - broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get(); - if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue")); +broker::QueuedMessage Connection::getUpdateMessage() { + broker::QueuedMessage m = findQueue(UpdateClient::UPDATE)->get(); + if (!m.payload) throw Exception(QPID_MSG(cluster << " empty update queue")); return m; } @@ -323,12 +323,12 @@ void Connection::deliveryRecord(const string& qname, broker::QueuedMessage m; broker::Queue::shared_ptr queue = findQueue(qname); if (!ended) { // Has a message - if (acquired) // Message is on the dump queue - m = getDumpMessage(); + if (acquired) // Message is on the update queue + m = getUpdateMessage(); else // Message at original position in original queue m = queue->find(position); if (!m.payload) - throw Exception(QPID_MSG("deliveryRecord no dump message")); + throw Exception(QPID_MSG("deliveryRecord no update message")); } broker::DeliveryRecord dr(m, queue, tag, acquired, accepted, windowing, credit); @@ -349,7 +349,7 @@ std::ostream& operator<<(std::ostream& o, const Connection& c) { const char* type="unknown"; if (c.isLocal()) type = "local"; else if (c.isShadow()) type = "shadow"; - else if (c.isDumped()) type = "dumped"; + else if (c.isUpdated()) type = "updated"; return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") << ")"; } @@ -361,15 +361,15 @@ void Connection::txAccept(const framing::SequenceSet& acked) { } void Connection::txDequeue(const std::string& queue) { - txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload))); + txBuffer->enlist(make_shared_ptr(new broker::RecoveredDequeue(findQueue(queue), getUpdateMessage().payload))); } void Connection::txEnqueue(const std::string& queue) { - txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload))); + txBuffer->enlist(make_shared_ptr(new broker::RecoveredEnqueue(findQueue(queue), getUpdateMessage().payload))); } void Connection::txPublish(const framing::Array& queues, bool delivered) { - boost::shared_ptr txPub(new broker::TxPublish(getDumpMessage().payload)); + boost::shared_ptr txPub(new broker::TxPublish(getUpdateMessage().payload)); for (framing::Array::const_iterator i = queues.begin(); i != queues.end(); ++i) txPub->deliverTo(findQueue((*i)->get())); txPub->delivered = delivered; diff --git a/cpp/src/qpid/cluster/Connection.h b/cpp/src/qpid/cluster/Connection.h index 3b18e22d17..e22ff05c08 100644 --- a/cpp/src/qpid/cluster/Connection.h +++ b/cpp/src/qpid/cluster/Connection.h @@ -81,8 +81,8 @@ class Connection : /** True if the connection is in "catch-up" mode: building initial broker state. */ bool isCatchUp() const { return catchUp; } - /** True if the connection is a completed shared dump connection */ - bool isDumped() const; + /** True if the connection is a completed shared update connection */ + bool isUpdated() const; Cluster& getCluster() { return cluster; } @@ -108,7 +108,7 @@ class Connection : // ==== Used in catch-up mode to build initial state. // - // State dump methods. + // State update methods. void sessionState(const framing::SequenceNumber& replayStart, const framing::SequenceNumber& sendCommandPoint, const framing::SequenceSet& sentIncomplete, @@ -156,7 +156,7 @@ class Connection : boost::shared_ptr findQueue(const std::string& qname); broker::SessionState& sessionState(); broker::SemanticState& semanticState(); - broker::QueuedMessage getDumpMessage(); + broker::QueuedMessage getUpdateMessage(); static NoOpConnectionOutputHandler discardHandler; diff --git a/cpp/src/qpid/cluster/ConnectionCodec.cpp b/cpp/src/qpid/cluster/ConnectionCodec.cpp index 28d2750ff9..1334f97eec 100644 --- a/cpp/src/qpid/cluster/ConnectionCodec.cpp +++ b/cpp/src/qpid/cluster/ConnectionCodec.cpp @@ -66,7 +66,6 @@ ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& id, ConnectionCodec::~ConnectionCodec() {} size_t ConnectionCodec::decode(const char* buffer, size_t size) { - QPID_LOG(trace, "RECVB [" << localId << "]: " << size << " bytes"); return interceptor->decode(buffer, size); } diff --git a/cpp/src/qpid/cluster/DumpClient.cpp b/cpp/src/qpid/cluster/DumpClient.cpp deleted file mode 100644 index 00328eb310..0000000000 --- a/cpp/src/qpid/cluster/DumpClient.cpp +++ /dev/null @@ -1,369 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "DumpClient.h" -#include "Cluster.h" -#include "ClusterMap.h" -#include "Connection.h" -#include "qpid/client/SessionBase_0_10Access.h" -#include "qpid/client/ConnectionAccess.h" -#include "qpid/broker/Broker.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/QueueRegistry.h" -#include "qpid/broker/Message.h" -#include "qpid/broker/Exchange.h" -#include "qpid/broker/ExchangeRegistry.h" -#include "qpid/broker/SessionHandler.h" -#include "qpid/broker/SessionState.h" -#include "qpid/broker/TxOpVisitor.h" -#include "qpid/broker/DtxAck.h" -#include "qpid/broker/TxAccept.h" -#include "qpid/broker/TxPublish.h" -#include "qpid/broker/RecoveredDequeue.h" -#include "qpid/broker/RecoveredEnqueue.h" -#include "qpid/framing/MessageTransferBody.h" -#include "qpid/framing/ClusterConnectionMembershipBody.h" -#include "qpid/framing/ClusterConnectionShadowReadyBody.h" -#include "qpid/framing/ClusterConnectionSessionStateBody.h" -#include "qpid/framing/ClusterConnectionConsumerStateBody.h" -#include "qpid/framing/enum.h" -#include "qpid/framing/ProtocolVersion.h" -#include "qpid/framing/TypeCode.h" -#include "qpid/log/Statement.h" -#include "qpid/Url.h" -#include -#include - -namespace qpid { -namespace cluster { - -using broker::Broker; -using broker::Exchange; -using broker::Queue; -using broker::QueueBinding; -using broker::Message; -using namespace framing; -namespace arg=client::arg; -using client::SessionBase_0_10Access; - -struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { - ClusterConnectionProxy(client::Connection c) : - AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} - ClusterConnectionProxy(client::AsyncSession s) : - AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {} -}; - -// Create a connection with special version that marks it as a catch-up connection. -client::Connection catchUpConnection() { - client::Connection c; - client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10)); - return c; -} - -// Send a control body directly to the session. -void send(client::AsyncSession& s, const AMQBody& body) { - client::SessionBase_0_10Access sb(s); - sb.get()->send(body); -} - -// TODO aconway 2008-09-24: optimization: dump connections/sessions in parallel. - -DumpClient::DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url& url, - broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, - const boost::function& ok, - const boost::function& fail) - : dumperId(dumper), dumpeeId(dumpee), dumpeeUrl(url), dumperBroker(broker), map(m), connections(cons), - connection(catchUpConnection()), shadowConnection(catchUpConnection()), - done(ok), failed(fail) -{ - connection.open(url); - session = connection.newSession(DUMP); -} - -DumpClient::~DumpClient() {} - -// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges. -static const char DUMP_CHARS[] = "\000qpid-dump"; -const std::string DumpClient::DUMP(DUMP_CHARS, sizeof(DUMP_CHARS)); - -void DumpClient::dump() { - QPID_LOG(debug, dumperId << " dumping state to " << dumpeeId << " at " << dumpeeUrl); - Broker& b = dumperBroker; - b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); - - // Dump exchange is used to route messages to the proper queue without modifying routing key. - session.exchangeDeclare(arg::exchange=DUMP, arg::type="fanout", arg::autoDelete=true); - b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1)); - // Dump queue is used to transfer acquired messages that are no longer on their original queue. - session.queueDeclare(arg::queue=DUMP, arg::autoDelete=true); - session.sync(); - session.close(); - - std::for_each(connections.begin(), connections.end(), boost::bind(&DumpClient::dumpConnection, this, _1)); - AMQFrame frame(map.asMethodBody()); - client::ConnectionAccess::getImpl(connection)->handle(frame); - connection.close(); - QPID_LOG(debug, dumperId << " dumped state to " << dumpeeId << " at " << dumpeeUrl); -} - -void DumpClient::run() { - try { - dump(); - done(); - } catch (const std::exception& e) { - failed(e); - } - delete this; -} - -namespace { -template std::string encode(const T& t) { - std::string encoded; - encoded.resize(t.encodedSize()); - framing::Buffer buf(const_cast(encoded.data()), encoded.size()); - t.encode(buf); - return encoded; -} -} // namespace - -void DumpClient::dumpExchange(const boost::shared_ptr& ex) { - QPID_LOG(debug, dumperId << " dumping exchange " << ex->getName()); - ClusterConnectionProxy proxy(session); - proxy.exchange(encode(*ex)); -} - -/** Bind a queue to the dump exchange and dump messges to it - * setting the message possition as needed. - */ -class MessageDumper { - std::string queue; - bool haveLastPos; - framing::SequenceNumber lastPos; - client::AsyncSession session; - - public: - - MessageDumper(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { - session.exchangeBind(queue, DumpClient::DUMP); - } - - ~MessageDumper() { - session.exchangeUnbind(queue, DumpClient::DUMP); - } - - void dumpQueuedMessage(const broker::QueuedMessage& message) { - if (!haveLastPos || message.position - lastPos != 1) { - ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); - haveLastPos = true; - } - lastPos = message.position; - SessionBase_0_10Access sb(session); - framing::MessageTransferBody transfer( - framing::ProtocolVersion(), DumpClient::DUMP, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); - sb.get()->send(transfer, message.payload->getFrames()); - } - - void dumpMessage(const boost::intrusive_ptr& message) { - dumpQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); - } -}; - - -void DumpClient::dumpQueue(const boost::shared_ptr& q) { - QPID_LOG(debug, dumperId << " dumping queue " << q->getName()); - ClusterConnectionProxy proxy(session); - proxy.queue(encode(*q)); - MessageDumper dumper(q->getName(), session); - q->eachMessage(boost::bind(&MessageDumper::dumpQueuedMessage, &dumper, _1)); - q->eachBinding(boost::bind(&DumpClient::dumpBinding, this, q->getName(), _1)); -} - - -void DumpClient::dumpBinding(const std::string& queue, const QueueBinding& binding) { - session.exchangeBind(queue, binding.exchange, binding.key, binding.args); -} - -void DumpClient::dumpConnection(const boost::intrusive_ptr& dumpConnection) { - QPID_LOG(debug, dumperId << " dumping connection " << *dumpConnection); - shadowConnection = catchUpConnection(); - - broker::Connection& bc = dumpConnection->getBrokerConnection(); - // FIXME aconway 2008-10-20: What authentication info to use on reconnect? - shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); - bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1)); - ClusterConnectionProxy(shadowConnection).shadowReady( - dumpConnection->getId().getMember(), - reinterpret_cast(dumpConnection->getId().getPointer())); - shadowConnection.close(); - QPID_LOG(debug, dumperId << " dumped connection " << *dumpConnection); -} - -void DumpClient::dumpSession(broker::SessionHandler& sh) { - QPID_LOG(debug, dumperId << " dumping session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " - << sh.getSession()->getId()); - broker::SessionState* ss = sh.getSession(); - if (!ss) return; // no session. - - // Create a client session to dump session state. - boost::shared_ptr cimpl = client::ConnectionAccess::getImpl(shadowConnection); - boost::shared_ptr simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); - client::SessionBase_0_10Access(shadowSession).set(simpl); - AMQP_AllProxy::ClusterConnection proxy(simpl->out); - - // Re-create session state on remote connection. - - // Dump consumers. For reasons unknown, boost::bind does not work here with boost 1.33. - QPID_LOG(debug, dumperId << " dumping consumers."); - ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this)); - - QPID_LOG(debug, dumperId << " dumping unacknowledged messages."); - broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); - std::for_each(drs.begin(), drs.end(), boost::bind(&DumpClient::dumpUnacked, this, _1)); - - dumpTxState(ss->getSemanticState()); // Tx transaction state. - - // Adjust for command counter for message in progress, will be sent after state update. - boost::intrusive_ptr inProgress = ss->getMessageInProgress(); - SequenceNumber received = ss->receiverGetReceived().command; - if (inProgress) - --received; - - // Reset command-sequence state. - proxy.sessionState( - ss->senderGetReplayPoint().command, - ss->senderGetCommandPoint().command, - ss->senderGetIncomplete(), - std::max(received, ss->receiverGetExpected().command), - received, - ss->receiverGetUnknownComplete(), - ss->receiverGetIncomplete() - ); - - // Send frames for partial message in progress. - if (inProgress) { - inProgress->getFrames().map(simpl->out); - } - - // FIXME aconway 2008-09-23: update session replay list. - - QPID_LOG(debug, dumperId << " dumped session " << sh.getSession()->getId()); -} - -void DumpClient::dumpConsumer(const broker::SemanticState::ConsumerImpl* ci) { - QPID_LOG(debug, dumperId << " dumping consumer " << ci->getName() << " on " << shadowSession.getId()); - using namespace message; - shadowSession.messageSubscribe( - arg::queue = ci->getQueue()->getName(), - arg::destination = ci->getName(), - arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, - arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, - arg::exclusive = ci->isExclusive(), - arg::resumeId = ci->getResumeId(), - arg::resumeTtl = ci->getResumeTtl(), - arg::arguments = ci->getArguments() - ); - shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); - shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); - ClusterConnectionConsumerStateBody state( - ProtocolVersion(), - ci->getName(), - ci->isBlocked(), - ci->isNotifyEnabled() - ); - client::SessionBase_0_10Access(shadowSession).get()->send(state); - QPID_LOG(debug, dumperId << " dumped consumer " << ci->getName() << " on " << shadowSession.getId()); -} - -void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) { - if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { - // If the message is acquired then it is no longer on the - // dumpees queue, put it on the dump queue for dumpee to pick up. - // - MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage()); - } - ClusterConnectionProxy(shadowSession).deliveryRecord( - dr.getQueue()->getName(), - dr.getMessage().position, - dr.getTag(), - dr.getId(), - dr.isAcquired(), - dr.isAccepted(), - dr.isCancelled(), - dr.isComplete(), - dr.isEnded(), - dr.isWindowing(), - dr.getCredit() - ); -} - -class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper { - public: - TxOpDumper(DumpClient& dc, client::AsyncSession s) - : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s) {} - - void operator()(const broker::DtxAck& ) { - throw InternalErrorException("DTX transactions not currently supported by cluster."); - } - - void operator()(const broker::RecoveredDequeue& rdeq) { - dumpMessage(rdeq.getMessage()); - proxy.txEnqueue(rdeq.getQueue()->getName()); - } - - void operator()(const broker::RecoveredEnqueue& renq) { - dumpMessage(renq.getMessage()); - proxy.txEnqueue(renq.getQueue()->getName()); - } - - void operator()(const broker::TxAccept& txAccept) { - proxy.txAccept(txAccept.getAcked()); - } - - void operator()(const broker::TxPublish& txPub) { - dumpMessage(txPub.getMessage()); - typedef std::list QueueList; - const QueueList& qlist = txPub.getQueues(); - Array qarray(TYPE_CODE_STR8); - for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) - qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); - proxy.txPublish(qarray, txPub.delivered); - } - - private: - DumpClient& parent; - client::AsyncSession session; - ClusterConnectionProxy proxy; -}; - -void DumpClient::dumpTxState(broker::SemanticState& s) { - QPID_LOG(debug, dumperId << " dumping TX transaction state."); - ClusterConnectionProxy proxy(shadowSession); - proxy.accumulatedAck(s.getAccumulatedAck()); - broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); - if (txBuffer) { - proxy.txStart(); - TxOpDumper dumper(*this, shadowSession); - txBuffer->accept(dumper); - proxy.txEnd(); - } -} - -}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/DumpClient.h b/cpp/src/qpid/cluster/DumpClient.h deleted file mode 100644 index 23676e7646..0000000000 --- a/cpp/src/qpid/cluster/DumpClient.h +++ /dev/null @@ -1,101 +0,0 @@ -#ifndef QPID_CLUSTER_DUMPCLIENT_H -#define QPID_CLUSTER_DUMPCLIENT_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ClusterMap.h" -#include "qpid/client/Connection.h" -#include "qpid/client/AsyncSession.h" -#include "qpid/broker/SemanticState.h" -#include "qpid/sys/Runnable.h" -#include - - -namespace qpid { - -class Url; - -namespace broker { - -class Broker; -class Queue; -class Exchange; -class QueueBindings; -class QueueBinding; -class QueuedMessage; -class SessionHandler; -class DeliveryRecord; -class SessionState; -class SemanticState; - -} // namespace broker - -namespace cluster { - -class Cluster; -class Connection; -class ClusterMap; - -/** - * A client that dumps the contents of a local broker to a remote one using AMQP. - */ -class DumpClient : public sys::Runnable { - public: - static const std::string DUMP; // Name for special dump queue and exchange. - - DumpClient(const MemberId& dumper, const MemberId& dumpee, const Url&, - broker::Broker& donor, const ClusterMap& map, const std::vector >& , - const boost::function& done, - const boost::function& fail); - - ~DumpClient(); - void dump(); - void run(); // Will delete this when finished. - - void dumpUnacked(const broker::DeliveryRecord&); - - private: - void dumpQueue(const boost::shared_ptr&); - void dumpExchange(const boost::shared_ptr&); - void dumpMessage(const broker::QueuedMessage&); - void dumpMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s); - void dumpBinding(const std::string& queue, const broker::QueueBinding& binding); - void dumpConnection(const boost::intrusive_ptr& connection); - void dumpSession(broker::SessionHandler& s); - void dumpTxState(broker::SemanticState& s); - void dumpConsumer(const broker::SemanticState::ConsumerImpl*); - - MemberId dumperId; - MemberId dumpeeId; - Url dumpeeUrl; - broker::Broker& dumperBroker; - ClusterMap map; - std::vector > connections; - client::Connection connection, shadowConnection; - client::AsyncSession session, shadowSession; - boost::function done; - boost::function failed; -}; - -}} // namespace qpid::cluster - -#endif /*!QPID_CLUSTER_DUMPCLIENT_H*/ diff --git a/cpp/src/qpid/cluster/Multicaster.cpp b/cpp/src/qpid/cluster/Multicaster.cpp index 847088435c..446722745c 100644 --- a/cpp/src/qpid/cluster/Multicaster.cpp +++ b/cpp/src/qpid/cluster/Multicaster.cpp @@ -23,6 +23,7 @@ #include "Cpg.h" #include "qpid/log/Statement.h" #include "qpid/sys/LatencyMetric.h" +#include "qpid/framing/AMQBody.h" namespace qpid { namespace cluster { @@ -40,12 +41,14 @@ Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_, } void Multicaster::mcastControl(const framing::AMQBody& body, const ConnectionId& id) { + QPID_LOG(trace, "MCAST " << id << ": " << body); mcast(Event::control(body, id)); } void Multicaster::mcastBuffer(const char* data, size_t size, const ConnectionId& id) { Event e(DATA, id, size); memcpy(e.getData(), data, size); + QPID_LOG(trace, "MCAST " << e); mcast(e); } @@ -54,7 +57,6 @@ void Multicaster::mcast(const Event& e) { sys::Mutex::ScopedLock l(lock); if (e.getType() == DATA && e.isConnection() && holding) { holdingQueue.push_back(e); - QPID_LOG(trace, " MCAST held: " << e ); return; } } @@ -85,7 +87,6 @@ void Multicaster::sendMcast(PollableEventQueue::Queue& values) { } break; } - QPID_LOG(trace, " MCAST " << *i); ++i; } values.erase(values.begin(), i); // Erase sent events. diff --git a/cpp/src/qpid/cluster/UpdateClient.cpp b/cpp/src/qpid/cluster/UpdateClient.cpp new file mode 100644 index 0000000000..c58133f453 --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateClient.cpp @@ -0,0 +1,369 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "UpdateClient.h" +#include "Cluster.h" +#include "ClusterMap.h" +#include "Connection.h" +#include "qpid/client/SessionBase_0_10Access.h" +#include "qpid/client/ConnectionAccess.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/ExchangeRegistry.h" +#include "qpid/broker/SessionHandler.h" +#include "qpid/broker/SessionState.h" +#include "qpid/broker/TxOpVisitor.h" +#include "qpid/broker/DtxAck.h" +#include "qpid/broker/TxAccept.h" +#include "qpid/broker/TxPublish.h" +#include "qpid/broker/RecoveredDequeue.h" +#include "qpid/broker/RecoveredEnqueue.h" +#include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/ClusterConnectionMembershipBody.h" +#include "qpid/framing/ClusterConnectionShadowReadyBody.h" +#include "qpid/framing/ClusterConnectionSessionStateBody.h" +#include "qpid/framing/ClusterConnectionConsumerStateBody.h" +#include "qpid/framing/enum.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/framing/TypeCode.h" +#include "qpid/log/Statement.h" +#include "qpid/Url.h" +#include +#include + +namespace qpid { +namespace cluster { + +using broker::Broker; +using broker::Exchange; +using broker::Queue; +using broker::QueueBinding; +using broker::Message; +using namespace framing; +namespace arg=client::arg; +using client::SessionBase_0_10Access; + +struct ClusterConnectionProxy : public AMQP_AllProxy::ClusterConnection { + ClusterConnectionProxy(client::Connection c) : + AMQP_AllProxy::ClusterConnection(*client::ConnectionAccess::getImpl(c)) {} + ClusterConnectionProxy(client::AsyncSession s) : + AMQP_AllProxy::ClusterConnection(SessionBase_0_10Access(s).get()->out) {} +}; + +// Create a connection with special version that marks it as a catch-up connection. +client::Connection catchUpConnection() { + client::Connection c; + client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10)); + return c; +} + +// Send a control body directly to the session. +void send(client::AsyncSession& s, const AMQBody& body) { + client::SessionBase_0_10Access sb(s); + sb.get()->send(body); +} + +// TODO aconway 2008-09-24: optimization: update connections/sessions in parallel. + +UpdateClient::UpdateClient(const MemberId& updater, const MemberId& updatee, const Url& url, + broker::Broker& broker, const ClusterMap& m, const Cluster::Connections& cons, + const boost::function& ok, + const boost::function& fail) + : updaterId(updater), updateeId(updatee), updateeUrl(url), updaterBroker(broker), map(m), connections(cons), + connection(catchUpConnection()), shadowConnection(catchUpConnection()), + done(ok), failed(fail) +{ + connection.open(url); + session = connection.newSession("update_shared"); +} + +UpdateClient::~UpdateClient() {} + +// Illegal exchange/queue name for catch-up, avoid clashes with user queues/exchanges. +static const char UPDATE_CHARS[] = "\000qpid-update"; +const std::string UpdateClient::UPDATE(UPDATE_CHARS, sizeof(UPDATE_CHARS)); + +void UpdateClient::update() { + QPID_LOG(debug, updaterId << " updateing state to " << updateeId << " at " << updateeUrl); + Broker& b = updaterBroker; + b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, this, _1)); + + // Update exchange is used to route messages to the proper queue without modifying routing key. + session.exchangeDeclare(arg::exchange=UPDATE, arg::type="fanout", arg::autoDelete=true); + b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueue, this, _1)); + // Update queue is used to transfer acquired messages that are no longer on their original queue. + session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true); + session.sync(); + session.close(); + + std::for_each(connections.begin(), connections.end(), boost::bind(&UpdateClient::updateConnection, this, _1)); + AMQFrame frame(map.asMethodBody()); + client::ConnectionAccess::getImpl(connection)->handle(frame); + connection.close(); + QPID_LOG(debug, updaterId << " updated state to " << updateeId << " at " << updateeUrl); +} + +void UpdateClient::run() { + try { + update(); + done(); + } catch (const std::exception& e) { + failed(e); + } + delete this; +} + +namespace { +template std::string encode(const T& t) { + std::string encoded; + encoded.resize(t.encodedSize()); + framing::Buffer buf(const_cast(encoded.data()), encoded.size()); + t.encode(buf); + return encoded; +} +} // namespace + +void UpdateClient::updateExchange(const boost::shared_ptr& ex) { + QPID_LOG(debug, updaterId << " updateing exchange " << ex->getName()); + ClusterConnectionProxy proxy(session); + proxy.exchange(encode(*ex)); +} + +/** Bind a queue to the update exchange and update messges to it + * setting the message possition as needed. + */ +class MessageUpdater { + std::string queue; + bool haveLastPos; + framing::SequenceNumber lastPos; + client::AsyncSession session; + + public: + + MessageUpdater(const string& q, const client::AsyncSession s) : queue(q), haveLastPos(false), session(s) { + session.exchangeBind(queue, UpdateClient::UPDATE); + } + + ~MessageUpdater() { + session.exchangeUnbind(queue, UpdateClient::UPDATE); + } + + void updateQueuedMessage(const broker::QueuedMessage& message) { + if (!haveLastPos || message.position - lastPos != 1) { + ClusterConnectionProxy(session).queuePosition(queue, message.position.getValue()-1); + haveLastPos = true; + } + lastPos = message.position; + SessionBase_0_10Access sb(session); + framing::MessageTransferBody transfer( + framing::ProtocolVersion(), UpdateClient::UPDATE, message::ACCEPT_MODE_NONE, message::ACQUIRE_MODE_PRE_ACQUIRED); + sb.get()->send(transfer, message.payload->getFrames()); + } + + void updateMessage(const boost::intrusive_ptr& message) { + updateQueuedMessage(broker::QueuedMessage(0, message, haveLastPos? lastPos.getValue()+1 : 1)); + } +}; + + +void UpdateClient::updateQueue(const boost::shared_ptr& q) { + QPID_LOG(debug, updaterId << " updateing queue " << q->getName()); + ClusterConnectionProxy proxy(session); + proxy.queue(encode(*q)); + MessageUpdater updater(q->getName(), session); + q->eachMessage(boost::bind(&MessageUpdater::updateQueuedMessage, &updater, _1)); + q->eachBinding(boost::bind(&UpdateClient::updateBinding, this, q->getName(), _1)); +} + + +void UpdateClient::updateBinding(const std::string& queue, const QueueBinding& binding) { + session.exchangeBind(queue, binding.exchange, binding.key, binding.args); +} + +void UpdateClient::updateConnection(const boost::intrusive_ptr& updateConnection) { + QPID_LOG(debug, updaterId << " updateing connection " << *updateConnection); + shadowConnection = catchUpConnection(); + + broker::Connection& bc = updateConnection->getBrokerConnection(); + // FIXME aconway 2008-10-20: What authentication info to use on reconnect? + shadowConnection.open(updateeUrl, bc.getUserId(), ""/*password*/, "/"/*vhost*/, bc.getFrameMax()); + bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1)); + ClusterConnectionProxy(shadowConnection).shadowReady( + updateConnection->getId().getMember(), + reinterpret_cast(updateConnection->getId().getPointer())); + shadowConnection.close(); + QPID_LOG(debug, updaterId << " updated connection " << *updateConnection); +} + +void UpdateClient::updateSession(broker::SessionHandler& sh) { + QPID_LOG(debug, updaterId << " updateing session " << &sh.getConnection() << "[" << sh.getChannel() << "] = " + << sh.getSession()->getId()); + broker::SessionState* ss = sh.getSession(); + if (!ss) return; // no session. + + // Create a client session to update session state. + boost::shared_ptr cimpl = client::ConnectionAccess::getImpl(shadowConnection); + boost::shared_ptr simpl = cimpl->newSession(ss->getId().getName(), ss->getTimeout(), sh.getChannel()); + client::SessionBase_0_10Access(shadowSession).set(simpl); + AMQP_AllProxy::ClusterConnection proxy(simpl->out); + + // Re-create session state on remote connection. + + // Update consumers. For reasons unknown, boost::bind does not work here with boost 1.33. + QPID_LOG(debug, updaterId << " updateing consumers."); + ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this)); + + QPID_LOG(debug, updaterId << " updateing unacknowledged messages."); + broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked(); + std::for_each(drs.begin(), drs.end(), boost::bind(&UpdateClient::updateUnacked, this, _1)); + + updateTxState(ss->getSemanticState()); // Tx transaction state. + + // Adjust for command counter for message in progress, will be sent after state update. + boost::intrusive_ptr inProgress = ss->getMessageInProgress(); + SequenceNumber received = ss->receiverGetReceived().command; + if (inProgress) + --received; + + // Reset command-sequence state. + proxy.sessionState( + ss->senderGetReplayPoint().command, + ss->senderGetCommandPoint().command, + ss->senderGetIncomplete(), + std::max(received, ss->receiverGetExpected().command), + received, + ss->receiverGetUnknownComplete(), + ss->receiverGetIncomplete() + ); + + // Send frames for partial message in progress. + if (inProgress) { + inProgress->getFrames().map(simpl->out); + } + + // FIXME aconway 2008-09-23: update session replay list. + + QPID_LOG(debug, updaterId << " updated session " << sh.getSession()->getId()); +} + +void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* ci) { + QPID_LOG(debug, updaterId << " updateing consumer " << ci->getName() << " on " << shadowSession.getId()); + using namespace message; + shadowSession.messageSubscribe( + arg::queue = ci->getQueue()->getName(), + arg::destination = ci->getName(), + arg::acceptMode = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : ACCEPT_MODE_NONE, + arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : ACQUIRE_MODE_NOT_ACQUIRED, + arg::exclusive = ci->isExclusive(), + arg::resumeId = ci->getResumeId(), + arg::resumeTtl = ci->getResumeTtl(), + arg::arguments = ci->getArguments() + ); + shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? FLOW_MODE_WINDOW : FLOW_MODE_CREDIT); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, ci->getMsgCredit()); + shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, ci->getByteCredit()); + ClusterConnectionConsumerStateBody state( + ProtocolVersion(), + ci->getName(), + ci->isBlocked(), + ci->isNotifyEnabled() + ); + client::SessionBase_0_10Access(shadowSession).get()->send(state); + QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on " << shadowSession.getId()); +} + +void UpdateClient::updateUnacked(const broker::DeliveryRecord& dr) { + if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) { + // If the message is acquired then it is no longer on the + // updatees queue, put it on the update queue for updatee to pick up. + // + MessageUpdater(UPDATE, shadowSession).updateQueuedMessage(dr.getMessage()); + } + ClusterConnectionProxy(shadowSession).deliveryRecord( + dr.getQueue()->getName(), + dr.getMessage().position, + dr.getTag(), + dr.getId(), + dr.isAcquired(), + dr.isAccepted(), + dr.isCancelled(), + dr.isComplete(), + dr.isEnded(), + dr.isWindowing(), + dr.getCredit() + ); +} + +class TxOpUpdater : public broker::TxOpConstVisitor, public MessageUpdater { + public: + TxOpUpdater(UpdateClient& dc, client::AsyncSession s) + : MessageUpdater(UpdateClient::UPDATE, s), parent(dc), session(s), proxy(s) {} + + void operator()(const broker::DtxAck& ) { + throw InternalErrorException("DTX transactions not currently supported by cluster."); + } + + void operator()(const broker::RecoveredDequeue& rdeq) { + updateMessage(rdeq.getMessage()); + proxy.txEnqueue(rdeq.getQueue()->getName()); + } + + void operator()(const broker::RecoveredEnqueue& renq) { + updateMessage(renq.getMessage()); + proxy.txEnqueue(renq.getQueue()->getName()); + } + + void operator()(const broker::TxAccept& txAccept) { + proxy.txAccept(txAccept.getAcked()); + } + + void operator()(const broker::TxPublish& txPub) { + updateMessage(txPub.getMessage()); + typedef std::list QueueList; + const QueueList& qlist = txPub.getQueues(); + Array qarray(TYPE_CODE_STR8); + for (QueueList::const_iterator i = qlist.begin(); i != qlist.end(); ++i) + qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName()))); + proxy.txPublish(qarray, txPub.delivered); + } + + private: + UpdateClient& parent; + client::AsyncSession session; + ClusterConnectionProxy proxy; +}; + +void UpdateClient::updateTxState(broker::SemanticState& s) { + QPID_LOG(debug, updaterId << " updateing TX transaction state."); + ClusterConnectionProxy proxy(shadowSession); + proxy.accumulatedAck(s.getAccumulatedAck()); + broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer(); + if (txBuffer) { + proxy.txStart(); + TxOpUpdater updater(*this, shadowSession); + txBuffer->accept(updater); + proxy.txEnd(); + } +} + +}} // namespace qpid::cluster diff --git a/cpp/src/qpid/cluster/UpdateClient.h b/cpp/src/qpid/cluster/UpdateClient.h new file mode 100644 index 0000000000..93dca9f0c6 --- /dev/null +++ b/cpp/src/qpid/cluster/UpdateClient.h @@ -0,0 +1,101 @@ +#ifndef QPID_CLUSTER_UPDATECLIENT_H +#define QPID_CLUSTER_UPDATECLIENT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "ClusterMap.h" +#include "qpid/client/Connection.h" +#include "qpid/client/AsyncSession.h" +#include "qpid/broker/SemanticState.h" +#include "qpid/sys/Runnable.h" +#include + + +namespace qpid { + +class Url; + +namespace broker { + +class Broker; +class Queue; +class Exchange; +class QueueBindings; +class QueueBinding; +class QueuedMessage; +class SessionHandler; +class DeliveryRecord; +class SessionState; +class SemanticState; + +} // namespace broker + +namespace cluster { + +class Cluster; +class Connection; +class ClusterMap; + +/** + * A client that updates the contents of a local broker to a remote one using AMQP. + */ +class UpdateClient : public sys::Runnable { + public: + static const std::string UPDATE; // Name for special update queue and exchange. + + UpdateClient(const MemberId& updater, const MemberId& updatee, const Url&, + broker::Broker& donor, const ClusterMap& map, const std::vector >& , + const boost::function& done, + const boost::function& fail); + + ~UpdateClient(); + void update(); + void run(); // Will delete this when finished. + + void updateUnacked(const broker::DeliveryRecord&); + + private: + void updateQueue(const boost::shared_ptr&); + void updateExchange(const boost::shared_ptr&); + void updateMessage(const broker::QueuedMessage&); + void updateMessageTo(const broker::QueuedMessage&, const std::string& queue, client::Session s); + void updateBinding(const std::string& queue, const broker::QueueBinding& binding); + void updateConnection(const boost::intrusive_ptr& connection); + void updateSession(broker::SessionHandler& s); + void updateTxState(broker::SemanticState& s); + void updateConsumer(const broker::SemanticState::ConsumerImpl*); + + MemberId updaterId; + MemberId updateeId; + Url updateeUrl; + broker::Broker& updaterBroker; + ClusterMap map; + std::vector > connections; + client::Connection connection, shadowConnection; + client::AsyncSession session, shadowSession; + boost::function done; + boost::function failed; +}; + +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_UPDATECLIENT_H*/ diff --git a/cpp/src/tests/cluster_test.cpp b/cpp/src/tests/cluster_test.cpp index 6ca957f310..b7d28bf914 100644 --- a/cpp/src/tests/cluster_test.cpp +++ b/cpp/src/tests/cluster_test.cpp @@ -27,7 +27,7 @@ #include "qpid/client/FailoverListener.h" #include "qpid/cluster/Cluster.h" #include "qpid/cluster/Cpg.h" -#include "qpid/cluster/DumpClient.h" +#include "qpid/cluster/UpdateClient.h" #include "qpid/framing/AMQBody.h" #include "qpid/framing/Uuid.h" #include "qpid/framing/reply_exceptions.h" @@ -352,8 +352,8 @@ QPID_AUTO_TEST_CASE(testUnacked) { BOOST_CHECK_EQUAL(c1.subs.get("q2", TIME_SEC).getData(), "22"); } -QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) { - // Verify that we dump transaction state correctly to new members. +QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testUpdateTxState, 1) { + // Verify that we update transaction state correctly to new members. ClusterFixture cluster(1); Client c0(cluster[0], "c0"); @@ -386,8 +386,8 @@ QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testDumpTxState, 1) { BOOST_CHECK_EQUAL(m.getData(), "3"); } -QPID_AUTO_TEST_CASE(testDumpMessageBuilder) { - // Verify that we dump a partially recieved message to a new member. +QPID_AUTO_TEST_CASE(testUpdateMessageBuilder) { + // Verify that we update a partially recieved message to a new member. ClusterFixture cluster(1); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); @@ -452,7 +452,7 @@ QPID_AUTO_TEST_CASE(testConnectionKnownHosts) { BOOST_CHECK_EQUAL(kb0, kb2); } -QPID_AUTO_TEST_CASE(DumpConsumers) { +QPID_AUTO_TEST_CASE(UpdateConsumers) { ClusterFixture cluster(1, 1); Client c0(cluster[0], "c0"); -- cgit v1.2.1