diff options
-rw-r--r-- | cpp/src/qpid/cluster/Cluster.cpp | 9 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterPlugin.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/ClusterSettings.h | 5 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/cluster/InitialStatusMap.h | 3 | ||||
-rw-r--r-- | cpp/src/tests/InitialStatusMap.cpp | 26 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 48 | ||||
-rwxr-xr-x | cpp/src/tests/run_cluster_tests | 6 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 26 |
9 files changed, 87 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index 9756ad0a62..5e962e9767 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -208,7 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : decoder(boost::bind(&Cluster::deliverFrame, this, _1)), discarding(true), state(INIT), - initMap(self), + initMap(self, settings.size), lastSize(0), lastBroker(false), updateRetracted(false), @@ -403,8 +403,7 @@ void Cluster::flagError( << ": " << msg); leave(l); } - else if (settings.checkErrors) - error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); + error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg); } // Handler for deliverFrameQueue. @@ -423,7 +422,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { deliverEventQueue.start(); } // Process each frame through the error checker. - if (settings.checkErrors && error.isUnresolved()) { + if (error.isUnresolved()) { error.delivered(e); while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); @@ -874,7 +873,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) { }; assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1); o << "cluster(" << cluster.self << " " << STATE[cluster.state]; - if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error"; + if (cluster.error.isUnresolved()) o << "/error"; return o << ")";; } diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp index 289ec672a8..4eec388866 100644 --- a/cpp/src/qpid/cluster/ClusterPlugin.cpp +++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp @@ -71,9 +71,8 @@ struct ClusterOptions : public Options { #if HAVE_LIBCMAN_H ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif + ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.") ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.") - // TODO aconway 2009-05-20: temporary, remove - ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.") ; } }; diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h index 8820e8a5ac..8e708aa139 100644 --- a/cpp/src/qpid/cluster/ClusterSettings.h +++ b/cpp/src/qpid/cluster/ClusterSettings.h @@ -34,10 +34,9 @@ struct ClusterSettings { bool quorum; size_t readMax; std::string username, password, mechanism; - bool checkErrors; + size_t size; - ClusterSettings() : quorum(false), readMax(10), - checkErrors(true) // TODO aconway 2009-05-20: remove this option. + ClusterSettings() : quorum(false), readMax(10), size(1) {} Url getUrl(uint16_t port) const { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp index 6d27b3ae72..f2251f4043 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.cpp +++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp @@ -28,8 +28,8 @@ using namespace boost; namespace qpid { namespace cluster { -InitialStatusMap::InitialStatusMap(const MemberId& self_) - : self(self_), completed(), resendNeeded() +InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_) + : self(self_), completed(), resendNeeded(), size(size_) {} void InitialStatusMap::configChange(const MemberSet& members) { @@ -83,7 +83,8 @@ bool InitialStatusMap::isActive(const Map::value_type& v) { } bool InitialStatusMap::isComplete() { - return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end(); + return !map.empty() && find_if(map.begin(), map.end(), ¬Initialized) == map.end() + && (map.size() >= size); } bool InitialStatusMap::transitionToComplete() { diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h index 4605a4c1fe..9e9b71e363 100644 --- a/cpp/src/qpid/cluster/InitialStatusMap.h +++ b/cpp/src/qpid/cluster/InitialStatusMap.h @@ -37,7 +37,7 @@ class InitialStatusMap public: typedef framing::ClusterInitialStatusBody Status; - InitialStatusMap(const MemberId& self); + InitialStatusMap(const MemberId& self, size_t size); /** Process a config change. @return true if we need to re-send our status */ void configChange(const MemberSet& newConfig); /** @return true if we need to re-send status */ @@ -71,6 +71,7 @@ class InitialStatusMap MemberSet firstConfig; MemberId self; bool completed, resendNeeded; + size_t size; }; }} // namespace qpid::cluster diff --git a/cpp/src/tests/InitialStatusMap.cpp b/cpp/src/tests/InitialStatusMap.cpp index 70b077b695..c3587965e5 100644 --- a/cpp/src/tests/InitialStatusMap.cpp +++ b/cpp/src/tests/InitialStatusMap.cpp @@ -40,7 +40,7 @@ Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(), QPID_AUTO_TEST_CASE(testFirstInCluster) { // Single member is first in cluster. - InitialStatusMap map(MemberId(0)); + InitialStatusMap map(MemberId(0), 1); Uuid id(true); BOOST_CHECK(!map.isComplete()); MemberSet members = list_of(MemberId(0)); @@ -56,7 +56,7 @@ QPID_AUTO_TEST_CASE(testFirstInCluster) { QPID_AUTO_TEST_CASE(testJoinExistingCluster) { // Single member 0 joins existing cluster 1,2 - InitialStatusMap map(MemberId(0)); + InitialStatusMap map(MemberId(0), 1); Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2)); map.configChange(members); @@ -79,7 +79,7 @@ QPID_AUTO_TEST_CASE(testJoinExistingCluster) { QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) { // Multiple members 0,1,2 join at same time. - InitialStatusMap map(MemberId(1)); // self is 1 + InitialStatusMap map(MemberId(1), 1); // self is 1 Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2)); map.configChange(members); @@ -99,7 +99,7 @@ QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) { QPID_AUTO_TEST_CASE(testMultipleJoinExisting) { // Multiple members 1,2,3 join existing cluster containing 0. - InitialStatusMap map(MemberId(2)); // self is 2 + InitialStatusMap map(MemberId(2), 1); // self is 2 Uuid id(true); MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3)); map.configChange(members); @@ -119,7 +119,7 @@ QPID_AUTO_TEST_CASE(testMultipleJoinExisting) { QPID_AUTO_TEST_CASE(testMembersLeave) { // Test that map completes if members leave rather than send status. - InitialStatusMap map(MemberId(0)); + InitialStatusMap map(MemberId(0), 1); Uuid id(true); map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2))); map.received(MemberId(0), newcomerStatus()); @@ -134,7 +134,7 @@ QPID_AUTO_TEST_CASE(testMembersLeave) { QPID_AUTO_TEST_CASE(testInteveningConfig) { // Multiple config changes arrives before we complete the map. - InitialStatusMap map(MemberId(0)); + InitialStatusMap map(MemberId(0), 1); Uuid id(true); map.configChange(list_of<MemberId>(0)(1)); @@ -159,6 +159,20 @@ QPID_AUTO_TEST_CASE(testInteveningConfig) { BOOST_CHECK_EQUAL(map.getClusterId(), id); } +QPID_AUTO_TEST_CASE(testInitialSize) { + InitialStatusMap map(MemberId(0), 3); + map.configChange(list_of<MemberId>(0)(1)); + map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(1), newcomerStatus()); + BOOST_CHECK(!map.isComplete()); + + map.configChange(list_of<MemberId>(0)(1)(2)); + map.received(MemberId(0), newcomerStatus()); + map.received(MemberId(1), newcomerStatus()); + map.received(MemberId(2), newcomerStatus()); + BOOST_CHECK(map.isComplete()); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 3ded6c103e..ed39277f77 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -26,13 +26,8 @@ from qpid.messaging import Message from threading import Thread -class ClusterTests(BrokerTest): - """Cluster tests with support for testing with a store plugin.""" - - def duration(self): - d = self.config.defines.get("DURATION") - if d: return float(d)*60 - else: return 3 +class ShortTests(BrokerTest): + """Short cluster functionality tests.""" def test_message_replication(self): """Test basic cluster message replication.""" @@ -57,6 +52,42 @@ class ClusterTests(BrokerTest): self.assertEqual("y", m.content) s2.connection.close() + def test_cluster_size(self): + """Verify cluster startup waits for N brokers if --cluster-size=N""" + class ConnectThread(Thread): + def __init__(self, broker): + Thread.__init__(self) + self.broker=broker + self.connected = False + self.error = None + + def run(self): + try: + self.broker.connect() + self.connected = True + except Exception, e: self.error = RethrownException(e) + + cluster = self.cluster(1, args=["--cluster-size=3"], wait_for_start=False) + c = ConnectThread(cluster[0]) + c.start() + time.sleep(.01) + assert not c.connected + cluster.start(wait_for_start=False) + time.sleep(.01) + assert not c.connected + cluster.start(wait_for_start=False) + c.join(1) + assert not c.isAlive() # Join didn't time out + assert c.connected + if c.error: raise c.error + +class LongTests(BrokerTest): + """Tests that can run for a long time if -DDURATION=<minutes> is set""" + def duration(self): + d = self.config.defines.get("DURATION") + if d: return float(d)*60 + else: return 3 # Default is to be quick + def test_failover(self): """Test fail-over during continuous send-receive with errors""" @@ -84,7 +115,8 @@ class ClusterTests(BrokerTest): receiver.stop(sender.sent) for i in range(i, len(cluster)): cluster[i].kill() -class ClusterStoreTests(BrokerTest): + +class StoreTests(BrokerTest): """ Cluster tests that can only be run if there is a store available. """ diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests index 014233d8d3..b090f71852 100755 --- a/cpp/src/tests/run_cluster_tests +++ b/cpp/src/tests/run_cluster_tests @@ -45,8 +45,10 @@ rm -rf $OUTDIR mkdir -p $OUTDIR # Ignore tests requiring a store by default. -TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail $*" +CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail} +CLUSTER_TESTS=${CLUSTER_TESTS:-$*} -with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $TESTS || exit 1 +set -x +with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1 rm -rf $OUTDIR #exit 0 diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index b6046682de..01372bb802 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -206,7 +206,7 @@ class Cluster: _cluster_lib = checkenv("CLUSTER_LIB") _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count @@ -215,21 +215,16 @@ class Cluster: self.args = copy(args) self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--load-module", self._cluster_lib ] - self.start_n(count, expect=expect) + self.start_n(count, expect=expect, wait_for_start=wait_for_start) - def start(self, name=None, expect=EXPECT_RUNNING): + def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) - self._brokers.append(self.test.broker(self.args, name, expect)) + self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start)) return self._brokers[-1] - def start_n(self, count, expect=EXPECT_RUNNING): - for i in range(count): self.start(expect=expect) - - def wait(self): - """Wait for all cluster members to be ready""" - for b in self._brokers: - b.connect().close() + def start_n(self, count, expect=EXPECT_RUNNING, wait_for_start=True): + for i in range(count): self.start(expect=expect, wait_for_start=wait_for_start) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -280,16 +275,15 @@ class BrokerTest(TestCase): self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait_for_start=True): """Create and return a broker ready for use""" b = Broker(self, args=args, name=name, expect=expect) - b.connect().close() + if (wait_for_start): b.connect().close() return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect) - cluster.wait() + cluster = Cluster(self, count, args, expect=expect, wait_for_start=wait_for_start) return cluster class RethrownException(Exception): |