diff options
author | Alan Conway <aconway@apache.org> | 2008-09-16 00:22:00 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-09-16 00:22:00 +0000 |
commit | de70df69b4b086ac6bf9ce09d3741d527c290f19 (patch) | |
tree | a9e30e22b19bf86fca0eb136defd2290bc1f869b /qpid | |
parent | e307d4138fff7b2635ce808eea50f01f4d542a85 (diff) | |
download | qpid-python-de70df69b4b086ac6bf9ce09d3741d527c290f19.tar.gz |
Simplified cluster updates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@695696 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rwxr-xr-x | qpid/cpp/rubygen/cppgen.rb | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 132 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 138 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.h | 62 | ||||
-rw-r--r-- | qpid/cpp/src/tests/ClusterMapTest.cpp | 217 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/xml/cluster.xml | 23 |
9 files changed, 103 insertions, 488 deletions
diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb index 3a4228567a..13f6f3744d 100755 --- a/qpid/cpp/rubygen/cppgen.rb +++ b/qpid/cpp/rubygen/cppgen.rb @@ -118,6 +118,7 @@ class AmqpRoot # preview; map 0-10 types to preview code generator types @@typemap = { "bit"=> CppType.new("bool").code("Octet").defval("false"), + "boolean"=> CppType.new("bool").code("Octet").defval("false"), "uint8"=>CppType.new("uint8_t").code("Octet").defval("0"), "uint16"=>CppType.new("uint16_t").code("Short").defval("0"), "uint32"=>CppType.new("uint32_t").code("Long").defval("0"), @@ -189,7 +190,7 @@ class AmqpField c=containing_class c.struct(type_) end - def cpptype() lookup_cpptype(type_) or raise "no cpptype #{self}" end + def cpptype() lookup_cpptype(type_) or raise "no cpptype #{type_} for field #{self}" end def cppname() name.lcaps.cppsafe; end def bit?() type_ == "bit"; end def signature() cpptype.param+" "+cppname; end diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index c441686def..7fb2e5ad58 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -26,9 +26,7 @@ #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" #include "qpid/framing/ClusterDumpRequestBody.h" -#include "qpid/framing/ClusterReadyBody.h" -#include "qpid/framing/ClusterDumpErrorBody.h" -#include "qpid/framing/ClusterMapBody.h" +#include "qpid/framing/ClusterUpdateBody.h" #include "qpid/framing/ClusterConnectionDeliverCloseBody.h" #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h" #include "qpid/log/Statement.h" @@ -56,11 +54,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler { bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); } void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); } - void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); } - void ready(const std::string& u) { cluster.ready(member, u); } - virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { - cluster.mapInit(members, dumpees, dumps); - } + void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); } }; Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : @@ -76,7 +70,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) : boost::bind(&Cluster::disconnect, this, _1) // disconnect ), connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))), - state(DISCARD) + state(START) { QPID_LOG(notice, self << " joining cluster " << name.str()); broker.addFinalizer(boost::bind(&Cluster::shutdown, this)); @@ -127,7 +121,7 @@ void Cluster::mcastEvent(const Event& e) { size_t Cluster::size() const { Mutex::ScopedLock l(lock); - return map.memberCount(); + return map.size(); } std::vector<Url> Cluster::getUrls() const { @@ -229,7 +223,7 @@ void Cluster::configChange( cpg_name */*group*/, cpg_address *current, int nCurrent, cpg_address *left, int nLeft, - cpg_address *joined, int nJoined) + cpg_address */*joined*/, int nJoined) { // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node. QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent)); @@ -240,19 +234,17 @@ void Cluster::configChange( broker.shutdown(); } Mutex::ScopedLock l(lock); - if (state == DISCARD) { - if (nCurrent == 1 && *current == self) { - QPID_LOG(notice, self << " first in cluster."); - map.ready(self, url); - ready(); // First in cluster. - } - else if (find(joined, joined+nJoined, self) != joined+nJoined) { - QPID_LOG(notice, self << " requesting state dump."); // Just joined - mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); - } + map.configChange(current, nCurrent); + if (state == START && nCurrent == 1) { // First in cluster + assert(*current == self); + assert(map.empty()); + QPID_LOG(notice, self << " first in cluster."); + map.insert(self, url); + ready(); + } + else if (nJoined && self == map.first()) { // Send an update to new members. + mcastControl(map.toControl(), 0); } - for (int i = 0; i < nLeft; ++i) - map.leave(left[i]); } void Cluster::dispatch(sys::DispatchHandle& h) { @@ -268,33 +260,42 @@ void Cluster::disconnect(sys::DispatchHandle& ) { broker.shutdown(); } -// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place. -// Only one at a time to simplify things? -void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { +void Cluster::update(const FieldTable& members, bool dumping) { Mutex::ScopedLock l(lock); - Url url(urlStr); - if (self == m) { - switch (state) { - case DISCARD: state = CATCHUP; stall(); break; - case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map. - default: assert(0); - }; - } - else if (self == map.dumpRequest(m, url)) { - assert(state == READY); - QPID_LOG(info, self << " dumping to " << url); - // state = DUMPING; - // stall(); - // FIXME aconway 2008-09-15: need to stall map? - // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. - mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump. + map.update(members, dumping); + QPID_LOG(info, "Cluster update:\n " << map); + if (state == START && dumping == false) { + state = DISCARD; + mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0); } } -void Cluster::ready(const MemberId& m, const string& urlStr) { +void Cluster::dumpRequest(const MemberId& m, const string& urlStr) { Mutex::ScopedLock l(lock); - Url url(urlStr); - map.ready(m, url); + bool wasDumping = map.isDumping(); + map.setDumping(true); + if (!wasDumping) { + if (self == m) { // My turn + assert(state == DISCARD); + // FIXME aconway 2008-09-15: RECEIVE DUMP + // state = CATCHUP; + // stall(); + // When received + map.insert(self, url); + mcastControl(map.toControl(), 0); + ready(); + } + else if (state == READY && self == map.first()) { // Give the dump. + QPID_LOG(info, self << " dumping to " << url); + // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient. + // state = DUMPING; + // stall(); + (void)urlStr; + // When dump complete: + map.setDumping(false); + mcastControl(map.toControl(), 0); + } + } } broker::Broker& Cluster::getBroker(){ return broker; } @@ -314,7 +315,6 @@ void Cluster::ready() { // Called with lock held QPID_LOG(info, self << " ready with URL " << url); state = READY; - mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0); connectionEventQueue.start(poller); // FIXME aconway 2008-09-15: stall/unstall map? } @@ -331,45 +331,5 @@ void Cluster::shutdown() { delete this; } -/** Received from cluster */ -void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) { - QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee); - Mutex::ScopedLock l(lock); - map.dumpError(dumpee); - if (state == DUMPING && map.dumps(self) == 0) - ready(); -} - -/** Called in local dump thread */ -void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) { - assert(state == DUMPING); - QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg); - mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0); - Mutex::ScopedLock l(lock); - map.dumpError(dumpee); - if (map.dumps(self) == 0) // Unstall immediately. - ready(); -} - -void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) { - Mutex::ScopedLock l(lock); - // FIXME aconway 2008-09-15: faking out dump here. - switch (state) { - case DISCARD: - map.init(members, dumpees, dumps); - state = HAVE_DUMP; - break; - case CATCHUP: - map.init(members, dumpees, dumps); - ready(); - break; - default: - break; - } -} - -void Cluster::dumpTo(const Url& ) { - // FIXME aconway 2008-09-12: DumpClient -} }} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 24db07b32b..b8527ae66b 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -76,11 +76,7 @@ class Cluster : private Cpg::Handler void leave(); void dumpRequest(const MemberId&, const std::string& url); - void dumpError(const MemberId& dumper, const MemberId& dumpee); - void ready(const MemberId&, const std::string& url); - void mapInit(const framing::FieldTable& members, - const framing::FieldTable& dumpees, - const framing::FieldTable& dumps); + void update(const framing::FieldTable& members, bool dumping); MemberId getSelf() const { return self; } @@ -95,7 +91,8 @@ class Cluster : private Cpg::Handler typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap; typedef sys::PollableQueue<Event> EventQueue; enum State { - DISCARD, // Discard updates up to catchup point. + START, // Have not yet received first cluster update. + DISCARD, // Discard updates up to dump start point. HAVE_DUMP, // Received state dump, waiting for catchup point. CATCHUP, // Stalled at catchup point, waiting for dump. DUMPING, // Stalled while sending a state dump. @@ -131,9 +128,6 @@ class Cluster : private Cpg::Handler boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&); - void dumpTo(const Url&); - void dumpError(const MemberId&, const Url&, const char* msg); - mutable sys::Monitor lock; // Protect access to members. broker::Broker& broker; boost::shared_ptr<sys::Poller> poller; diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp index 24c3ed5552..63d0c786d2 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -32,143 +32,55 @@ using namespace framing; namespace cluster { -ClusterMap::ClusterMap() : stalled(false) {} +ClusterMap::ClusterMap() : dumping(false) {} -MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) { - if (stalled) { - stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url)); - return MemberId(); - } - MemberId dumper = nextDumper(); - Dumpee& d = dumpees[id]; - d.url = url; - d.dumper = dumper; - return dumper; -} - -void ClusterMap::ready(const MemberId& id, const Url& url) { - if (stalled) { - stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url)); - return; - } - dumpees.erase(id); - members[id] = url; -} - - -MemberId ClusterMap::nextDumper() const { - // Choose the first member in member-id order of the group that - // has the least number of dumps-in-progress. - assert(!members.empty()); - MemberId dumper = members.begin()->first; - int minDumps = dumps(dumper); - MemberMap::const_iterator i = ++members.begin(); - while (i != members.end()) { - int d = dumps(i->first); - if (d < minDumps) { - minDumps = d; - dumper = i->first; - } - ++i; - } - return dumper; -} - -void ClusterMap::leave(const MemberId& id) { - if (stalled) { - stallq.push_back(boost::bind(&ClusterMap::leave, this, id)); - return; - } - - if (isDumpee(id)) - dumpees.erase(id); - if (isMember(id)) { - members.erase(id); - DumpeeMap::iterator i = dumpees.begin(); - while (i != dumpees.end()) { - if (i->second.dumper == id) dumpees.erase(i++); - else ++i; - } - } -} - -struct ClusterMap::MatchDumper { - MemberId d; - MatchDumper(const MemberId& i) : d(i) {} - bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; } -}; - -int ClusterMap::dumps(const MemberId& id) const { - return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id)); +MemberId ClusterMap::first() { + return (empty()) ? MemberId() : begin()->first; } -void ClusterMap::dumpError(const MemberId& dumpee) { - if (stalled) { - stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee)); - return; +void ClusterMap::configChange(const cpg_address* addrs, size_t size) { + iterator i = begin(); + while (i != end()) { // Remove members that are no longer in addrs. + if (std::find(addrs, addrs+size, i->first) == addrs+size) + erase(i++); + else + ++i; } - dumpees.erase(dumpee); } -framing::ClusterMapBody ClusterMap::toControl() const { - framing::ClusterMapBody b; - for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i) +framing::ClusterUpdateBody ClusterMap::toControl() const { + framing::ClusterUpdateBody b; + for (const_iterator i = begin(); i != end(); ++i) b.getMembers().setString(i->first.str(), i->second.str()); - for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) { - b.getDumpees().setString(i->first.str(), i->second.url.str()); - b.getDumps().setString(i->first.str(), i->second.dumper.str()); - } + b.setDumping(dumping); return b; } -void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) { - *this = ClusterMap(); // Reset any current contents. +void ClusterMap::update(const FieldTable& ftMembers, bool dump) { + dumping = dump; FieldTable::ValueMap::const_iterator i; for (i = ftMembers.begin(); i != ftMembers.end(); ++i) - members[i->first] = Url(i->second->get<std::string>()); - for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i) - dumpees[i->first].url = Url(i->second->get<std::string>()); - for (i = ftDumps.begin(); i != ftDumps.end(); ++i) - dumpees[i->first].dumper = MemberId(i->second->get<std::string>()); + (*this)[i->first] = Url(i->second->get<std::string>()); } -void ClusterMap::fromControl(const framing::ClusterMapBody& b) { - init(b.getMembers(), b.getDumpees(), b.getDumps()); +void ClusterMap::fromControl(const framing::ClusterUpdateBody& b) { + update(b.getMembers(), b.getDumping()); } std::vector<Url> ClusterMap::memberUrls() const { - std::vector<Url> result(members.size()); - std::transform(members.begin(), members.end(), result.begin(), - boost::bind(&MemberMap::value_type::second, _1)); + std::vector<Url> result(size()); + std::transform(begin(), end(), result.begin(), + boost::bind(&value_type::second, _1)); return result; } -void ClusterMap::stall() { stalled = true; } - -namespace { -template <class F> void call(const F& f) { f(); } -} - -void ClusterMap::unstall() { - stalled = false; - std::for_each(stallq.begin(), stallq.end(), - boost::bind(&boost::function<void()>::operator(), _1)); - stallq.clear(); -} - -std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) { +std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv) { return o << mv.first << "=" << mv.second; } -std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) { - return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url; -} - std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { - std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n "); - std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n "); - std::copy(m.members.begin(), m.members.end(), im); - std::copy(m.dumpees.begin(), m.dumpees.end(), id); + std::ostream_iterator<ClusterMap::value_type> im(o, "\n "); + std::copy(m.begin(), m.end(), im); return o; } diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 04323c5905..fce65f083d 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -23,7 +23,7 @@ */ #include "types.h" -#include "qpid/framing/ClusterMapBody.h" +#include "qpid/framing/ClusterUpdateBody.h" #include "qpid/Url.h" #include <boost/function.hpp> #include <vector> @@ -34,68 +34,46 @@ namespace qpid { namespace cluster { +// FIXME aconway 2008-09-15: rename cluster status? + /** * Map of established cluster members and brain-dumps in progress. * A dumper is an established member that is sending catch-up data. * A dumpee is an aspiring member that is receiving catch-up data. */ -class ClusterMap -{ +class ClusterMap : public std::map<MemberId, Url> { public: ClusterMap(); - - MemberId dumpRequest(const MemberId& from, const Url& url); - - void dumpError(const MemberId&); - - void ready(const MemberId& from, const Url& url); - /** Update map for cpg leave event */ - void leave(const MemberId&); + /** First member of the cluster in ID order, gets to perform one-off tasks. */ + MemberId first(); - /** Instead of updating the map, queue the updates for unstall */ - void stall(); + /** Update for CPG config change. */ + void configChange(const cpg_address* addrs, size_t size); - /** Apply queued updates */ - void unstall(); - - /** Number of unfinished dumps for member. */ - int dumps(const MemberId&) const; /** Convert map contents to a cluster control body. */ - framing::ClusterMapBody toControl() const; + framing::ClusterUpdateBody toControl() const; - /** Initialize map contents from a cluster control body. */ - void init(const framing::FieldTable& members, - const framing::FieldTable& dumpees, - const framing::FieldTable& dumps); - - void fromControl(const framing::ClusterMapBody&); + /** Update with first member. */ + using std::map<MemberId, Url>::insert; + void insert(const MemberId& id, const Url& url) { insert(value_type(id,url)); } + void setDumping(bool d) { dumping = d; } - size_t memberCount() const { return members.size(); } - size_t dumpeeCount() const { return dumpees.size(); } + /** Apply update delivered from clsuter. */ + void update(const framing::FieldTable& members, bool dumping); + void fromControl(const framing::ClusterUpdateBody&); - bool isMember(const MemberId& id) const { return members.find(id) != members.end(); } - bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); } + bool isMember(const MemberId& id) const { return find(id) != end(); } + bool isDumping() const { return dumping; } std::vector<Url> memberUrls() const; private: - struct Dumpee { Url url; MemberId dumper; }; - typedef std::map<MemberId, Url> MemberMap; - typedef std::map<MemberId, Dumpee> DumpeeMap; - struct MatchDumper; - - MemberId nextDumper() const; - - MemberMap members; - DumpeeMap dumpees; - bool stalled; - std::deque<boost::function<void()> > stallq; + bool dumping; friend std::ostream& operator<<(std::ostream&, const ClusterMap&); - friend std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv); - friend std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv); + friend std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv); }; }} // namespace qpid::cluster diff --git a/qpid/cpp/src/tests/ClusterMapTest.cpp b/qpid/cpp/src/tests/ClusterMapTest.cpp deleted file mode 100644 index f8ac2e22e6..0000000000 --- a/qpid/cpp/src/tests/ClusterMapTest.cpp +++ /dev/null @@ -1,217 +0,0 @@ -/* - * - * Copyright (c) 2006 The Apache Software Foundation - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - - -#include "unit_test.h" -#include "test_tools.h" -#include "qpid/cluster/ClusterMap.h" -#include "qpid/framing/ClusterMapBody.h" -#include "qpid/framing/Buffer.h" -#include "qpid/Url.h" -#include <boost/assign.hpp> - -QPID_AUTO_TEST_SUITE(CluterMapTest) - -using namespace std; -using namespace qpid; -using namespace cluster; -using namespace framing; - -MemberId id(int i) { return MemberId(i,i); } - -Url url(const char* host) { return Url(TcpAddress(host)); } - -QPID_AUTO_TEST_CASE(testNotice) { - ClusterMap m; - m.ready(id(0), url("0-ready")); // id(0) member, no dump. - BOOST_CHECK(m.isMember(id(0))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump"))); // Newbie, needs dump - BOOST_CHECK(m.isMember(id(0))); - BOOST_CHECK(m.isDumpee(id(1))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); - BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - - m.ready(id(1), url("1-ready")); // id(1) is ready. - BOOST_CHECK(m.isMember(id(0))); - BOOST_CHECK(m.isMember(id(1))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); - BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump"))); // id(2) needs dump - BOOST_CHECK(m.isDumpee(id(2))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - - BOOST_CHECK_EQUAL(id(1), m.dumpRequest(id(3), url("3-dump"))); // 0 busy, dump to id(1). - BOOST_CHECK(m.isDumpee(id(3))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); - BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)2); - - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump"))); // Equally busy, 0 is first on list. - BOOST_CHECK_EQUAL(m.dumps(id(0)), 2); - BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3); - - // My dumpees both complete - m.ready(id(2), url("2-ready")); - m.ready(id(4), url("4-ready")); - BOOST_CHECK(m.isMember(id(2))); - BOOST_CHECK(m.isMember(id(4))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); - BOOST_CHECK_EQUAL(m.dumps(id(1)), 1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - - // Final dumpee completes. - m.ready(id(3), url("3-ready")); - BOOST_CHECK(m.isMember(id(3))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); - BOOST_CHECK_EQUAL(m.dumps(id(1)), 0); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - -} - -QPID_AUTO_TEST_CASE(testLeave) { - ClusterMap m; - m.ready(id(0), url("0-ready")); - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump"))); - m.ready(id(1), url("1-ready")); - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump"))); - m.ready(id(2), url("2-ready")); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)3); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - - m.leave(id(1)); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - BOOST_CHECK(m.isMember(id(0))); - BOOST_CHECK(m.isMember(id(2))); - - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump"))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); - BOOST_CHECK(m.isDumpee(id(4))); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - - m.dumpError(id(4)); // Dumper detected a failure. - BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); - BOOST_CHECK(!m.isDumpee(id(4))); - BOOST_CHECK(!m.isMember(id(4))); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - - m.leave(id(4)); // Dumpee leaves, no-op since we already know it failed. - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - - BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(5), url("5-dump"))); - BOOST_CHECK_EQUAL(m.dumps(id(0)), 1); - BOOST_CHECK(m.isDumpee(id(5))); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1); - - m.leave(id(5)); // Dumpee detects failure and leaves cluster. - BOOST_CHECK_EQUAL(m.dumps(id(0)), 0); - BOOST_CHECK(!m.isDumpee(id(5))); - BOOST_CHECK(!m.isMember(id(5))); - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); - - m.dumpError(id(5)); // Dumper reports failure - no op, we already know. - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0); -} - -QPID_AUTO_TEST_CASE(testToControl) { - ClusterMap m; - m.ready(id(0), url("0")); - m.dumpRequest(id(1), url("1dump")); - m.ready(id(1), url("1")); - m.dumpRequest(id(2), url("2dump")); - m.dumpRequest(id(3), url("3dump")); - m.dumpRequest(id(4), url("4dump")); - - BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3); - - ClusterMapBody b = m.toControl(); - - BOOST_CHECK_EQUAL(b.getMembers().count(), 2); - BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str()); - BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str()); - - BOOST_CHECK_EQUAL(b.getDumpees().count(), 3); - BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str()); - BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str()); - BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str()); - - BOOST_CHECK_EQUAL(b.getDumps().count(), 3); - BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str()); - BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str()); - BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str()); - - std::string s(b.size(), '\0'); - Buffer buf(&s[0], s.size()); - b.encode(buf); - - ClusterMap m2; - m2.fromControl(b); - ClusterMapBody b2 = m2.toControl(); - std::string s2(b2.size(), '\0'); - Buffer buf2(&s2[0], s2.size()); - b2.encode(buf2); - - // Verify a round-trip encoding produces identical results. - BOOST_CHECK_EQUAL(s,s2); -} - -QPID_AUTO_TEST_CASE(testStall) { - ClusterMap m; - m.ready(id(0), url("0")); - BOOST_CHECK_EQUAL(m.memberCount(), 1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); - m.stall(); - - m.dumpRequest(id(1), url("1dump")); - m.ready(id(1), url("1")); - m.leave(id(0)); - m.dumpRequest(id(2), url("2dump")); - m.ready(id(2), url("2")); - m.dumpRequest(id(3), url("3dump")); - BOOST_CHECK_EQUAL(m.memberCount(), 1); - BOOST_CHECK_EQUAL(m.dumpeeCount(), 0); - - m.unstall(); - BOOST_CHECK_EQUAL(m.memberCount(), 2); - BOOST_CHECK_EQUAL(m.dumpeeCount(), 1); - BOOST_CHECK(!m.isMember(id(0))); - BOOST_CHECK(m.isMember(id(1))); - BOOST_CHECK(m.isMember(id(2))); - BOOST_CHECK(m.isDumpee(id(3))); -} - - -QPID_AUTO_TEST_SUITE_END() diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 0fc95b3c7d..55fa71b5e6 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -19,6 +19,6 @@ cluster_test_SOURCES=unit_test.cpp cluster_test.cpp cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework unit_test_LDADD+=$(lib_cluster) -unit_test_SOURCES+=ClusterMapTest.cpp DumpClientTest.cpp +unit_test_SOURCES+=DumpClientTest.cpp endif diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index af380c629d..c17dc99901 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -280,7 +280,7 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) { QPID_AUTO_TEST_CASE(testMessageDequeue) { // Enqueue on one broker, dequeue on two others. - ClusterFixture cluster (3); + ClusterFixture cluster(3); Client c0(cluster[0], "c0"); c0.session.queueDeclare("q"); c0.session.messageTransfer(arg::content=Message("foo", "q")); diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml index cefe92c657..6dbfee109d 100644 --- a/qpid/cpp/xml/cluster.xml +++ b/qpid/cpp/xml/cluster.xml @@ -27,27 +27,14 @@ o<?xml version="1.0"?> <class name = "cluster" code = "0x80" label="Qpid clustering extensions."> <doc>Qpid extension class to allow clustered brokers to communicate.</doc> - <!-- Cluster membership --> - - <control name = "dump-request" code="0x1" label="Url to use for a cluster member"> - <field name="url" type="str16" label="URL for brain dump to new member."/> - </control> - - <control name = "dump-error" code="0x2" label="Error while dumping to new member"> - <field name="dumpee" type="uint64"/> - </control> - - <control name = "ready" code="0x3" label="Cluster member ready at URL"> - <field name="url" type="str16" label="URL for client connections."/> - </control> - - <control name="map" code="0x4" label="Cluster map sent to new members."> + <control name="update" code="0x4" label="Cluster status update."> <field name="members" type="map"/> <!-- member-id -> URL --> - <field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL --> - <field name="dumps" type="map"/> <!-- dumpee-id -> donor-id --> + <field name="dumping" type="boolean"/> <!-- currently dumping state to new member. --> </control> - <!-- Transferring broker state --> + <control name = "dump-request" code="0x1" label="New meber requests brain dump"> + <field name="url" type="str16" label="Url for brain dump."/> + </control> </class> |