summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/tests/brokertest.py134
-rw-r--r--cpp/src/tests/cluster_tests.fail2
-rwxr-xr-xcpp/src/tests/cluster_tests.py34
-rwxr-xr-xcpp/src/tests/run_cluster_tests2
4 files changed, 119 insertions, 53 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index ec25201505..d70990bfae 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -21,7 +21,7 @@
# or federation
import os, signal, string, tempfile, popen2, socket, threading, time
-import qpid
+import qpid, traceback
from qpid import connection, messaging, util
from qpid.compat import format_exc
from qpid.harness import Skipped
@@ -41,13 +41,14 @@ def is_running(pid):
except:
return False
-class Unexpected(Exception):
+class BadProcessStatus(Exception):
pass
class Popen(popen2.Popen3):
"""
Similar to subprocess.Popen but using popen2 classes for portability.
Can set and verify expectation of process status at end of test.
+ Dumps command line, stdout, stderr to data dir for debugging.
"""
def __init__(self, cmd, expect=EXPECT_EXIT_OK):
@@ -57,17 +58,25 @@ class Popen(popen2.Popen3):
self.stdin = self.tochild
self.stdout = self.fromchild
self.stderr = self.childerr
+ self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
+ self.dump(self.cmd_str(), "cmd")
+
+ def dump(self, str, ext):
+ f = file("%s.%s" % (self.pname, ext), "w")
+ f.write(str)
+ f.close()
def unexpected(self,msg):
- raise Unexpected("%s: %s\n--stdout:\n%s\n--stderr:\n%s" %
- (msg, self.cmd_str(), self.stdout.read(), self.stderr.read()))
+ self.dump(self.stdout.read(), "out")
+ self.dump(self.stderr.read(), "err")
+ raise BadProcessStatus("%s: %s" % (msg, self.pname))
- def testend(self): # Clean up at end of test.
+ def stop(self): # Clean up at end of test.
if self.expect == EXPECT_RUNNING:
try:
self.kill()
except:
- self.unexpected("Expected running but exited %d" % self.wait())
+ self.unexpected("Exit code %d" % self.wait())
else:
# Give the process some time to exit.
delay = 0.1
@@ -76,11 +85,11 @@ class Popen(popen2.Popen3):
delay *= 2
if self.returncode is None: # Still haven't stopped
self.kill()
- self.unexpected("Expected to exit but still running")
+ self.unexpected("Still running")
elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
- self.unexpected("Expected exit ok but exited %d" % self.returncode)
+ self.unexpected("Exit code %d" % self.returncode)
elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
- self.unexpected("Expected to fail but exited ok")
+ self.unexpected("Expected error")
def communicate(self, input=None):
if input:
@@ -107,8 +116,6 @@ class Popen(popen2.Popen3):
def terminate(self): self.send_signal(signal.SIGTERM)
def kill(self): self.send_signal(signal.SIGKILL)
-
-
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
@@ -133,19 +140,23 @@ class Broker(Popen):
else:
self.name = "broker%d" % Broker._broker_count
Broker._broker_count += 1
- self.log = os.path.join(test.dir, self.name+".log")
- cmd += ["--log-to-file", self.log, "--log-prefix", self.name,"--log-to-stderr=no"]
- self.datadir = os.path.join(test.dir, self.name)
+ self.log = self.name+".log"
+ cmd += ["--log-to-file", self.log, "--log-prefix", self.name]
+ cmd += ["--log-to-stderr=no"]
+ self.datadir = self.name
cmd += ["--data-dir", self.datadir]
if self._store_lib: cmd += ["--load-module", self._store_lib]
Popen.__init__(self, cmd, expect)
try: self.port = int(self.stdout.readline())
- except Exception:
- raise Exception("Failed to start broker, log: "+self.log)
- test.cleanup_popen(self)
+ except Exception, e:
+ raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname))
+ test.cleanup_stop(self)
self.host = "localhost" # Placeholder for remote brokers.
+ def unexpected(self,msg):
+ raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
+
def connect(self):
"""New API connection to the broker."""
return messaging.Connection.open(self.host, self.port)
@@ -168,6 +179,12 @@ class Broker(Popen):
s.sender(queue+" {create:always}").send(message)
s.connection.close()
+ def send_messages(self, queue, messages):
+ s = self.connect().session()
+ sender = s.sender(queue+" {create:always}")
+ for m in messages: sender.send(m)
+ s.connection.close()
+
def get_message(self, queue):
s = self.connect().session()
m = s.receiver(queue+" {create:always}", capacity=1).fetch(timeout=1)
@@ -175,6 +192,14 @@ class Broker(Popen):
s.connection.close()
return m
+ def get_messages(self, queue, n):
+ s = self.connect().session()
+ receiver = s.receiver(queue+" {create:always}", capacity=n)
+ m = [receiver.fetch(timeout=1) for i in range(n)]
+ s.acknowledge()
+ s.connection.close()
+ return m
+
class Cluster:
"""A cluster of brokers in a test."""
@@ -217,9 +242,7 @@ class BrokerTest(TestCase):
Provides a well-known working directory for each test.
"""
- # FIXME aconway 2009-11-05: too many env vars, need a simpler
- # scheme for locating exes and libs
-
+ # Environment settings.
cluster_lib = os.getenv("CLUSTER_LIB")
xml_lib = os.getenv("XML_LIB")
qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -227,36 +250,39 @@ class BrokerTest(TestCase):
receiver_exec = os.getenv("RECEIVER_EXEC")
sender_exec = os.getenv("SENDER_EXEC")
store_lib = os.getenv("STORE_LIB")
-
+
+ rootdir = os.getcwd()
def configure(self, config): self.config=config
def setUp(self):
- self.dir = os.path.join(self.config.defines["OUTDIR"], self.id())
+ self.dir = os.path.join(self.rootdir, self.config.defines["OUTDIR"], self.id())
os.makedirs(self.dir)
- self.popens = []
+ os.chdir(self.dir)
+ self.stopem = [] # things to stop at end of test
def tearDown(self):
err = []
- for p in self.popens:
- try: p.testend()
- except Unexpected, e: err.append(str(e))
- if err: raise Exception("\n".join(err))
+ for p in self.stopem:
+ try: p.stop()
+ except Exception, e: err.append(str(e))
+ if err: raise Exception("\n ".join(err))
# FIXME aconway 2009-11-06: check for core files of exited processes.
- def cleanup_popen(self, popen):
- """Add process to be killed at end of test"""
- self.popens.append(popen)
+ def cleanup_stop(self, stopable):
+ """Call thing.stop at end of test"""
+ self.stopem.append(stopable)
def popen(self, cmd, expect=EXPECT_EXIT_OK):
- """Start a process that will be killed at end of test"""
+ """Start a process that will be killed at end of test, in the test dir."""
+ os.chdir(self.dir)
p = Popen(cmd, expect)
- self.cleanup_popen(p)
+ self.cleanup_stop(p)
return p
def broker(self, args=[], name=None, expect=EXPECT_RUNNING):
"""Create and return a broker ready for use"""
- b = Broker(self, args, name, expect)
+ b = Broker(self, args=args, name=name, expect=expect)
b.connect().close()
return b
@@ -266,6 +292,11 @@ class BrokerTest(TestCase):
cluster.wait()
return cluster
+class RethrownException(Exception):
+ """Captures the original stack trace to be thrown later"""
+ def __init__(self, e):
+ Exception.__init__(self, format_exc())
+
class StoppableThread(Thread):
"""
Base class for threads that do something in a loop and periodically check
@@ -281,7 +312,7 @@ class StoppableThread(Thread):
self.join()
if self.error: raise self.error
-class Sender(StoppableThread):
+class NumberedSender(StoppableThread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
@@ -300,7 +331,7 @@ class Sender(StoppableThread):
self.sent += 1
except Exception, e: self.error = RethrownException(e)
-class Receiver(Thread):
+class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
sequentially numbered messages.
@@ -320,8 +351,9 @@ class Receiver(Thread):
while self.stopat is None or self.received < self.stopat:
self.lock.acquire()
try:
- self.test.assertEqual(self.receiver.stdout.readline(), str(self.received)+"\n")
- self.received += 1
+ m = int(self.receiver.stdout.readline())
+ assert(m <= self.received) # Allow for duplicates
+ if (m == self.received): self.received += 1
finally:
self.lock.release()
except Exception, e:
@@ -335,7 +367,27 @@ class Receiver(Thread):
self.join()
if self.error: raise self.error
-class RethrownException(Exception):
- """Captures the original stack trace to be thrown later"""
- def __init__(self, e):
- Exception.__init__(self, format_exc())
+class ErrorGenerator(StoppableThread):
+ """
+ Thread that continuously generates errors by trying to consume from
+ a non-existent queue. For cluster regression tests, error handling
+ caused issues in the past.
+ """
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.broker=broker
+ broker.test.cleanup_stop(self)
+ self.start()
+
+ def run(self):
+ c = self.broker.connect_old()
+ try:
+ while not self.stopped:
+ try:
+ c.session(str(qpid.datatypes.uuid4())).message_subscribe(
+ queue="non-existent-queue")
+ assert(False)
+ except qpid.session.SessionException: pass
+ except: pass # Normal if broker is killed.
+
diff --git a/cpp/src/tests/cluster_tests.fail b/cpp/src/tests/cluster_tests.fail
index ca445e3ee6..139597f9cb 100644
--- a/cpp/src/tests/cluster_tests.fail
+++ b/cpp/src/tests/cluster_tests.fail
@@ -1,2 +1,2 @@
-cluster_tests.ClusterTests.test_failover
+
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 682e25fbd3..07050c922f 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -53,28 +53,42 @@ class ClusterTests(BrokerTest):
s2.connection.close()
def test_failover(self):
- """Test fail-over during continuous send-receive"""
- # FIXME aconway 2009-11-09: this test is failing, showing lost messages.
- # Enable when fixed
+ """Test fail-over during continuous send-receive with errors"""
# Original cluster will all be killed so expect exit with failure
cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+
# Start sender and receiver threads
cluster[0].declare_queue("test-queue")
- self.receiver = Receiver(cluster[1])
- self.receiver.start()
- self.sender = Sender(cluster[2])
- self.sender.start()
+ receiver = NumberedReceiver(cluster[1])
+ receiver.start()
+ sender = NumberedSender(cluster[2])
+ sender.start()
# Kill original brokers, start new ones.
for i in range(3):
cluster[i].kill()
- cluster.start()
+ b = cluster.start()
time.sleep(1)
- self.sender.stop()
- self.receiver.stop(self.sender.sent)
+ sender.stop()
+ receiver.stop(sender.sent)
+
+ def send_receive_verify(self, b1, b2, queue, msgs):
+ b1.send_messages(queue, msgs)
+ self.assertEqual(msgs, [ m.content for m in b2.get_messages(queue,len(msgs))])
+
+ def test_error_storm(self):
+ """Verify cluster behaves with clients generating a lot of errors."""
+ cluster = self.cluster(3)
+ errgen = [ ErrorGenerator(b) for b in cluster ]
+ msgs = [ str(i) for i in range(10) ]
+ self.send_receive_verify(cluster[0], cluster[1], "q", msgs)
+ self.send_receive_verify(cluster[1], cluster[2], "q", msgs)
+ for i in range(3):
+ cluster.start()
+ self.send_receive_verify(cluster[1], cluster[2], "q", msgs)
class ClusterStoreTests(BrokerTest):
diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests
index 9f9b6735f6..014233d8d3 100755
--- a/cpp/src/tests/run_cluster_tests
+++ b/cpp/src/tests/run_cluster_tests
@@ -45,7 +45,7 @@ rm -rf $OUTDIR
mkdir -p $OUTDIR
# Ignore tests requiring a store by default.
-TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail"
+TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail $*"
with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $TESTS || exit 1
rm -rf $OUTDIR