diff options
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 15 | ||||
-rwxr-xr-x | cpp/src/tests/run_long_cluster_tests | 2 | ||||
-rw-r--r-- | python/qpid/brokertest.py | 80 |
3 files changed, 54 insertions, 43 deletions
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 4fefe26db3..55b36aba46 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -160,7 +160,7 @@ class LongTests(BrokerTest): for i in range(i, len(cluster)): cluster[i].kill() def test_management(self): - """Run management clients and other clients concurrently.""" + """Stress test: Run management clients and other clients concurrently.""" # TODO aconway 2010-03-03: move to brokertest framework class ClientLoop(StoppableThread): @@ -171,6 +171,7 @@ class LongTests(BrokerTest): self.cmd = cmd # Client command. self.lock = Lock() self.process = None # Client process. + self._expect_fail = False self.start() def run(self): @@ -195,7 +196,7 @@ class LongTests(BrokerTest): try: # Quit and ignore errors if stopped or expecting failure. if self.stopped: break - if exit != 0: + if not self._expect_fail and exit != 0: self.process.unexpected( "client of %s exit code %s"%(self.broker.name, exit)) finally: self.lock.release() @@ -205,12 +206,13 @@ class LongTests(BrokerTest): def expect_fail(self): """Ignore exit status of the currently running client.""" self.lock.acquire() - stopped = True + self._expect_fail = True self.lock.release() def stop(self): """Stop the running client and wait for it to exit""" self.lock.acquire() + if self.stopped: return try: self.stopped = True if self.process: @@ -232,7 +234,7 @@ class LongTests(BrokerTest): """Start ordinary clients for a broker""" batch = [] for cmd in [ - ["perftest", "--count", 1000, + ["perftest", "--count", 50000, "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()], ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())], ["testagent", "localhost", str(broker.port())] ]: @@ -257,11 +259,12 @@ class LongTests(BrokerTest): for b in cluster[alive:]: b.ready() # Check if a broker crashed. # Kill the first broker. Ignore errors on its clients and all the mclients for c in clients[alive] + mclients: c.expect_fail() - clients[alive] = [] - mclients = [] b = cluster[alive] b.expect = EXPECT_EXIT_FAIL b.kill() + for c in clients[alive] + mclients: c.stop() + clients[alive] = [] + mclients = [] # Start another broker and clients alive += 1 b = cluster.start() diff --git a/cpp/src/tests/run_long_cluster_tests b/cpp/src/tests/run_long_cluster_tests index cb9c6b219b..81d4019616 100755 --- a/cpp/src/tests/run_long_cluster_tests +++ b/cpp/src/tests/run_long_cluster_tests @@ -20,4 +20,4 @@ # srcdir=`dirname $0` -$srcdir/run_cluster_tests long_cluster_tests +$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=5 diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index 192228a74a..8771032bee 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -81,7 +81,7 @@ def error_line(filename): """Get the last line of filename for error messages""" result = "" try: - f = file(filename) + f = open(filename) try: for l in f: result = ": " + l finally: f.close() @@ -118,7 +118,7 @@ class Popen(popen2.Popen3): try: for line in self.infile: if self.outfile is None: - self.outfile = file(self.outname, "w") + self.outfile = open(self.outname, "w") self.outfile.write(line) finally: self.infile.close() @@ -146,57 +146,61 @@ class Popen(popen2.Popen3): assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] + self.returncode = None popen2.Popen3.__init__(self, self.cmd, True) self.expect = expect - self.was_shutdown = False # Set if we deliberately kill/terminate the process self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid) msg = "Process %s" % self.pname self.stdin = ExceptionWrapper(self.tochild, msg) self.stdout = Popen.OutStream(self.fromchild, self.outfile("out"), msg) self.stderr = Popen.OutStream(self.childerr, self.outfile("err"), msg) - f = file(self.outfile("cmd"), "w") + f = open(self.outfile("cmd"), "w") try: f.write(self.cmd_str()) finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) if drain: self.drain() + self._clean = False def drain(self): """Start threads to drain stdout/err""" self.stdout.drain() self.stderr.drain() - def drain_join(self): - """Join the drain threads""" - self.stdout.thread.join() + def _cleanup(self): + """Close pipes to sub-process""" + if self._clean: return + self._clean = True + self.stdin.close() + self.drain() # Drain output pipes. + self.stdout.thread.join() # Drain thread closes pipe. self.stderr.thread.join() def unexpected(self,msg): - self.drain() - self.drain_join() + self._cleanup() err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) def stop(self): # Clean up at end of test. - self.drain() - self.stdin.close() - if self.expect == EXPECT_UNKNOWN: - try: self.kill() # Just make sure its dead - except: pass - elif self.expect == EXPECT_RUNNING: - try: - self.kill() - except: - self.unexpected("expected running, exit code %d" % self.wait()) - else: - retry(lambda: self.poll() is not None) - if self.returncode is None: # Still haven't stopped - self.kill() - self.unexpected("still running") - elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: - self.unexpected("exit code %d" % self.returncode) - elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: - self.unexpected("expected error") - self.stdin.close() + try: + if self.expect == EXPECT_UNKNOWN: + try: self.kill() # Just make sure its dead + except: pass + elif self.expect == EXPECT_RUNNING: + try: + self.kill() + except: + self.unexpected("expected running, exit code %d" % self.wait()) + else: + retry(lambda: self.poll() is not None) + if self.returncode is None: # Still haven't stopped + self.kill() + self.unexpected("still running") + elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: + self.unexpected("exit code %d" % self.returncode) + elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: + self.unexpected("expected error") + finally: + self._cleanup() def communicate(self, input=None): if input: @@ -213,20 +217,24 @@ class Popen(popen2.Popen3): if not self.is_running(): unexpected("Exit code %d" % self.returncode) def poll(self): + if self.returncode is not None: return self.returncode self.returncode = popen2.Popen3.poll(self) if (self.returncode == -1): self.returncode = None + if self.returncode is not None: self._cleanup() return self.returncode def wait(self): + if self.returncode is not None: return self.returncode self.drain() - self.returncode = popen2.Popen3.wait(self) - self.drain_join() + try: self.returncode = popen2.Popen3.wait(self) + except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e)) + self._cleanup() return self.returncode def send_signal(self, sig): - self.was_shutdown = True - os.kill(self.pid,sig) - self.wait() + try: os.kill(self.pid,sig) + except OSError,e: raise OSError("Kill failed %s: %s"%(self.pname, e)) + self._cleanup() def terminate(self): self.send_signal(signal.SIGTERM) def kill(self): self.send_signal(signal.SIGKILL) @@ -347,7 +355,7 @@ class Broker(Popen): """Return true if the log file exists and contains a broker ready message""" if self._log_ready: return True if not os.path.exists(self.log): return False - f = file(self.log) + f = open(self.log) try: for l in f: if "notice Broker running" in l: @@ -604,7 +612,7 @@ def import_script(path): Import executable script at path as a module. Requires some trickery as scripts are not in standard module format """ - f = file(path) + f = open(path) try: name=os.path.split(path)[1].replace("-","_") return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) |