summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/tests/brokertest.py215
-rwxr-xr-xcpp/src/tests/cluster_tests.py40
-rwxr-xr-xcpp/src/tests/run_cluster_tests2
3 files changed, 252 insertions, 5 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
new file mode 100644
index 0000000000..511a86edda
--- /dev/null
+++ b/cpp/src/tests/brokertest.py
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Support library for tests that start multiple brokers, e.g. cluster
+# or federation
+
+#
+# FIXME aconway 2009-10-30: Missing features:
+# - support for calling qpid-tool/qpid-config directly from a test.
+# (Not by starting a separate process)
+# - helper functions to run executable clients e.g. sender/receiver.
+#
+
+import os, signal, string, tempfile, popen2, socket
+from qpid import connection, messaging
+from shutil import rmtree
+from unittest import TestCase
+
+# Values for expected outcome of process at end of test
+EXPECT_NONE=0 # No expectation
+EXPECT_EXIT_OK=1 # Expect to exit with 0 before end of test
+EXPECT_RUNNING=2 # Expect to still be running at end of test
+
+def is_running(pid):
+ try:
+ os.kill(pid, 0)
+ return True
+ except:
+ return False
+
+class Popen:
+ """Similar to subprocess.Popen but using popen2 classes for portability.
+ Can set expectation that process exits with 0 or is still running at end of test.
+ """
+
+ def __init__(self, cmd, expect=EXPECT_EXIT_OK):
+ self._cmd = cmd
+ self.expect = expect
+ self._popen = popen2.Popen3(cmd, True)
+ self.stdin = self._popen.tochild
+ self.stdout = self._popen.fromchild
+ self.stderr = self._popen.childerr
+ self.pid = self._popen.pid
+
+ def _addoutput(self, msg, name, output):
+ if output: msg += [name, output]
+
+ def _check(self, retcode):
+ self.returncode = retcode
+ if self.expect == EXPECT_EXIT_OK and self.returncode != 0:
+ msg = [ "Unexpected error %d: %s" %(retcode, string.join(self._cmd)) ]
+ self._addoutput(msg, "stdout:", self.stdout.read())
+ self._addoutput(msg, "stderr:", self.stderr.read())
+ raise Exception(string.join(msg, "\n"))
+
+ def poll(self):
+ retcode = self._popen.poll()
+ if retcode != -1: self._check(retcode)
+ return retcode
+
+ def wait(self):
+ self._check(self._popen.wait())
+ return self.returncode
+
+ def communicate(self, input=None):
+ if input: self.stdin.write(input)
+ outerr = (self.stdout.read(), self.stderr.read())
+ wait()
+ return outerr
+
+ def is_running(self): return is_running(pid)
+ def send_signal(self, sig): os.kill(self.pid,sig)
+ def terminate(self): self.send_signal(signal.SIGTERM)
+ def kill(self): self.send_signal(signal.SIGKILL)
+
+def checkenv(name):
+ value = os.getenv(name)
+ if not value: raise Exception("Environment variable %s is not set" % name)
+ return value
+
+class Broker(Popen):
+ "A broker process. Takes care of start, stop and logging."
+ _store_lib = os.getenv("STORE_LIB")
+ _qpidd = checkenv("QPIDD_EXEC")
+ _broker_count = 0
+
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
+ """Start a broker daemon. name determines the data-dir and log
+ file names."""
+
+ cmd = [self._qpidd, "--port=0", "--no-module-dir", "--auth=no"] + args
+ if name: self.name = name
+ else:
+ self.name = "broker%d" % Broker._broker_count
+ Broker._broker_count += 1
+ self.log = os.path.join(test.dir, self.name+".log")
+ cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
+ self.datadir = os.path.join(test.dir, self.name)
+ cmd += ["--data-dir", self.datadir]
+ if self._store_lib: cmd += ["--load-module", self._store_lib]
+
+ Popen.__init__(self, cmd, expect)
+ self.port = int(self.stdout.readline())
+ test.willkill(self)
+ self.host = "localhost" # Placeholder for remote brokers.
+
+ def connect(self):
+ """New API connection to the broker."""
+ return messaging.Connection.open(self.host, self.port)
+
+ def connect_old(self):
+ """Old API connection to the broker."""
+ socket = connect(self.host,self.port)
+ connection = connection.Connection (sock=socket)
+ connection.start()
+ return connection;
+
+
+class Cluster:
+ """A cluster of brokers in a test."""
+
+ _cluster_lib = checkenv("CLUSTER_LIB")
+ _cluster_count = 0
+
+ # FIXME aconway 2009-10-30: missing args
+ def __init__(self, test, count=0):
+ self.test = test
+ self._brokers=[]
+ self.name = "cluster%d" % Cluster._cluster_count
+ Cluster._cluster_count += 1
+ # Use unique cluster name
+ self.args = []
+ self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
+ self.args += [ "--load-module", self._cluster_lib ]
+ self.start_n(count)
+
+ def start(self, name=None, expect=EXPECT_RUNNING):
+ """Add a broker to the cluster. Returns the index of the new broker."""
+ if not name: name="%s-%d" % (self.name, len(self._brokers))
+ self._brokers.append(self.test.broker(self.args, name, expect))
+ return self._brokers[-1]
+
+ def start_n(self, count):
+ for i in range(count): self.start()
+
+ def wait(self):
+ """Wait for all cluster members to be ready"""
+ for b in brokers:
+ b.connect().close()
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+class BrokerTest(TestCase):
+ """Provides working dir that is cleaned up only if test passes.
+ Tracks processes started by test and kills at end of test.
+ Note that subclasses need to call selfpassed() at the end of
+ a successful test."""
+
+ def setUp(self):
+ self.dir = tempfile.mkdtemp()
+ self.popens = []
+
+ def willkill(self, popen):
+ """Add process to be killed at end of test"""
+ self.popens.append(popen)
+
+ def popen(self, cmd, expect=EXPECT_EXIT_OK):
+ """Start a process that will be killed at end of test"""
+ p = Popen(cmd, expect)
+ willkill(p)
+ return p
+
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING):
+ return Broker(self, args, name, expect)
+
+ def cluster(self, count=0): return Cluster(self)
+
+ def passed(self):
+ """On pass, kill processes and clean up work directory"""
+ rmtree(self.dir)
+ self.passed = True
+
+ def tearDown(self):
+ """On failure print working dir, kill processes"""
+ if not self.passed: print "TEST DIRECTORY: ", self.dir
+ err=[]
+ for p in self.popens:
+ if p.is_running:
+ p.kill()
+ else:
+ if p.expect == EXPECT_RUNNING:
+ err.append("NOT running: %s" % (cmd))
+ if len(err) != 0:
+ raise Exception(string.join(err, "\n"))
+
+
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index a0b9c420d9..3613f564bd 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -18,12 +18,44 @@
# under the License.
#
-import os, signal, sys, unittest
-from testlib import TestBaseCluster
+import os, signal, sys
+from brokertest import *
+from qpid import datatypes, messaging, tests
+from testlib import TestBaseCluster # Old framework
+
+# New framework tests
+class NewTests(BrokerTest):
+
+ def test_basic(self):
+ """Test basic cluster message replication."""
+ # Start a cluster, send some messages to member 0.
+ cluster = Cluster(self, 2)
+ s0 = cluster[0].connect().session()
+ s0.sender("q {create:always}").send(messaging.Message("x"))
+ s0.sender("q {create:always}").send(messaging.Message("y"))
+ s0.connection.close()
+
+ # Verify messages available on member 1.
+ s1 = cluster[1].connect().session()
+ m = s1.receiver("q", capacity=1).fetch(timeout=1)
+ s1.acknowledge()
+ self.assertEqual("x", m.content)
+ s1.connection.close()
+
+ # Start member 2 and verify messages available.
+ s2 = cluster.start().connect().session()
+ m = s2.receiver("q", capacity=1).fetch(timeout=1)
+ s1.acknowledge()
+ self.assertEqual("y", m.content)
+ s2.connection.close()
+
+ self.passed()
+
+# Old framework tests
class ShortTests(TestBaseCluster):
"""Basic cluster with async store tests"""
-
+
def test_01_Initialization(self):
"""Start a single cluster containing several nodes, and stop it again"""
try:
@@ -401,5 +433,5 @@ class LongTests(TestBaseCluster):
if __name__ == '__main__':
if os.getenv("STORE_LIB") != None:
print "NOTE: Store enabled for the following tests:"
- if not unittest.main(): sys.exit(1)
+ if not test.main(): sys.exit(1)
diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests
index d1a58f9f6a..d6eeed33ea 100755
--- a/cpp/src/tests/run_cluster_tests
+++ b/cpp/src/tests/run_cluster_tests
@@ -29,7 +29,7 @@ TEST_DIR=${top_builddir}/src/tests
. $srcdir/python_env.sh
if test -z $1; then
- CLUSTER_TEST="$PYTHON_COMMANDS/qpid-python-test -m cluster_tests cluster_tests.ShortTests.\*"
+ CLUSTER_TEST="$PYTHON_COMMANDS/qpid-python-test -m cluster_tests cluster_tests.NewTests.*"
else
CLUSTER_TEST="$PYTHON_COMMANDS/qpid-python-test -m cluster_tests cluster_tests.LongTests.\*"
echo "Running $1..."