diff options
author | Alan Conway <aconway@apache.org> | 2008-09-15 19:39:22 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-15 19:39:22 +0000 |
commit | fa79886b733eb0d17782e435c9eebebcc73ae608 (patch) | |
tree | 3c31521ce353b16ba3709963353b3c4e8885a695 /qpid/cpp | |
parent | e4b313f1120a520d353808ff48cc0d37d9e3728d (diff) | |
download | qpid-python-fa79886b733eb0d17782e435c9eebebcc73ae608.tar.gz |
Cluster member stalling, cluster map updates and unit tests.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@695593 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 231 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 40 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 101 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.h | 43 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Connection.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/DumpClient.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/DumpClient.h | 11 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClusterMapTest.cpp | 74 | ||||
-rw-r--r-- | qpid/cpp/src/tests/DumpClientTest.cpp | 122 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 90 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 12 |
14 files changed, 476 insertions, 281 deletions
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 9db2a61a82..c441686def 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -25,7 +25,10 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" -#include "qpid/framing/ClusterUrlNoticeBody.h" +#include "qpid/framing/ClusterDumpRequestBody.h" +#include "qpid/framing/ClusterReadyBody.h" +#include "qpid/framing/ClusterDumpErrorBody.h" +#include "qpid/framing/ClusterMapBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -50,19 +53,14 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { Cluster& cluster; MemberId member; ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {} - void urlNotice(const std::string& u) { cluster.urlNotice(member, u); } - void ready() { cluster.ready(member); } - - void members(const framing::FieldTable& , const framing::FieldTable& , const framing::FieldTable& ) { - assert(0); // Not passed to cluster, used to start a brain dump over TCP. - } - bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } - virtual void map(const FieldTable& ,const FieldTable& ,const FieldTable& ) { - // FIXME aconway 2008-09-12: TODO + void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); } + void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); } + void ready(const std::string& u) { cluster.ready(member, u); } + virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { + cluster.mapInit(members, dumpees, dumps); } - }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -80,17 +78,14 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), state(DISCARD) { - QPID_LOG(notice, "Cluster member " << self << " joining cluster " << name.str()); + QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); - cpg.join(name); - - connectionEventQueue.start(poller); cpgDispatchHandle.startWatch(poller); + cpg.join(name); + } -Cluster::~Cluster() { - QPID_LOG(debug, "~Cluster()"); -} +Cluster::~Cluster() {} void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { Mutex::ScopedLock l(lock); @@ -102,60 +97,47 @@ void Cluster::erase(ConnectionId id) { connections.erase(id); } -// FIXME aconway 2008-09-10: leave is currently not called, -// It should be called if we are shut down by a cluster admin command. +// FIXME aconway 2008-09-10: call leave from cluster admin command. // Any other type of exit is caught in disconnect(). // void Cluster::leave() { - QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << name.str()); + QPID_LOG(notice, self << " leaving cluster " << name.str()); cpg.leave(name); - // Cluster will shut down in configChange when the cluster knows we've left. -} - -template <class T> void decodePtr(Buffer& buf, T*& ptr) { - uint64_t value = buf.getLongLong(); - ptr = reinterpret_cast<T*>(value); -} - -template <class T> void encodePtr(Buffer& buf, T* ptr) { - uint64_t value = reinterpret_cast<uint64_t>(ptr); - buf.putLongLong(value); + // Defer shut down to the final configChange when the group knows we've left. } -void Cluster::mcastFrame(const AMQFrame& frame, const ConnectionId& connection) { - QPID_LOG(trace, "MCAST [" << connection << "] " << frame); - Event e(CONTROL, connection, frame.size()); +void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) { + QPID_LOG(trace, "MCAST [" << self << "]: " << body); + AMQFrame f(body); + Event e(CONTROL, ConnectionId(self, cptr), f.size()); Buffer buf(e); - frame.encode(buf); + f.encode(buf); mcastEvent(e); } void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& connection) { - QPID_LOG(trace, "MCAST [" << connection << "] " << size << "bytes of data"); Event e(DATA, connection, size); memcpy(e.getData(), data, size); mcastEvent(e); } void Cluster::mcastEvent(const Event& e) { - QPID_LOG(trace, "Multicasting: " << e); e.mcast(name, cpg); } size_t Cluster::size() const { Mutex::ScopedLock l(lock); - return urls.size(); + return map.memberCount(); } std::vector<Url> Cluster::getUrls() const { Mutex::ScopedLock l(lock); - std::vector<Url> result(urls.size()); - std::transform(urls.begin(), urls.end(), result.begin(), - boost::bind(&UrlMap::value_type::second, _1)); - return result; -} + return map.memberUrls(); +} +// FIXME aconway 2008-09-15: volatile for locked/unlocked functions. boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) { + Mutex::ScopedLock l(lock); if (id.getMember() == self) return boost::intrusive_ptr<Connection>(id.getConnectionPtr()); ConnectionMap::iterator i = connections.find(id); @@ -180,17 +162,19 @@ void Cluster::deliver( try { MemberId from(nodeid, pid); Event e = Event::delivered(from, msg, msg_len); - QPID_LOG(trace, "Cluster deliver: " << e); - // Process cluster controls immediately if (e.getConnectionId().getConnectionPtr() == 0) { // Cluster control Buffer buf(e); AMQFrame frame; - while (frame.decode(buf)) + while (frame.decode(buf)) { + QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody()); if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame)) - throw Exception("Invalid cluster control"); + throw Exception(QPID_MSG("Invalid cluster control")); + } } - else { // Process connection controls & data via the connectionEventQueue. + else { + // Process connection controls & data via the connectionEventQueue + // unless we are in the DISCARD state, in which case ignore. if (state != DISCARD) { e.setConnection(getConnection(e.getConnectionId())); connectionEventQueue.push(e); @@ -227,15 +211,15 @@ ostream& operator<<(ostream& o, const AddrList& a) { for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) { const char* reasonString; switch (p->reason) { - case CPG_REASON_JOIN: reasonString = " joined "; break; - case CPG_REASON_LEAVE: reasonString = " left ";break; - case CPG_REASON_NODEDOWN: reasonString = " node-down ";break; - case CPG_REASON_NODEUP: reasonString = " node-up ";break; - case CPG_REASON_PROCDOWN: reasonString = " process-down ";break; + case CPG_REASON_JOIN: reasonString = " joined"; break; + case CPG_REASON_LEAVE: reasonString = " left";break; + case CPG_REASON_NODEDOWN: reasonString = " node-down";break; + case CPG_REASON_NODEUP: reasonString = " node-up";break; + case CPG_REASON_PROCDOWN: reasonString = " process-down";break; default: reasonString = " "; } qpid::cluster::MemberId member(*p); - o << member << reasonString; + o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : ""); } return o; } @@ -247,23 +231,28 @@ void Cluster::configChange( cpg_address *left, int nLeft, cpg_address *joined, int nJoined) { - QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, nCurrent) << ".\n Changes: " - << AddrList(joined, nJoined) << AddrList(left, nLeft)); - - if (nJoined) // Notfiy new members of my URL. - mcastFrame( - AMQFrame(in_place<ClusterUrlNoticeBody>(ProtocolVersion(), url.str())), - ConnectionId(self,0)); - + // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. + QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent)); + QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft)); if (find(left, left+nLeft, self) != left+nLeft) { // We have left the group, this is the final config change. - QPID_LOG(notice, "Cluster member " << self << " left cluster " << name.str()); - broker.shutdown(); + QPID_LOG(notice, self << " left cluster " << name.str()); + broker.shutdown(); } Mutex::ScopedLock l(lock); - for (int i = 0; i < nLeft; ++i) urls.erase(left[i]); - // Add new members when their URL notice arraives. - lock.notifyAll(); // Threads waiting for membership changes. + if (state == DISCARD) { + if (nCurrent == 1 && *current == self) { + QPID_LOG(notice, self << " first in cluster."); + map.ready(self, url); + ready(); // First in cluster. + } + else if (find(joined, joined+nJoined, self) != joined+nJoined) { + QPID_LOG(notice, self << " requesting state dump."); // Just joined + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); + } + } + for (int i = 0; i < nLeft; ++i) + map.leave(left[i]); } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -275,24 +264,59 @@ void Cluster::disconnect(sys::DispatchHandle& ) { // FIXME aconway 2008-09-11: this should be logged as critical, // when we provide admin option to shut down cluster and let // members leave cleanly. - QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster " << name.str()); + QPID_LOG(notice, self << " disconnected from cluster " << name.str()); broker.shutdown(); } -void Cluster::urlNotice(const MemberId& m, const string& url) { - //FIXME aconway 2008-09-12: Rdo join logic using ClusterMap. Implement xml map function also. - //FIXME aconway 2008-09-11: Note multiple meanings of my own notice - - //from DISCARD->STALL and from STALL->READY via map. +// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place. +// Only one at a time to simplify things? +void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { + Mutex::ScopedLock l(lock); + Url url(urlStr); + if (self == m) { + switch (state) { + case DISCARD: state = CATCHUP; stall(); break; + case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map. + default: assert(0); + }; + } + else if (self == map.dumpRequest(m, url)) { + assert(state == READY); + QPID_LOG(info, self << " dumping to " << url); + // state = DUMPING; + // stall(); + // FIXME aconway 2008-09-15: need to stall map? + // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. + mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump. + } +} + +void Cluster::ready(const MemberId& m, const string& urlStr) { + Mutex::ScopedLock l(lock); + Url url(urlStr); + map.ready(m, url); +} + +broker::Broker& Cluster::getBroker(){ return broker; } + +void Cluster::stall() { + Mutex::ScopedLock l(lock); + // Stop processing connection events. We still process config changes + // and cluster controls in deliver() + connectionEventQueue.stop(); - QPID_LOG(info, "Cluster member " << m << " has URL " << url); - // My brain dump is up to this point, stall till it is complete. - if (m == self && state == DISCARD) - state = STALL; - urls.insert(UrlMap::value_type(m,Url(url))); + // FIXME aconway 2008-09-11: Flow control, we should slow down or + // stop reading from local connections while stalled to avoid an + // unbounded queue. } -void Cluster::ready(const MemberId& ) { - // FIXME aconway 2008-09-08: TODO +void Cluster::ready() { + // Called with lock held + QPID_LOG(info, self << " ready with URL " << url); + state = READY; + mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); + connectionEventQueue.start(poller); + // FIXME aconway 2008-09-15: stall/unstall map? } // Called from Broker::~Broker when broker is shut down. At this @@ -301,26 +325,51 @@ void Cluster::ready(const MemberId& ) { // callbacks will be invoked. // void Cluster::shutdown() { - QPID_LOG(notice, "Cluster member " << self << " shutting down."); + QPID_LOG(notice, self << " shutting down."); try { cpg.shutdown(); } catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); } delete this; } -broker::Broker& Cluster::getBroker(){ return broker; } +/** Received from cluster */ +void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) { + QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee); + Mutex::ScopedLock l(lock); + map.dumpError(dumpee); + if (state == DUMPING && map.dumps(self) == 0) + ready(); +} -void Cluster::stall() { - // Stop processing connection events. We still process config changes - // and cluster controls in deliver() +/** Called in local dump thread */ +void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) { + assert(state == DUMPING); + QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg); + mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0); + Mutex::ScopedLock l(lock); + map.dumpError(dumpee); + if (map.dumps(self) == 0) // Unstall immediately. + ready(); +} - // FIXME aconway 2008-09-11: Flow control, we should slow down or - // stop reading from local connections while stalled to avoid an - // unbounded queue. - connectionEventQueue.stop(); +void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { + Mutex::ScopedLock l(lock); + // FIXME aconway 2008-09-15: faking out dump here. + switch (state) { + case DISCARD: + map.init(members, dumpees, dumps); + state = HAVE_DUMP; + break; + case CATCHUP: + map.init(members, dumpees, dumps); + ready(); + break; + default: + break; + } } -void Cluster::unStall() { - connectionEventQueue.start(poller); +void Cluster::dumpTo(const Url& ) { + // FIXME aconway 2008-09-12: DumpClient } }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 5187cb08e7..24db07b32b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -19,19 +19,19 @@ * */ -#include "qpid/cluster/Cpg.h" -#include "qpid/cluster/Event.h" -#include "qpid/sys/PollableQueue.h" -#include "qpid/cluster/NoOpConnectionOutputHandler.h" +#include "Cpg.h" +#include "Event.h" +#include "NoOpConnectionOutputHandler.h" +#include "ClusterMap.h" #include "qpid/broker/Broker.h" +#include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/Url.h" #include <boost/intrusive_ptr.hpp> -#include <map> #include <vector> namespace qpid { @@ -68,33 +68,38 @@ class Cluster : private Cpg::Handler bool empty() const { return size() == 0; } /** Send to the cluster */ - void mcastFrame(const framing::AMQFrame&, const ConnectionId&); + void mcastControl(const framing::AMQBody& controlBody, Connection* cptr); void mcastBuffer(const char*, size_t, const ConnectionId&); void mcastEvent(const Event& e); /** Leave the cluster */ void leave(); - void urlNotice(const MemberId&, const std::string& url); - void ready(const MemberId&); + void dumpRequest(const MemberId&, const std::string& url); + void dumpError(const MemberId& dumper, const MemberId& dumpee); + void ready(const MemberId&, const std::string& url); + void mapInit(const framing::FieldTable& members, + const framing::FieldTable& dumpees, + const framing::FieldTable& dumps); MemberId getSelf() const { return self; } void stall(); - void unStall(); + void ready(); void shutdown(); broker::Broker& getBroker(); private: - typedef std::map<MemberId, Url> UrlMap; typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> EventQueue; enum State { - DISCARD, // Initially discard connection events up to my own join message. - READY, // Normal processing. - STALL // Stalled while a new member joins. + DISCARD, // Discard updates up to catchup point. + HAVE_DUMP, // Received state dump, waiting for catchup point. + CATCHUP, // Stalled at catchup point, waiting for dump. + DUMPING, // Stalled while sending a state dump. + READY // Normal processing. }; void connectionEvent(const Event&); @@ -126,23 +131,22 @@ class Cluster : private Cpg::Handler boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); + void dumpTo(const Url&); + void dumpError(const MemberId&, const Url&, const char* msg); + mutable sys::Monitor lock; // Protect access to members. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; Cpg cpg; Cpg::Name name; Url url; - UrlMap urls; + ClusterMap map; MemberId self; ConnectionMap connections; NoOpConnectionOutputHandler shadowOut; sys::DispatchHandle cpgDispatchHandle; EventQueue connectionEventQueue; State state; - - friend std::ostream& operator <<(std::ostream&, const Cluster&); - friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&); - friend std::ostream& operator <<(std::ostream&, const UrlMap&); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index b0c45ad625..24c3ed5552 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -24,36 +24,38 @@ #include <boost/bind.hpp> #include <algorithm> #include <functional> +#include <iterator> +#include <ostream> namespace qpid { using namespace framing; namespace cluster { -ClusterMap::ClusterMap() {} +ClusterMap::ClusterMap() : stalled(false) {} -MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) { - if (isMember(id)) return MemberId(); // Ignore notices from established members. - if (isDumpee(id)) { - // Dumpee caught up, graduate to member with new URL and remove dumper from list. - dumpees.erase(id); - members[id] = url; - } - else if (members.empty()) { - // First in cluster, congratulations! - members[id] = url; +MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url)); + return MemberId(); } - else { - // New member needs brain dump. - MemberId dumper = nextDumper(); - Dumpee& d = dumpees[id]; - d.url = url; - d.dumper = dumper; - return dumper; + MemberId dumper = nextDumper(); + Dumpee& d = dumpees[id]; + d.url = url; + d.dumper = dumper; + return dumper; +} + +void ClusterMap::ready(const MemberId& id, const Url& url) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url)); + return; } - return MemberId(); + dumpees.erase(id); + members[id] = url; } + MemberId ClusterMap::nextDumper() const { // Choose the first member in member-id order of the group that // has the least number of dumps-in-progress. @@ -73,6 +75,11 @@ MemberId ClusterMap::nextDumper() const { } void ClusterMap::leave(const MemberId& id) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::leave, this, id)); + return; + } + if (isDumpee(id)) dumpees.erase(id); if (isMember(id)) { @@ -95,7 +102,13 @@ int ClusterMap::dumps(const MemberId& id) const { return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id)); } -void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); } +void ClusterMap::dumpError(const MemberId& dumpee) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee)); + return; + } + dumpees.erase(dumpee); +} framing::ClusterMapBody ClusterMap::toControl() const { framing::ClusterMapBody b; @@ -108,15 +121,55 @@ framing::ClusterMapBody ClusterMap::toControl() const { return b; } -void ClusterMap::fromControl(const framing::ClusterMapBody& b) { +void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) { *this = ClusterMap(); // Reset any current contents. FieldTable::ValueMap::const_iterator i; - for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i) + for (i = ftMembers.begin(); i != ftMembers.end(); ++i) members[i->first] = Url(i->second->get<std::string>()); - for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i) + for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i) dumpees[i->first].url = Url(i->second->get<std::string>()); - for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i) + for (i = ftDumps.begin(); i != ftDumps.end(); ++i) dumpees[i->first].dumper = MemberId(i->second->get<std::string>()); } +void ClusterMap::fromControl(const framing::ClusterMapBody& b) { + init(b.getMembers(), b.getDumpees(), b.getDumps()); +} + +std::vector<Url> ClusterMap::memberUrls() const { + std::vector<Url> result(members.size()); + std::transform(members.begin(), members.end(), result.begin(), + boost::bind(&MemberMap::value_type::second, _1)); + return result; +} + +void ClusterMap::stall() { stalled = true; } + +namespace { +template <class F> void call(const F& f) { f(); } +} + +void ClusterMap::unstall() { + stalled = false; + std::for_each(stallq.begin(), stallq.end(), + boost::bind(&boost::function<void()>::operator(), _1)); + stallq.clear(); +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) { + return o << mv.first << "=" << mv.second; +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) { + return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url; +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { + std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n "); + std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n "); + std::copy(m.members.begin(), m.members.end(), im); + std::copy(m.dumpees.begin(), m.dumpees.end(), id); + return o; +} + }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 7695ebeabb..04323c5905 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -25,9 +25,11 @@ #include "types.h" #include "qpid/framing/ClusterMapBody.h" #include "qpid/Url.h" -#include <boost/optional.hpp> +#include <boost/function.hpp> #include <vector> +#include <deque> #include <map> +#include <iosfwd> namespace qpid { namespace cluster { @@ -41,21 +43,22 @@ class ClusterMap { public: ClusterMap(); + + MemberId dumpRequest(const MemberId& from, const Url& url); + + void dumpError(const MemberId&); + + void ready(const MemberId& from, const Url& url); - /** Update map for url-notice event. - *@param from Member that sent the notice. - *@param url URL for from. - *@return MemberId of member that should dump to URL, or a null - * MemberId() if no dump is needed. - */ - MemberId urlNotice(const MemberId& from, const Url& url); - - /** Dump failed notice */ - void dumpFailed(const MemberId&); - - /** Update map for leave event */ + /** Update map for cpg leave event */ void leave(const MemberId&); + /** Instead of updating the map, queue the updates for unstall */ + void stall(); + + /** Apply queued updates */ + void unstall(); + /** Number of unfinished dumps for member. */ int dumps(const MemberId&) const; @@ -63,13 +66,20 @@ class ClusterMap framing::ClusterMapBody toControl() const; /** Initialize map contents from a cluster control body. */ + void init(const framing::FieldTable& members, + const framing::FieldTable& dumpees, + const framing::FieldTable& dumps); + void fromControl(const framing::ClusterMapBody&); size_t memberCount() const { return members.size(); } size_t dumpeeCount() const { return dumpees.size(); } + bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); } + std::vector<Url> memberUrls() const; + private: struct Dumpee { Url url; MemberId dumper; }; typedef std::map<MemberId, Url> MemberMap; @@ -80,7 +90,14 @@ class ClusterMap MemberMap members; DumpeeMap dumpees; + bool stalled; + std::deque<boost::function<void()> > stallq; + + friend std::ostream& operator<<(std::ostream&, const ClusterMap&); + friend std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv); + friend std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv); }; + }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_CLUSTERMAP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/Connection.cpp b/qpid/cpp/src/qpid/cluster/Connection.cpp index 00d3901886..6cc21633d3 100644 --- a/qpid/cpp/src/qpid/cluster/Connection.cpp +++ b/qpid/cpp/src/qpid/cluster/Connection.cpp @@ -79,7 +79,7 @@ void Connection::closed() { // handler will be deleted. // connection.setOutputHandler(&discardHandler); - cluster.mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverCloseBody>()), self); + cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this); ++mcastSeq; } catch (const std::exception& e) { @@ -93,7 +93,6 @@ void Connection::deliverClose () { } size_t Connection::decode(const char* buffer, size_t size) { - QPID_LOG(trace, "mcastBuffer " << self << " " << mcastSeq << " " << size); ++mcastSeq; cluster.mcastBuffer(buffer, size, self); // FIXME aconway 2008-09-01: deserialize? @@ -101,7 +100,6 @@ size_t Connection::decode(const char* buffer, size_t size) { } void Connection::deliverBuffer(Buffer& buf) { - QPID_LOG(trace, "deliverBuffer " << self << " " << deliverSeq << " " << buf.available()); ++deliverSeq; while (decoder.decode(buf)) deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread. diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.cpp b/qpid/cpp/src/qpid/cluster/DumpClient.cpp index 5b92552209..f20ceb2ab6 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.cpp +++ b/qpid/cpp/src/qpid/cluster/DumpClient.cpp @@ -28,6 +28,7 @@ #include "qpid/broker/ExchangeRegistry.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/enum.h" +#include "qpid/log/Statement.h" #include "qpid/Url.h" #include <boost/bind.hpp> @@ -43,7 +44,9 @@ using namespace framing::message; using namespace client; -DumpClient::DumpClient(const Url& url) { +DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f) + : donor(b), failed(f) +{ connection.open(url); session = connection.newSession(); } @@ -57,8 +60,7 @@ DumpClient::~DumpClient() { static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange"; static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); -void DumpClient::dump(Broker& donor) { - // TODO aconway 2008-09-08: Caller must handle exceptions +void DumpClient::dump() { // FIXME aconway 2008-09-08: send cluster map frame first. donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1)); // Catch-up exchange is used to route messages to the proper queue without modifying routing key. @@ -67,6 +69,15 @@ void DumpClient::dump(Broker& donor) { session.sync(); } +void DumpClient::run() { + try { + dump(); + } catch (const Exception& e) { + failed(e.what()); + } + delete this; +} + void DumpClient::dumpExchange(const boost::shared_ptr<Exchange>& ex) { session.exchangeDeclare( ex->getName(), ex->getType(), diff --git a/qpid/cpp/src/qpid/cluster/DumpClient.h b/qpid/cpp/src/qpid/cluster/DumpClient.h index 447fd1abef..1c49b417d7 100644 --- a/qpid/cpp/src/qpid/cluster/DumpClient.h +++ b/qpid/cpp/src/qpid/cluster/DumpClient.h @@ -29,6 +29,7 @@ #include "qpid/broker/Exchange.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/ExchangeRegistry.h" +#include "qpid/sys/Runnable.h" #include <boost/shared_ptr.hpp> @@ -51,12 +52,12 @@ namespace cluster { /** * A client that dumps the contents of a local broker to a remote one using AMQP. */ -class DumpClient { +class DumpClient : public sys::Runnable { public: - DumpClient(const Url& receiver); + DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail); ~DumpClient(); - - void dump(broker::Broker& donor); + void dump(); + void run(); // Will delete this when finished. private: void dumpQueue(const boost::shared_ptr<broker::Queue>&); @@ -67,6 +68,8 @@ class DumpClient { private: client::Connection connection; client::AsyncSession session; + broker::Broker& donor; + boost::function<void(const char*)> failed; }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp index 82b0d3f077..3212d34775 100644 --- a/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp +++ b/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp @@ -98,8 +98,7 @@ void OutputInterceptor::sendDoOutput() { // Send it anyway to keep the doOutput chain going until we are sure there's no more output // (in deliverDoOutput) // - parent.getCluster().mcastFrame(AMQFrame(in_place<ClusterConnectionDeliverDoOutputBody>( - framing::ProtocolVersion(), request)), parent.getId()); + parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), request), &parent); QPID_LOG(trace, &parent << "Send doOutput request for " << request); } diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index d62ad62b49..f48ba2db30 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -62,13 +62,6 @@ struct ConnectionId : public std::pair<MemberId, Connection*> { Connection* getConnectionPtr() const { return second; } }; -/** State of a cluster member */ -enum State { - DISCARD, // Initially discard connection events up to my own join message. - STALL, // All members stall while a new member joins. - READY // Normal processing. -}; - std::ostream& operator<<(std::ostream&, const ConnectionId&); }} // namespace qpid::cluster diff --git a/qpid/cpp/src/tests/ClusterMapTest.cpp b/qpid/cpp/src/tests/ClusterMapTest.cpp index cce0efb69f..f8ac2e22e6 100644 --- a/qpid/cpp/src/tests/ClusterMapTest.cpp +++ b/qpid/cpp/src/tests/ClusterMapTest.cpp @@ -38,13 +38,13 @@ Url url(const char* host) { return Url(TcpAddress(host)); } QPID_AUTO_TEST_CASE(testNotice) { ClusterMap m; - BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); // id(0) member, no dump. + m.ready(id(0), url("0-ready")); // id(0) member, no dump. BOOST_CHECK(m.isMember(id(0))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); // Newbie, needs dump + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump"))); // Newbie, needs dump BOOST_CHECK(m.isMember(id(0))); BOOST_CHECK(m.isDumpee(id(1))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); @@ -52,7 +52,7 @@ QPID_AUTO_TEST_CASE(testNotice) { BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); // id(1) is ready. + m.ready(id(1), url("1-ready")); // id(1) is ready. BOOST_CHECK(m.isMember(id(0))); BOOST_CHECK(m.isMember(id(1))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); @@ -60,25 +60,25 @@ QPID_AUTO_TEST_CASE(testNotice) { BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); // id(2) needs dump + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump"))); // id(2) needs dump BOOST_CHECK(m.isDumpee(id(2))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - BOOST_CHECK_EQUAL(id(1), m.urlNotice(id(3), url("3-dump"))); // 0 busy, dump to id(1). + BOOST_CHECK_EQUAL(id(1), m.dumpRequest(id(3), url("3-dump"))); // 0 busy, dump to id(1). BOOST_CHECK(m.isDumpee(id(3))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)2); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); // Equally busy, 0 is first on list. + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump"))); // Equally busy, 0 is first on list. BOOST_CHECK_EQUAL(m.dumps(id(0)), 2); BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3); // My dumpees both complete - BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); - BOOST_CHECK(!m.urlNotice(id(4), url("4-ready"))); + m.ready(id(2), url("2-ready")); + m.ready(id(4), url("4-ready")); BOOST_CHECK(m.isMember(id(2))); BOOST_CHECK(m.isMember(id(4))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); @@ -86,7 +86,7 @@ QPID_AUTO_TEST_CASE(testNotice) { BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); // Final dumpee completes. - BOOST_CHECK(!m.urlNotice(id(3), url("3-ready"))); + m.ready(id(3), url("3-ready")); BOOST_CHECK(m.isMember(id(3))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); @@ -96,11 +96,11 @@ QPID_AUTO_TEST_CASE(testNotice) { QPID_AUTO_TEST_CASE(testLeave) { ClusterMap m; - BOOST_CHECK(!m.urlNotice(id(0), url("0-ready"))); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(1), url("1-dump"))); - BOOST_CHECK(!m.urlNotice(id(1), url("1-ready"))); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(2), url("2-dump"))); - BOOST_CHECK(!m.urlNotice(id(2), url("2-ready"))); + m.ready(id(0), url("0-ready")); + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump"))); + m.ready(id(1), url("1-ready")); + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump"))); + m.ready(id(2), url("2-ready")); BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)3); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); @@ -110,13 +110,13 @@ QPID_AUTO_TEST_CASE(testLeave) { BOOST_CHECK(m.isMember(id(0))); BOOST_CHECK(m.isMember(id(2))); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(4), url("4-dump"))); + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump"))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); BOOST_CHECK(m.isDumpee(id(4))); BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - m.dumpFailed(id(4)); // Dumper detected a failure. + m.dumpError(id(4)); // Dumper detected a failure. BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); BOOST_CHECK(!m.isDumpee(id(4))); BOOST_CHECK(!m.isMember(id(4))); @@ -127,7 +127,7 @@ QPID_AUTO_TEST_CASE(testLeave) { BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - BOOST_CHECK_EQUAL(id(0), m.urlNotice(id(5), url("5-dump"))); + BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(5), url("5-dump"))); BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); BOOST_CHECK(m.isDumpee(id(5))); BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); @@ -140,19 +140,19 @@ QPID_AUTO_TEST_CASE(testLeave) { BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - m.dumpFailed(id(5)); // Dumper reports failure - no op, we already know. + m.dumpError(id(5)); // Dumper reports failure - no op, we already know. BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); } QPID_AUTO_TEST_CASE(testToControl) { ClusterMap m; - m.urlNotice(id(0), url("0")); - m.urlNotice(id(1), url("1dump")); - m.urlNotice(id(1), url("1")); - m.urlNotice(id(2), url("2dump")); - m.urlNotice(id(3), url("3dump")); - m.urlNotice(id(4), url("4dump")); + m.ready(id(0), url("0")); + m.dumpRequest(id(1), url("1dump")); + m.ready(id(1), url("1")); + m.dumpRequest(id(2), url("2dump")); + m.dumpRequest(id(3), url("3dump")); + m.dumpRequest(id(4), url("4dump")); BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3); @@ -188,4 +188,30 @@ QPID_AUTO_TEST_CASE(testToControl) { BOOST_CHECK_EQUAL(s,s2); } +QPID_AUTO_TEST_CASE(testStall) { + ClusterMap m; + m.ready(id(0), url("0")); + BOOST_CHECK_EQUAL(m.memberCount(), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + m.stall(); + + m.dumpRequest(id(1), url("1dump")); + m.ready(id(1), url("1")); + m.leave(id(0)); + m.dumpRequest(id(2), url("2dump")); + m.ready(id(2), url("2")); + m.dumpRequest(id(3), url("3dump")); + BOOST_CHECK_EQUAL(m.memberCount(), 1); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); + + m.unstall(); + BOOST_CHECK_EQUAL(m.memberCount(), 2); + BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); + BOOST_CHECK(!m.isMember(id(0))); + BOOST_CHECK(m.isMember(id(1))); + BOOST_CHECK(m.isMember(id(2))); + BOOST_CHECK(m.isDumpee(id(3))); +} + + QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/DumpClientTest.cpp b/qpid/cpp/src/tests/DumpClientTest.cpp new file mode 100644 index 0000000000..27c4174ffe --- /dev/null +++ b/qpid/cpp/src/tests/DumpClientTest.cpp @@ -0,0 +1,122 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "BrokerFixture.h" +#include "qpid/cluster/DumpClient.h" +#include "qpid/framing/Buffer.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/Url.h" +#include <boost/assign.hpp> + +QPID_AUTO_TEST_SUITE(DumpClientTest) + +using namespace std; +using namespace qpid; +using namespace framing; +using namespace client; +using namespace cluster; +using namespace sys; + +// Verify we can copy shared state - wiring + messages - from one +// broker to another via the DumpClient. +// +QPID_AUTO_TEST_CASE(testDumpClientSharedState) { + BrokerFixture donor, receiver; + { + Client c(donor.getPort()); + FieldTable args; + args.setString("x", "y"); + c.session.queueDeclare("qa", arg::arguments=args); + c.session.queueDeclare("qb", arg::alternateExchange="amq.direct"); + + c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); + c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); + c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo")); + + c.session.exchangeDeclare("ext", arg::type="topic"); + c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); + c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar")); + c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar")); + + c.session.close(); + c.connection.close(); + } + Url url(Url::getIpAddressesUrl(receiver.getPort())); + qpid::cluster::DumpClient dump(url, *donor.broker, 0); + dump.dump(); + { + Client r(receiver.getPort()); + // Verify exchanges + ExchangeQueryResult ex=r.session.exchangeQuery("exd"); + BOOST_CHECK_EQUAL(ex.getType(), "direct"); + BOOST_CHECK_EQUAL(ex.getDurable(), false); + BOOST_CHECK_EQUAL(ex.getNotFound(), false); + BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y"); + + ex = r.session.exchangeQuery("ext"); + BOOST_CHECK_EQUAL(ex.getType(), "topic"); + BOOST_CHECK_EQUAL(ex.getNotFound(), false); + + // Verify queues + QueueQueryResult qq = r.session.queueQuery("qa"); + BOOST_CHECK_EQUAL(qq.getQueue(), "qa"); + BOOST_CHECK_EQUAL(qq.getAlternateExchange(), ""); + BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)1); + + qq = r.session.queueQuery("qb"); + BOOST_CHECK_EQUAL(qq.getQueue(), "qb"); + BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct"); + BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)2); + + // Verify messages + Message m; + BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "one"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo"); + + BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "one"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); + + BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); + BOOST_CHECK_EQUAL(m.getData(), "two"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); + BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); + + // Verify bindings + r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo")); + BOOST_CHECK(r.subs.get(m, "qa")); + BOOST_CHECK_EQUAL(m.getData(), "xxx"); + + r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar")); + BOOST_CHECK(r.subs.get(m, "qb")); + BOOST_CHECK_EQUAL(m.getData(), "yyy"); + + r.session.close(); + r.connection.close(); + } +} + +QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 40d1c0df3e..0fc95b3c7d 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -18,7 +18,7 @@ check_PROGRAMS+=cluster_test cluster_test_SOURCES=unit_test.cpp cluster_test.cpp cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework -unit_test_SOURCES+=ClusterMapTest.cpp unit_test_LDADD+=$(lib_cluster) +unit_test_SOURCES+=ClusterMapTest.cpp DumpClientTest.cpp endif diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index 28ee439a5d..af380c629d 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -140,94 +140,6 @@ ostream& operator<<(ostream& o, const pair<T*, int>& array) { return o; } -// FIXME aconway 2008-09-11: This test has to be first otherwise -// it picks up the cluster name from a previous test and runs the -// brokers as cluster nodes. Something wrong with option parsing... -// -QPID_AUTO_TEST_CASE(testDumpClientSharedState) { - // In this test we don't want the cluster plugin to initialize, so set --cluster-name="" - const char* argv[] = { "--cluster-name", "" }; - Broker::Options opts = parseOpts(sizeof(argv)/sizeof(*argv), argv); - - BrokerFixture donor(opts), receiver(opts); - { - Client c(donor.getPort()); - FieldTable args; - args.setString("x", "y"); - c.session.queueDeclare("qa", arg::arguments=args); - c.session.queueDeclare("qb", arg::alternateExchange="amq.direct"); - - c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", arg::arguments=args); - c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", arg::bindingKey="foo"); - c.session.messageTransfer(arg::destination="exd", arg::content=Message("one", "foo")); - - c.session.exchangeDeclare("ext", arg::type="topic"); - c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", arg::bindingKey="bar"); - c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0)); - c.session.messageTransfer(arg::destination="ext", arg::content=Message("one", "bar")); - c.session.messageTransfer(arg::destination="ext", arg::content=Message("two", "bar")); - - c.session.close(); - c.connection.close(); - } - qpid::cluster::DumpClient dump(Url::getIpAddressesUrl(receiver.getPort())); - dump.dump(*donor.broker); - { - Client r(receiver.getPort()); - // Verify exchanges - ExchangeQueryResult ex=r.session.exchangeQuery("exd"); - BOOST_CHECK_EQUAL(ex.getType(), "direct"); - BOOST_CHECK_EQUAL(ex.getDurable(), false); - BOOST_CHECK_EQUAL(ex.getNotFound(), false); - BOOST_CHECK_EQUAL(ex.getArguments().getString("x"), "y"); - - ex = r.session.exchangeQuery("ext"); - BOOST_CHECK_EQUAL(ex.getType(), "topic"); - BOOST_CHECK_EQUAL(ex.getNotFound(), false); - - // Verify queues - QueueQueryResult qq = r.session.queueQuery("qa"); - BOOST_CHECK_EQUAL(qq.getQueue(), "qa"); - BOOST_CHECK_EQUAL(qq.getAlternateExchange(), ""); - BOOST_CHECK_EQUAL(qq.getArguments().getString("x"), "y"); - BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)1); - - qq = r.session.queueQuery("qb"); - BOOST_CHECK_EQUAL(qq.getQueue(), "qb"); - BOOST_CHECK_EQUAL(qq.getAlternateExchange(), "amq.direct"); - BOOST_CHECK_EQUAL(qq.getMessageCount(), (unsigned)2); - - // Verify messages - Message m; - BOOST_CHECK(r.subs.get(m, "qa", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "one"); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "exd"); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "foo"); - - BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "one"); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); - - BOOST_CHECK(r.subs.get(m, "qb", TIME_SEC)); - BOOST_CHECK_EQUAL(m.getData(), "two"); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getExchange(), "ext"); - BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar"); - - // Verify bindings - r.session.messageTransfer(arg::destination="exd", arg::content=Message("xxx", "foo")); - BOOST_CHECK(r.subs.get(m, "qa")); - BOOST_CHECK_EQUAL(m.getData(), "xxx"); - - r.session.messageTransfer(arg::destination="ext", arg::content=Message("yyy", "bar")); - BOOST_CHECK(r.subs.get(m, "qb")); - BOOST_CHECK_EQUAL(m.getData(), "yyy"); - - r.session.close(); - r.connection.close(); - } -} - // FIXME aconway 2008-09-12: finish the new join protocol. QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) { @@ -278,7 +190,7 @@ QPID_AUTO_TEST_CASE(testStall) { BOOST_REQUIRE(q0); BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0); // Now unstall and we should get the message. - getGlobalCluster().unStall(); + getGlobalCluster().ready(); Message m; BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC)); BOOST_CHECK_EQUAL(m.getData(), "foo"); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index 596a00158b..cefe92c657 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -29,11 +29,19 @@ o<?xml version="1.0"?> <!-- Cluster membership --> - <control name = "url-notice" code="0x1" label="Url to use for a cluster member"> + <control name = "dump-request" code="0x1" label="Url to use for a cluster member"> <field name="url" type="str16" label="URL for brain dump to new member."/> </control> - <control name="map" code="0x3" label="Cluster map sent to new members."> + <control name = "dump-error" code="0x2" label="Error while dumping to new member"> + <field name="dumpee" type="uint64"/> + </control> + + <control name = "ready" code="0x3" label="Cluster member ready at URL"> + <field name="url" type="str16" label="URL for client connections."/> + </control> + + <control name="map" code="0x4" label="Cluster map sent to new members."> <field name="members" type="map"/> <!-- member-id -> URL --> <field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL --> <field name="dumps" type="map"/> <!-- dumpee-id -> donor-id --> |