diff options
author | Alan Conway <aconway@apache.org> | 2011-03-11 22:29:14 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-03-11 22:29:14 +0000 |
commit | 681b508b5d496a7a5b3093b6902050cabe8f68f8 (patch) | |
tree | 66d9a4162d6d7b088f33de53fbe78f1b9e544012 /cpp/src | |
parent | 7373fddecb13bcbc2017fac3f84ecd84ebd6cc34 (diff) | |
download | qpid-python-681b508b5d496a7a5b3093b6902050cabe8f68f8.tar.gz |
QPID-3129: cluster_tests.LongTests.test_failover hangs
- simplified brokertest.py using subprocess.Popen file redirection instead of threads.
- fixed the hang in test_failover
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1080786 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/tests/brokertest.py | 189 | ||||
-rwxr-xr-x | cpp/src/tests/cluster_tests.py | 22 |
2 files changed, 83 insertions, 128 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 19e97ce7aa..5a20dceae4 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -62,24 +62,6 @@ def is_running(pid): class BadProcessStatus(Exception): pass -class ExceptionWrapper: - """Proxy object that adds a message to exceptions raised""" - def __init__(self, obj, msg): - self.obj = obj - self.msg = msg - - def __getattr__(self, name): - func = getattr(self.obj, name) - if type(func) != callable: - return func - return lambda *args, **kwargs: self._wrap(func, args, kwargs) - - def _wrap(self, func, args, kwargs): - try: - return func(*args, **kwargs) - except Exception, e: - raise Exception("%s: %s" %(self.msg, str(e))) - def error_line(filename, n=1): """Get the last n line(s) of filename for error messages""" result = [] @@ -89,7 +71,8 @@ def error_line(filename, n=1): for l in f: if len(result) == n: result.pop(0) result.append(" "+l) - finally: f.close() + finally: + f.close() except: return "" return ":\n" + "".join(result) @@ -106,88 +89,63 @@ def retry(function, timeout=10, delay=.01): delay *= 2 return True +class AtomicCounter: + def __init__(self): + self.count = 0 + self.lock = Lock() + + def next(self): + self.lock.acquire(); + ret = self.count + self.count += 1 + self.lock.release(); + return ret + +_popen_id = AtomicCounter() # Popen identifier for use in output file names. + +# Constants for file descriptor arguments to Popen +FILE = "FILE" # Write to file named after process +PIPE = subprocess.PIPE + class Popen(subprocess.Popen): """ Can set and verify expectation of process status at end of test. Dumps command line, stdout, stderr to data dir for debugging. """ - class DrainThread(Thread): - """Thread to drain a file object and write the data to a file.""" - def __init__(self, infile, outname): - Thread.__init__(self) - self.infile, self.outname = infile, outname - self.outfile = None - - def run(self): - try: - for line in self.infile: - if self.outfile is None: - self.outfile = open(self.outname, "w") - self.outfile.write(line) - finally: - self.infile.close() - if self.outfile is not None: self.outfile.close() - - class OutStream(ExceptionWrapper): - """Wrapper for output streams, handles exceptions & draining output""" - def __init__(self, infile, outfile, msg): - ExceptionWrapper.__init__(self, infile, msg) - self.infile, self.outfile = infile, outfile - self.thread = None - - def drain(self): - if self.thread is None: - self.thread = Popen.DrainThread(self.infile, self.outfile) - self.thread.start() - - def outfile(self, ext): return "%s.%s" % (self.pname, ext) - - def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True): - """Run cmd (should be a list of arguments) + def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): + """Run cmd (should be a list of program and arguments) expect - if set verify expectation at end of test. - drain - if true (default) drain stdout/stderr to files. + stdout, stderr - can have the same values as for subprocess.Popen as well as + FILE (the default) which means write to a file named after the process. + stdin - like subprocess.Popen but defauts to PIPE """ self._clean = False self._clean_lock = Lock() assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] - self.returncode = None self.expect = expect + self.id = _popen_id.next() + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id) + if stdout == FILE: stdout = open(self.outfile("out"), "w") + if stderr == FILE: stderr = open(self.outfile("err"), "w") try: - subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True) - except ValueError: # Windows can't do close_fds - subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE) - self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid) - msg = "Process %s" % self.pname - self.stdin = ExceptionWrapper(self.stdin, msg) - self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg) - self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg) + subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, + stdin=stdin, stdout=stdout, stderr=stderr, + close_fds=True) + except ValueError: # Windows can't do close_fds + subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, + stdin=stdin, stdout=stdout, stderr=stderr) + f = open(self.outfile("cmd"), "w") - try: f.write(self.cmd_str()) + try: f.write("%s\n%d"%(self.cmd_str(), self.pid)) finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) - if drain: self.drain() - def __str__(self): return "Popen<%s>"%(self.pname) + def __str__(self): return "Popen<%s>"%(self.pname) - def drain(self): - """Start threads to drain stdout/err""" - self.stdout.drain() - self.stderr.drain() - - def _cleanup(self): - """Close pipes to sub-process""" - self._clean_lock.acquire() - try: - if self._clean: return - self._clean = True - self.stdin.close() - self.drain() # Drain output pipes. - self.stdout.thread.join() # Drain thread closes pipe. - self.stderr.thread.join() - finally: self._clean_lock.release() + def outfile(self, ext): return "%s.%s" % (self.pname, ext) def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) @@ -199,10 +157,8 @@ class Popen(subprocess.Popen): try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: - try: - self.kill() - except: - self.unexpected("expected running, exit code %d" % self.wait()) + try: self.kill() + except: self.unexpected("expected running, exit code %d" % self.wait()) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped @@ -216,38 +172,19 @@ class Popen(subprocess.Popen): self.wait() # Clean up the process. def communicate(self, input=None): - if input: - self.stdin.write(input) - self.stdin.close() - outerr = (self.stdout.read(), self.stderr.read()) - self.wait() - return outerr + ret = subprocess.Popen.communicate(self, input) + self.cleanup() + return ret - def is_running(self): - return self.poll() is None + def is_running(self): return self.poll() is None def assert_running(self): if not self.is_running(): self.unexpected("Exit code %d" % self.returncode) - def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4 - if self.returncode is None: - # Pass _deadstate only if it has been set, there is no _deadstate - # parameter in Python 2.6 - if _deadstate is None: ret = subprocess.Popen.poll(self) - else: ret = subprocess.Popen.poll(self, _deadstate) - - if (ret != -1): - self.returncode = ret - self._cleanup() - return self.returncode - def wait(self): - if self.returncode is None: - self.drain() - try: self.returncode = subprocess.Popen.wait(self) - except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e)) - self._cleanup() - return self.returncode + ret = subprocess.Popen.wait(self) + self._cleanup() + return ret def terminate(self): try: subprocess.Popen.terminate(self) @@ -256,6 +193,7 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') + self._cleanup() def kill(self): try: subprocess.Popen.kill(self) @@ -264,6 +202,20 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') + self._cleanup() + + def _cleanup(self): + """Clean up after a dead process""" + self._clean_lock.acquire() + if not self._clean: + self._clean = True + try: self.stdin.close() + except: pass + try: self.stdout.close() + except: pass + try: self.stderr.close() + except: pass + self._clean_lock.release() def cmd_str(self): return " ".join([str(s) for s in self.cmd]) @@ -323,7 +275,7 @@ class Broker(Popen): cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] - Popen.__init__(self, cmd, expect, drain=False) + Popen.__init__(self, cmd, expect, stdout=PIPE) test.cleanup_stop(self) self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) @@ -427,7 +379,9 @@ class Broker(Popen): "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) def store_state(self): - uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines() + f = open(os.path.join(self.datadir, "cluster", "store.status")) + try: uuids = f.readlines() + finally: f.close() null_uuid="00000000-0000-0000-0000-000000000000\n" if len(uuids) < 2: return "unknown" # we looked while the file was being updated. if uuids[0] == null_uuid: return "empty" @@ -509,10 +463,10 @@ class BrokerTest(TestCase): """Call thing.stop at end of test""" self.stopem.append(stopable) - def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True): + def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) - p = Popen(cmd, expect, drain) + p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) self.cleanup_stop(p) return p @@ -582,7 +536,8 @@ class NumberedSender(Thread): "--failover-updates", "--content-stdin" ], - expect=EXPECT_RUNNING) + expect=EXPECT_RUNNING, + stdin=PIPE) self.condition = Condition() self.max = max_depth self.received = 0 @@ -642,7 +597,7 @@ class NumberedReceiver(Thread): "--forever" ], expect=EXPECT_RUNNING, - drain=False) + stdout=PIPE) self.lock = Lock() self.error = None self.sender = sender diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 3f19411d19..71df954427 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -526,24 +526,24 @@ class LongTests(BrokerTest): if self.stopped: break self.process = self.broker.test.popen( self.cmd, expect=EXPECT_UNKNOWN) - finally: self.lock.release() - try: exit = self.process.wait() + finally: + self.lock.release() + try: + exit = self.process.wait() except OSError, e: - # Seems to be a race in wait(), it throws - # "no such process" during test shutdown. - # Doesn't indicate a test error, ignore. - return + # Process may already have been killed by self.stop() + break except Exception, e: self.process.unexpected( "client of %s: %s"%(self.broker.name, e)) self.lock.acquire() try: - # Quit and ignore errors if stopped or expecting failure. if self.stopped: break if exit != 0: self.process.unexpected( "client of %s exit code %s"%(self.broker.name, exit)) - finally: self.lock.release() + finally: + self.lock.release() except Exception, e: self.error = RethrownException("Error in ClientLoop.run") @@ -588,7 +588,8 @@ class LongTests(BrokerTest): mclients.append(ClientLoop(broker, cmd)) endtime = time.time() + self.duration() - runtime = self.duration() / 4 # First run is longer, use quarter of duration. + # For long duration, first run is a quarter of the duration. + runtime = max(5, self.duration() / 4.0) alive = 0 # First live cluster member for i in range(len(cluster)): start_clients(cluster[i]) start_mclients(cluster[alive]) @@ -614,14 +615,13 @@ class LongTests(BrokerTest): start_mclients(cluster[alive]) for c in chain(mclients, *clients): c.stop() - # Verify that logs are consistent cluster_test_logs.verify_logs() def test_management_qmf2(self): self.test_management(args=["--mgmt-qmf2=yes"]) - def test_connect_consistent(self): # FIXME aconway 2011-01-18: + def test_connect_consistent(self): args=["--mgmt-pub-interval=1","--log-enable=trace+:management"] cluster = self.cluster(2, args=args) end = time.time() + self.duration() |