1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "ClusterMap.h"
#include "qpid/Url.h"
#include "qpid/framing/FieldTable.h"
#include <boost/bind.hpp>
#include <algorithm>
#include <functional>
namespace qpid {
using namespace framing;
namespace cluster {
ClusterMap::ClusterMap() {}
MemberId ClusterMap::urlNotice(const MemberId& id, const Url& url) {
if (isMember(id)) return MemberId(); // Ignore notices from established members.
if (isDumpee(id)) {
// Dumpee caught up, graduate to member with new URL and remove dumper from list.
dumpees.erase(id);
members[id] = url;
}
else if (members.empty()) {
// First in cluster, congratulations!
members[id] = url;
}
else {
// New member needs brain dump.
MemberId dumper = nextDumper();
Dumpee& d = dumpees[id];
d.url = url;
d.dumper = dumper;
return dumper;
}
return MemberId();
}
MemberId ClusterMap::nextDumper() const {
// Choose the first member in member-id order of the group that
// has the least number of dumps-in-progress.
assert(!members.empty());
MemberId dumper = members.begin()->first;
int minDumps = dumps(dumper);
MemberMap::const_iterator i = ++members.begin();
while (i != members.end()) {
int d = dumps(i->first);
if (d < minDumps) {
minDumps = d;
dumper = i->first;
}
++i;
}
return dumper;
}
void ClusterMap::leave(const MemberId& id) {
if (isDumpee(id))
dumpees.erase(id);
if (isMember(id)) {
members.erase(id);
DumpeeMap::iterator i = dumpees.begin();
while (i != dumpees.end()) {
if (i->second.dumper == id) dumpees.erase(i++);
else ++i;
}
}
}
struct ClusterMap::MatchDumper {
MemberId d;
MatchDumper(const MemberId& i) : d(i) {}
bool operator()(const DumpeeMap::value_type& v) const { return v.second.dumper == d; }
};
int ClusterMap::dumps(const MemberId& id) const {
return std::count_if(dumpees.begin(), dumpees.end(), MatchDumper(id));
}
void ClusterMap::dumpFailed(const MemberId& dumpee) { dumpees.erase(dumpee); }
framing::ClusterMapBody ClusterMap::toControl() const {
framing::ClusterMapBody b;
for (MemberMap::const_iterator i = members.begin(); i != members.end(); ++i)
b.getMembers().setString(i->first.str(), i->second.str());
for (DumpeeMap::const_iterator i = dumpees.begin(); i != dumpees.end(); ++i) {
b.getDumpees().setString(i->first.str(), i->second.url.str());
b.getDumps().setString(i->first.str(), i->second.dumper.str());
}
return b;
}
void ClusterMap::fromControl(const framing::ClusterMapBody& b) {
*this = ClusterMap(); // Reset any current contents.
FieldTable::ValueMap::const_iterator i;
for (i = b.getMembers().begin(); i != b.getMembers().end(); ++i)
members[i->first] = Url(i->second->get<std::string>());
for (i = b.getDumpees().begin(); i != b.getDumpees().end(); ++i)
dumpees[i->first].url = Url(i->second->get<std::string>());
for (i = b.getDumps().begin(); i != b.getDumps().end(); ++i)
dumpees[i->first].dumper = MemberId(i->second->get<std::string>());
}
}} // namespace qpid::cluster
|