summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-09-16 00:22:00 +0000
committerAlan Conway <aconway@apache.org>2008-09-16 00:22:00 +0000
commitde70df69b4b086ac6bf9ce09d3741d527c290f19 (patch)
treea9e30e22b19bf86fca0eb136defd2290bc1f869b /qpid
parente307d4138fff7b2635ce808eea50f01f4d542a85 (diff)
downloadqpid-python-de70df69b4b086ac6bf9ce09d3741d527c290f19.tar.gz
Simplified cluster updates.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@695696 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rwxr-xr-xqpid/cpp/rubygen/cppgen.rb3
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.cpp132
-rw-r--r--qpid/cpp/src/qpid/cluster/Cluster.h12
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.cpp138
-rw-r--r--qpid/cpp/src/qpid/cluster/ClusterMap.h62
-rw-r--r--qpid/cpp/src/tests/ClusterMapTest.cpp217
-rw-r--r--qpid/cpp/src/tests/cluster.mk2
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp2
-rw-r--r--qpid/cpp/xml/cluster.xml23
9 files changed, 103 insertions, 488 deletions
diff --git a/qpid/cpp/rubygen/cppgen.rb b/qpid/cpp/rubygen/cppgen.rb
index 3a4228567a..13f6f3744d 100755
--- a/qpid/cpp/rubygen/cppgen.rb
+++ b/qpid/cpp/rubygen/cppgen.rb
@@ -118,6 +118,7 @@ class AmqpRoot
# preview; map 0-10 types to preview code generator types
@@typemap = {
"bit"=> CppType.new("bool").code("Octet").defval("false"),
+ "boolean"=> CppType.new("bool").code("Octet").defval("false"),
"uint8"=>CppType.new("uint8_t").code("Octet").defval("0"),
"uint16"=>CppType.new("uint16_t").code("Short").defval("0"),
"uint32"=>CppType.new("uint32_t").code("Long").defval("0"),
@@ -189,7 +190,7 @@ class AmqpField
c=containing_class
c.struct(type_)
end
- def cpptype() lookup_cpptype(type_) or raise "no cpptype #{self}" end
+ def cpptype() lookup_cpptype(type_) or raise "no cpptype #{type_} for field #{self}" end
def cppname() name.lcaps.cppsafe; end
def bit?() type_ == "bit"; end
def signature() cpptype.param+" "+cppname; end
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp
index c441686def..7fb2e5ad58 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.cpp
+++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp
@@ -26,9 +26,7 @@
#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
-#include "qpid/framing/ClusterReadyBody.h"
-#include "qpid/framing/ClusterDumpErrorBody.h"
-#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -56,11 +54,7 @@ struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
- void dumpError(uint64_t dumpee) { cluster.dumpError(member, MemberId(dumpee)); }
- void ready(const std::string& u) { cluster.ready(member, u); }
- virtual void map(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
- cluster.mapInit(members, dumpees, dumps);
- }
+ void update(const FieldTable& members,bool dumping) { cluster.update(members, dumping); }
};
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
@@ -76,7 +70,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
- state(DISCARD)
+ state(START)
{
QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
@@ -127,7 +121,7 @@ void Cluster::mcastEvent(const Event& e) {
size_t Cluster::size() const {
Mutex::ScopedLock l(lock);
- return map.memberCount();
+ return map.size();
}
std::vector<Url> Cluster::getUrls() const {
@@ -229,7 +223,7 @@ void Cluster::configChange(
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address *joined, int nJoined)
+ cpg_address */*joined*/, int nJoined)
{
// FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
@@ -240,19 +234,17 @@ void Cluster::configChange(
broker.shutdown();
}
Mutex::ScopedLock l(lock);
- if (state == DISCARD) {
- if (nCurrent == 1 && *current == self) {
- QPID_LOG(notice, self << " first in cluster.");
- map.ready(self, url);
- ready(); // First in cluster.
- }
- else if (find(joined, joined+nJoined, self) != joined+nJoined) {
- QPID_LOG(notice, self << " requesting state dump."); // Just joined
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
- }
+ map.configChange(current, nCurrent);
+ if (state == START && nCurrent == 1) { // First in cluster
+ assert(*current == self);
+ assert(map.empty());
+ QPID_LOG(notice, self << " first in cluster.");
+ map.insert(self, url);
+ ready();
+ }
+ else if (nJoined && self == map.first()) { // Send an update to new members.
+ mcastControl(map.toControl(), 0);
}
- for (int i = 0; i < nLeft; ++i)
- map.leave(left[i]);
}
void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -268,33 +260,42 @@ void Cluster::disconnect(sys::DispatchHandle& ) {
broker.shutdown();
}
-// FIXME aconway 2008-09-15: can't serve multiple dump requests, stall in wrong place.
-// Only one at a time to simplify things?
-void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+void Cluster::update(const FieldTable& members, bool dumping) {
Mutex::ScopedLock l(lock);
- Url url(urlStr);
- if (self == m) {
- switch (state) {
- case DISCARD: state = CATCHUP; stall(); break;
- case HAVE_DUMP: ready(); break; // FIXME aconway 2008-09-15: apply dump to map.
- default: assert(0);
- };
- }
- else if (self == map.dumpRequest(m, url)) {
- assert(state == READY);
- QPID_LOG(info, self << " dumping to " << url);
- // state = DUMPING;
- // stall();
- // FIXME aconway 2008-09-15: need to stall map?
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- mcastControl(map.toControl(), 0); // FIXME aconway 2008-09-15: stand-in for dump.
+ map.update(members, dumping);
+ QPID_LOG(info, "Cluster update:\n " << map);
+ if (state == START && dumping == false) {
+ state = DISCARD;
+ mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
}
}
-void Cluster::ready(const MemberId& m, const string& urlStr) {
+void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
Mutex::ScopedLock l(lock);
- Url url(urlStr);
- map.ready(m, url);
+ bool wasDumping = map.isDumping();
+ map.setDumping(true);
+ if (!wasDumping) {
+ if (self == m) { // My turn
+ assert(state == DISCARD);
+ // FIXME aconway 2008-09-15: RECEIVE DUMP
+ // state = CATCHUP;
+ // stall();
+ // When received
+ map.insert(self, url);
+ mcastControl(map.toControl(), 0);
+ ready();
+ }
+ else if (state == READY && self == map.first()) { // Give the dump.
+ QPID_LOG(info, self << " dumping to " << url);
+ // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
+ // state = DUMPING;
+ // stall();
+ (void)urlStr;
+ // When dump complete:
+ map.setDumping(false);
+ mcastControl(map.toControl(), 0);
+ }
+ }
}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -314,7 +315,6 @@ void Cluster::ready() {
// Called with lock held
QPID_LOG(info, self << " ready with URL " << url);
state = READY;
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
connectionEventQueue.start(poller);
// FIXME aconway 2008-09-15: stall/unstall map?
}
@@ -331,45 +331,5 @@ void Cluster::shutdown() {
delete this;
}
-/** Received from cluster */
-void Cluster::dumpError(const MemberId& dumper, const MemberId& dumpee) {
- QPID_LOG(error, "Error in dump from " << dumper << " to " << dumpee);
- Mutex::ScopedLock l(lock);
- map.dumpError(dumpee);
- if (state == DUMPING && map.dumps(self) == 0)
- ready();
-}
-
-/** Called in local dump thread */
-void Cluster::dumpError(const MemberId& dumpee, const Url& url, const char* msg) {
- assert(state == DUMPING);
- QPID_LOG(error, "Error in local dump to " << dumpee << " at " << url << ": " << msg);
- mcastControl(ClusterDumpErrorBody(ProtocolVersion(), dumpee), 0);
- Mutex::ScopedLock l(lock);
- map.dumpError(dumpee);
- if (map.dumps(self) == 0) // Unstall immediately.
- ready();
-}
-
-void Cluster::mapInit(const FieldTable& members,const FieldTable& dumpees, const FieldTable& dumps) {
- Mutex::ScopedLock l(lock);
- // FIXME aconway 2008-09-15: faking out dump here.
- switch (state) {
- case DISCARD:
- map.init(members, dumpees, dumps);
- state = HAVE_DUMP;
- break;
- case CATCHUP:
- map.init(members, dumpees, dumps);
- ready();
- break;
- default:
- break;
- }
-}
-
-void Cluster::dumpTo(const Url& ) {
- // FIXME aconway 2008-09-12: DumpClient
-}
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h
index 24db07b32b..b8527ae66b 100644
--- a/qpid/cpp/src/qpid/cluster/Cluster.h
+++ b/qpid/cpp/src/qpid/cluster/Cluster.h
@@ -76,11 +76,7 @@ class Cluster : private Cpg::Handler
void leave();
void dumpRequest(const MemberId&, const std::string& url);
- void dumpError(const MemberId& dumper, const MemberId& dumpee);
- void ready(const MemberId&, const std::string& url);
- void mapInit(const framing::FieldTable& members,
- const framing::FieldTable& dumpees,
- const framing::FieldTable& dumps);
+ void update(const framing::FieldTable& members, bool dumping);
MemberId getSelf() const { return self; }
@@ -95,7 +91,8 @@ class Cluster : private Cpg::Handler
typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > ConnectionMap;
typedef sys::PollableQueue<Event> EventQueue;
enum State {
- DISCARD, // Discard updates up to catchup point.
+ START, // Have not yet received first cluster update.
+ DISCARD, // Discard updates up to dump start point.
HAVE_DUMP, // Received state dump, waiting for catchup point.
CATCHUP, // Stalled at catchup point, waiting for dump.
DUMPING, // Stalled while sending a state dump.
@@ -131,9 +128,6 @@ class Cluster : private Cpg::Handler
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
- void dumpTo(const Url&);
- void dumpError(const MemberId&, const Url&, const char* msg);
-
mutable sys::Monitor lock; // Protect access to members.
broker::Broker& broker;
boost::shared_ptr<sys::Poller> poller;
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
index 24c3ed5552..63d0c786d2 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -32,143 +32,55 @@ using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() : stalled(false) {}
+ClusterMap::ClusterMap() : dumping(false) {}
-MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) {
- if (stalled) {
- stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url));
- return MemberId();
- }
- MemberId dumper = nextDumper();
- Dumpee& d = dumpees[id];
- d.url = url;
- d.dumper = dumper;
- return dumper;
-}
-
-void ClusterMap::ready(const MemberId& id, const Url& url) {
- if (stalled) {
- stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url));
- return;
- }
- dumpees.erase(id);
- members[id] = url;
-}
-
-
-MemberId ClusterMap::nextDumper() const {
- // Choose the first member in member-id order of the group that
- // has the least number of dumps-in-progress.
- assert(!members.empty());
- MemberId dumper = members.begin()->first;
- int minDumps = dumps(dumper);
- MemberMap::const_iterator i = ++members.begin();
- while (i != members.end()) {
- int d = dumps(i->first);
- if (d < minDumps) {
- minDumps = d;
- dumper = i->first;
- }
- ++i;
- }
- return dumper;
-}
-
-void ClusterMap::leave(const MemberId& id) {
- if (stalled) {
- stallq.push_back(boost::bind(&ClusterMap::leave, this, id));
- return;
- }
-
- if (isDumpee(id))
- dumpees.erase(id);
- if (isMember(id)) {
- members.erase(id);
- DumpeeMap::iterator i = dumpees.begin();
- while (i != dumpees.end()) {
- if (i->second.dumper == id) dumpees.erase(i++);
- else ++i;
- }
- }
-}
-
-struct ClusterMap::MatchDumper {
- MemberId d;
- MatchDumper(const MemberId& i) : d(i) {}
- bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; }
-};
-
-int ClusterMap::dumps(const MemberId& id) const {
- return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
+MemberId ClusterMap::first() {
+ return (empty()) ? MemberId() : begin()->first;
}
-void ClusterMap::dumpError(const MemberId& dumpee) {
- if (stalled) {
- stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee));
- return;
+void ClusterMap::configChange(const cpg_address* addrs, size_t size) {
+ iterator i = begin();
+ while (i != end()) { // Remove members that are no longer in addrs.
+ if (std::find(addrs, addrs+size, i->first) == addrs+size)
+ erase(i++);
+ else
+ ++i;
}
- dumpees.erase(dumpee);
}
-framing::ClusterMapBody ClusterMap::toControl() const {
- framing::ClusterMapBody b;
- for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i)
+framing::ClusterUpdateBody ClusterMap::toControl() const {
+ framing::ClusterUpdateBody b;
+ for (const_iterator i = begin(); i != end(); ++i)
b.getMembers().setString(i->first.str(), i->second.str());
- for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) {
- b.getDumpees().setString(i->first.str(), i->second.url.str());
- b.getDumps().setString(i->first.str(), i->second.dumper.str());
- }
+ b.setDumping(dumping);
return b;
}
-void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) {
- *this = ClusterMap(); // Reset any current contents.
+void ClusterMap::update(const FieldTable& ftMembers, bool dump) {
+ dumping = dump;
FieldTable::ValueMap::const_iterator i;
for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
- members[i->first] = Url(i->second->get<std::string>());
- for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i)
- dumpees[i->first].url = Url(i->second->get<std::string>());
- for (i = ftDumps.begin(); i != ftDumps.end(); ++i)
- dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
+ (*this)[i->first] = Url(i->second->get<std::string>());
}
-void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
- init(b.getMembers(), b.getDumpees(), b.getDumps());
+void ClusterMap::fromControl(const framing::ClusterUpdateBody& b) {
+ update(b.getMembers(), b.getDumping());
}
std::vector<Url> ClusterMap::memberUrls() const {
- std::vector<Url> result(members.size());
- std::transform(members.begin(), members.end(), result.begin(),
- boost::bind(&MemberMap::value_type::second, _1));
+ std::vector<Url> result(size());
+ std::transform(begin(), end(), result.begin(),
+ boost::bind(&value_type::second, _1));
return result;
}
-void ClusterMap::stall() { stalled = true; }
-
-namespace {
-template <class F> void call(const F& f) { f(); }
-}
-
-void ClusterMap::unstall() {
- stalled = false;
- std::for_each(stallq.begin(), stallq.end(),
- boost::bind(&boost::function<void()>::operator(), _1));
- stallq.clear();
-}
-
-std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) {
+std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv) {
return o << mv.first << "=" << mv.second;
}
-std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) {
- return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url;
-}
-
std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
- std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n ");
- std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n ");
- std::copy(m.members.begin(), m.members.end(), im);
- std::copy(m.dumpees.begin(), m.dumpees.end(), id);
+ std::ostream_iterator<ClusterMap::value_type> im(o, "\n ");
+ std::copy(m.begin(), m.end(), im);
return o;
}
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h
index 04323c5905..fce65f083d 100644
--- a/qpid/cpp/src/qpid/cluster/ClusterMap.h
+++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h
@@ -23,7 +23,7 @@
*/
#include "types.h"
-#include "qpid/framing/ClusterMapBody.h"
+#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/Url.h"
#include <boost/function.hpp>
#include <vector>
@@ -34,68 +34,46 @@
namespace qpid {
namespace cluster {
+// FIXME aconway 2008-09-15: rename cluster status?
+
/**
* Map of established cluster members and brain-dumps in progress.
* A dumper is an established member that is sending catch-up data.
* A dumpee is an aspiring member that is receiving catch-up data.
*/
-class ClusterMap
-{
+class ClusterMap : public std::map<MemberId, Url> {
public:
ClusterMap();
-
- MemberId dumpRequest(const MemberId& from, const Url& url);
-
- void dumpError(const MemberId&);
-
- void ready(const MemberId& from, const Url& url);
- /** Update map for cpg leave event */
- void leave(const MemberId&);
+ /** First member of the cluster in ID order, gets to perform one-off tasks. */
+ MemberId first();
- /** Instead of updating the map, queue the updates for unstall */
- void stall();
+ /** Update for CPG config change. */
+ void configChange(const cpg_address* addrs, size_t size);
- /** Apply queued updates */
- void unstall();
-
- /** Number of unfinished dumps for member. */
- int dumps(const MemberId&) const;
/** Convert map contents to a cluster control body. */
- framing::ClusterMapBody toControl() const;
+ framing::ClusterUpdateBody toControl() const;
- /** Initialize map contents from a cluster control body. */
- void init(const framing::FieldTable& members,
- const framing::FieldTable& dumpees,
- const framing::FieldTable& dumps);
-
- void fromControl(const framing::ClusterMapBody&);
+ /** Update with first member. */
+ using std::map<MemberId, Url>::insert;
+ void insert(const MemberId& id, const Url& url) { insert(value_type(id,url)); }
+ void setDumping(bool d) { dumping = d; }
- size_t memberCount() const { return members.size(); }
- size_t dumpeeCount() const { return dumpees.size(); }
+ /** Apply update delivered from clsuter. */
+ void update(const framing::FieldTable& members, bool dumping);
+ void fromControl(const framing::ClusterUpdateBody&);
- bool isMember(const MemberId& id) const { return members.find(id) != members.end(); }
- bool isDumpee(const MemberId& id) const { return dumpees.find(id) != dumpees.end(); }
+ bool isMember(const MemberId& id) const { return find(id) != end(); }
+ bool isDumping() const { return dumping; }
std::vector<Url> memberUrls() const;
private:
- struct Dumpee { Url url; MemberId dumper; };
- typedef std::map<MemberId, Url> MemberMap;
- typedef std::map<MemberId, Dumpee> DumpeeMap;
- struct MatchDumper;
-
- MemberId nextDumper() const;
-
- MemberMap members;
- DumpeeMap dumpees;
- bool stalled;
- std::deque<boost::function<void()> > stallq;
+ bool dumping;
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
- friend std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv);
- friend std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv);
+ friend std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv);
};
}} // namespace qpid::cluster
diff --git a/qpid/cpp/src/tests/ClusterMapTest.cpp b/qpid/cpp/src/tests/ClusterMapTest.cpp
deleted file mode 100644
index f8ac2e22e6..0000000000
--- a/qpid/cpp/src/tests/ClusterMapTest.cpp
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-#include "unit_test.h"
-#include "test_tools.h"
-#include "qpid/cluster/ClusterMap.h"
-#include "qpid/framing/ClusterMapBody.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/Url.h"
-#include <boost/assign.hpp>
-
-QPID_AUTO_TEST_SUITE(CluterMapTest)
-
-using namespace std;
-using namespace qpid;
-using namespace cluster;
-using namespace framing;
-
-MemberId id(int i) { return MemberId(i,i); }
-
-Url url(const char* host) { return Url(TcpAddress(host)); }
-
-QPID_AUTO_TEST_CASE(testNotice) {
- ClusterMap m;
- m.ready(id(0), url("0-ready")); // id(0) member, no dump.
- BOOST_CHECK(m.isMember(id(0)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump"))); // Newbie, needs dump
- BOOST_CHECK(m.isMember(id(0)));
- BOOST_CHECK(m.isDumpee(id(1)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
- BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
-
- m.ready(id(1), url("1-ready")); // id(1) is ready.
- BOOST_CHECK(m.isMember(id(0)));
- BOOST_CHECK(m.isMember(id(1)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
- BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump"))); // id(2) needs dump
- BOOST_CHECK(m.isDumpee(id(2)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
-
- BOOST_CHECK_EQUAL(id(1), m.dumpRequest(id(3), url("3-dump"))); // 0 busy, dump to id(1).
- BOOST_CHECK(m.isDumpee(id(3)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
- BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)2);
-
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump"))); // Equally busy, 0 is first on list.
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 2);
- BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3);
-
- // My dumpees both complete
- m.ready(id(2), url("2-ready"));
- m.ready(id(4), url("4-ready"));
- BOOST_CHECK(m.isMember(id(2)));
- BOOST_CHECK(m.isMember(id(4)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
- BOOST_CHECK_EQUAL(m.dumps(id(1)), 1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
-
- // Final dumpee completes.
- m.ready(id(3), url("3-ready"));
- BOOST_CHECK(m.isMember(id(3)));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
- BOOST_CHECK_EQUAL(m.dumps(id(1)), 0);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
-}
-
-QPID_AUTO_TEST_CASE(testLeave) {
- ClusterMap m;
- m.ready(id(0), url("0-ready"));
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(1), url("1-dump")));
- m.ready(id(1), url("1-ready"));
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(2), url("2-dump")));
- m.ready(id(2), url("2-ready"));
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)3);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
- m.leave(id(1));
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
- BOOST_CHECK(m.isMember(id(0)));
- BOOST_CHECK(m.isMember(id(2)));
-
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(4), url("4-dump")));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
- BOOST_CHECK(m.isDumpee(id(4)));
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
-
- m.dumpError(id(4)); // Dumper detected a failure.
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
- BOOST_CHECK(!m.isDumpee(id(4)));
- BOOST_CHECK(!m.isMember(id(4)));
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
- m.leave(id(4)); // Dumpee leaves, no-op since we already know it failed.
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
- BOOST_CHECK_EQUAL(id(0), m.dumpRequest(id(5), url("5-dump")));
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 1);
- BOOST_CHECK(m.isDumpee(id(5)));
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)1);
-
- m.leave(id(5)); // Dumpee detects failure and leaves cluster.
- BOOST_CHECK_EQUAL(m.dumps(id(0)), 0);
- BOOST_CHECK(!m.isDumpee(id(5)));
- BOOST_CHECK(!m.isMember(id(5)));
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-
- m.dumpError(id(5)); // Dumper reports failure - no op, we already know.
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)0);
-}
-
-QPID_AUTO_TEST_CASE(testToControl) {
- ClusterMap m;
- m.ready(id(0), url("0"));
- m.dumpRequest(id(1), url("1dump"));
- m.ready(id(1), url("1"));
- m.dumpRequest(id(2), url("2dump"));
- m.dumpRequest(id(3), url("3dump"));
- m.dumpRequest(id(4), url("4dump"));
-
- BOOST_CHECK_EQUAL(m.memberCount(), (unsigned)2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), (unsigned)3);
-
- ClusterMapBody b = m.toControl();
-
- BOOST_CHECK_EQUAL(b.getMembers().count(), 2);
- BOOST_CHECK_EQUAL(b.getMembers().getString(id(0).str()), url("0").str());
- BOOST_CHECK_EQUAL(b.getMembers().getString(id(1).str()), url("1").str());
-
- BOOST_CHECK_EQUAL(b.getDumpees().count(), 3);
- BOOST_CHECK_EQUAL(b.getDumpees().getString(id(2).str()), url("2dump").str());
- BOOST_CHECK_EQUAL(b.getDumpees().getString(id(3).str()), url("3dump").str());
- BOOST_CHECK_EQUAL(b.getDumpees().getString(id(4).str()), url("4dump").str());
-
- BOOST_CHECK_EQUAL(b.getDumps().count(), 3);
- BOOST_CHECK_EQUAL(b.getDumps().getString(id(2).str()), id(0).str());
- BOOST_CHECK_EQUAL(b.getDumps().getString(id(3).str()), id(1).str());
- BOOST_CHECK_EQUAL(b.getDumps().getString(id(4).str()), id(0).str());
-
- std::string s(b.size(), '\0');
- Buffer buf(&s[0], s.size());
- b.encode(buf);
-
- ClusterMap m2;
- m2.fromControl(b);
- ClusterMapBody b2 = m2.toControl();
- std::string s2(b2.size(), '\0');
- Buffer buf2(&s2[0], s2.size());
- b2.encode(buf2);
-
- // Verify a round-trip encoding produces identical results.
- BOOST_CHECK_EQUAL(s,s2);
-}
-
-QPID_AUTO_TEST_CASE(testStall) {
- ClusterMap m;
- m.ready(id(0), url("0"));
- BOOST_CHECK_EQUAL(m.memberCount(), 1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
- m.stall();
-
- m.dumpRequest(id(1), url("1dump"));
- m.ready(id(1), url("1"));
- m.leave(id(0));
- m.dumpRequest(id(2), url("2dump"));
- m.ready(id(2), url("2"));
- m.dumpRequest(id(3), url("3dump"));
- BOOST_CHECK_EQUAL(m.memberCount(), 1);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), 0);
-
- m.unstall();
- BOOST_CHECK_EQUAL(m.memberCount(), 2);
- BOOST_CHECK_EQUAL(m.dumpeeCount(), 1);
- BOOST_CHECK(!m.isMember(id(0)));
- BOOST_CHECK(m.isMember(id(1)));
- BOOST_CHECK(m.isMember(id(2)));
- BOOST_CHECK(m.isDumpee(id(3)));
-}
-
-
-QPID_AUTO_TEST_SUITE_END()
diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk
index 0fc95b3c7d..55fa71b5e6 100644
--- a/qpid/cpp/src/tests/cluster.mk
+++ b/qpid/cpp/src/tests/cluster.mk
@@ -19,6 +19,6 @@ cluster_test_SOURCES=unit_test.cpp cluster_test.cpp
cluster_test_LDADD=$(lib_client) $(lib_cluster) -lboost_unit_test_framework
unit_test_LDADD+=$(lib_cluster)
-unit_test_SOURCES+=ClusterMapTest.cpp DumpClientTest.cpp
+unit_test_SOURCES+=DumpClientTest.cpp
endif
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index af380c629d..c17dc99901 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -280,7 +280,7 @@ QPID_AUTO_TEST_CASE(testMessageEnqueue) {
QPID_AUTO_TEST_CASE(testMessageDequeue) {
// Enqueue on one broker, dequeue on two others.
- ClusterFixture cluster (3);
+ ClusterFixture cluster(3);
Client c0(cluster[0], "c0");
c0.session.queueDeclare("q");
c0.session.messageTransfer(arg::content=Message("foo", "q"));
diff --git a/qpid/cpp/xml/cluster.xml b/qpid/cpp/xml/cluster.xml
index cefe92c657..6dbfee109d 100644
--- a/qpid/cpp/xml/cluster.xml
+++ b/qpid/cpp/xml/cluster.xml
@@ -27,27 +27,14 @@ o<?xml version="1.0"?>
<class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
<doc>Qpid extension class to allow clustered brokers to communicate.</doc>
- <!-- Cluster membership -->
-
- <control name = "dump-request" code="0x1" label="Url to use for a cluster member">
- <field name="url" type="str16" label="URL for brain dump to new member."/>
- </control>
-
- <control name = "dump-error" code="0x2" label="Error while dumping to new member">
- <field name="dumpee" type="uint64"/>
- </control>
-
- <control name = "ready" code="0x3" label="Cluster member ready at URL">
- <field name="url" type="str16" label="URL for client connections."/>
- </control>
-
- <control name="map" code="0x4" label="Cluster map sent to new members.">
+ <control name="update" code="0x4" label="Cluster status update.">
<field name="members" type="map"/> <!-- member-id -> URL -->
- <field name="dumpees" type="map"/> <!-- dumpee-id -> braindump URL -->
- <field name="dumps" type="map"/> <!-- dumpee-id -> donor-id -->
+ <field name="dumping" type="boolean"/> <!-- currently dumping state to new member. -->
</control>
- <!-- Transferring broker state -->
+ <control name = "dump-request" code="0x1" label="New meber requests brain dump">
+ <field name="url" type="str16" label="Url for brain dump."/>
+ </control>
</class>