diff options
author | Kim van der Riet <kpvdr@apache.org> | 2009-05-26 15:30:47 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2009-05-26 15:30:47 +0000 |
commit | 03fdb8e9e281cc317099fdcf67d05098b9d38131 (patch) | |
tree | 2805fbfef193089797071c87f276b919414c2c77 | |
parent | fdba1a9ed5074286fe58ebf9be543bbebea0bb79 (diff) | |
download | qpid-python-03fdb8e9e281cc317099fdcf67d05098b9d38131.tar.gz |
Added installable python cluster tests that can be run from an external store build/test environment and can test persistent clusters.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@778751 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/src/Makefile.am | 10 | ||||
-rw-r--r-- | cpp/src/tests/Makefile.am | 33 | ||||
-rwxr-xr-x | cpp/src/tests/ais_check | 1 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 46 | ||||
-rw-r--r-- | cpp/src/tests/cluster.py | 419 | ||||
-rwxr-xr-x | cpp/src/tests/clustered_replication_test | 8 | ||||
-rwxr-xr-x | cpp/src/tests/federated_cluster_test | 8 | ||||
-rwxr-xr-x | cpp/src/tests/run_cluster_tests | 70 | ||||
-rw-r--r-- | cpp/src/tests/testlib.py | 486 | ||||
-rw-r--r-- | python/qpid/testlib.py | 195 |
10 files changed, 1062 insertions, 214 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 63ca7009d9..bba0b83509 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -107,6 +107,16 @@ AM_CXXFLAGS = $(WARNING_CFLAGS) AM_LDFLAGS = -version-info $(LIBTOOL_VERSION_INFO_ARG) INCLUDES = -Igen -I$(srcdir)/gen +# +# Destination for intalled programs and tests defined here +# +qpidexecdir = $(libexecdir)/qpid +qpidexec_PROGRAMS = +qpidexec_SCRIPTS = +qpidtestdir = $(qpidexecdir)/test +qpidtest_PROGRAMS = +qpidtest_SCRIPTS = + ## Automake macros to build libraries and executables. qpidd_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDD_MODULE_DIR=\"$(dmoduledir)\" -DQPIDD_CONF_FILE=\"$(sysconfdir)/qpidd.conf\" libqpidclient_la_CXXFLAGS = $(AM_CXXFLAGS) -DQPIDC_MODULE_DIR=\"$(cmoduledir)\" -DQPIDC_CONF_FILE=\"$(confdir)/qpidc.conf\" diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 8d66bc0828..64cd713f1a 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -39,6 +39,16 @@ CLEANFILES= LONG_TESTS= # +# Destination for intalled programs and tests defined here +# +qpidexecdir = $(libexecdir)/qpid +qpidexec_PROGRAMS = +qpidexec_SCRIPTS = +qpidtestdir = $(qpidexecdir)/test +qpidtest_PROGRAMS = +qpidtest_SCRIPTS = + +# # Unit test program # # Unit tests are built as a single program to reduce valgrind overhead @@ -128,6 +138,21 @@ if SSL include ssl.mk endif +# receiver, sender are installed and therefore built as part of make, not make check +qpidtest_PROGRAMS += receiver +receiver_SOURCES = \ + receiver.cpp \ + TestOptions.h \ + ConnectionOptions.h +receiver_LDADD = $(lib_client) + +qpidtest_PROGRAMS += sender +sender_SOURCES = \ + sender.cpp \ + TestOptions.h \ + ConnectionOptions.h +sender_LDADD = $(lib_client) + # # Other test programs # @@ -195,14 +220,6 @@ check_PROGRAMS+=txjob txjob_SOURCES=txjob.cpp TestOptions.h ConnectionOptions.h txjob_LDADD=$(lib_client) -check_PROGRAMS+=receiver -receiver_SOURCES=receiver.cpp TestOptions.h ConnectionOptions.h -receiver_LDADD=$(lib_client) - -check_PROGRAMS+=sender -sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h -sender_LDADD=$(lib_client) - check_PROGRAMS+=PollerTest PollerTest_SOURCES=PollerTest.cpp PollerTest_LDADD=$(lib_common) $(SOCKLIBS) diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check index 5687110165..79862d7439 100755 --- a/cpp/src/tests/ais_check +++ b/cpp/src/tests/ais_check @@ -33,7 +33,6 @@ if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then Tests that depend on the openais library (used for clustering) will not be run because: - $NOGROUP $NOAISEXEC diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index e5e803003a..3e75d0bf87 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -29,17 +29,45 @@ if HAVE_LIBCPG # ais_check checks pre-requisites for cluster tests and runs them if ok. -TESTS+=ais_check federated_cluster_test clustered_replication_test -EXTRA_DIST+=ais_check start_cluster stop_cluster restart_cluster cluster_python_tests cluster_python_tests_failing.txt \ - federated_cluster_test clustered_replication_test +TESTS += \ + ais_check \ + run_cluster_tests \ + federated_cluster_test \ + clustered_replication_test + +EXTRA_DIST += \ + ais_check \ + start_cluster \ + stop_cluster \ + restart_cluster \ + cluster_python_tests \ + cluster_python_tests_failing.txt \ + federated_cluster_test \ + clustered_replication_test \ + run_cluster_tests \ + testlib.py \ + cluster.py + -check_PROGRAMS+=cluster_test -cluster_test_SOURCES=unit_test.cpp ClusterFixture.cpp ClusterFixture.h ForkedBroker.h ForkedBroker.cpp \ - cluster_test.cpp PartialFailure.cpp ClusterFailover.cpp +unit_test_LDADD += ../cluster.la -cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework +LONG_TESTS += \ + start_cluster \ + cluster_python_tests \ + stop_cluster -unit_test_LDADD+=../cluster.la +qpidtest_PROGRAMS += cluster_test +cluster_test_SOURCES = \ + cluster_test.cpp \ + unit_test.cpp \ + ClusterFixture.cpp \ + ClusterFixture.h \ + ForkedBroker.h \ + ForkedBroker.cpp \ + PartialFailure.cpp \ + ClusterFailover.cpp +cluster_test_LDADD=$(lib_client) ../cluster.la test_store.la -lboost_unit_test_framework + +qpidtest_SCRIPTS += run_cluster_tests cluster.py testlib.py -LONG_TESTS+=start_cluster cluster_python_tests stop_cluster endif diff --git a/cpp/src/tests/cluster.py b/cpp/src/tests/cluster.py new file mode 100644 index 0000000000..fadcc5beaa --- /dev/null +++ b/cpp/src/tests/cluster.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, signal, sys, unittest +from testlib import TestBaseCluster + +class ClusterTests(TestBaseCluster): + """Basic cluster with async store tests""" + + def test_Cluster_01_Initialization(self): + """Start a single cluster containing several nodes, and stop it again""" + try: + clusterName = "cluster-01" + self.createCheckCluster(clusterName, 5) + self.checkNumBrokers(5) + self.stopCheckCluster(clusterName) + except: + self.killAllClusters() + raise + + def test_Cluster_02_MultipleClusterInitialization(self): + """Start several clusters each with several nodes and stop them again""" + try: + for i in range(0, 5): + clusterName = "cluster-02.%d" % i + self.createCluster(clusterName, 5) + self.checkNumBrokers(25) + self.killCluster("cluster-02.2") + self.checkNumBrokers(20) + self.stopCheckAll() + except: + self.killAllClusters() + raise + + def test_Cluster_03_AddRemoveNodes(self): + """Create a multi-node cluster, then kill some nodes and add some new ones (not those killed)""" + try: + clusterName = "cluster-03" + self.createCheckCluster(clusterName, 3) + for i in range(4,9): + self.createClusterNode(i, clusterName) + self.checkNumClusterBrokers(clusterName, 8) + self.killNode(2, clusterName) + self.killNode(5, clusterName) + self.killNode(6, clusterName) + self.checkNumClusterBrokers(clusterName, 5) + self.createClusterNode(9, clusterName) + self.createClusterNode(10, clusterName) + self.checkNumClusterBrokers(clusterName, 7) + self.stopCheckAll() + except: + self.killAllClusters() + raise + + def test_Cluster_04_RemoveRestoreNodes(self): + """Create a multi-node cluster, then kill some of the nodes and restart them""" + try: + clusterName = "cluster-04" + self.createCheckCluster(clusterName, 6) + self.checkNumBrokers(6) + self.killNode(1, clusterName) + self.killNode(3, clusterName) + self.killNode(4, clusterName) + self.checkNumBrokers(3) + self.createClusterNode(1, clusterName) + self.createClusterNode(3, clusterName) + self.createClusterNode(4, clusterName) + self.checkNumClusterBrokers(clusterName, 6) + self.killNode(2, clusterName) + self.killNode(3, clusterName) + self.killNode(4, clusterName) + self.checkNumBrokers(3) + self.createClusterNode(2, clusterName) + self.createClusterNode(3, clusterName) + self.createClusterNode(4, clusterName) + self.checkNumClusterBrokers(clusterName, 6) + self.stopCheckAll() + except: + self.killAllClusters() + raise + + def test_Cluster_05_KillAllNodesThenRecover(self): + """Create a multi-node cluster, then kill *all* nodes, then restart the cluster""" + try: + clusterName = "cluster-05" + self.createCheckCluster(clusterName, 6) + self.killClusterCheck(clusterName) + self.createCheckCluster(clusterName, 6) + self.stopCheckAll() + except: + self.killAllClusters() + raise + + def test_Cluster_06_PublishConsume(self): + """Publish then consume 100 messages from a single cluster""" + try: + clusterName = "cluster-06" + self.createCheckCluster(clusterName, 3) + self.sendReceiveMsgs(0, clusterName, "test-exchange-06", "test-queue-06", 100) + self.stopCheckAll() + except: + self.killAllClusters() + raise + + def test_Cluster_07_MultiplePublishConsume(self): + """Staggered publish and consume on a single cluster""" + try: + clusterName = "cluster-07" + exchangeName = "test-exchange-07" + queueName = "test-queue-07" + self.createCheckCluster(clusterName, 3) + self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 + rxMsgs = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10 + txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 20) # 30, 10 + rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30 + txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 + rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50 + txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50 + rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80 + self.stopCheckAll() + if txMsgs != rxMsgs: + print "txMsgs=%s" % txMsgs + print "rxMsgs=%s" % rxMsgs + self.fail("Send - receive message mismatch") + except: + self.killAllClusters() + raise + + def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self): + """Staggered publish and consume interleaved with adding and removing nodes on a single cluster""" + try: + clusterName = "cluster-08" + exchangeName = "test-exchange-08" + queueName = "test-queue-08" + self.createCheckCluster(clusterName, 3) + self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 + for i in range(3,6): + self.createClusterNode(i, clusterName) + self.checkNumClusterBrokers(clusterName, 6) + txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0 + self.killNode(0, clusterName) + self.checkNumClusterBrokers(clusterName, 5) + rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10 + self.killNode(2, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30 + self.createClusterNode(6, clusterName) + self.checkNumClusterBrokers(clusterName, 5) + txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 20) # 30, 30 + rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50 + self.createClusterNode(7, clusterName) + self.checkNumClusterBrokers(clusterName, 6) + txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 20) # 30, 50 + rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80 + self.stopCheckAll() + if txMsgs != rxMsgs: + print "txMsgs=%s" % txMsgs + print "rxMsgs=%s" % rxMsgs + self.fail("Send - receive message mismatch") + except: + self.killAllClusters() + raise + + def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self): + """Publish and consume messages interleaved with adding and restoring previous nodes on a single cluster""" + try: + clusterName = "cluster-09" + exchangeName = "test-exchange-09" + queueName = "test-queue-09" + self.createCheckCluster(clusterName, 6) + self.createBindDirectExchangeQueue(0, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 20, 0 + self.killNode(2, clusterName) + self.checkNumClusterBrokers(clusterName, 5) + txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 40, 0 + self.killNode(0, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10 + self.killNode(4, clusterName) + self.checkNumClusterBrokers(clusterName, 3) + rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30 + self.createClusterNode(2, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 20) # 30, 30 + self.createClusterNode(0, clusterName) + self.checkNumClusterBrokers(clusterName, 5) + rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50 + self.createClusterNode(4, clusterName) + self.checkNumClusterBrokers(clusterName, 6) + txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 20) # 30, 50 + rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80 + self.stopCheckAll() + if txMsgs != rxMsgs: + print "txMsgs=%s" % txMsgs + print "rxMsgs=%s" % rxMsgs + self.fail("Send - receive message mismatch") + except: + self.killAllClusters() + raise + + def test_Cluster_10_LinearNodeKillCreateProgression(self): + """Publish and consume messages while linearly killing all original nodes and replacing them with new ones""" + try: + clusterName = "cluster-10" + exchangeName = "test-exchange-10" + queueName = "test-queue-10" + self.createCheckCluster(clusterName, 4) + self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20) + rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) + for i in range(0, 16): + self.killNode(i, clusterName) + self.createClusterNode(i+4, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, queueName, 20) + rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20) + rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10) + self.stopCheckAll() + if txMsgs != rxMsgs: + print "txMsgs=%s" % txMsgs + print "rxMsgs=%s" % rxMsgs + self.fail("Send - receive message mismatch") + except: + self.killAllClusters() + raise + + def test_Cluster_11_CircularNodeKillRestoreProgression(self): + """Publish and consume messages while circularly killing all original nodes and restoring them again""" + try: + clusterName = "cluster-11" + exchangeName = "test-exchange-11" + queueName = "test-queue-11" + self.createCheckCluster(clusterName, 4) + self.createBindDirectExchangeQueue(2, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20) + rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10) + self.killNode(0, clusterName) + self.killNode(1, clusterName) + for i in range(0, 16): + self.killNode((i + 2) % 4, clusterName) + self.createClusterNode(i % 4, clusterName) + self.checkNumClusterBrokers(clusterName, 2) + txMsgs += self.sendMsgs((i + 3) % 4, clusterName, exchangeName, queueName, 20) + rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, queueName, 20) + rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10) + self.stopCheckAll() + if txMsgs != rxMsgs: + print "txMsgs=%s" % txMsgs + print "rxMsgs=%s" % rxMsgs + self.fail("Send - receive message mismatch") + except: + self.killAllClusters() + raise + + def test_Cluster_12_TopicExchange(self): + """Create topic exchange in a cluster and make sure it replicates correctly""" + try: + clusterName = "cluster-12" + self.createCheckCluster(clusterName, 4) + topicExchangeName = "test-exchange-12" + topicQueueNameKeyList = {"test-queue-12-A" : "#.A", "test-queue-12-B" : "#.B", "test-queue-12-C" : "C.#", "test-queue-12-D" : "D.#"} + self.createBindTopicExchangeQueues(2, clusterName, topicExchangeName, topicQueueNameKeyList) + + # Place initial messages + txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0) + self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # Should not go to any queue + txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, "D.hello.A", 10) # (20, 0, 10, 10) + txMsgsA += txMsgsD + txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, "hello.B", 20) # (20, 20, 10, 10) + # Kill and add some nodes + self.killNode(0, clusterName) + self.killNode(2, clusterName) + self.createClusterNode(4, clusterName) + self.createClusterNode(5, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + # Pull 10 messages from each queue + rxMsgsA = self.receiveMsgs(1, clusterName, "test-queue-12-A", 10) # (10, 20, 10, 10) + rxMsgsB = self.receiveMsgs(3, clusterName, "test-queue-12-B", 10) # (10, 10, 10, 10) + rxMsgsC = self.receiveMsgs(4, clusterName, "test-queue-12-C", 10) # (10, 10, 0, 10) + rxMsgsD = self.receiveMsgs(5, clusterName, "test-queue-12-D", 10) # (10, 10, 0, 0) + # Kill and add another node + self.killNode(4, clusterName) + self.createClusterNode(6, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + # Add two more queues + self.createBindTopicExchangeQueues(6, clusterName, topicExchangeName, {"test-queue-12-E" : "#.bye.A", "test-queue-12-F" : "#.bye.B"}) + # Place more messages + txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, "C.bye.A", 10) # (20, 10, 10, 0, 10, 0) + txMsgsA += txMsgs + txMsgsC += txMsgs + txMsgsE = txMsgs + self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # Should not go to any queue + txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, "D.bye.B", 20) # (20, 30, 10, 20, 10, 20) + txMsgsB += txMsgs + txMsgsD += txMsgs + txMsgsF = txMsgs + # Kill all nodes but one + self.killNode(1, clusterName) + self.killNode(3, clusterName) + self.killNode(6, clusterName) + self.checkNumClusterBrokers(clusterName, 1) + # Pull all remaining messages from each queue + rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-12-A", 20) + rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-12-B", 30) + rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-12-C", 10) + rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-12-D", 20) + rxMsgsE = self.receiveMsgs(5, clusterName, "test-queue-12-E", 10) + rxMsgsF = self.receiveMsgs(5, clusterName, "test-queue-12-F", 20) + # Check messages + self.stopCheckAll() + if txMsgsA != rxMsgsA: + self.fail("Send - receive message mismatch for queue A") + if txMsgsB != rxMsgsB: + self.fail("Send - receive message mismatch for queue B") + if txMsgsC != rxMsgsC: + self.fail("Send - receive message mismatch for queue C") + if txMsgsD != rxMsgsD: + self.fail("Send - receive message mismatch for queue D") + if txMsgsE != rxMsgsE: + self.fail("Send - receive message mismatch for queue E") + if txMsgsF != rxMsgsF: + self.fail("Send - receive message mismatch for queue F") + except: + self.killAllClusters() + raise + + def test_Cluster_13_FanoutExchange(self): + """Create fanout exchange in a cluster and make sure it replicates correctly""" + try: + clusterName = "cluster-13" + self.createCheckCluster(clusterName, 4) + fanoutExchangeName = "test-exchange-13" + fanoutQueueNameList = ["test-queue-13-A", "test-queue-13-B", "test-queue-13-C"] + self.createBindFanoutExchangeQueues(2, clusterName, fanoutExchangeName, fanoutQueueNameList) + + # Place initial 20 messages, retrieve 10 + txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) + rxMsgA = self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) + rxMsgB = self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) + rxMsgC = self.receiveMsgs(0, clusterName, "test-queue-13-C", 10) + # Kill and add some nodes + self.killNode(0, clusterName) + self.killNode(2, clusterName) + self.createClusterNode(4, clusterName) + self.createClusterNode(5, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + # Place another 20 messages, retrieve 20 + txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) + rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20) + rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20) + rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-13-C", 20) + # Kill and add another node + self.killNode(4, clusterName) + self.createClusterNode(6, clusterName) + self.checkNumClusterBrokers(clusterName, 4) + # Add another 2 queues + self.createBindFanoutExchangeQueues(6, clusterName, fanoutExchangeName, ["test-queue-13-D", "test-queue-13-E"]) + # Place another 20 messages, retrieve 20 + tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20) + txMsg += tmp + rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-13-A", 20) + rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-13-B", 20) + rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-13-C", 20) + rxMsgD = self.receiveMsgs(6, clusterName, "test-queue-13-D", 10) + rxMsgE = self.receiveMsgs(6, clusterName, "test-queue-13-E", 10) + # Kill all nodes but one + self.killNode(1, clusterName) + self.killNode(3, clusterName) + self.killNode(6, clusterName) + self.checkNumClusterBrokers(clusterName, 1) + # Pull all remaining messages from each queue + rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 10) + rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 10) + rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10) + rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) + rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-13-E", 10) + # Check messages + self.stopCheckAll() + if txMsg != rxMsgA: + self.fail("Send - receive message mismatch for queue A") + if txMsg != rxMsgB: + self.fail("Send - receive message mismatch for queue B") + if txMsg != rxMsgC: + self.fail("Send - receive message mismatch for queue C") + if tmp != rxMsgD: + self.fail("Send - receive message mismatch for queue D") + if tmp != rxMsgE: + self.fail("Send - receive message mismatch for queue E") + except: + self.killAllClusters() + raise + + +# Start the test here + +if __name__ == '__main__': + if os.getenv("STORE_ENABLE") != None: + print "NOTE: Store enabled for the following tests:" + if not unittest.main(): sys.exit(1) + diff --git a/cpp/src/tests/clustered_replication_test b/cpp/src/tests/clustered_replication_test index 7afda87733..cc331957ad 100755 --- a/cpp/src/tests/clustered_replication_test +++ b/cpp/src/tests/clustered_replication_test @@ -63,9 +63,15 @@ if test -d ${PYTHON_DIR}; then if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then cat <<EOF -Not running federation to cluster test because: + + =========== WARNING: NOT RUNNING AIS TESTS ============== + + Not running cluster replication test because: $NOGROUP $NOAISEXEC + + ========================================================== + EOF exit 0; fi diff --git a/cpp/src/tests/federated_cluster_test b/cpp/src/tests/federated_cluster_test index 575a5d9c9b..e42bf8cf7f 100755 --- a/cpp/src/tests/federated_cluster_test +++ b/cpp/src/tests/federated_cluster_test @@ -132,9 +132,15 @@ if test -d ${PYTHON_DIR}; then if test -n "$NOGROUP" -o -n "$NOAISEXEC"; then cat <<EOF -Not running federation to cluster test because: + + =========== WARNING: NOT RUNNING AIS TESTS ============== + + Not running federation to cluster test because: $NOGROUP $NOAISEXEC + + ========================================================== + EOF exit 0; fi diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests new file mode 100755 index 0000000000..f3fd3f2883 --- /dev/null +++ b/cpp/src/tests/run_cluster_tests @@ -0,0 +1,70 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run the cluster tests. +TEST_DIR=$srcdir + +# Check AIS requirements +id -nG | grep '\<ais\>' >/dev/null || NOGROUP="You are not a member of the ais group." +ps -u root | grep 'aisexec\|corosync' >/dev/null || NOAISEXEC="The aisexec or corosync daemon is not running as root" + +if test -n "${NOGROUP}" -o -n "${NOAISEXEC}"; then + cat <<EOF + + ========= WARNING: CLUSTERING TESTS DISABLED ============== + + Tests that depend on the openais library (used for clustering) + will not be run because: + + ${NOGROUP} + ${NOAISEXEC} + + =========================================================== + +EOF + exit 0 +fi + +export PYTHONPATH=$srcdir +export RUN_CLUSTER_TESTS=1 +export QPIDD=$srcdir/../qpidd +export LIBCLUSTER=$srcdir/../.libs/cluster.so +export RECEIVER=$srcdir/receiver +export SENDER=$srcdir/sender + +#Make sure temp dir exists if this is the first to use it +TMP_STORE_DIR=${TEST_DIR}/test_tmp +if ! test -d ${TMP_STORE_DIR} ; then + mkdir -p ${TMP_STORE_DIR} + mkdir -p ${TMP_STORE_DIR}/cluster +else + rm -rf "${TMP_STORE_DIR}/cluster" + mkdir -p "${TMP_STORE_DIR}/cluster" +fi +export TMP_STORE_DIR + + +AMQP_SPEC=${TEST_DIR}/../../../specs/amqp.0-10-qpid-errata.xml +sg ais -c "${TEST_DIR}/cluster.py -v" +RETCODE=$? +if test x$RETCODE != x0; then + echo "FAIL cluster tests"; exit 1; +fi diff --git a/cpp/src/tests/testlib.py b/cpp/src/tests/testlib.py new file mode 100644 index 0000000000..64b6396d5b --- /dev/null +++ b/cpp/src/tests/testlib.py @@ -0,0 +1,486 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Support library for qpid python tests. +# + +import os, signal, subprocess, unittest + +class TestBase(unittest.TestCase): + """ + Base class for qpid tests. Provides broker start/stop/kill methods + """ + + """ + The following environment vars control if and how the test is run, and determine where many of the helper + executables/libs are to be found. + """ + _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for durability to be enabled during the test + _storeLib = os.getenv("LIBSTORE") + _qpiddExec = os.getenv("QPIDD", "/usr/sbin/qpidd") + _tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid")) + + """Global message counter ensures unique messages""" + _msgCnt = 0 + + # --- Helper functions for parameter handling --- + + def _paramBool(self, key, val, keyOnly = False): + if val == None: + return "" + if keyOnly: + if val: + return " --%s" % key + else: + return "" + else: + if val: + return " --%s yes" % key + else: + return " --%s no" % key + + def _paramNum(self, key, val): + if val != None: + return " --%s %d" % (key, val) + return "" + + def _paramString(self, key, val): + if val != None: + return " --%s %s" % (key, val) + return "" + + def _paramStringList(self, key, valList, val): + if val in valList: + return " --%s %s" % (key, val) + return "" + + # --- Helper functions for message creation --- + + def _makeMessage(self, msgSize): + msg = "Message-%04d" % self._msgCnt + self._msgCnt = self._msgCnt + 1 + msgLen = len(msg) + if msgSize > msgLen: + for i in range(msgLen, msgSize): + if i == msgLen: + msg += "-" + else: + msg += chr(ord('a') + (i % 26)) + return msg + + def _makeMessageList(self, numMsgs, msgSize): + if msgSize == None: + msgSize = 12 + msgs = "" + for m in range(0, numMsgs): + msgs += "%s\n" % self._makeMessage(msgSize) + return msgs + + # --- Starting and stopping a broker --- + + def startBroker(self, qpiddArgs, logFile = None): + """Start a single broker daemon, returns tuple (pid, port)""" + if self._qpiddExec == None: + raise Exception("Environment variable QPIDD is not set") + cmd = "%s --daemon --port=0 %s" % (self._qpiddExec, qpiddArgs) + portStr = os.popen(cmd).read() + if len(portStr) == 0: + err = "Broker daemon startup failed." + if logFile != None: + err += " See log file %s" % logFile + raise Exception(err) + port = int(portStr) + pidStr = os.popen("%s -p %d -c" % (self._qpiddExec, port)).read() + try: + pid = int(pidStr) + except: + raise Exception("Unable to get pid: \"%s -p %d -c\" returned %s" % (self._qpiddExec, port, pidStr)) + #print "started broker: pid=%d, port=%d args: %s" % (pid, port, qpiddArgs) + return (pid, port) + + def killBroker(self, pid): + """Kill a broker using kill -9""" + os.kill(pid, signal.SIGTERM) + #print "killed broker: pid=%d" % pid + + def stopBroker(self, port): + """Stop a broker using qpidd -q""" + ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, "--port=%d" % port, "-q") + if ret != 0: + raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % (port, ret)) + #print "stopped broker: port=%d" % port + + + +class TestBaseCluster(TestBase): + """ + Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes. + """ + + """ + The following environment vars control if and how the test is run, and determine where many of the helper + executables/libs are to be found. + """ + _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True for these cluster tests to run + _clusterLib = os.getenv("LIBCLUSTER") + _qpidConfigExec = os.getenv("QPID_CONFIG", "/usr/bin/qpid-config") + _qpidRouteExec = os.getenv("QPID_ROUTE", "/usr/bin/qpid-route") + _receiverExec = os.getenv("RECEIVER", "/usr/libexec/qpid/test/receiver") + _senderExec = os.getenv("SENDER", "/usr/libexec/qpid/test/sender") + + + """ + _clusterDict is a dictionary of clusters: + key = cluster name (string) + val = dictionary of node numbers: + key = integer node number + val = tuple containing (pid, port) + For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows: + {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}} + where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively. + """ + _clusterDict = {} + + """Index for (pid, port) tuple""" + PID = 0 + PORT = 1 + + def run(self, res): + """ Skip cluster testing if env var RUN_CLUSTER_TESTS is not defined.""" + if not self._runClusterTests: + return + unittest.TestCase.run(self, res) + + + # --- Starting cluster node(s) --- + + def createClusterNode(self, nodeNumber, clusterName): + """Create a node and add it to the named cluster""" + if self._tempStoreDir == None: + raise Exception("Environment variable TMP_STORE_DIR is not set") + if self._clusterLib == None: + raise Exception("Environment variable LIBCLUSTER is not set") + name = "%s-%d" % (clusterName, nodeNumber) + dataDir = os.path.join(self._tempStoreDir, "cluster", name) + logFile = "%s.log" % dataDir + args = "--no-module-dir --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=notice+ --log-to-file=%s" % \ + (self._clusterLib, dataDir, clusterName, logFile) + if self._storeEnable: + if self._storeLib == None: + raise Exception("Environment variable LIBSTORE is not set") + args += " --load-module %s" % self._storeLib + self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile) + + def createCluster(self, clusterName, numberNodes): + """Create a cluster containing an initial number of nodes""" + self._clusterDict[clusterName] = {} + for n in range(0, numberNodes): + self.createClusterNode(n, clusterName) + + # --- Cluster and node status --- + + def getTupleList(self): + """Get list of (pid, port) tuples of all known cluster brokers""" + tList = [] + for l in self._clusterDict.itervalues(): + for t in l.itervalues(): + tList.append(t) + return tList + + def getNumBrokers(self): + """Get total number of brokers in all known clusters""" + return len(self.getTupleList()) + + def checkNumBrokers(self, expected): + """Check that the total number of brokers in all known clusters is the expected value""" + if self.getNumBrokers() != expected: + raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers())) + + def getClusterTupleList(self, clusterName): + """Get list of (pid, port) tuples of all nodes in named cluster""" + return self._clusterDict[clusterName].values() + + def getNumClusterBrokers(self, clusterName): + """Get total number of brokers in named cluster""" + return len(self.getClusterTupleList(clusterName)) + + def getNodeTuple(self, nodeNumber, clusterName): + """Get the (pid, port) tuple for the given cluster node""" + return self._clusterDict[clusterName][nodeNumber] + + def checkNumClusterBrokers(self, clusterName, expected): + """Check that the total number of brokers in the named cluster is the expected value""" + if self.getNumClusterBrokers(clusterName) != expected: + raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ + (clusterName, expected, self.getNumClusterBrokers(clusterName))) + + def clusterExists(self, clusterName): + """ Return True if clusterName exists, False otherwise""" + return clusterName in self._clusterDict.keys() + + def clusterNodeExists(self, clusterName, nodeNumber): + """ Return True if nodeNumber in clusterName exists, False otherwise""" + if clusterName in self._clusterDict.keys(): + return nodeNumber in self._clusterDict[nodeName] + return False + + def createCheckCluster(self, clusterName, size): + """Create a cluster using the given name and size, then check the number of brokers""" + self.createCluster(clusterName, size) + self.checkNumClusterBrokers(clusterName, size) + + # --- Kill cluster nodes using signal 9 --- + + def killNode(self, nodeNumber, clusterName, updateDict = True): + """Kill the given node in the named cluster using kill -9""" + self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID]) + if updateDict: + del(self._clusterDict[clusterName][nodeNumber]) + + def killCluster(self, clusterName, updateDict = True): + """Kill all nodes in the named cluster""" + for n in self._clusterDict[clusterName].iterkeys(): + self.killNode(n, clusterName, False) + if updateDict: + del(self._clusterDict[clusterName]) + + def killClusterCheck(self, clusterName): + """Kill the named cluster and check that the name is removed from the cluster dictionary""" + self.killCluster(clusterName) + if self.clusterExists(clusterName): + raise Exception("Unable to kill cluster %s; %d nodes still exist" % \ + (clusterName, self.getNumClusterBrokers(clusterName))) + + def killAllClusters(self): + """Kill all known clusters""" + for n in self._clusterDict.iterkeys(): + self.killCluster(n, False) + self._clusterDict.clear() + + def killAllClustersCheck(self): + """Kill all known clusters and check that the cluster dictionary is empty""" + self.killAllClusters() + self.checkNumBrokers(0) + + # --- Stop cluster nodes using qpidd -q --- + + def stopNode(self, nodeNumber, clusterName, updateDict = True): + """Stop the given node in the named cluster using qpidd -q""" + self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT]) + if updateDict: + del(self._clusterDict[clusterName][nodeNumber]) + + def stopAllClusters(self): + """Stop all known clusters""" + for n in self._clusterDict.iterkeys(): + self.stopCluster(n, False) + self._clusterDict.clear() + + + def stopCluster(self, clusterName, updateDict = True): + """Stop all nodes in the named cluster""" + for n in self._clusterDict[clusterName].iterkeys(): + self.stopNode(n, clusterName, False) + if updateDict: + del(self._clusterDict[clusterName]) + + def stopCheckCluster(self, clusterName): + """Stop the named cluster and check that the name is removed from the cluster dictionary""" + self.stopCluster(clusterName) + if self.clusterExists(clusterName): + raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) + + def stopCheckAll(self): + """Kill all known clusters and check that the cluster dictionary is empty""" + self.stopAllClusters() + self.checkNumBrokers(0) + + # --- qpid-config functions --- + + def _qpidConfig(self, nodeNumber, clusterName, action): + """Configure some aspect of a qpid broker using the qpid_config executable""" + port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] + #print "%s -a localhost:%d %s" % (self._qpidConfigExec, port, action) + ret = os.spawnl(os.P_WAIT, self._qpidConfigExec, self._qpidConfigExec, "-a", "localhost:%d" % port, *action.split()) + if ret != 0: + raise Exception("_qpidConfig(): cluster=\"%s\" nodeNumber=%d port=%d action=\"%s\" returned %d" % \ + (clusterName, nodeNumber, port, action, ret)) + + def addExchange(self, nodeNumber, clusterName, exchangeType, exchangeName, durable = False, sequence = False, \ + ive = False): + """Add a named exchange.""" + action = "add exchange %s %s" % (exchangeType, exchangeName) + action += self._paramBool("durable", durable, True) + action += self._paramBool("sequence", sequence, True) + action += self._paramBool("ive", ive, True) + self._qpidConfig(nodeNumber, clusterName, action) + + def deleteExchange(self, nodeNumber, clusterName, exchangeName): + """Delete a named exchange""" + self._qpidConfig(nodeNumber, clusterName, "del exchange %s" % exchangeName) + + def addQueue(self, nodeNumber, clusterName, queueName, configArgs = None): + """Add a queue using qpid-config.""" + action = "add queue %s" % queueName + if self._storeEnable: + action += " --durable" + if configArgs != None: + action += " %s" % configArgs + self._qpidConfig(nodeNumber, clusterName, action) + + def delQueue(self, nodeNumber, clusterName, queueName): + """Delete a named queue using qpid-config.""" + self._qpidConfig(nodeNumber, clusterName, "del queue %s" % queueName) + + def bind(self, nodeNumber, clusterName, exchangeName, queueName, key): + """Create an exchange-queue binding using qpid-config.""" + self._qpidConfig(nodeNumber, clusterName, "bind %s %s %s" % (exchangeName, queueName, key)) + + def unbind(self, nodeNumber, clusterName, exchangeName, queueName, key): + """Remove an exchange-queue binding using qpid-config.""" + self._qpidConfig(nodeNumber, clusterName, "unbind %s %s %s" % (exchangeName, queueName, key)) + + # --- qpid-route functions (federation) --- + + def brokerDict(self, nodeNumber, clusterName, host = "localhost", user = None, password = None): + """Returns a dictionary containing the broker info to be passed to route functions""" + port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] + return {"cluster": clusterName, "node":nodeNumber, "port":port, "host":host, "user":user, "password":password} + + def _brokerStr(self, brokerDict): + """Set up a broker string in the format [user/password@]host:port""" + str = "" + if brokerDict["user"] !=None and brokerDict["password"] != None: + str = "%s@%s" % (brokerDict["user"], brokerDict["password"]) + str += "%s:%d" % (brokerDict["host"], brokerDict["port"]) + return str + + def _qpidRoute(self, action): + """Set up a route using qpid-route""" + #print "%s %s" % (self._qpidRouteExec, action) + ret = os.spawnl(os.P_WAIT, self._qpidRouteExec, self._qpidRouteExec, *action.split()) + if ret != 0: + raise Exception("_qpidRoute(): action=\"%s\" returned %d" % (action, ret)) + + def routeDynamicAdd(self, destBrokerDict, srcBrokerDict, exchangeName): + self._qpidRoute("dynamic add %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName)) + + def routeDynamicDelete(self, destBrokerDict, srcBrokerDict, exchangeName): + self._qpidRoute("dynamic del %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName)) + + def routeAdd(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey): + self._qpidRoute("route add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey)) + + def routeDelete(self, destBrokerDict, srcBrokerDict, exchangeName, routingKey): + self._qpidRoute("route del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, routingKey)) + + def routeQueueAdd(self, destBrokerDict, srcBrokerDict, exchangeName, queueName): + self._qpidRoute("queue add %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName)) + + def routeQueueDelete(self, destBrokerDict, srcBrokerDict, exchangeName, queueName): + self._qpidRoute("queue del %s %s %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict), exchangeName, queueName)) + + def routeLinkAdd(self, destBrokerDict, srcBrokerDict): + self._qpidRoute("link add %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict))) + + def routeLinkDelete(self, destBrokerDict, srcBrokerDict): + self._qpidRoute("link del %s %s" % (self._brokerStr(destBrokerDict), self._brokerStr(srcBrokerDict))) + + # --- Message send and receive functions --- + + def _receiver(self, action): + if self._receiverExec == None: + raise Exception("Environment variable RECEIVER is not set") + cmd = "%s %s" % (self._receiverExec, action) + #print cmd + return subprocess.Popen(cmd.split(), stdout = subprocess.PIPE) + + def _sender(self, action): + if self._senderExec == None: + raise Exception("Environment variable SENDER is not set") + cmd = "%s %s" % (self._senderExec, action) + #print cmd + return subprocess.Popen(cmd.split(), stdin = subprocess.PIPE) + + def createReciever(self, nodeNumber, clusterName, queueName, numMsgs = None, receiverArgs = None): + port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] + action = "--port %d --queue %s" % (port, queueName) + if numMsgs != None: + action += " --messages %d" % numMsgs + if receiverArgs != None: + action += " %s" % receiverArgs + return self._receiver(action) + + def createSender(self, nodeNumber, clusterName, exchangeName, routingKey, senderArgs = None): + port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] + action = "--port %d --exchange %s" % (port, exchangeName) + if routingKey != None and len(routingKey) > 0: + action += " --routing-key %s" % routingKey + if self._storeEnable: + action += " --durable yes" + if senderArgs != None: + action += " %s" % senderArgs + return self._sender(action) + + def createBindDirectExchangeQueue(self, nodeNumber, clusterName, exchangeName, queueName): + self.addExchange(nodeNumber, clusterName, "direct", exchangeName) + self.addQueue(nodeNumber, clusterName, queueName) + self.bind(nodeNumber, clusterName, exchangeName, queueName, queueName) + + def createBindTopicExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameKeyList): + self.addExchange(nodeNumber, clusterName, "topic", exchangeName) + for queueName, key in queueNameKeyList.iteritems(): + self.addQueue(nodeNumber, clusterName, queueName) + self.bind(nodeNumber, clusterName, exchangeName, queueName, key) + + def createBindFanoutExchangeQueues(self, nodeNumber, clusterName, exchangeName, queueNameList): + self.addExchange(nodeNumber, clusterName, "fanout", exchangeName) + for queueName in queueNameList: + self.addQueue(nodeNumber, clusterName, queueName) + self.bind(nodeNumber, clusterName, exchangeName, queueName, "") + + def sendMsgs(self, nodeNumber, clusterName, exchangeName, routingKey, numMsgs, msgSize = None, wait = True): + msgs = self._makeMessageList(numMsgs, msgSize) + sender = self.createSender(nodeNumber, clusterName, exchangeName, routingKey) + sender.stdin.write(msgs) + sender.stdin.close() + if wait: + sender.wait() + return msgs + + def receiveMsgs(self, nodeNumber, clusterName, queueName, numMsgs, wait = True): + receiver = self.createReciever(nodeNumber, clusterName, queueName, numMsgs) + cnt = 0 + msgs = "" + while cnt < numMsgs: + rx = receiver.stdout.readline() + if rx == "" and receiver.poll() != None: break + msgs += rx + cnt = cnt + 1 + if wait: + receiver.wait() + return msgs + + def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, queueName, numMsgs, wait = True, msgSize = None): + self.createBindDirectExchangeQueue(nodeNumber, clusterName, exchangeName, queueName) + txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, queueName, numMsgs, msgSize, wait) + rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, wait) + if txMsgs != rxMsgs: + self.fail("Send - receive message mismatch") diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py index ac81b6d240..25137769bf 100644 --- a/python/qpid/testlib.py +++ b/python/qpid/testlib.py @@ -21,7 +21,7 @@ # Support library for qpid python tests. # -import sys, re, unittest, os, signal, random, logging, traceback +import sys, re, unittest, os, random, logging, traceback import qpid.client, qpid.spec, qmf.console import Queue from fnmatch import fnmatch @@ -429,196 +429,3 @@ class TestBase010(unittest.TestCase): session.message_subscribe(**keys) session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFFL) session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFFL) - - -class TestBaseCluster(unittest.TestCase): - """ - Base class for cluster tests. Provides methods for starting and stopping clusters and cluster nodes. - """ - _tempStoreDir = os.getenv("TMP_STORE_DIR") - _qpidd = os.getenv("QPIDD") - _storeLib = os.getenv("LIBSTORE") - _clusterLib = os.getenv("LIBCLUSTER") - - # --- Cluster helper functions --- - - """ - _clusterDict is a dictionary of clusters: - key = cluster name (string) - val = dictionary of node numbers: - key = node number (int) - val = tuple containing (pid, port) - For example, two clusters "TestCluster0" and "TestCluster1" containing several nodes would look as follows: - {"TestCluster0": {0: (pid0-0, port0-0), 1: (pid0-1, port0-1), ...}, "TestCluster1": {0: (pid1-0, port1-0), 1: (pid1-1, port1-1), ...}} - where pidm-n and portm-n are the int pid and port for TestCluster m node n respectively. - """ - _clusterDict = {} - - """Index for (pid, port) tuple""" - PID = 0 - PORT = 1 - - def startBroker(self, qpiddArgs, logFile = None): - """Start a single broker daemon, returns tuple (pid, port)""" - if self._qpidd == None: - raise Exception("Environment variable QPIDD is not set") - cmd = "%s --daemon --port=0 %s" % (self._qpidd, qpiddArgs) - portStr = os.popen(cmd).read() - if len(portStr) == 0: - err = "Broker daemon startup failed." - if logFile != None: - err += " See log file %s" % logFile - raise Exception(err) - port = int(portStr) - pid = int(os.popen("%s -p %d -c" % (self._qpidd, port)).read()) - #print "started broker: pid=%d, port=%d" % (pid, port) - return (pid, port) - - def createClusterNode(self, nodeNumber, clusterName): - """Create a node and add it to the named cluster""" - if self._tempStoreDir == None: - raise Exception("Environment variable TMP_STORE_DIR is not set") - if self._storeLib == None: - raise Exception("Environment variable LIBSTORE is not set") - if self._clusterLib == None: - raise Exception("Environment variable LIBCLUSTER is not set") - name = "%s-%d" % (clusterName, nodeNumber) - dataDir = os.path.join(self._tempStoreDir, "cluster", name) - logFile = "%s.log" % dataDir - args = "--no-module-dir --load-module=%s --load-module=%s --data-dir=%s --cluster-name=%s --auth=no --log-enable=error+ --log-to-file=%s" % \ - (self._storeLib, self._clusterLib, dataDir, clusterName, logFile) - self._clusterDict[clusterName][nodeNumber] = self.startBroker(args, logFile) - - def createCluster(self, clusterName, numberNodes): - """Create a cluster containing an initial number of nodes""" - self._clusterDict[clusterName] = {} - for n in range(0, numberNodes): - self.createClusterNode(n, clusterName) - - def getTupleList(self): - """Get list of (pid, port) tuples of all known cluster brokers""" - tList = [] - for l in self._clusterDict.itervalues(): - for t in l.itervalues(): - tList.append(t) - return tList - - def getNumBrokers(self): - """Get total number of brokers in all known clusters""" - return len(self.getTupleList()) - - def checkNumBrokers(self, expected): - """Check that the total number of brokers in all known clusters is the expected value""" - if self.getNumBrokers() != expected: - raise Exception("Unexpected number of brokers: expected %d, found %d" % (expected, self.getNumBrokers())) - - def getClusterTupleList(self, clusterName): - """Get list of (pid, port) tuples of all nodes in named cluster""" - return self._clusterDict[clusterName].values() - - def getNumClusterBrokers(self, clusterName): - """Get total number of brokers in named cluster""" - return len(self.getClusterTupleList(clusterName)) - - def getNodeTuple(self, nodeNumber, clusterName): - """Get the (pid, port) tuple for the given cluster node""" - return self._clusterDict[clusterName][nodeNumber] - - def checkNumClusterBrokers(self, clusterName, expected): - """Check that the total number of brokers in the named cluster is the expected value""" - if self.getNumClusterBrokers(clusterName) != expected: - raise Exception("Unexpected number of brokers in cluster %s: expected %d, found %d" % \ - (clusterName, expected, self.getNumClusterBrokers(clusterName))) - - def clusterExists(self, clusterName): - """ Return True if clusterName exists, False otherwise""" - return clusterName in self._clusterDict.keys() - - def clusterNodeExists(self, clusterName, nodeNumber): - """ Return True if nodeNumber in clusterName exists, False otherwise""" - if clusterName in self._clusterDict.keys(): - return nodeNumber in self._clusterDict[nodeName] - return False - - def createCheckCluster(self, clusterName, size): - """Create a cluster using the given name and size, then check the number of brokers""" - self.createCluster(clusterName, size) - self.checkNumClusterBrokers(clusterName, size) - - # Kill cluster nodes using signal 9 - - def killNode(self, nodeNumber, clusterName, updateDict = True): - """Kill the given node in the named cluster using kill -9""" - pid = self.getNodeTuple(nodeNumber, clusterName)[self.PID] - os.kill(pid, signal.SIGTERM) - #print "killed broker: pid=%d" % pid - if updateDict: - del(self._clusterDict[clusterName][nodeNumber]) - - def killCluster(self, clusterName, updateDict = True): - """Kill all nodes in the named cluster""" - for n in self._clusterDict[clusterName].iterkeys(): - self.killNode(n, clusterName, False) - if updateDict: - del(self._clusterDict[clusterName]) - - def killClusterCheck(self, clusterName): - """Kill the named cluster and check that the name is removed from the cluster dictionary""" - self.killCluster(clusterName) - if self.clusterExists(clusterName): - raise Exception("Unable to kill cluster %s; %d nodes still exist" % \ - (clusterName, self.getNumClusterBrokers(clusterName))) - - def killAllClusters(self): - """Kill all known clusters""" - for n in self._clusterDict.iterkeys(): - self.killCluster(n, False) - self._clusterDict.clear() - - def killAllClustersCheck(self): - """Kill all known clusters and check that the cluster dictionary is empty""" - self.killAllClusters() - self.checkNumBrokers(0) - - # Stop cluster nodes using qpidd -q - - def stopNode(self, nodeNumber, clusterName, updateDict = True): - """Stop the given node in the named cluster using qpidd -q""" - port = self.getNodeTuple(nodeNumber, clusterName)[self.PORT] - ret = os.spawnl(os.P_WAIT, self._qpidd, self._qpidd, "--port=%d" % port, "-q") - if ret != 0: - raise Exception("stop_node(): cluster=\"%s\" nodeNumber=%d pid=%d port=%d: qpidd -q returned %d" % \ - (clusterName, nodeNumber, self.getNodeTuple(nodeNumber, clusterName)[self.PID], port, ret)) - #print "stopped broker: port=%d" % port - if updateDict: - del(self._clusterDict[clusterName][nodeNumber]) - - def stopAllClusters(self): - """Stop all known clusters""" - for n in self._clusterDict.iterkeys(): - self.stopCluster(n, False) - self._clusterDict.clear() - - - def stopCluster(self, clusterName, updateDict = True): - """Stop all nodes in the named cluster""" - for n in self._clusterDict[clusterName].iterkeys(): - self.stopNode(n, clusterName, False) - if updateDict: - del(self._clusterDict[clusterName]) - - def stopCheckCluster(self, clusterName): - """Stop the named cluster and check that the name is removed from the cluster dictionary""" - self.stopCluster(clusterName) - if self.clusterExists(clusterName): - raise Exception("Unable to kill cluster %s; %d nodes still exist" % (clusterName, self.getNumClusterBrokers(clusterName))) - def stopCheckAll(self): - """Kill all known clusters and check that the cluster dictionary is empty""" - self.stopAllClusters() - self.checkNumBrokers(0) - - def setUp(self): - pass - - def tearDown(self): - pass |