summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ClusterMap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterMap.cpp')
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp101
1 files changed, 77 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
index b0c45ad625..24c3ed5552 100644
--- a/cpp/src/qpid/cluster/ClusterMap.cpp
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -24,36 +24,38 @@
#include <boost/bind.hpp>
#include <algorithm>
#include <functional>
+#include <iterator>
+#include <ostream>
namespace qpid {
using namespace framing;
namespace cluster {
-ClusterMap::ClusterMap() {}
+ClusterMap::ClusterMap() : stalled(false) {}
-MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
- if (isMember(id)) return MemberId(); // Ignore notices from established members.
- if (isDumpee(id)) {
- // Dumpee caught up, graduate to member with new URL and remove dumper from list.
- dumpees.erase(id);
- members[id] = url;
- }
- else if (members.empty()) {
- // First in cluster, congratulations!
- members[id] = url;
+MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url));
+ return MemberId();
}
- else {
- // New member needs brain dump.
- MemberId dumper = nextDumper();
- Dumpee& d = dumpees[id];
- d.url = url;
- d.dumper = dumper;
- return dumper;
+ MemberId dumper = nextDumper();
+ Dumpee& d = dumpees[id];
+ d.url = url;
+ d.dumper = dumper;
+ return dumper;
+}
+
+void ClusterMap::ready(const MemberId& id, const Url& url) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url));
+ return;
}
- return MemberId();
+ dumpees.erase(id);
+ members[id] = url;
}
+
MemberId ClusterMap::nextDumper() const {
// Choose the first member in member-id order of the group that
// has the least number of dumps-in-progress.
@@ -73,6 +75,11 @@ MemberId ClusterMap::nextDumper() const {
}
void ClusterMap::leave(const MemberId& id) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::leave, this, id));
+ return;
+ }
+
if (isDumpee(id))
dumpees.erase(id);
if (isMember(id)) {
@@ -95,7 +102,13 @@ int ClusterMap::dumps(const MemberId& id) const {
return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
}
-void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
+void ClusterMap::dumpError(const MemberId& dumpee) {
+ if (stalled) {
+ stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee));
+ return;
+ }
+ dumpees.erase(dumpee);
+}
framing::ClusterMapBody ClusterMap::toControl() const {
framing::ClusterMapBody b;
@@ -108,15 +121,55 @@ framing::ClusterMapBody ClusterMap::toControl() const {
return b;
}
-void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) {
*this = ClusterMap(); // Reset any current contents.
FieldTable::ValueMap::const_iterator i;
- for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i)
+ for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
members[i->first] = Url(i->second->get<std::string>());
- for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
+ for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i)
dumpees[i->first].url = Url(i->second->get<std::string>());
- for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
+ for (i = ftDumps.begin(); i != ftDumps.end(); ++i)
dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
}
+void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+ init(b.getMembers(), b.getDumpees(), b.getDumps());
+}
+
+std::vector<Url> ClusterMap::memberUrls() const {
+ std::vector<Url> result(members.size());
+ std::transform(members.begin(), members.end(), result.begin(),
+ boost::bind(&MemberMap::value_type::second, _1));
+ return result;
+}
+
+void ClusterMap::stall() { stalled = true; }
+
+namespace {
+template <class F> void call(const F& f) { f(); }
+}
+
+void ClusterMap::unstall() {
+ stalled = false;
+ std::for_each(stallq.begin(), stallq.end(),
+ boost::bind(&boost::function<void()>::operator(), _1));
+ stallq.clear();
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) {
+ return o << mv.first << "=" << mv.second;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) {
+ return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url;
+}
+
+std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
+ std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n ");
+ std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n ");
+ std::copy(m.members.begin(), m.members.end(), im);
+ std::copy(m.dumpees.begin(), m.dumpees.end(), id);
+ return o;
+}
+
}} // namespace qpid::cluster