summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/qpid/cluster/Cluster.cpp9
-rw-r--r--cpp/src/qpid/cluster/ClusterPlugin.cpp3
-rw-r--r--cpp/src/qpid/cluster/ClusterSettings.h5
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.cpp7
-rw-r--r--cpp/src/qpid/cluster/InitialStatusMap.h3
-rw-r--r--cpp/src/tests/InitialStatusMap.cpp26
-rwxr-xr-xcpp/src/tests/cluster_tests.py48
-rwxr-xr-xcpp/src/tests/run_cluster_tests6
-rw-r--r--python/qpid/brokertest.py26
9 files changed, 87 insertions, 46 deletions
diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp
index 9756ad0a62..5e962e9767 100644
--- a/cpp/src/qpid/cluster/Cluster.cpp
+++ b/cpp/src/qpid/cluster/Cluster.cpp
@@ -208,7 +208,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) :
decoder(boost::bind(&Cluster::deliverFrame, this, _1)),
discarding(true),
state(INIT),
- initMap(self),
+ initMap(self, settings.size),
lastSize(0),
lastBroker(false),
updateRetracted(false),
@@ -403,8 +403,7 @@ void Cluster::flagError(
<< ": " << msg);
leave(l);
}
- else if (settings.checkErrors)
- error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
+ error.error(connection, type, map.getFrameSeq(), map.getMembers(), msg);
}
// Handler for deliverFrameQueue.
@@ -423,7 +422,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) {
deliverEventQueue.start();
}
// Process each frame through the error checker.
- if (settings.checkErrors && error.isUnresolved()) {
+ if (error.isUnresolved()) {
error.delivered(e);
while (error.canProcess()) // There is a frame ready to process.
processFrame(error.getNext(), l);
@@ -874,7 +873,7 @@ std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
};
assert(sizeof(STATE)/sizeof(*STATE) == Cluster::LEFT+1);
o << "cluster(" << cluster.self << " " << STATE[cluster.state];
- if (cluster.settings.checkErrors && cluster.error.isUnresolved()) o << "/error";
+ if (cluster.error.isUnresolved()) o << "/error";
return o << ")";;
}
diff --git a/cpp/src/qpid/cluster/ClusterPlugin.cpp b/cpp/src/qpid/cluster/ClusterPlugin.cpp
index 289ec672a8..4eec388866 100644
--- a/cpp/src/qpid/cluster/ClusterPlugin.cpp
+++ b/cpp/src/qpid/cluster/ClusterPlugin.cpp
@@ -71,9 +71,8 @@ struct ClusterOptions : public Options {
#if HAVE_LIBCMAN_H
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
+ ("cluster-size", optValue(settings.size, "N"), "Wait for N cluster members before allowing clients to connect.")
("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
- // TODO aconway 2009-05-20: temporary, remove
- ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
;
}
};
diff --git a/cpp/src/qpid/cluster/ClusterSettings.h b/cpp/src/qpid/cluster/ClusterSettings.h
index 8820e8a5ac..8e708aa139 100644
--- a/cpp/src/qpid/cluster/ClusterSettings.h
+++ b/cpp/src/qpid/cluster/ClusterSettings.h
@@ -34,10 +34,9 @@ struct ClusterSettings {
bool quorum;
size_t readMax;
std::string username, password, mechanism;
- bool checkErrors;
+ size_t size;
- ClusterSettings() : quorum(false), readMax(10),
- checkErrors(true) // TODO aconway 2009-05-20: remove this option.
+ ClusterSettings() : quorum(false), readMax(10), size(1)
{}
Url getUrl(uint16_t port) const {
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.cpp b/cpp/src/qpid/cluster/InitialStatusMap.cpp
index 6d27b3ae72..f2251f4043 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.cpp
+++ b/cpp/src/qpid/cluster/InitialStatusMap.cpp
@@ -28,8 +28,8 @@ using namespace boost;
namespace qpid {
namespace cluster {
-InitialStatusMap::InitialStatusMap(const MemberId& self_)
- : self(self_), completed(), resendNeeded()
+InitialStatusMap::InitialStatusMap(const MemberId& self_, size_t size_)
+ : self(self_), completed(), resendNeeded(), size(size_)
{}
void InitialStatusMap::configChange(const MemberSet& members) {
@@ -83,7 +83,8 @@ bool InitialStatusMap::isActive(const Map::value_type& v) {
}
bool InitialStatusMap::isComplete() {
- return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end();
+ return !map.empty() && find_if(map.begin(), map.end(), &notInitialized) == map.end()
+ && (map.size() >= size);
}
bool InitialStatusMap::transitionToComplete() {
diff --git a/cpp/src/qpid/cluster/InitialStatusMap.h b/cpp/src/qpid/cluster/InitialStatusMap.h
index 4605a4c1fe..9e9b71e363 100644
--- a/cpp/src/qpid/cluster/InitialStatusMap.h
+++ b/cpp/src/qpid/cluster/InitialStatusMap.h
@@ -37,7 +37,7 @@ class InitialStatusMap
public:
typedef framing::ClusterInitialStatusBody Status;
- InitialStatusMap(const MemberId& self);
+ InitialStatusMap(const MemberId& self, size_t size);
/** Process a config change. @return true if we need to re-send our status */
void configChange(const MemberSet& newConfig);
/** @return true if we need to re-send status */
@@ -71,6 +71,7 @@ class InitialStatusMap
MemberSet firstConfig;
MemberId self;
bool completed, resendNeeded;
+ size_t size;
};
}} // namespace qpid::cluster
diff --git a/cpp/src/tests/InitialStatusMap.cpp b/cpp/src/tests/InitialStatusMap.cpp
index 70b077b695..c3587965e5 100644
--- a/cpp/src/tests/InitialStatusMap.cpp
+++ b/cpp/src/tests/InitialStatusMap.cpp
@@ -40,7 +40,7 @@ Status newcomerStatus(const Uuid& id=Uuid()) { return Status(ProtocolVersion(),
QPID_AUTO_TEST_CASE(testFirstInCluster) {
// Single member is first in cluster.
- InitialStatusMap map(MemberId(0));
+ InitialStatusMap map(MemberId(0), 1);
Uuid id(true);
BOOST_CHECK(!map.isComplete());
MemberSet members = list_of(MemberId(0));
@@ -56,7 +56,7 @@ QPID_AUTO_TEST_CASE(testFirstInCluster) {
QPID_AUTO_TEST_CASE(testJoinExistingCluster) {
// Single member 0 joins existing cluster 1,2
- InitialStatusMap map(MemberId(0));
+ InitialStatusMap map(MemberId(0), 1);
Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
map.configChange(members);
@@ -79,7 +79,7 @@ QPID_AUTO_TEST_CASE(testJoinExistingCluster) {
QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
// Multiple members 0,1,2 join at same time.
- InitialStatusMap map(MemberId(1)); // self is 1
+ InitialStatusMap map(MemberId(1), 1); // self is 1
Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2));
map.configChange(members);
@@ -99,7 +99,7 @@ QPID_AUTO_TEST_CASE(testMultipleFirstInCluster) {
QPID_AUTO_TEST_CASE(testMultipleJoinExisting) {
// Multiple members 1,2,3 join existing cluster containing 0.
- InitialStatusMap map(MemberId(2)); // self is 2
+ InitialStatusMap map(MemberId(2), 1); // self is 2
Uuid id(true);
MemberSet members = list_of(MemberId(0))(MemberId(1))(MemberId(2))(MemberId(3));
map.configChange(members);
@@ -119,7 +119,7 @@ QPID_AUTO_TEST_CASE(testMultipleJoinExisting) {
QPID_AUTO_TEST_CASE(testMembersLeave) {
// Test that map completes if members leave rather than send status.
- InitialStatusMap map(MemberId(0));
+ InitialStatusMap map(MemberId(0), 1);
Uuid id(true);
map.configChange(list_of(MemberId(0))(MemberId(1))(MemberId(2)));
map.received(MemberId(0), newcomerStatus());
@@ -134,7 +134,7 @@ QPID_AUTO_TEST_CASE(testMembersLeave) {
QPID_AUTO_TEST_CASE(testInteveningConfig) {
// Multiple config changes arrives before we complete the map.
- InitialStatusMap map(MemberId(0));
+ InitialStatusMap map(MemberId(0), 1);
Uuid id(true);
map.configChange(list_of<MemberId>(0)(1));
@@ -159,6 +159,20 @@ QPID_AUTO_TEST_CASE(testInteveningConfig) {
BOOST_CHECK_EQUAL(map.getClusterId(), id);
}
+QPID_AUTO_TEST_CASE(testInitialSize) {
+ InitialStatusMap map(MemberId(0), 3);
+ map.configChange(list_of<MemberId>(0)(1));
+ map.received(MemberId(0), newcomerStatus());
+ map.received(MemberId(1), newcomerStatus());
+ BOOST_CHECK(!map.isComplete());
+
+ map.configChange(list_of<MemberId>(0)(1)(2));
+ map.received(MemberId(0), newcomerStatus());
+ map.received(MemberId(1), newcomerStatus());
+ map.received(MemberId(2), newcomerStatus());
+ BOOST_CHECK(map.isComplete());
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 3ded6c103e..ed39277f77 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -26,13 +26,8 @@ from qpid.messaging import Message
from threading import Thread
-class ClusterTests(BrokerTest):
- """Cluster tests with support for testing with a store plugin."""
-
- def duration(self):
- d = self.config.defines.get("DURATION")
- if d: return float(d)*60
- else: return 3
+class ShortTests(BrokerTest):
+ """Short cluster functionality tests."""
def test_message_replication(self):
"""Test basic cluster message replication."""
@@ -57,6 +52,42 @@ class ClusterTests(BrokerTest):
self.assertEqual("y", m.content)
s2.connection.close()
+ def test_cluster_size(self):
+ """Verify cluster startup waits for N brokers if --cluster-size=N"""
+ class ConnectThread(Thread):
+ def __init__(self, broker):
+ Thread.__init__(self)
+ self.broker=broker
+ self.connected = False
+ self.error = None
+
+ def run(self):
+ try:
+ self.broker.connect()
+ self.connected = True
+ except Exception, e: self.error = RethrownException(e)
+
+ cluster = self.cluster(1, args=["--cluster-size=3"], wait_for_start=False)
+ c = ConnectThread(cluster[0])
+ c.start()
+ time.sleep(.01)
+ assert not c.connected
+ cluster.start(wait_for_start=False)
+ time.sleep(.01)
+ assert not c.connected
+ cluster.start(wait_for_start=False)
+ c.join(1)
+ assert not c.isAlive() # Join didn't time out
+ assert c.connected
+ if c.error: raise c.error
+
+class LongTests(BrokerTest):
+ """Tests that can run for a long time if -DDURATION=<minutes> is set"""
+ def duration(self):
+ d = self.config.defines.get("DURATION")
+ if d: return float(d)*60
+ else: return 3 # Default is to be quick
+
def test_failover(self):
"""Test fail-over during continuous send-receive with errors"""
@@ -84,7 +115,8 @@ class ClusterTests(BrokerTest):
receiver.stop(sender.sent)
for i in range(i, len(cluster)): cluster[i].kill()
-class ClusterStoreTests(BrokerTest):
+
+class StoreTests(BrokerTest):
"""
Cluster tests that can only be run if there is a store available.
"""
diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests
index 014233d8d3..b090f71852 100755
--- a/cpp/src/tests/run_cluster_tests
+++ b/cpp/src/tests/run_cluster_tests
@@ -45,8 +45,10 @@ rm -rf $OUTDIR
mkdir -p $OUTDIR
# Ignore tests requiring a store by default.
-TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail $*"
+CLUSTER_TESTS_IGNORE=${CLUSTER_TESTS_IGNORE:--i cluster_tests.StoreTests.* -I $srcdir/cluster_tests.fail}
+CLUSTER_TESTS=${CLUSTER_TESTS:-$*}
-with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $TESTS || exit 1
+set -x
+with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $CLUSTER_TESTS_IGNORE $CLUSTER_TESTS || exit 1
rm -rf $OUTDIR
#exit 0
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index b6046682de..01372bb802 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -206,7 +206,7 @@ class Cluster:
_cluster_lib = checkenv("CLUSTER_LIB")
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
@@ -215,21 +215,16 @@ class Cluster:
self.args = copy(args)
self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
self.args += [ "--load-module", self._cluster_lib ]
- self.start_n(count, expect=expect)
+ self.start_n(count, expect=expect, wait_for_start=wait_for_start)
- def start(self, name=None, expect=EXPECT_RUNNING):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args, name, expect))
+ self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start))
return self._brokers[-1]
- def start_n(self, count, expect=EXPECT_RUNNING):
- for i in range(count): self.start(expect=expect)
-
- def wait(self):
- """Wait for all cluster members to be ready"""
- for b in self._brokers:
- b.connect().close()
+ def start_n(self, count, expect=EXPECT_RUNNING, wait_for_start=True):
+ for i in range(count): self.start(expect=expect, wait_for_start=wait_for_start)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -280,16 +275,15 @@ class BrokerTest(TestCase):
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait_for_start=True):
"""Create and return a broker ready for use"""
b = Broker(self, args=args, name=name, expect=expect)
- b.connect().close()
+ if (wait_for_start): b.connect().close()
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect)
- cluster.wait()
+ cluster = Cluster(self, count, args, expect=expect, wait_for_start=wait_for_start)
return cluster
class RethrownException(Exception):