summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2009-06-10 18:54:30 +0000
committerKim van der Riet <kpvdr@apache.org>2009-06-10 18:54:30 +0000
commited4b7b25c1c0b8c9a533af8685216bcf8090bcb9 (patch)
treeeb69c2782d98f34e17fdf24c24af018a0cade379
parent3a9ab627b39b4fe2ab515e27dc162d12d0744777 (diff)
downloadqpid-python-ed4b7b25c1c0b8c9a533af8685216bcf8090bcb9.tar.gz
Updates to python cluster tests and associated scripts
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@783451 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-xqpid/cpp/src/tests/cluster.py15
-rw-r--r--qpid/cpp/src/tests/cluster_test.cpp14
-rwxr-xr-xqpid/cpp/src/tests/federated_cluster_test4
-rwxr-xr-xqpid/cpp/src/tests/run_cluster_tests18
-rw-r--r--qpid/cpp/src/tests/testlib.py39
5 files changed, 61 insertions, 29 deletions
diff --git a/qpid/cpp/src/tests/cluster.py b/qpid/cpp/src/tests/cluster.py
index ae4cc7aa85..a9082dddd9 100755
--- a/qpid/cpp/src/tests/cluster.py
+++ b/qpid/cpp/src/tests/cluster.py
@@ -43,7 +43,7 @@ class ClusterTests(TestBaseCluster):
self.checkNumBrokers(25)
self.killCluster("cluster-02.2")
self.checkNumBrokers(20)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -53,17 +53,17 @@ class ClusterTests(TestBaseCluster):
try:
clusterName = "cluster-03"
self.createCheckCluster(clusterName, 3)
- for i in range(4,9):
+ for i in range(3,8):
self.createClusterNode(i, clusterName)
self.checkNumClusterBrokers(clusterName, 8)
self.killNode(2, clusterName)
self.killNode(5, clusterName)
self.killNode(6, clusterName)
self.checkNumClusterBrokers(clusterName, 5)
+ self.createClusterNode(8, clusterName)
self.createClusterNode(9, clusterName)
- self.createClusterNode(10, clusterName)
self.checkNumClusterBrokers(clusterName, 7)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -90,7 +90,7 @@ class ClusterTests(TestBaseCluster):
self.createClusterNode(3, clusterName)
self.createClusterNode(4, clusterName)
self.checkNumClusterBrokers(clusterName, 6)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -102,7 +102,7 @@ class ClusterTests(TestBaseCluster):
self.createCheckCluster(clusterName, 6)
self.killClusterCheck(clusterName)
self.createCheckCluster(clusterName, 6)
- self.stopCheckAll()
+ self.stopAllCheck()
except:
self.killAllClusters(True)
raise
@@ -245,6 +245,7 @@ class ClusterTests(TestBaseCluster):
dh.sendMsgs(20, 5) # 60 30 *
dh.receiveMsgs(20, 4) # 60 50 *
dh.killCluster() # cluster does not exist
+ self.checkNumClusterBrokers("cluster-12", 0)
dh.restoreCluster() # 60 50 . . . . . .
dh.restoreNodes() # 0 1 2 3 4 5
dh.finalizeTest()
@@ -325,7 +326,7 @@ class ClusterTests(TestBaseCluster):
# Start the test here
if __name__ == '__main__':
- if os.getenv("STORE_ENABLE") != None:
+ if os.getenv("STORE_LIB") != None:
print "NOTE: Store enabled for the following tests:"
if not unittest.main(): sys.exit(1)
diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp
index a903d192c2..cad782cea0 100644
--- a/qpid/cpp/src/tests/cluster_test.cpp
+++ b/qpid/cpp/src/tests/cluster_test.cpp
@@ -70,14 +70,14 @@ using namespace boost::assign;
using broker::Broker;
using boost::shared_ptr;
-bool durableFlag = std::getenv("STORE_ENABLE") != 0;
+bool durableFlag = std::getenv("STORE_LIB") != 0;
void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) {
ostringstream clusterLib;
clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so";
args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str();
if (durableFlag)
- args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR";
+ args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR";
else
args += "--no-data-dir";
}
@@ -697,7 +697,7 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) {
QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
{
/*
- Start with a single broker.
+ Start with a single broker.
Set up two queues: one durable, and one not.
Add a new broker to the cluster.
Make sure it has one durable and one non-durable queue.
@@ -714,7 +714,7 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie)
QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" );
BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue"));
BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue"));
-
+
BOOST_CHECK_EQUAL ( durable_query.getDurable(), true );
BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false );
}
@@ -869,9 +869,9 @@ Subscription lockMessages(Client& client, const std::string& queue, int count)
Subscription sub = client.subs.subscribe(q, queue, settings);
client.session.messageFlush(sub.getName());
return sub;
-}
+}
-/**
+/**
* check that the specified queue contains the expected set of
* messages (matched on content) for all nodes in the cluster
*/
@@ -884,7 +884,7 @@ void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::ve
}
}
-void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
+void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m")
{
for (int i = 0; i < count; i++) {
client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag));
diff --git a/qpid/cpp/src/tests/federated_cluster_test b/qpid/cpp/src/tests/federated_cluster_test
index e42bf8cf7f..a781e269d6 100755
--- a/qpid/cpp/src/tests/federated_cluster_test
+++ b/qpid/cpp/src/tests/federated_cluster_test
@@ -44,7 +44,9 @@ stop_brokers() {
../qpidd -q --port $NODE_2
unset NODE_2
fi
- rm cluster.ports
+ if [ -f cluster.ports ]; then
+ rm cluster.ports
+ fi
}
start_brokers() {
diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests
index 103896cd3d..bfe6ef855e 100755
--- a/qpid/cpp/src/tests/run_cluster_tests
+++ b/qpid/cpp/src/tests/run_cluster_tests
@@ -19,6 +19,15 @@
# under the License.
#
+
+# Check that top_builddir and srcdir are set
+# If not, assume local run from test dir
+if [ -z ${top_builddir} -o -z ${srcdir} ]; then
+ srcdir=`pwd`
+ top_builddir=${srcdir}/../../
+fi
+
+
# Run the cluster tests.
TEST_DIR=${top_builddir}/src/tests
@@ -64,8 +73,8 @@ if ! test -d ${TMP_STORE_DIR} ; then
mkdir -p ${TMP_STORE_DIR}/cluster
else
# Delete old cluster test dirs
- rm -rf "${TMP_STORE_DIR}/cluster"
- mkdir -p "${TMP_STORE_DIR}/cluster"
+ rm -rf ${TMP_STORE_DIR}/cluster
+ mkdir -p ${TMP_STORE_DIR}/cluster
fi
export TMP_STORE_DIR
@@ -75,3 +84,8 @@ RETCODE=$?
if test x${RETCODE} != x0; then
exit 1;
fi
+
+# Delete cluster store dir if test was successful.
+rm -rf ${TMP_STORE_DIR}
+
+exit 0 \ No newline at end of file
diff --git a/qpid/cpp/src/tests/testlib.py b/qpid/cpp/src/tests/testlib.py
index 07c4794767..398eaf96cc 100644
--- a/qpid/cpp/src/tests/testlib.py
+++ b/qpid/cpp/src/tests/testlib.py
@@ -21,7 +21,7 @@
# Support library for qpid python tests.
#
-import os, re, signal, subprocess, unittest
+import os, re, signal, subprocess, time, unittest
class TestBase(unittest.TestCase):
"""
@@ -210,6 +210,12 @@ class TestBaseCluster(TestBase):
for n in range(0, numberNodes):
self.createClusterNode(n, clusterName)
+ def waitForNodes(self, clusterName):
+ """Wait for all nodes to become active (ie finish cluster sync)"""
+ # TODO - connect to each known node in cluster
+ # Until this is done, wait a bit (hack)
+ time.sleep(1)
+
# --- Cluster and node status ---
def getTupleList(self, clusterName = None):
@@ -246,13 +252,15 @@ class TestBaseCluster(TestBase):
"""Get the (pid, port) tuple for the given cluster node"""
return self._clusterDict[clusterName][nodeNumber]
- def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True):
+ def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True, waitForNodes = True):
"""Check that the total number of brokers in the named cluster is the expected value"""
if expected != None and self.getNumClusterBrokers(clusterName) != expected:
raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \
(clusterName, expected, self.getNumClusterBrokers(clusterName)))
if checkPids:
self._checkPids(clusterName)
+ if waitForNodes:
+ self.waitForNodes(clusterName)
def clusterExists(self, clusterName):
""" Return True if clusterName exists, False otherwise"""
@@ -330,7 +338,7 @@ class TestBaseCluster(TestBase):
if self.clusterExists(clusterName):
raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName)))
- def stopCheckAll(self, ignoreFailures = False):
+ def stopAllCheck(self, ignoreFailures = False):
"""Kill all known clusters and check that the cluster dictionary is empty"""
self.stopAllClusters()
self.checkNumBrokers(0)
@@ -580,6 +588,7 @@ class TestBaseCluster(TestBase):
self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName)
self._nodes.append(nodeNumber)
self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes))
+ self._testBaseCluster.waitForNodes(self._clusterName)
def restoreNode(self, nodeNumber):
"""Restore a cluster node that has been previously killed"""
@@ -598,6 +607,7 @@ class TestBaseCluster(TestBase):
self.restoreNode(lastNode)
while len(self._deadNodes) > 0:
self.restoreNode(self._deadNodes[0])
+ self._testBaseCluster.waitForNodes(self._clusterName)
def killNode(self, nodeNumber):
"""Kill a cluster node (if it is in the _nodes list)."""
@@ -635,15 +645,20 @@ class TestBaseCluster(TestBase):
if nm == None:
nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages
if nm > 0:
- receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
- cnt = 0
- while cnt < nm:
- rx = receiver.stdout.readline().strip()
- if rx == "" and receiver.poll() != None: break
- self._rxMsgs[qn].append(rx)
- cnt = cnt + 1
+ while nm > 0:
+ receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
+ cnt = 0
+ while cnt < nm:
+ rx = receiver.stdout.readline().strip()
+ if rx == "":
+ if receiver.poll() != None: break
+ elif rx not in self._rxMsgs[qn]:
+ self._rxMsgs[qn].append(rx)
+ cnt = cnt + 1
+ nm = nm - cnt
if wait:
receiver.wait()
+ self._rxMsgs[qn].sort()
self._lastNode = nodeNumber
def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True):
@@ -670,10 +685,10 @@ class TestBaseCluster(TestBase):
def finalizeTest(self):
"""Recover all the remaining messages on all queues, then check that all expected messages were received."""
self.receiveRemainingMsgs()
- self._testBaseCluster.stopCheckAll()
+ self._testBaseCluster.stopAllCheck()
if not self.checkMsgs():
- self._testBaseCluster.fail("Send - receive message mismatch")
self.printMsgs()
+ self._testBaseCluster.fail("Send - receive message mismatch")
def printMsgs(self, txMsgs = True, rxMsgs = True):
"""Print all messages transmitted and received."""