summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/cluster/ClusterMap.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/cluster/ClusterMap.cpp')
-rw-r--r--cpp/src/qpid/cluster/ClusterMap.cpp122
1 files changed, 122 insertions, 0 deletions
diff --git a/cpp/src/qpid/cluster/ClusterMap.cpp b/cpp/src/qpid/cluster/ClusterMap.cpp
new file mode 100644
index 0000000000..b0c45ad625
--- /dev/null
+++ b/cpp/src/qpid/cluster/ClusterMap.cpp
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterMap.h"
+#include "qpid/Url.h"
+#include "qpid/framing/FieldTable.h"
+#include <boost/bind.hpp>
+#include <algorithm>
+#include <functional>
+
+namespace qpid {
+using namespace framing;
+
+namespace cluster {
+
+ClusterMap::ClusterMap() {}
+
+MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
+ if (isMember(id)) return MemberId(); // Ignore notices from established members.
+ if (isDumpee(id)) {
+ // Dumpee caught up, graduate to member with new URL and remove dumper from list.
+ dumpees.erase(id);
+ members[id] = url;
+ }
+ else if (members.empty()) {
+ // First in cluster, congratulations!
+ members[id] = url;
+ }
+ else {
+ // New member needs brain dump.
+ MemberId dumper = nextDumper();
+ Dumpee& d = dumpees[id];
+ d.url = url;
+ d.dumper = dumper;
+ return dumper;
+ }
+ return MemberId();
+}
+
+MemberId ClusterMap::nextDumper() const {
+ // Choose the first member in member-id order of the group that
+ // has the least number of dumps-in-progress.
+ assert(!members.empty());
+ MemberId dumper = members.begin()->first;
+ int minDumps = dumps(dumper);
+ MemberMap::const_iterator i = ++members.begin();
+ while (i != members.end()) {
+ int d = dumps(i->first);
+ if (d < minDumps) {
+ minDumps = d;
+ dumper = i->first;
+ }
+ ++i;
+ }
+ return dumper;
+}
+
+void ClusterMap::leave(const MemberId& id) {
+ if (isDumpee(id))
+ dumpees.erase(id);
+ if (isMember(id)) {
+ members.erase(id);
+ DumpeeMap::iterator i = dumpees.begin();
+ while (i != dumpees.end()) {
+ if (i->second.dumper == id) dumpees.erase(i++);
+ else ++i;
+ }
+ }
+}
+
+struct ClusterMap::MatchDumper {
+ MemberId d;
+ MatchDumper(const MemberId& i) : d(i) {}
+ bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; }
+};
+
+int ClusterMap::dumps(const MemberId& id) const {
+ return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
+}
+
+void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
+
+framing::ClusterMapBody ClusterMap::toControl() const {
+ framing::ClusterMapBody b;
+ for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i)
+ b.getMembers().setString(i->first.str(), i->second.str());
+ for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) {
+ b.getDumpees().setString(i->first.str(), i->second.url.str());
+ b.getDumps().setString(i->first.str(), i->second.dumper.str());
+ }
+ return b;
+}
+
+void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
+ *this = ClusterMap(); // Reset any current contents.
+ FieldTable::ValueMap::const_iterator i;
+ for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i)
+ members[i->first] = Url(i->second->get<std::string>());
+ for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
+ dumpees[i->first].url = Url(i->second->get<std::string>());
+ for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
+ dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
+}
+
+}} // namespace qpid::cluster