diff options
| -rwxr-xr-x | cpp/src/tests/cluster_tests.py | 72 | ||||
| -rw-r--r-- | cpp/src/tests/test_env.sh.in | 3 | ||||
| -rw-r--r-- | python/qpid/brokertest.py | 117 |
3 files changed, 155 insertions, 37 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index a6c096b067..b3274b1b1e 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -31,6 +31,12 @@ log = getLogger("qpid.cluster_tests") # Import scripts as modules qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC")) +def readfile(filename): + """Returns te content of file named filename as a string""" + f = file(filename) + try: return f.read() + finally: f.close() + class ShortTests(BrokerTest): """Short cluster functionality tests.""" @@ -81,7 +87,7 @@ class ShortTests(BrokerTest): # Now update a new member and compare their dumps. cluster.start(args=["--test-store-dump", "updatee.dump"]) - assert file("direct.dump").read() == file("updatee.dump").read() + assert readfile("direct.dump") == readfile("updatee.dump") os.remove("direct.dump") os.remove("updatee.dump") @@ -94,14 +100,14 @@ class ShortTests(BrokerTest): cluster.start() update_re = re.compile(r"member update: (.*) frameSeq=[0-9]+ configSeq=([0-9]+)") - matches = [ update_re.search(file(b.log).read()) for b in cluster ] + matches = [ update_re.search(readfile(b.log)) for b in cluster ] sequences = [ m.group(2) for m in matches] self.assertEqual(sequences, ["0", "1", "3"]) # Check that configurations with same seq. number match configs={} for b in cluster: - matches = update_re.findall(file(b.log).read()) + matches = update_re.findall(readfile(b.log)) for m in matches: seq=m[1] config=re.sub("\((member|unknown)\)", "", m[0]) @@ -142,6 +148,62 @@ class LongTests(BrokerTest): receiver.stop(sender.sent) for i in range(i, len(cluster)): cluster[i].kill() + def test_management(self): + """Run management in conjunction with other traffic.""" + # Publish often to provoke errors + args=["--mgmt-pub-interval", 1] + # Use store if present + if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib] + + class ClientLoop(StoppableThread): + """Run an infinite client loop.""" + def __init__(self, broker, cmd): + StoppableThread.__init__(self) + self.broker=broker + self.cmd = cmd + self.lock = Lock() + self.process = None + self.stopped = False + self.start() + + def run(self): + try: + while True: + self.lock.acquire() + try: + if self.stopped: break + self.process = self.broker.test.popen(self.cmd, + expect=EXPECT_UNKNOWN) + finally: self.lock.release() + try: exit = self.process.wait() + except: exit = 1 + self.lock.acquire() + try: + if exit != 0 and not self.stopped: + self.process.unexpected("bad exit status in client loop") + finally: self.lock.release() + except Exception, e: + error=e + + def stop(self): + self.lock.acquire() + try: + self.stopped = True + try: self.process.terminate() + except: pass + finally: self.lock.release() + StoppableThread.stop(self) + cluster = self.cluster(3, args) + clients = [] + for b in cluster: + clients.append(ClientLoop(b, ["perftest", "--count", "100", "--port", b.port()])) + clients.append(ClientLoop(b, ["qpid-queue-stats", "-a", "localhost:%s" %(b.port())])) + endtime = time.time() + self.duration() + while time.time() < endtime: + for b in cluster: b.ready() # Will raise if broker crashed. + time.sleep(1) + for c in clients: + c.stop() class StoreTests(BrokerTest): """ @@ -255,8 +317,8 @@ class StoreTests(BrokerTest): self.assertRaises(Exception, lambda: a.ready()) self.assertRaises(Exception, lambda: b.ready()) msg = re.compile("critical.*no clean store") - assert msg.search(file(a.log).read()) - assert msg.search(file(b.log).read()) + assert msg.search(readfile(a.log)) + assert msg.search(readfile(b.log)) # FIXME aconway 2009-12-03: verify manual restore procedure diff --git a/cpp/src/tests/test_env.sh.in b/cpp/src/tests/test_env.sh.in index 32c010b9be..f7e527c118 100644 --- a/cpp/src/tests/test_env.sh.in +++ b/cpp/src/tests/test_env.sh.in @@ -49,6 +49,9 @@ export QPID_TEST_EXEC_DIR=$builddir export RECEIVER_EXEC=$QPID_TEST_EXEC_DIR/receiver export SENDER_EXEC=$QPID_TEST_EXEC_DIR/sender +# Path +export PATH=$top_builddir/src:$PYTHON_COMMANDS:$QPID_TEST_EXEC_DIR:$PATH + # Modules export TEST_STORE_LIB=$testmoduledir/test_store.so diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 9f07ffe388..d53a5ae1e2 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -36,6 +36,7 @@ log = getLogger("qpid.brokertest") EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test. EXPECT_RUNNING=3 # Expect to still be running at end of test +EXPECT_UNKNOWN=4 # No expectation, don't check exit status. def is_running(pid): try: @@ -62,14 +63,16 @@ class ExceptionWrapper: return func(*args, **kwargs) except Exception, e: raise Exception("%s: %s" %(self.msg, str(e))) - + def error_line(f): try: - lines = file(f).readlines() - if len(lines) > 0: return ": %s" % (lines[-1]) - except: pass - return "" - + ff = file(f) + try: + lines = ff.readlines() + if len(lines) > 0: return ": %s" % (lines[-1]) + else: return "" + finally: ff.close() + except: return "" class Popen(popen2.Popen3): """ @@ -78,7 +81,41 @@ class Popen(popen2.Popen3): Dumps command line, stdout, stderr to data dir for debugging. """ - def __init__(self, cmd, expect=EXPECT_EXIT_OK): + class DrainThread(Thread): + """Thread to drain a file object and write the data to a file.""" + def __init__(self, infile, outname): + Thread.__init__(self) + self.infile, self.outname = infile, outname + self.outfile = None + + def run(self): + try: + for line in self.infile: + if self.outfile is None: + self.outfile = file(self.outname, "w") + self.outfile.write(line) + finally: + if self.outfile is not None: self.outfile.close + + class OutStream(ExceptionWrapper): + """Wrapper for output streams, handles excpetions & draining output""" + def __init__(self, infile, outfile, msg): + ExceptionWrapper.__init__(self, infile, msg) + self.infile, self.outfile = infile, outfile + self.thread = None + + def drain(self): + if self.thread is None: + self.thread = Popen.DrainThread(self.infile, self.outfile) + self.thread.start() + + def outfile(self, ext): return "%s.%s" % (self.pname, ext) + + def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True): + """Run cmd (should be a list of arguments) + expect - if set verify expectation at end of test. + drain - if true (default) drain stdout/stderr to files. + """ if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] popen2.Popen3.__init__(self, self.cmd, True) @@ -86,25 +123,34 @@ class Popen(popen2.Popen3): self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid) msg = "Process %s" % self.pname self.stdin = ExceptionWrapper(self.tochild, msg) - self.stdout = ExceptionWrapper(self.fromchild, msg) - self.stderr = ExceptionWrapper(self.childerr, msg) - self.dump(self.cmd_str(), "cmd") + self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg) + self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg) + f = file(self.outfile("cmd"), "w") + try: f.write(self.cmd_str()) + finally: f.close() log.debug("Started process %s" % self.pname) + if drain: self.drain() + + def drain(self): + self.stdout.drain() + self.stderr.drain() - def dump(self, str, ext): - name = "%s.%s" % (self.pname, ext) - f = file(name, "w") - f.write(str) - f.close() - return name + def drain_join(self): + self.stdout.thread.join() + self.stderr.thread.join() def unexpected(self,msg): - self.dump(self.stdout.read(), "out") - err = self.dump(self.stderr.read(), "err") - raise BadProcessStatus("%s %s%s" % (self.pname, msg, error_line(err))) + self.drain() + self.drain_join() + raise BadProcessStatus("%s %s%s" % (self.pname, msg, + error_line(self.outfile("err")))) def stop(self): # Clean up at end of test. - if self.expect == EXPECT_RUNNING: + self.drain() + if self.expect == EXPECT_UNKNOWN: + try: self.kill() # Just make sure its dead + except: pass + elif self.expect == EXPECT_RUNNING: try: self.kill() except: @@ -131,7 +177,8 @@ class Popen(popen2.Popen3): self.wait() return outerr - def is_running(self): return self.poll() is None + def is_running(self): + return self.poll() is None def assert_running(self): if not self.is_running(): unexpected("Exit code %d" % self.returncode) @@ -142,7 +189,9 @@ class Popen(popen2.Popen3): return self.returncode def wait(self): + self.drain() self.returncode = popen2.Popen3.wait(self) + self.drain_join() return self.returncode def send_signal(self, sig): @@ -186,7 +235,7 @@ class Broker(Popen): cmd += ["--log-to-stderr=no"] self.datadir = self.name cmd += ["--data-dir", self.datadir] - Popen.__init__(self, cmd, expect) + Popen.__init__(self, cmd, expect, drain=False) test.cleanup_stop(self) self.host = "localhost" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) @@ -336,10 +385,10 @@ class BrokerTest(TestCase): """Call thing.stop at end of test""" self.stopem.append(stopable) - def popen(self, cmd, expect=EXPECT_EXIT_OK): + def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) - p = Popen(cmd, expect) + p = Popen(cmd, expect, drain) self.cleanup_stop(p) return p @@ -359,8 +408,8 @@ class BrokerTest(TestCase): for b in _brokers: b.connect().close() class RethrownException(Exception): - """Captures the original stack trace to be thrown later""" - def __init__(self, e, msg=""): + """Captures the stack trace of the current exception to be thrown later""" + def __init__(self, msg=""): Exception.__init__(self, msg+"\n"+format_exc()) class StoppableThread(Thread): @@ -409,7 +458,7 @@ class NumberedSender(Thread): self.sender.stdin.write(str(self.sent)+"\n") self.sender.stdin.flush() self.sent += 1 - except Exception, e: self.error = RethrownException(e, self.sender.pname) + except Exception: self.error = RethrownException(self.sender.pname) def notify_received(self, count): """Called by receiver to enable flow control. count = messages received so far.""" @@ -438,7 +487,8 @@ class NumberedReceiver(Thread): Thread.__init__(self) self.test = broker.test self.receiver = self.test.popen( - [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING) + [self.test.receiver_exec, "--port", broker.port()], + expect=EXPECT_RUNNING, drain=False) self.stopat = None self.lock = Lock() self.error = None @@ -460,8 +510,8 @@ class NumberedReceiver(Thread): self.received += 1 if self.sender: self.sender.notify_received(self.received) - except Exception, e: - self.error = RethrownException(e, self.receiver.pname) + except Exception: + self.error = RethrownException(self.receiver.pname) def stop(self, count): """Returns when received >= count""" @@ -500,5 +550,8 @@ def import_script(path): Import executable script at path as a module. Requires some trickery as scripts are not in standard module format """ - name=os.path.split(path)[1].replace("-","_") - return imp.load_module(name, file(path), path, ("", "r", imp.PY_SOURCE)) + f = file(path) + try: + name=os.path.split(path)[1].replace("-","_") + return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) + finally: f.close() |
