diff options
author | Alan Conway <aconway@apache.org> | 2009-11-10 20:57:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-11-10 20:57:30 +0000 |
commit | 03e7b94f1e12cf90e08196327d56aeabbee6cbdf (patch) | |
tree | ab8fd04812a0c7d9a063c02db3eb52176238ea7b /cpp/src/tests/brokertest.py | |
parent | e048998526e4beceda8ba1d732197bf945114ef3 (diff) | |
download | qpid-python-03e7b94f1e12cf90e08196327d56aeabbee6cbdf.tar.gz |
Fixed test_failover, added ErrorGenerator with test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834662 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r-- | cpp/src/tests/brokertest.py | 134 |
1 files changed, 93 insertions, 41 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index ec25201505..d70990bfae 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -21,7 +21,7 @@ # or federation import os, signal, string, tempfile, popen2, socket, threading, time -import qpid +import qpid, traceback from qpid import connection, messaging, util from qpid.compat import format_exc from qpid.harness import Skipped @@ -41,13 +41,14 @@ def is_running(pid): except: return False -class Unexpected(Exception): +class BadProcessStatus(Exception): pass class Popen(popen2.Popen3): """ Similar to subprocess.Popen but using popen2 classes for portability. Can set and verify expectation of process status at end of test. + Dumps command line, stdout, stderr to data dir for debugging. """ def __init__(self, cmd, expect=EXPECT_EXIT_OK): @@ -57,17 +58,25 @@ class Popen(popen2.Popen3): self.stdin = self.tochild self.stdout = self.fromchild self.stderr = self.childerr + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid) + self.dump(self.cmd_str(), "cmd") + + def dump(self, str, ext): + f = file("%s.%s" % (self.pname, ext), "w") + f.write(str) + f.close() def unexpected(self,msg): - raise Unexpected("%s: %s\n--stdout:\n%s\n--stderr:\n%s" % - (msg, self.cmd_str(), self.stdout.read(), self.stderr.read())) + self.dump(self.stdout.read(), "out") + self.dump(self.stderr.read(), "err") + raise BadProcessStatus("%s: %s" % (msg, self.pname)) - def testend(self): # Clean up at end of test. + def stop(self): # Clean up at end of test. if self.expect == EXPECT_RUNNING: try: self.kill() except: - self.unexpected("Expected running but exited %d" % self.wait()) + self.unexpected("Exit code %d" % self.wait()) else: # Give the process some time to exit. delay = 0.1 @@ -76,11 +85,11 @@ class Popen(popen2.Popen3): delay *= 2 if self.returncode is None: # Still haven't stopped self.kill() - self.unexpected("Expected to exit but still running") + self.unexpected("Still running") elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: - self.unexpected("Expected exit ok but exited %d" % self.returncode) + self.unexpected("Exit code %d" % self.returncode) elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: - self.unexpected("Expected to fail but exited ok") + self.unexpected("Expected error") def communicate(self, input=None): if input: @@ -107,8 +116,6 @@ class Popen(popen2.Popen3): def terminate(self): self.send_signal(signal.SIGTERM) def kill(self): self.send_signal(signal.SIGKILL) - - def cmd_str(self): return " ".join([str(s) for s in self.cmd]) @@ -133,19 +140,23 @@ class Broker(Popen): else: self.name = "broker%d" % Broker._broker_count Broker._broker_count += 1 - self.log = os.path.join(test.dir, self.name+".log") - cmd += ["--log-to-file", self.log, "--log-prefix", self.name,"--log-to-stderr=no"] - self.datadir = os.path.join(test.dir, self.name) + self.log = self.name+".log" + cmd += ["--log-to-file", self.log, "--log-prefix", self.name] + cmd += ["--log-to-stderr=no"] + self.datadir = self.name cmd += ["--data-dir", self.datadir] if self._store_lib: cmd += ["--load-module", self._store_lib] Popen.__init__(self, cmd, expect) try: self.port = int(self.stdout.readline()) - except Exception: - raise Exception("Failed to start broker, log: "+self.log) - test.cleanup_popen(self) + except Exception, e: + raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname)) + test.cleanup_stop(self) self.host = "localhost" # Placeholder for remote brokers. + def unexpected(self,msg): + raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) + def connect(self): """New API connection to the broker.""" return messaging.Connection.open(self.host, self.port) @@ -168,6 +179,12 @@ class Broker(Popen): s.sender(queue+" {create:always}").send(message) s.connection.close() + def send_messages(self, queue, messages): + s = self.connect().session() + sender = s.sender(queue+" {create:always}") + for m in messages: sender.send(m) + s.connection.close() + def get_message(self, queue): s = self.connect().session() m = s.receiver(queue+" {create:always}", capacity=1).fetch(timeout=1) @@ -175,6 +192,14 @@ class Broker(Popen): s.connection.close() return m + def get_messages(self, queue, n): + s = self.connect().session() + receiver = s.receiver(queue+" {create:always}", capacity=n) + m = [receiver.fetch(timeout=1) for i in range(n)] + s.acknowledge() + s.connection.close() + return m + class Cluster: """A cluster of brokers in a test.""" @@ -217,9 +242,7 @@ class BrokerTest(TestCase): Provides a well-known working directory for each test. """ - # FIXME aconway 2009-11-05: too many env vars, need a simpler - # scheme for locating exes and libs - + # Environment settings. cluster_lib = os.getenv("CLUSTER_LIB") xml_lib = os.getenv("XML_LIB") qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC") @@ -227,36 +250,39 @@ class BrokerTest(TestCase): receiver_exec = os.getenv("RECEIVER_EXEC") sender_exec = os.getenv("SENDER_EXEC") store_lib = os.getenv("STORE_LIB") - + + rootdir = os.getcwd() def configure(self, config): self.config=config def setUp(self): - self.dir = os.path.join(self.config.defines["OUTDIR"], self.id()) + self.dir = os.path.join(self.rootdir, self.config.defines["OUTDIR"], self.id()) os.makedirs(self.dir) - self.popens = [] + os.chdir(self.dir) + self.stopem = [] # things to stop at end of test def tearDown(self): err = [] - for p in self.popens: - try: p.testend() - except Unexpected, e: err.append(str(e)) - if err: raise Exception("\n".join(err)) + for p in self.stopem: + try: p.stop() + except Exception, e: err.append(str(e)) + if err: raise Exception("\n ".join(err)) # FIXME aconway 2009-11-06: check for core files of exited processes. - def cleanup_popen(self, popen): - """Add process to be killed at end of test""" - self.popens.append(popen) + def cleanup_stop(self, stopable): + """Call thing.stop at end of test""" + self.stopem.append(stopable) def popen(self, cmd, expect=EXPECT_EXIT_OK): - """Start a process that will be killed at end of test""" + """Start a process that will be killed at end of test, in the test dir.""" + os.chdir(self.dir) p = Popen(cmd, expect) - self.cleanup_popen(p) + self.cleanup_stop(p) return p def broker(self, args=[], name=None, expect=EXPECT_RUNNING): """Create and return a broker ready for use""" - b = Broker(self, args, name, expect) + b = Broker(self, args=args, name=name, expect=expect) b.connect().close() return b @@ -266,6 +292,11 @@ class BrokerTest(TestCase): cluster.wait() return cluster +class RethrownException(Exception): + """Captures the original stack trace to be thrown later""" + def __init__(self, e): + Exception.__init__(self, format_exc()) + class StoppableThread(Thread): """ Base class for threads that do something in a loop and periodically check @@ -281,7 +312,7 @@ class StoppableThread(Thread): self.join() if self.error: raise self.error -class Sender(StoppableThread): +class NumberedSender(StoppableThread): """ Thread to run a sender client and send numbered messages until stopped. """ @@ -300,7 +331,7 @@ class Sender(StoppableThread): self.sent += 1 except Exception, e: self.error = RethrownException(e) -class Receiver(Thread): +class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. @@ -320,8 +351,9 @@ class Receiver(Thread): while self.stopat is None or self.received < self.stopat: self.lock.acquire() try: - self.test.assertEqual(self.receiver.stdout.readline(), str(self.received)+"\n") - self.received += 1 + m = int(self.receiver.stdout.readline()) + assert(m <= self.received) # Allow for duplicates + if (m == self.received): self.received += 1 finally: self.lock.release() except Exception, e: @@ -335,7 +367,27 @@ class Receiver(Thread): self.join() if self.error: raise self.error -class RethrownException(Exception): - """Captures the original stack trace to be thrown later""" - def __init__(self, e): - Exception.__init__(self, format_exc()) +class ErrorGenerator(StoppableThread): + """ + Thread that continuously generates errors by trying to consume from + a non-existent queue. For cluster regression tests, error handling + caused issues in the past. + """ + + def __init__(self, broker): + StoppableThread.__init__(self) + self.broker=broker + broker.test.cleanup_stop(self) + self.start() + + def run(self): + c = self.broker.connect_old() + try: + while not self.stopped: + try: + c.session(str(qpid.datatypes.uuid4())).message_subscribe( + queue="non-existent-queue") + assert(False) + except qpid.session.SessionException: pass + except: pass # Normal if broker is killed. + |