diff options
Diffstat (limited to 'qpid/cpp/src/qpid/cluster/ClusterMap.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.cpp | 176 |
1 files changed, 176 insertions, 0 deletions
diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.cpp b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp new file mode 100644 index 0000000000..a8389095c9 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.cpp @@ -0,0 +1,176 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/cluster/ClusterMap.h" +#include "qpid/Url.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/log/Statement.h" +#include <boost/bind.hpp> +#include <algorithm> +#include <functional> +#include <iterator> +#include <ostream> + +using namespace std; +using namespace boost; + +namespace qpid { +using namespace framing; + +namespace cluster { + +namespace { + +void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& map, ClusterMap::Set& set) { + MemberId id(vt.first); + set.insert(id); + string url = vt.second->get<string>(); + if (!url.empty()) + map.insert(ClusterMap::Map::value_type(id, Url(url))); +} + +void insertFieldTableFromMapValue(FieldTable& ft, const ClusterMap::Map::value_type& vt) { + ft.setString(vt.first.str(), vt.second.str()); +} + +} + +ClusterMap::ClusterMap() : frameSeq(0) {} + +ClusterMap::ClusterMap(const Map& map) : frameSeq(0) { + transform(map.begin(), map.end(), inserter(alive, alive.begin()), bind(&Map::value_type::first, _1)); + members = map; +} + +ClusterMap::ClusterMap(const FieldTable& joinersFt, const FieldTable& membersFt, + framing::SequenceNumber frameSeq_) + : frameSeq(frameSeq_) +{ + for_each(joinersFt.begin(), joinersFt.end(), bind(&addFieldTableValue, _1, ref(joiners), ref(alive))); + for_each(membersFt.begin(), membersFt.end(), bind(&addFieldTableValue, _1, ref(members), ref(alive))); +} + +void ClusterMap::toMethodBody(framing::ClusterConnectionMembershipBody& b) const { + b.getJoiners().clear(); + for_each(joiners.begin(), joiners.end(), bind(&insertFieldTableFromMapValue, ref(b.getJoiners()), _1)); + for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) { + if (!isMember(*i) && !isJoiner(*i)) + b.getJoiners().setString(i->str(), string()); + } + b.getMembers().clear(); + for_each(members.begin(), members.end(), bind(&insertFieldTableFromMapValue, ref(b.getMembers()), _1)); + b.setFrameSeq(frameSeq); +} + +Url ClusterMap::getUrl(const Map& map, const MemberId& id) { + Map::const_iterator i = map.find(id); + return i == map.end() ? Url() : i->second; +} + +MemberId ClusterMap::firstJoiner() const { + return joiners.empty() ? MemberId() : joiners.begin()->first; +} + +vector<string> ClusterMap::memberIds() const { + vector<string> ids; + for (Map::const_iterator iter = members.begin(); + iter != members.end(); iter++) { + stringstream stream; + stream << iter->first; + ids.push_back(stream.str()); + } + return ids; +} + +vector<Url> ClusterMap::memberUrls() const { + vector<Url> urls(members.size()); + transform(members.begin(), members.end(), urls.begin(), + bind(&Map::value_type::second, _1)); + return urls; +} + +ClusterMap::Set ClusterMap::getAlive() const { return alive; } + +ClusterMap::Set ClusterMap::getMembers() const { + Set s; + transform(members.begin(), members.end(), inserter(s, s.begin()), + bind(&Map::value_type::first, _1)); + return s; +} + +ostream& operator<<(ostream& o, const ClusterMap::Map& m) { + ostream_iterator<MemberId> oi(o); + transform(m.begin(), m.end(), oi, bind(&ClusterMap::Map::value_type::first, _1)); + return o; +} + +ostream& operator<<(ostream& o, const ClusterMap& m) { + for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != m.alive.end(); ++i) { + o << *i; + if (m.isMember(*i)) o << "(member)"; + else if (m.isJoiner(*i)) o << "(joiner)"; + else o << "(unknown)"; + o << " "; + } + o << "frameSeq=" << m.getFrameSeq(); + return o; +} + +bool ClusterMap::updateRequest(const MemberId& id, const string& url) { + try { + if (isAlive(id)) { + joiners[id] = Url(url); + return true; + } + } catch (const Url::Invalid&) { + QPID_LOG(error, "Invalid URL in cluster update request: " << url); + } + return false; +} + +bool ClusterMap::ready(const MemberId& id, const Url& url) { + return isAlive(id) && members.insert(Map::value_type(id,url)).second; +} + +bool ClusterMap::configChange(const Set& update) { + bool memberChange = false; + Set removed; + set_difference(alive.begin(), alive.end(), + update.begin(), update.end(), + inserter(removed, removed.begin())); + alive = update; + for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) { + memberChange = memberChange || members.erase(*i); + joiners.erase(*i); + } + return memberChange; +} + +optional<Url> ClusterMap::updateOffer(const MemberId& from, const MemberId& to) { + Map::iterator i = joiners.find(to); + if (isAlive(from) && i != joiners.end()) { + Url url= i->second; + joiners.erase(i); // No longer a potential updatee. + return url; + } + return optional<Url>(); +} + +}} // namespace qpid::cluster |