diff options
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterMap.cpp')
-rw-r--r-- | cpp/src/qpid/cluster/ClusterMap.cpp | 101 |
1 files changed, 77 insertions, 24 deletions
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp index b0c45ad625..24c3ed5552 100644 --- a/cpp/src/qpid/cluster/ClusterMap.cpp +++ b/cpp/src/qpid/cluster/ClusterMap.cpp @@ -24,36 +24,38 @@ #include <boost/bind.hpp> #include <algorithm> #include <functional> +#include <iterator> +#include <ostream> namespace qpid { using namespace framing; namespace cluster { -ClusterMap::ClusterMap() {} +ClusterMap::ClusterMap() : stalled(false) {} -MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) { - if (isMember(id)) return MemberId(); // Ignore notices from established members. - if (isDumpee(id)) { - // Dumpee caught up, graduate to member with new URL and remove dumper from list. - dumpees.erase(id); - members[id] = url; - } - else if (members.empty()) { - // First in cluster, congratulations! - members[id] = url; +MemberId ClusterMap::dumpRequest(const MemberId& id, const Url& url) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::dumpRequest, this, id, url)); + return MemberId(); } - else { - // New member needs brain dump. - MemberId dumper = nextDumper(); - Dumpee& d = dumpees[id]; - d.url = url; - d.dumper = dumper; - return dumper; + MemberId dumper = nextDumper(); + Dumpee& d = dumpees[id]; + d.url = url; + d.dumper = dumper; + return dumper; +} + +void ClusterMap::ready(const MemberId& id, const Url& url) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::ready, this, id, url)); + return; } - return MemberId(); + dumpees.erase(id); + members[id] = url; } + MemberId ClusterMap::nextDumper() const { // Choose the first member in member-id order of the group that // has the least number of dumps-in-progress. @@ -73,6 +75,11 @@ MemberId ClusterMap::nextDumper() const { } void ClusterMap::leave(const MemberId& id) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::leave, this, id)); + return; + } + if (isDumpee(id)) dumpees.erase(id); if (isMember(id)) { @@ -95,7 +102,13 @@ int ClusterMap::dumps(const MemberId& id) const { return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id)); } -void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); } +void ClusterMap::dumpError(const MemberId& dumpee) { + if (stalled) { + stallq.push_back(boost::bind(&ClusterMap::dumpError, this, dumpee)); + return; + } + dumpees.erase(dumpee); +} framing::ClusterMapBody ClusterMap::toControl() const { framing::ClusterMapBody b; @@ -108,15 +121,55 @@ framing::ClusterMapBody ClusterMap::toControl() const { return b; } -void ClusterMap::fromControl(const framing::ClusterMapBody& b) { +void ClusterMap::init(const FieldTable& ftMembers,const FieldTable& ftDumpees, const FieldTable& ftDumps) { *this = ClusterMap(); // Reset any current contents. FieldTable::ValueMap::const_iterator i; - for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i) + for (i = ftMembers.begin(); i != ftMembers.end(); ++i) members[i->first] = Url(i->second->get<std::string>()); - for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i) + for (i = ftDumpees.begin(); i != ftDumpees.end(); ++i) dumpees[i->first].url = Url(i->second->get<std::string>()); - for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i) + for (i = ftDumps.begin(); i != ftDumps.end(); ++i) dumpees[i->first].dumper = MemberId(i->second->get<std::string>()); } +void ClusterMap::fromControl(const framing::ClusterMapBody& b) { + init(b.getMembers(), b.getDumpees(), b.getDumps()); +} + +std::vector<Url> ClusterMap::memberUrls() const { + std::vector<Url> result(members.size()); + std::transform(members.begin(), members.end(), result.begin(), + boost::bind(&MemberMap::value_type::second, _1)); + return result; +} + +void ClusterMap::stall() { stalled = true; } + +namespace { +template <class F> void call(const F& f) { f(); } +} + +void ClusterMap::unstall() { + stalled = false; + std::for_each(stallq.begin(), stallq.end(), + boost::bind(&boost::function<void()>::operator(), _1)); + stallq.clear(); +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap::MemberMap::value_type& mv) { + return o << mv.first << "=" << mv.second; +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap::DumpeeMap::value_type& dv) { + return o << "dump: " << dv.second.dumper << " to " << dv.first << "=" << dv.second.url; +} + +std::ostream& operator<<(std::ostream& o, const ClusterMap& m) { + std::ostream_iterator<ClusterMap::MemberMap::value_type> im(o, "\n "); + std::ostream_iterator<ClusterMap::DumpeeMap::value_type> id(o, "\n "); + std::copy(m.members.begin(), m.members.end(), im); + std::copy(m.dumpees.begin(), m.dumpees.end(), id); + return o; +} + }} // namespace qpid::cluster |