diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-06-10 18:54:30 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-06-10 18:54:30 +0000 |
commit | ed4b7b25c1c0b8c9a533af8685216bcf8090bcb9 (patch) | |
tree | eb69c2782d98f34e17fdf24c24af018a0cade379 | |
parent | 3a9ab627b39b4fe2ab515e27dc162d12d0744777 (diff) | |
download | qpid-python-ed4b7b25c1c0b8c9a533af8685216bcf8090bcb9.tar.gz |
Updates to python cluster tests and associated scripts
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@783451 13f79535-47bb-0310-9956-ffa450edef68
-rwxr-xr-x | qpid/cpp/src/tests/cluster.py | 15 | ||||
-rw-r--r-- | qpid/cpp/src/tests/cluster_test.cpp | 14 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/federated_cluster_test | 4 | ||||
-rwxr-xr-x | qpid/cpp/src/tests/run_cluster_tests | 18 | ||||
-rw-r--r-- | qpid/cpp/src/tests/testlib.py | 39 |
5 files changed, 61 insertions, 29 deletions
diff --git a/qpid/cpp/src/tests/cluster.py b/qpid/cpp/src/tests/cluster.py index ae4cc7aa85..a9082dddd9 100755 --- a/qpid/cpp/src/tests/cluster.py +++ b/qpid/cpp/src/tests/cluster.py @@ -43,7 +43,7 @@ class ClusterTests(TestBaseCluster): self.checkNumBrokers(25) self.killCluster("cluster-02.2") self.checkNumBrokers(20) - self.stopCheckAll() + self.stopAllCheck() except: self.killAllClusters(True) raise @@ -53,17 +53,17 @@ class ClusterTests(TestBaseCluster): try: clusterName = "cluster-03" self.createCheckCluster(clusterName, 3) - for i in range(4,9): + for i in range(3,8): self.createClusterNode(i, clusterName) self.checkNumClusterBrokers(clusterName, 8) self.killNode(2, clusterName) self.killNode(5, clusterName) self.killNode(6, clusterName) self.checkNumClusterBrokers(clusterName, 5) + self.createClusterNode(8, clusterName) self.createClusterNode(9, clusterName) - self.createClusterNode(10, clusterName) self.checkNumClusterBrokers(clusterName, 7) - self.stopCheckAll() + self.stopAllCheck() except: self.killAllClusters(True) raise @@ -90,7 +90,7 @@ class ClusterTests(TestBaseCluster): self.createClusterNode(3, clusterName) self.createClusterNode(4, clusterName) self.checkNumClusterBrokers(clusterName, 6) - self.stopCheckAll() + self.stopAllCheck() except: self.killAllClusters(True) raise @@ -102,7 +102,7 @@ class ClusterTests(TestBaseCluster): self.createCheckCluster(clusterName, 6) self.killClusterCheck(clusterName) self.createCheckCluster(clusterName, 6) - self.stopCheckAll() + self.stopAllCheck() except: self.killAllClusters(True) raise @@ -245,6 +245,7 @@ class ClusterTests(TestBaseCluster): dh.sendMsgs(20, 5) # 60 30 * dh.receiveMsgs(20, 4) # 60 50 * dh.killCluster() # cluster does not exist + self.checkNumClusterBrokers("cluster-12", 0) dh.restoreCluster() # 60 50 . . . . . . dh.restoreNodes() # 0 1 2 3 4 5 dh.finalizeTest() @@ -325,7 +326,7 @@ class ClusterTests(TestBaseCluster): # Start the test here if __name__ == '__main__': - if os.getenv("STORE_ENABLE") != None: + if os.getenv("STORE_LIB") != None: print "NOTE: Store enabled for the following tests:" if not unittest.main(): sys.exit(1) diff --git a/qpid/cpp/src/tests/cluster_test.cpp b/qpid/cpp/src/tests/cluster_test.cpp index a903d192c2..cad782cea0 100644 --- a/qpid/cpp/src/tests/cluster_test.cpp +++ b/qpid/cpp/src/tests/cluster_test.cpp @@ -70,14 +70,14 @@ using namespace boost::assign; using broker::Broker; using boost::shared_ptr; -bool durableFlag = std::getenv("STORE_ENABLE") != 0; +bool durableFlag = std::getenv("STORE_LIB") != 0; void prepareArgs(ClusterFixture::Args& args, const bool durableFlag = false) { ostringstream clusterLib; clusterLib << getLibPath("QPID_LIB_DIR", "../.libs") << "/cluster.so"; args += "--auth", "no", "--no-module-dir", "--load-module", clusterLib.str(); if (durableFlag) - args += "--load-module", getLibPath("LIBSTORE"), "TMP_DATA_DIR"; + args += "--load-module", getLibPath("STORE_LIB"), "TMP_DATA_DIR"; else args += "--no-data-dir"; } @@ -697,7 +697,7 @@ QPID_AUTO_TEST_CASE(testDequeueWaitingSubscription) { QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) { /* - Start with a single broker. + Start with a single broker. Set up two queues: one durable, and one not. Add a new broker to the cluster. Make sure it has one durable and one non-durable queue. @@ -714,7 +714,7 @@ QPID_AUTO_TEST_CASE(queueDurabilityPropagationToNewbie) QueueQueryResult non_durable_query = c1.session.queueQuery ( "non_durable_queue" ); BOOST_CHECK_EQUAL(durable_query.getQueue(), std::string("durable_queue")); BOOST_CHECK_EQUAL(non_durable_query.getQueue(), std::string("non_durable_queue")); - + BOOST_CHECK_EQUAL ( durable_query.getDurable(), true ); BOOST_CHECK_EQUAL ( non_durable_query.getDurable(), false ); } @@ -869,9 +869,9 @@ Subscription lockMessages(Client& client, const std::string& queue, int count) Subscription sub = client.subs.subscribe(q, queue, settings); client.session.messageFlush(sub.getName()); return sub; -} +} -/** +/** * check that the specified queue contains the expected set of * messages (matched on content) for all nodes in the cluster */ @@ -884,7 +884,7 @@ void checkQueue(ClusterFixture& cluster, const std::string& queue, const std::ve } } -void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") +void send(Client& client, const std::string& queue, int count, int start=1, const std::string& base="m") { for (int i = 0; i < count; i++) { client.session.messageTransfer(arg::content=makeMessage((boost::format("%1%_%2%") % base % (i+start)).str(), queue, durableFlag)); diff --git a/qpid/cpp/src/tests/federated_cluster_test b/qpid/cpp/src/tests/federated_cluster_test index e42bf8cf7f..a781e269d6 100755 --- a/qpid/cpp/src/tests/federated_cluster_test +++ b/qpid/cpp/src/tests/federated_cluster_test @@ -44,7 +44,9 @@ stop_brokers() { ../qpidd -q --port $NODE_2 unset NODE_2 fi - rm cluster.ports + if [ -f cluster.ports ]; then + rm cluster.ports + fi } start_brokers() { diff --git a/qpid/cpp/src/tests/run_cluster_tests b/qpid/cpp/src/tests/run_cluster_tests index 103896cd3d..bfe6ef855e 100755 --- a/qpid/cpp/src/tests/run_cluster_tests +++ b/qpid/cpp/src/tests/run_cluster_tests @@ -19,6 +19,15 @@ # under the License. # + +# Check that top_builddir and srcdir are set +# If not, assume local run from test dir +if [ -z ${top_builddir} -o -z ${srcdir} ]; then + srcdir=`pwd` + top_builddir=${srcdir}/../../ +fi + + # Run the cluster tests. TEST_DIR=${top_builddir}/src/tests @@ -64,8 +73,8 @@ if ! test -d ${TMP_STORE_DIR} ; then mkdir -p ${TMP_STORE_DIR}/cluster else # Delete old cluster test dirs - rm -rf "${TMP_STORE_DIR}/cluster" - mkdir -p "${TMP_STORE_DIR}/cluster" + rm -rf ${TMP_STORE_DIR}/cluster + mkdir -p ${TMP_STORE_DIR}/cluster fi export TMP_STORE_DIR @@ -75,3 +84,8 @@ RETCODE=$? if test x${RETCODE} != x0; then exit 1; fi + +# Delete cluster store dir if test was successful. +rm -rf ${TMP_STORE_DIR} + +exit 0
\ No newline at end of file diff --git a/qpid/cpp/src/tests/testlib.py b/qpid/cpp/src/tests/testlib.py index 07c4794767..398eaf96cc 100644 --- a/qpid/cpp/src/tests/testlib.py +++ b/qpid/cpp/src/tests/testlib.py @@ -21,7 +21,7 @@ # Support library for qpid python tests. # -import os, re, signal, subprocess, unittest +import os, re, signal, subprocess, time, unittest class TestBase(unittest.TestCase): """ @@ -210,6 +210,12 @@ class TestBaseCluster(TestBase): for n in range(0, numberNodes): self.createClusterNode(n, clusterName) + def waitForNodes(self, clusterName): + """Wait for all nodes to become active (ie finish cluster sync)""" + # TODO - connect to each known node in cluster + # Until this is done, wait a bit (hack) + time.sleep(1) + # --- Cluster and node status --- def getTupleList(self, clusterName = None): @@ -246,13 +252,15 @@ class TestBaseCluster(TestBase): """Get the (pid, port) tuple for the given cluster node""" return self._clusterDict[clusterName][nodeNumber] - def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True): + def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = True, waitForNodes = True): """Check that the total number of brokers in the named cluster is the expected value""" if expected != None and self.getNumClusterBrokers(clusterName) != expected: raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ (clusterName, expected, self.getNumClusterBrokers(clusterName))) if checkPids: self._checkPids(clusterName) + if waitForNodes: + self.waitForNodes(clusterName) def clusterExists(self, clusterName): """ Return True if clusterName exists, False otherwise""" @@ -330,7 +338,7 @@ class TestBaseCluster(TestBase): if self.clusterExists(clusterName): raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) - def stopCheckAll(self, ignoreFailures = False): + def stopAllCheck(self, ignoreFailures = False): """Kill all known clusters and check that the cluster dictionary is empty""" self.stopAllClusters() self.checkNumBrokers(0) @@ -580,6 +588,7 @@ class TestBaseCluster(TestBase): self._testBaseCluster.createClusterNode(nodeNumber, self._clusterName) self._nodes.append(nodeNumber) self._testBaseCluster.checkNumClusterBrokers(self._clusterName, len(self._nodes)) + self._testBaseCluster.waitForNodes(self._clusterName) def restoreNode(self, nodeNumber): """Restore a cluster node that has been previously killed""" @@ -598,6 +607,7 @@ class TestBaseCluster(TestBase): self.restoreNode(lastNode) while len(self._deadNodes) > 0: self.restoreNode(self._deadNodes[0]) + self._testBaseCluster.waitForNodes(self._clusterName) def killNode(self, nodeNumber): """Kill a cluster node (if it is in the _nodes list).""" @@ -635,15 +645,20 @@ class TestBaseCluster(TestBase): if nm == None: nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get all remaining messages if nm > 0: - receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm) - cnt = 0 - while cnt < nm: - rx = receiver.stdout.readline().strip() - if rx == "" and receiver.poll() != None: break - self._rxMsgs[qn].append(rx) - cnt = cnt + 1 + while nm > 0: + receiver = self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm) + cnt = 0 + while cnt < nm: + rx = receiver.stdout.readline().strip() + if rx == "": + if receiver.poll() != None: break + elif rx not in self._rxMsgs[qn]: + self._rxMsgs[qn].append(rx) + cnt = cnt + 1 + nm = nm - cnt if wait: receiver.wait() + self._rxMsgs[qn].sort() self._lastNode = nodeNumber def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = None, wait = True): @@ -670,10 +685,10 @@ class TestBaseCluster(TestBase): def finalizeTest(self): """Recover all the remaining messages on all queues, then check that all expected messages were received.""" self.receiveRemainingMsgs() - self._testBaseCluster.stopCheckAll() + self._testBaseCluster.stopAllCheck() if not self.checkMsgs(): - self._testBaseCluster.fail("Send - receive message mismatch") self.printMsgs() + self._testBaseCluster.fail("Send - receive message mismatch") def printMsgs(self, txMsgs = True, rxMsgs = True): """Print all messages transmitted and received.""" |