diff options
author | Alan Conway <aconway@apache.org> | 2010-03-03 17:11:09 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2010-03-03 17:11:09 +0000 |
commit | f0f898dfcae5815c31283ab476fa5252f6660bc1 (patch) | |
tree | f529bf4ea217f3b8fa9c8a633b003158c4828446 | |
parent | 6b76732a5390600afa8f0d89e817e4c2d91c019f (diff) | |
download | qpid-python-f0f898dfcae5815c31283ab476fa5252f6660bc1.tar.gz |
Minor improvements to brokertest framework.
- fixed bug in use of host()
- check for existence of executables
- more efficient error_line impl
- check both *.err and *.out for error line
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918578 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | qpid/python/qpid/brokertest.py | 109 |
2 files changed, 84 insertions, 28 deletions
diff --git a/.gitignore b/.gitignore index 14fc13b445..283441acef 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ *.lo *.vglog .dirstamp -Makefile* +Makefile.in config\.* configure .deps @@ -22,7 +22,6 @@ qpidc.spec qpid/cpp/src/gen/ *gen.mk *.timestamp -rgen.timestamp *.pcl qpid/cpp/managementgen/management-types.xml diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py index d53a5ae1e2..382f116ff6 100644 --- a/qpid/python/qpid/brokertest.py +++ b/qpid/python/qpid/brokertest.py @@ -37,7 +37,21 @@ EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test. EXPECT_RUNNING=3 # Expect to still be running at end of test EXPECT_UNKNOWN=4 # No expectation, don't check exit status. - + +def is_exe(fpath): + return os.path.exists(fpath) and os.access(fpath, os.X_OK) + +def find_exe(program): + """Find an executable in the system PATH""" + dir, name = os.path.split(program) + if dir: + if is_exe(program): return program + else: + for path in os.environ["PATH"].split(os.pathsep): + exe_file = os.path.join(path, program) + if is_exe(exe_file): return exe_file + return None + def is_running(pid): try: os.kill(pid, 0) @@ -64,15 +78,28 @@ class ExceptionWrapper: except Exception, e: raise Exception("%s: %s" %(self.msg, str(e))) -def error_line(f): +def error_line(filename): + """Get the last line of filename for error messages""" + result = "" try: - ff = file(f) + f = file(filename) try: - lines = ff.readlines() - if len(lines) > 0: return ": %s" % (lines[-1]) - else: return "" - finally: ff.close() + for l in f: result = ": " + l + finally: f.close() except: return "" + return result + +def retry(function, timeout=1, delay=.001): + """Call function until it returns True or timeout expires. + Double the delay for each retry. Return True if function + returns true, False if timeout expires.""" + elapsed = 0 + while not function(): + elapsed += delay + if elapsed > timeout: return False + delay *= 2 + time.sleep(delay) + return True class Popen(popen2.Popen3): """ @@ -95,7 +122,8 @@ class Popen(popen2.Popen3): self.outfile = file(self.outname, "w") self.outfile.write(line) finally: - if self.outfile is not None: self.outfile.close + self.infile.close() + if self.outfile is not None: self.outfile.close() class OutStream(ExceptionWrapper): """Wrapper for output streams, handles excpetions & draining output""" @@ -116,10 +144,12 @@ class Popen(popen2.Popen3): expect - if set verify expectation at end of test. drain - if true (default) drain stdout/stderr to files. """ + assert find_exe(cmd[0]) if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] popen2.Popen3.__init__(self, self.cmd, True) self.expect = expect + self.was_shutdown = False # Set if we deliberately kill/terminate the process self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid) msg = "Process %s" % self.pname self.stdin = ExceptionWrapper(self.tochild, msg) @@ -128,22 +158,24 @@ class Popen(popen2.Popen3): f = file(self.outfile("cmd"), "w") try: f.write(self.cmd_str()) finally: f.close() - log.debug("Started process %s" % self.pname) + log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) if drain: self.drain() def drain(self): + """Start threads to drain stdout/err""" self.stdout.drain() self.stderr.drain() def drain_join(self): + """Join the drain threads""" self.stdout.thread.join() self.stderr.thread.join() def unexpected(self,msg): self.drain() self.drain_join() - raise BadProcessStatus("%s %s%s" % (self.pname, msg, - error_line(self.outfile("err")))) + err = error_line(self.outfile("err")) or error_line(self.outfile("out")) + raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) def stop(self): # Clean up at end of test. self.drain() @@ -157,6 +189,7 @@ class Popen(popen2.Popen3): self.unexpected("expected running, exit code %d" % self.wait()) else: # Give the process some time to exit. + # FIXME aconway 2010-03-02: use retry delay = 0.1 while (self.poll() is None and delay < 1): time.sleep(delay) @@ -168,6 +201,7 @@ class Popen(popen2.Popen3): self.unexpected("exit code %d" % self.returncode) elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: self.unexpected("expected error") + self.stdin.close() def communicate(self, input=None): if input: @@ -195,6 +229,8 @@ class Popen(popen2.Popen3): return self.returncode def send_signal(self, sig): + log.debug("kill -%s %s"%(sig, self.pname)) + self.was_shutdown = True os.kill(self.pid,sig) self.wait() @@ -219,13 +255,13 @@ class Broker(Popen): self.log = "%s-%d.log" % (self.name, i) i += 1 - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING): + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0): """Start a broker daemon. name determines the data-dir and log file names.""" self.test = test - self._port = None - cmd = [BrokerTest.qpidd_exec, "--port=0", "--no-module-dir", "--auth=no"] + args + self._port=port + cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir", "--auth=no"] + args if name: self.name = name else: self.name = "broker%d" % Broker._broker_count @@ -237,12 +273,14 @@ class Broker(Popen): cmd += ["--data-dir", self.datadir] Popen.__init__(self, cmd, expect, drain=False) test.cleanup_stop(self) - self.host = "localhost" + self._host = "localhost" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) + def host(self): return self._host + def port(self): # Read port from broker process stdout if not already read. - if (self._port is None): + if (self._port == 0): try: self._port = int(self.stdout.readline()) except ValueError, e: raise Exception("Can't get port for broker %s (%s)%s" % @@ -254,11 +292,11 @@ class Broker(Popen): def connect(self): """New API connection to the broker.""" - return messaging.Connection.open(self.host, self.port()) + return messaging.Connection.open(self.host(), self.port()) def connect_old(self): """Old API connection to the broker.""" - socket = qpid.util.connect(self.host,self.port()) + socket = qpid.util.connect(self.host(),self.port()) connection = qpid.connection.Connection (sock=socket) connection.start() return connection; @@ -308,11 +346,28 @@ class Broker(Popen): s.connection.close() return m - def host_port(self): return "%s:%s" % (self.host, self.port()) + def host_port(self): return "%s:%s" % (self.host(), self.port()) + + def log_ready(self): + """Return true if the log file exists and contains a broker ready message""" + if not os.path.exists(self.log): return False + ready_msg = re.compile("notice Broker running") + f = file(self.log) + try: + for l in f: + if ready_msg.search(l): return True + return False + finally: f.close() + # FIXME aconway 2010-03-02: rename to wait_ready def ready(self): """Wait till broker is ready to serve clients""" - self.connect().close() + # First make sure the broker is listening by checking the log. + if not retry(lambda: self.log_ready()): + raise Exception("Timed out waiting for broker %s" % self.name) + # Make a connection, this will wait for extended cluster init to finish. + try: self.connect().close() + except: raise RethrownException("Broker %s failed ready test %s"%self.name) class Cluster: """A cluster of brokers in a test.""" @@ -332,11 +387,10 @@ class Cluster: self.args += [ "--load-module", BrokerTest.cluster_lib ] self.start_n(count, expect=expect, wait=wait) - def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) - log.debug("Cluster %s starting member %s" % (self.name, name)) - self._brokers.append(self.test.broker(self.args+args, name, expect, wait)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) return self._brokers[-1] def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): @@ -392,10 +446,13 @@ class BrokerTest(TestCase): self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True,port=0): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect) - if (wait): b.connect().close() + b = Broker(self, args=args, name=name, expect=expect, port=port) + if (wait): + try: b.ready() + except Exception, e: + raise Exception("Failed to start broker %s: %s" % ( b.name, e)) return b def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): |