diff options
author | Alan Conway <aconway@apache.org> | 2009-11-17 18:09:01 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-11-17 18:09:01 +0000 |
commit | 20e76d9894129f94f58a6b4794f64dbb4ddf8820 (patch) | |
tree | e2990b23d6955ed992050f7f8a55df16585560d4 /qpid/cpp/src | |
parent | df29240dd9452a3972ed65693d00d970bd841533 (diff) | |
download | qpid-python-20e76d9894129f94f58a6b4794f64dbb4ddf8820.tar.gz |
cluster::InitialStatusMap and unit tests, support for improved cluster join protocol.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@881420 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r-- | qpid/cpp/src/cluster.mk | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cluster.h | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/ClusterMap.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/Cpg.cpp | 15 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp | 110 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/InitialStatusMap.h | 68 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/cluster/types.h | 7 | ||||
-rw-r--r-- | qpid/cpp/src/tests/InitialStatusMap.cpp | 141 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster.mk | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/test_tools.h | 14 |
12 files changed, 365 insertions, 23 deletions
diff --git a/qpid/cpp/src/cluster.mk b/qpid/cpp/src/cluster.mk index 1a8812d169..6e9bd27698 100644 --- a/qpid/cpp/src/cluster.mk +++ b/qpid/cpp/src/cluster.mk @@ -82,6 +82,8 @@ cluster_la_SOURCES = \ qpid/cluster/PollerDispatch.h \ qpid/cluster/ProxyInputHandler.h \ qpid/cluster/Quorum.h \ + qpid/cluster/InitialStatusMap.h \ + qpid/cluster/InitialStatusMap.cpp \ qpid/cluster/types.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la diff --git a/qpid/cpp/src/qpid/cluster/Cluster.cpp b/qpid/cpp/src/qpid/cluster/Cluster.cpp index 3ced6263df..d6312e7b93 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.cpp +++ b/qpid/cpp/src/qpid/cluster/Cluster.cpp @@ -153,7 +153,7 @@ namespace _qmf = ::qmf::org::apache::qpid::cluster; * Currently use SVN revision to avoid clashes with versions from * different branches. */ -const uint32_t Cluster::CLUSTER_VERSION = 834052; +const uint32_t Cluster::CLUSTER_VERSION = 835547; struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { qpid::cluster::Cluster& cluster; @@ -162,12 +162,19 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { ClusterDispatcher(Cluster& c, const MemberId& id, Cluster::Lock& l_) : cluster(c), member(id), l(l_) {} void updateRequest(const std::string& url) { cluster.updateRequest(member, url, l); } + void initialStatus(bool active, bool persistent, const framing::FieldTable& props) { + cluster.initialStatus(member, active, persistent, props); + } void ready(const std::string& url) { cluster.ready(member, url, l); } void configChange(const std::string& current) { cluster.configChange(member, current, l); } - void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { cluster.updateOffer(member, updatee, id, version, l); } + void updateOffer(uint64_t updatee, const Uuid& id, uint32_t version) { + cluster.updateOffer(member, updatee, id, version, l); + } void retractOffer(uint64_t updatee) { cluster.retractOffer(member, updatee, l); } void messageExpired(uint64_t id) { cluster.messageExpired(member, id, l); } - void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { cluster.errorCheck(member, type, frameSeq, l); } + void errorCheck(uint8_t type, const framing::SequenceNumber& frameSeq) { + cluster.errorCheck(member, type, frameSeq, l); + } void shutdown() { cluster.shutdown(member, l); } @@ -603,6 +610,10 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) makeOffer(id, l); } +void Cluster::initialStatus(const MemberId&, bool /*active*/, bool /*persistent*/, + const framing::FieldTable&) { + // FIXME aconway 2009-11-12: fill in. +} void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) { if (map.ready(id, Url(url))) memberUpdate(l); diff --git a/qpid/cpp/src/qpid/cluster/Cluster.h b/qpid/cpp/src/qpid/cluster/Cluster.h index 286d7867c9..751a71867d 100644 --- a/qpid/cpp/src/qpid/cluster/Cluster.h +++ b/qpid/cpp/src/qpid/cluster/Cluster.h @@ -147,6 +147,7 @@ class Cluster : private Cpg::Handler, public management::Manageable { void updateOffer(const MemberId& updater, uint64_t updatee, const framing::Uuid&, uint32_t version, Lock&); void retractOffer(const MemberId& updater, uint64_t updatee, Lock&); + void initialStatus(const MemberId&, bool active, bool persistent, const framing::FieldTable& props); void ready(const MemberId&, const std::string&, Lock&); void configChange(const MemberId&, const std::string& current, Lock& l); void messageExpired(const MemberId&, uint64_t, Lock& l); diff --git a/qpid/cpp/src/qpid/cluster/ClusterMap.h b/qpid/cpp/src/qpid/cluster/ClusterMap.h index 5735a6335d..2e682a6f4a 100644 --- a/qpid/cpp/src/qpid/cluster/ClusterMap.h +++ b/qpid/cpp/src/qpid/cluster/ClusterMap.h @@ -33,14 +33,11 @@ #include <vector> #include <deque> #include <map> -#include <set> #include <iosfwd> namespace qpid { namespace cluster { -typedef std::set<MemberId> MemberSet; - /** * Map of established cluster members and joiners waiting for an update, * along with other cluster state that must be updated. diff --git a/qpid/cpp/src/qpid/cluster/Cpg.cpp b/qpid/cpp/src/qpid/cluster/Cpg.cpp index 316b5386f1..49a814b848 100644 --- a/qpid/cpp/src/qpid/cluster/Cpg.cpp +++ b/qpid/cpp/src/qpid/cluster/Cpg.cpp @@ -217,12 +217,15 @@ MemberId Cpg::self() const { namespace { int byte(uint32_t value, int i) { return (value >> (i*8)) & 0xff; } } -ostream& operator <<(ostream& out, const MemberId& id) { - out << byte(id.first, 0) << "." - << byte(id.first, 1) << "." - << byte(id.first, 2) << "." - << byte(id.first, 3); - return out << ":" << id.second; +ostream& operator<<(ostream& out, const MemberId& id) { + if (id.first) { + out << byte(id.first, 0) << "." + << byte(id.first, 1) << "." + << byte(id.first, 2) << "." + << byte(id.first, 3) + << ":"; + } + return out << id.second; } ostream& operator<<(ostream& o, const ConnectionId& c) { diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp new file mode 100644 index 0000000000..8c55a66ed7 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "InitialStatusMap.h" +#include <algorithm> +#include <boost/bind.hpp> + +using namespace std; +using namespace boost; + +namespace qpid { +namespace cluster { + +InitialStatusMap::InitialStatusMap(const MemberId& self_) + : self(self_), complete(), updateNeeded(), resendNeeded() +{ + map[self] = optional<Status>(); +} + +void InitialStatusMap::configChange(const MemberSet& members) { + resendNeeded = false; + if (firstConfig.empty()) firstConfig = members; + MemberSet::const_iterator i = members.begin(); + Map::iterator j = map.begin(); + while (i != members.end() || j != map.end()) { + if (i == members.end()) { // j not in members, member left + Map::iterator k = j++; + map.erase(k); + } + else if (j == map.end()) { // i not in map, member joined + resendNeeded = true; + map[*i] = optional<Status>(); + ++i; + } + else if (*i < j->first) { // i not in map, member joined + resendNeeded = true; + map[*i] = optional<Status>(); + ++i; + } + else if (*i > j->first) { // j not in members, member left + Map::iterator k = j++; + map.erase(k); + } + else { + i++; j++; + } + } + if (resendNeeded) { // Clear all status + for (Map::iterator i = map.begin(); i != map.end(); ++i) + i->second = optional<Status>(); + } +} + +void InitialStatusMap::received(const MemberId& m, const Status& s){ + map[m] = s; +} + +bool InitialStatusMap::notInitialized(const Map::value_type& v) { + return !v.second; +} + +bool InitialStatusMap::isActive(const Map::value_type& v) { + return v.second && v.second->getActive(); +} + +bool InitialStatusMap::isComplete() { + return find_if(map.begin(), map.end(), ¬Initialized) == map.end(); +} + +bool InitialStatusMap::isResendNeeded() { + bool ret = resendNeeded; + resendNeeded = false; + return ret; +} + +bool InitialStatusMap::isUpdateNeeded() { + assert(isComplete()); + // If there are any active members we need an update. + return find_if(map.begin(), map.end(), &isActive) != map.end(); +} + +MemberSet InitialStatusMap::getElders() { + assert(isComplete()); + MemberSet elders; + // Elders are from first config change, active or higher node-id. + for (MemberSet::iterator i = firstConfig.begin(); i != firstConfig.end(); ++i) { + if (map.find(*i) != map.end() && (map[*i]->getActive() || *i > self)) + elders.insert(*i); + } + return elders; +} + +}} // namespace qpid::cluster diff --git a/qpid/cpp/src/qpid/cluster/InitialStatusMap.h b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h new file mode 100644 index 0000000000..d139722623 --- /dev/null +++ b/qpid/cpp/src/qpid/cluster/InitialStatusMap.h @@ -0,0 +1,68 @@ +#ifndef QPID_CLUSTER_INITIALSTATUSMAP_H +#define QPID_CLUSTER_INITIALSTATUSMAP_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "types.h" +#include <qpid/framing/ClusterInitialStatusBody.h> +#include <boost/optional.hpp> + +namespace qpid { +namespace cluster { + +/** + * Track status of cluster members during initialization. + */ +class InitialStatusMap +{ + public: + typedef framing::ClusterInitialStatusBody Status; + + InitialStatusMap(const MemberId& self); + /** Process a config change. @return true if we need to re-send our status */ + void configChange(const MemberSet& newConfig); + /** @return true if we need to re-send status */ + bool isResendNeeded(); + + /** Process received status */ + void received(const MemberId&, const Status& is); + + /**@return true if the map is complete. */ + bool isComplete(); + /**@pre isComplete. @return this node's elders */ + MemberSet getElders(); + /**@pre isComplete. @return True if we need an update. */ + bool isUpdateNeeded(); + + private: + typedef std::map<MemberId, boost::optional<Status> > Map; + static bool notInitialized(const Map::value_type&); + static bool isActive(const Map::value_type&); + void check(); + Map map; + MemberSet firstConfig; + MemberId self; + bool complete, updateNeeded, resendNeeded; +}; +}} // namespace qpid::cluster + +#endif /*!QPID_CLUSTER_INITIALSTATUSMAP_H*/ diff --git a/qpid/cpp/src/qpid/cluster/types.h b/qpid/cpp/src/qpid/cluster/types.h index c19152e4d8..6777c9674c 100644 --- a/qpid/cpp/src/qpid/cluster/types.h +++ b/qpid/cpp/src/qpid/cluster/types.h @@ -29,6 +29,7 @@ #include <utility> #include <iosfwd> #include <string> +#include <set> extern "C" { @@ -52,8 +53,8 @@ enum EventType { DATA, CONTROL }; /** first=node-id, second=pid */ struct MemberId : std::pair<uint32_t, uint32_t> { - explicit MemberId(uint64_t n) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {} - explicit MemberId(uint32_t node=0, uint32_t pid=0) : std::pair<uint32_t,uint32_t>(node, pid) {} + MemberId(uint64_t n=0) : std::pair<uint32_t,uint32_t>( n >> 32, n & 0xffffffff) {} + MemberId(uint32_t node, uint32_t pid) : std::pair<uint32_t,uint32_t>(node, pid) {} MemberId(const cpg_address& caddr) : std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {} MemberId(const std::string&); // Decode from string. uint32_t getNode() const { return first; } @@ -75,6 +76,8 @@ struct ConnectionId : public std::pair<MemberId, uint64_t> { uint64_t getNumber() const { return second; } }; +typedef std::set<MemberId> MemberSet; + std::ostream& operator<<(std::ostream&, const ConnectionId&); std::ostream& operator<<(std::ostream&, EventType); diff --git a/qpid/cpp/src/tests/InitialStatusMap.cpp b/qpid/cpp/src/tests/InitialStatusMap.cpp new file mode 100644 index 0000000000..7709b1fbfc --- /dev/null +++ b/qpid/cpp/src/tests/InitialStatusMap.cpp @@ -0,0 +1,141 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/cluster/InitialStatusMap.h" +#include <boost/assign.hpp> + +using namespace std; +using namespace qpid::cluster; +using namespace qpid::framing; +using namespace boost::assign; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(InitialStatusMapTestSuite) + +typedef InitialStatusMap::Status Status; + +Status activeStatus() { return Status(ProtocolVersion(), true, false, FieldTable()); } +Status newcomerStatus() { return Status(ProtocolVersion(), false, false, FieldTable()); } + +QPID_AUTO_TEST_CASE(testFirstInCluster) { + // Single member is first in cluster. + InitialStatusMap map(MemberId(0)); + BOOST_CHECK(!map.isComplete()); + MemberSet members = list_of(MemberId(0)); + map.configChange(members); + BOOST_CHECK(!map.isComplete()); + map.received(MemberId(0), newcomerStatus()); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK(map.getElders().empty()); + BOOST_CHECK(!map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testJoinExistingCluster) { + // Single member 0 joins existing cluster 1,2 + InitialStatusMap map(MemberId(0)); + MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2)); + map.configChange(members); + BOOST_CHECK(map.isResendNeeded()); + BOOST_CHECK(!map.isComplete()); + map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(1), activeStatus()); + BOOST_CHECK(!map.isComplete()); + map.received(MemberId(2), activeStatus()); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1)(2)); + BOOST_CHECK(map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) { + // Multiple members 0,1,2 join at same time. + InitialStatusMap map(MemberId(1)); // self is 1 + MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2)); + map.configChange(members); + BOOST_CHECK(map.isResendNeeded()); + + // All new members + map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(1), newcomerStatus()); + map.received(MemberId(2), newcomerStatus()); + BOOST_CHECK(!map.isResendNeeded()); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(2))); + BOOST_CHECK(!map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testMultipleJoinExisting) { + // Multiple members 1,2,3 join existing cluster containing 0. + InitialStatusMap map(MemberId(2)); // self is 2 + MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3)); + map.configChange(members); + BOOST_CHECK(map.isResendNeeded()); + + map.received(MemberId(1), newcomerStatus()); + map.received(MemberId(2), newcomerStatus()); + map.received(MemberId(3), newcomerStatus()); + map.received(MemberId(0), activeStatus()); + BOOST_CHECK(!map.isResendNeeded()); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(0))(MemberId(3))); + BOOST_CHECK(map.isUpdateNeeded()); +} + +QPID_AUTO_TEST_CASE(testMembersLeave) { + // Test that map completes if members leave rather than send status. + InitialStatusMap map(MemberId(0)); + map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2))); + map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(1), activeStatus()); + BOOST_CHECK(!map.isComplete()); + map.configChange(list_of(MemberId(0))(MemberId(1))); // 2 left + BOOST_CHECK(map.isComplete()); + BOOST_CHECK_EQUAL(map.getElders(), list_of(MemberId(1))); +} + +QPID_AUTO_TEST_CASE(testInteveningConfig) { + // Multiple config changes arrives before we complete the map. + InitialStatusMap map(MemberId(0)); + + map.configChange(list_of<MemberId>(0)(1)); + BOOST_CHECK(map.isResendNeeded()); + map.received(MemberId(0), newcomerStatus()); + BOOST_CHECK(!map.isComplete()); + BOOST_CHECK(!map.isResendNeeded()); + // New member 2 joins before we receive 1 + map.configChange(list_of<MemberId>(0)(1)(2)); + BOOST_CHECK(!map.isComplete()); + BOOST_CHECK(map.isResendNeeded()); + map.received(1, activeStatus()); + map.received(2, newcomerStatus()); + // We should not be complete as we haven't received 0 since new member joined + BOOST_CHECK(!map.isComplete()); + BOOST_CHECK(!map.isResendNeeded()); + + map.received(0, newcomerStatus()); + BOOST_CHECK(map.isComplete()); + BOOST_CHECK_EQUAL(map.getElders(), list_of<MemberId>(1)); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/qpid/cpp/src/tests/cluster.mk b/qpid/cpp/src/tests/cluster.mk index 961d65f319..f33f87ee62 100644 --- a/qpid/cpp/src/tests/cluster.mk +++ b/qpid/cpp/src/tests/cluster.mk @@ -75,9 +75,10 @@ cluster_test_SOURCES = \ ForkedBroker.h \ ForkedBroker.cpp \ PartialFailure.cpp \ - ClusterFailover.cpp + ClusterFailover.cpp \ + InitialStatusMap.cpp -cluster_test_LDADD=$(lib_client) $(lib_broker) -lboost_unit_test_framework +cluster_test_LDADD=$(lib_client) $(lib_broker) ../cluster.la -lboost_unit_test_framework qpidtest_SCRIPTS += run_cluster_tests cluster_tests.py run_long_cluster_tests long_cluster_tests.py testlib.py cluster_tests.fail diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index eb6d98eced..c2683727cf 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -54,11 +54,6 @@ #include <algorithm> #include <iterator> -namespace std { // ostream operators in std:: namespace -template <class T> -ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s); } -} - using namespace std; using namespace qpid; using namespace qpid::cluster; diff --git a/qpid/cpp/src/tests/test_tools.h b/qpid/cpp/src/tests/test_tools.h index 832c04af04..4174751173 100644 --- a/qpid/cpp/src/tests/test_tools.h +++ b/qpid/cpp/src/tests/test_tools.h @@ -26,6 +26,7 @@ #include <boost/regex.hpp> #include <boost/assign/list_of.hpp> #include <vector> +#include <set> #include <ostream> // Print a sequence @@ -43,14 +44,17 @@ bool seqEqual(const T& a, const U& b) { return (i == a.end()) && (j == b.end()); } -// ostream and == operators so we can compare vectors and boost::assign::list_of -// with BOOST_CHECK_EQUALS +// ostream and == operators so we can compare vectors and sets with +// boost::assign::list_of with BOOST_CHECK_EQUALS namespace std { // In namespace std so boost can find them. template <class T> ostream& operator<<(ostream& o, const vector<T>& v) { return seqPrint(o, v); } template <class T> +ostream& operator<<(ostream& o, const set<T>& v) { return seqPrint(o, v); } + +template <class T> ostream& operator<<(ostream& o, const boost::assign_detail::generic_list<T>& l) { return seqPrint(o, l); } template <class T> @@ -58,6 +62,12 @@ bool operator == (const vector<T>& a, const boost::assign_detail::generic_list<T template <class T> bool operator == (const boost::assign_detail::generic_list<T>& b, const vector<T>& a) { return seqEqual(a, b); } + +template <class T> +bool operator == (const set<T>& a, const boost::assign_detail::generic_list<T>& b) { return seqEqual(a, b); } + +template <class T> +bool operator == (const boost::assign_detail::generic_list<T>& b, const set<T>& a) { return seqEqual(a, b); } } namespace qpid { |