diff options
-rw-r--r-- | cpp/src/tests/brokertest.py | 134 | ||||
-rw-r--r-- | cpp/src/tests/cluster_tests.fail | 2 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 34 | ||||
-rwxr-xr-x | cpp/src/tests/run_cluster_tests | 2 |
4 files changed, 119 insertions, 53 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index ec25201505..d70990bfae 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -21,7 +21,7 @@ # or federation import os, signal, string, tempfile, popen2, socket, threading, time -import qpid +import qpid, traceback from qpid import connection, messaging, util from qpid.compat import format_exc from qpid.harness import Skipped @@ -41,13 +41,14 @@ def is_running(pid): except: return False -class Unexpected(Exception): +class BadProcessStatus(Exception): pass class Popen(popen2.Popen3): """ Similar to subprocess.Popen but using popen2 classes for portability. Can set and verify expectation of process status at end of test. + Dumps command line, stdout, stderr to data dir for debugging. """ def __init__(self, cmd, expect=EXPECT_EXIT_OK): @@ -57,17 +58,25 @@ class Popen(popen2.Popen3): self.stdin = self.tochild self.stdout = self.fromchild self.stderr = self.childerr + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid) + self.dump(self.cmd_str(), "cmd") + + def dump(self, str, ext): + f = file("%s.%s" % (self.pname, ext), "w") + f.write(str) + f.close() def unexpected(self,msg): - raise Unexpected("%s: %s\n--stdout:\n%s\n--stderr:\n%s" % - (msg, self.cmd_str(), self.stdout.read(), self.stderr.read())) + self.dump(self.stdout.read(), "out") + self.dump(self.stderr.read(), "err") + raise BadProcessStatus("%s: %s" % (msg, self.pname)) - def testend(self): # Clean up at end of test. + def stop(self): # Clean up at end of test. if self.expect == EXPECT_RUNNING: try: self.kill() except: - self.unexpected("Expected running but exited %d" % self.wait()) + self.unexpected("Exit code %d" % self.wait()) else: # Give the process some time to exit. delay = 0.1 @@ -76,11 +85,11 @@ class Popen(popen2.Popen3): delay *= 2 if self.returncode is None: # Still haven't stopped self.kill() - self.unexpected("Expected to exit but still running") + self.unexpected("Still running") elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: - self.unexpected("Expected exit ok but exited %d" % self.returncode) + self.unexpected("Exit code %d" % self.returncode) elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: - self.unexpected("Expected to fail but exited ok") + self.unexpected("Expected error") def communicate(self, input=None): if input: @@ -107,8 +116,6 @@ class Popen(popen2.Popen3): def terminate(self): self.send_signal(signal.SIGTERM) def kill(self): self.send_signal(signal.SIGKILL) - - def cmd_str(self): return " ".join([str(s) for s in self.cmd]) @@ -133,19 +140,23 @@ class Broker(Popen): else: self.name = "broker%d" % Broker._broker_count Broker._broker_count += 1 - self.log = os.path.join(test.dir, self.name+".log") - cmd += ["--log-to-file", self.log, "--log-prefix", self.name,"--log-to-stderr=no"] - self.datadir = os.path.join(test.dir, self.name) + self.log = self.name+".log" + cmd += ["--log-to-file", self.log, "--log-prefix", self.name] + cmd += ["--log-to-stderr=no"] + self.datadir = self.name cmd += ["--data-dir", self.datadir] if self._store_lib: cmd += ["--load-module", self._store_lib] Popen.__init__(self, cmd, expect) try: self.port = int(self.stdout.readline()) - except Exception: - raise Exception("Failed to start broker, log: "+self.log) - test.cleanup_popen(self) + except Exception, e: + raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname)) + test.cleanup_stop(self) self.host = "localhost" # Placeholder for remote brokers. + def unexpected(self,msg): + raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) + def connect(self): """New API connection to the broker.""" return messaging.Connection.open(self.host, self.port) @@ -168,6 +179,12 @@ class Broker(Popen): s.sender(queue+" {create:always}").send(message) s.connection.close() + def send_messages(self, queue, messages): + s = self.connect().session() + sender = s.sender(queue+" {create:always}") + for m in messages: sender.send(m) + s.connection.close() + def get_message(self, queue): s = self.connect().session() m = s.receiver(queue+" {create:always}", capacity=1).fetch(timeout=1) @@ -175,6 +192,14 @@ class Broker(Popen): s.connection.close() return m + def get_messages(self, queue, n): + s = self.connect().session() + receiver = s.receiver(queue+" {create:always}", capacity=n) + m = [receiver.fetch(timeout=1) for i in range(n)] + s.acknowledge() + s.connection.close() + return m + class Cluster: """A cluster of brokers in a test.""" @@ -217,9 +242,7 @@ class BrokerTest(TestCase): Provides a well-known working directory for each test. """ - # FIXME aconway 2009-11-05: too many env vars, need a simpler - # scheme for locating exes and libs - + # Environment settings. cluster_lib = os.getenv("CLUSTER_LIB") xml_lib = os.getenv("XML_LIB") qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC") @@ -227,36 +250,39 @@ class BrokerTest(TestCase): receiver_exec = os.getenv("RECEIVER_EXEC") sender_exec = os.getenv("SENDER_EXEC") store_lib = os.getenv("STORE_LIB") - + + rootdir = os.getcwd() def configure(self, config): self.config=config def setUp(self): - self.dir = os.path.join(self.config.defines["OUTDIR"], self.id()) + self.dir = os.path.join(self.rootdir, self.config.defines["OUTDIR"], self.id()) os.makedirs(self.dir) - self.popens = [] + os.chdir(self.dir) + self.stopem = [] # things to stop at end of test def tearDown(self): err = [] - for p in self.popens: - try: p.testend() - except Unexpected, e: err.append(str(e)) - if err: raise Exception("\n".join(err)) + for p in self.stopem: + try: p.stop() + except Exception, e: err.append(str(e)) + if err: raise Exception("\n ".join(err)) # FIXME aconway 2009-11-06: check for core files of exited processes. - def cleanup_popen(self, popen): - """Add process to be killed at end of test""" - self.popens.append(popen) + def cleanup_stop(self, stopable): + """Call thing.stop at end of test""" + self.stopem.append(stopable) def popen(self, cmd, expect=EXPECT_EXIT_OK): - """Start a process that will be killed at end of test""" + """Start a process that will be killed at end of test, in the test dir.""" + os.chdir(self.dir) p = Popen(cmd, expect) - self.cleanup_popen(p) + self.cleanup_stop(p) return p def broker(self, args=[], name=None, expect=EXPECT_RUNNING): """Create and return a broker ready for use""" - b = Broker(self, args, name, expect) + b = Broker(self, args=args, name=name, expect=expect) b.connect().close() return b @@ -266,6 +292,11 @@ class BrokerTest(TestCase): cluster.wait() return cluster +class RethrownException(Exception): + """Captures the original stack trace to be thrown later""" + def __init__(self, e): + Exception.__init__(self, format_exc()) + class StoppableThread(Thread): """ Base class for threads that do something in a loop and periodically check @@ -281,7 +312,7 @@ class StoppableThread(Thread): self.join() if self.error: raise self.error -class Sender(StoppableThread): +class NumberedSender(StoppableThread): """ Thread to run a sender client and send numbered messages until stopped. """ @@ -300,7 +331,7 @@ class Sender(StoppableThread): self.sent += 1 except Exception, e: self.error = RethrownException(e) -class Receiver(Thread): +class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. @@ -320,8 +351,9 @@ class Receiver(Thread): while self.stopat is None or self.received < self.stopat: self.lock.acquire() try: - self.test.assertEqual(self.receiver.stdout.readline(), str(self.received)+"\n") - self.received += 1 + m = int(self.receiver.stdout.readline()) + assert(m <= self.received) # Allow for duplicates + if (m == self.received): self.received += 1 finally: self.lock.release() except Exception, e: @@ -335,7 +367,27 @@ class Receiver(Thread): self.join() if self.error: raise self.error -class RethrownException(Exception): - """Captures the original stack trace to be thrown later""" - def __init__(self, e): - Exception.__init__(self, format_exc()) +class ErrorGenerator(StoppableThread): + """ + Thread that continuously generates errors by trying to consume from + a non-existent queue. For cluster regression tests, error handling + caused issues in the past. + """ + + def __init__(self, broker): + StoppableThread.__init__(self) + self.broker=broker + broker.test.cleanup_stop(self) + self.start() + + def run(self): + c = self.broker.connect_old() + try: + while not self.stopped: + try: + c.session(str(qpid.datatypes.uuid4())).message_subscribe( + queue="non-existent-queue") + assert(False) + except qpid.session.SessionException: pass + except: pass # Normal if broker is killed. + diff --git a/cpp/src/tests/cluster_tests.fail b/cpp/src/tests/cluster_tests.fail index ca445e3ee6..139597f9cb 100644 --- a/cpp/src/tests/cluster_tests.fail +++ b/cpp/src/tests/cluster_tests.fail @@ -1,2 +1,2 @@ -cluster_tests.ClusterTests.test_failover + diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 682e25fbd3..07050c922f 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -53,28 +53,42 @@ class ClusterTests(BrokerTest): s2.connection.close() def test_failover(self): - """Test fail-over during continuous send-receive""" - # FIXME aconway 2009-11-09: this test is failing, showing lost messages. - # Enable when fixed + """Test fail-over during continuous send-receive with errors""" # Original cluster will all be killed so expect exit with failure cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL) + # Start sender and receiver threads cluster[0].declare_queue("test-queue") - self.receiver = Receiver(cluster[1]) - self.receiver.start() - self.sender = Sender(cluster[2]) - self.sender.start() + receiver = NumberedReceiver(cluster[1]) + receiver.start() + sender = NumberedSender(cluster[2]) + sender.start() # Kill original brokers, start new ones. for i in range(3): cluster[i].kill() - cluster.start() + b = cluster.start() time.sleep(1) - self.sender.stop() - self.receiver.stop(self.sender.sent) + sender.stop() + receiver.stop(sender.sent) + + def send_receive_verify(self, b1, b2, queue, msgs): + b1.send_messages(queue, msgs) + self.assertEqual(msgs, [ m.content for m in b2.get_messages(queue,len(msgs))]) + + def test_error_storm(self): + """Verify cluster behaves with clients generating a lot of errors.""" + cluster = self.cluster(3) + errgen = [ ErrorGenerator(b) for b in cluster ] + msgs = [ str(i) for i in range(10) ] + self.send_receive_verify(cluster[0], cluster[1], "q", msgs) + self.send_receive_verify(cluster[1], cluster[2], "q", msgs) + for i in range(3): + cluster.start() + self.send_receive_verify(cluster[1], cluster[2], "q", msgs) class ClusterStoreTests(BrokerTest): diff --git a/cpp/src/tests/run_cluster_tests b/cpp/src/tests/run_cluster_tests index 9f9b6735f6..014233d8d3 100755 --- a/cpp/src/tests/run_cluster_tests +++ b/cpp/src/tests/run_cluster_tests @@ -45,7 +45,7 @@ rm -rf $OUTDIR mkdir -p $OUTDIR # Ignore tests requiring a store by default. -TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail" +TESTS="-i cluster_tests.ClusterStoreTests.* -I $srcdir/cluster_tests.fail $*" with_ais_group $PYTHON_COMMANDS/qpid-python-test -DOUTDIR=$OUTDIR -m cluster_tests $TESTS || exit 1 rm -rf $OUTDIR |