summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-03-03 17:11:09 +0000
committerAlan Conway <aconway@apache.org>2010-03-03 17:11:09 +0000
commitf0f898dfcae5815c31283ab476fa5252f6660bc1 (patch)
treef529bf4ea217f3b8fa9c8a633b003158c4828446
parent6b76732a5390600afa8f0d89e817e4c2d91c019f (diff)
downloadqpid-python-f0f898dfcae5815c31283ab476fa5252f6660bc1.tar.gz
Minor improvements to brokertest framework.
- fixed bug in use of host() - check for existence of executables - more efficient error_line impl - check both *.err and *.out for error line git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@918578 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--.gitignore3
-rw-r--r--qpid/python/qpid/brokertest.py109
2 files changed, 84 insertions, 28 deletions
diff --git a/.gitignore b/.gitignore
index 14fc13b445..283441acef 100644
--- a/.gitignore
+++ b/.gitignore
@@ -7,7 +7,7 @@
*.lo
*.vglog
.dirstamp
-Makefile*
+Makefile.in
config\.*
configure
.deps
@@ -22,7 +22,6 @@ qpidc.spec
qpid/cpp/src/gen/
*gen.mk
*.timestamp
-rgen.timestamp
*.pcl
qpid/cpp/managementgen/management-types.xml
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index d53a5ae1e2..382f116ff6 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -37,7 +37,21 @@ EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test.
EXPECT_RUNNING=3 # Expect to still be running at end of test
EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
-
+
+def is_exe(fpath):
+ return os.path.exists(fpath) and os.access(fpath, os.X_OK)
+
+def find_exe(program):
+ """Find an executable in the system PATH"""
+ dir, name = os.path.split(program)
+ if dir:
+ if is_exe(program): return program
+ else:
+ for path in os.environ["PATH"].split(os.pathsep):
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file): return exe_file
+ return None
+
def is_running(pid):
try:
os.kill(pid, 0)
@@ -64,15 +78,28 @@ class ExceptionWrapper:
except Exception, e:
raise Exception("%s: %s" %(self.msg, str(e)))
-def error_line(f):
+def error_line(filename):
+ """Get the last line of filename for error messages"""
+ result = ""
try:
- ff = file(f)
+ f = file(filename)
try:
- lines = ff.readlines()
- if len(lines) > 0: return ": %s" % (lines[-1])
- else: return ""
- finally: ff.close()
+ for l in f: result = ": " + l
+ finally: f.close()
except: return ""
+ return result
+
+def retry(function, timeout=1, delay=.001):
+ """Call function until it returns True or timeout expires.
+ Double the delay for each retry. Return True if function
+ returns true, False if timeout expires."""
+ elapsed = 0
+ while not function():
+ elapsed += delay
+ if elapsed > timeout: return False
+ delay *= 2
+ time.sleep(delay)
+ return True
class Popen(popen2.Popen3):
"""
@@ -95,7 +122,8 @@ class Popen(popen2.Popen3):
self.outfile = file(self.outname, "w")
self.outfile.write(line)
finally:
- if self.outfile is not None: self.outfile.close
+ self.infile.close()
+ if self.outfile is not None: self.outfile.close()
class OutStream(ExceptionWrapper):
"""Wrapper for output streams, handles excpetions & draining output"""
@@ -116,10 +144,12 @@ class Popen(popen2.Popen3):
expect - if set verify expectation at end of test.
drain - if true (default) drain stdout/stderr to files.
"""
+ assert find_exe(cmd[0])
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
popen2.Popen3.__init__(self, self.cmd, True)
self.expect = expect
+ self.was_shutdown = False # Set if we deliberately kill/terminate the process
self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid)
msg = "Process %s" % self.pname
self.stdin = ExceptionWrapper(self.tochild, msg)
@@ -128,22 +158,24 @@ class Popen(popen2.Popen3):
f = file(self.outfile("cmd"), "w")
try: f.write(self.cmd_str())
finally: f.close()
- log.debug("Started process %s" % self.pname)
+ log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
if drain: self.drain()
def drain(self):
+ """Start threads to drain stdout/err"""
self.stdout.drain()
self.stderr.drain()
def drain_join(self):
+ """Join the drain threads"""
self.stdout.thread.join()
self.stderr.thread.join()
def unexpected(self,msg):
self.drain()
self.drain_join()
- raise BadProcessStatus("%s %s%s" % (self.pname, msg,
- error_line(self.outfile("err"))))
+ err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
+ raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
def stop(self): # Clean up at end of test.
self.drain()
@@ -157,6 +189,7 @@ class Popen(popen2.Popen3):
self.unexpected("expected running, exit code %d" % self.wait())
else:
# Give the process some time to exit.
+ # FIXME aconway 2010-03-02: use retry
delay = 0.1
while (self.poll() is None and delay < 1):
time.sleep(delay)
@@ -168,6 +201,7 @@ class Popen(popen2.Popen3):
self.unexpected("exit code %d" % self.returncode)
elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
self.unexpected("expected error")
+ self.stdin.close()
def communicate(self, input=None):
if input:
@@ -195,6 +229,8 @@ class Popen(popen2.Popen3):
return self.returncode
def send_signal(self, sig):
+ log.debug("kill -%s %s"%(sig, self.pname))
+ self.was_shutdown = True
os.kill(self.pid,sig)
self.wait()
@@ -219,13 +255,13 @@ class Broker(Popen):
self.log = "%s-%d.log" % (self.name, i)
i += 1
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0):
"""Start a broker daemon. name determines the data-dir and log
file names."""
self.test = test
- self._port = None
- cmd = [BrokerTest.qpidd_exec, "--port=0", "--no-module-dir", "--auth=no"] + args
+ self._port=port
+ cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir", "--auth=no"] + args
if name: self.name = name
else:
self.name = "broker%d" % Broker._broker_count
@@ -237,12 +273,14 @@ class Broker(Popen):
cmd += ["--data-dir", self.datadir]
Popen.__init__(self, cmd, expect, drain=False)
test.cleanup_stop(self)
- self.host = "localhost"
+ self._host = "localhost"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ def host(self): return self._host
+
def port(self):
# Read port from broker process stdout if not already read.
- if (self._port is None):
+ if (self._port == 0):
try: self._port = int(self.stdout.readline())
except ValueError, e:
raise Exception("Can't get port for broker %s (%s)%s" %
@@ -254,11 +292,11 @@ class Broker(Popen):
def connect(self):
"""New API connection to the broker."""
- return messaging.Connection.open(self.host, self.port())
+ return messaging.Connection.open(self.host(), self.port())
def connect_old(self):
"""Old API connection to the broker."""
- socket = qpid.util.connect(self.host,self.port())
+ socket = qpid.util.connect(self.host(),self.port())
connection = qpid.connection.Connection (sock=socket)
connection.start()
return connection;
@@ -308,11 +346,28 @@ class Broker(Popen):
s.connection.close()
return m
- def host_port(self): return "%s:%s" % (self.host, self.port())
+ def host_port(self): return "%s:%s" % (self.host(), self.port())
+
+ def log_ready(self):
+ """Return true if the log file exists and contains a broker ready message"""
+ if not os.path.exists(self.log): return False
+ ready_msg = re.compile("notice Broker running")
+ f = file(self.log)
+ try:
+ for l in f:
+ if ready_msg.search(l): return True
+ return False
+ finally: f.close()
+ # FIXME aconway 2010-03-02: rename to wait_ready
def ready(self):
"""Wait till broker is ready to serve clients"""
- self.connect().close()
+ # First make sure the broker is listening by checking the log.
+ if not retry(lambda: self.log_ready()):
+ raise Exception("Timed out waiting for broker %s" % self.name)
+ # Make a connection, this will wait for extended cluster init to finish.
+ try: self.connect().close()
+ except: raise RethrownException("Broker %s failed ready test %s"%self.name)
class Cluster:
"""A cluster of brokers in a test."""
@@ -332,11 +387,10 @@ class Cluster:
self.args += [ "--load-module", BrokerTest.cluster_lib ]
self.start_n(count, expect=expect, wait=wait)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[]):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
- log.debug("Cluster %s starting member %s" % (self.name, name))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait))
+ self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
return self._brokers[-1]
def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
@@ -392,10 +446,13 @@ class BrokerTest(TestCase):
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING,wait=True,port=0):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect)
- if (wait): b.connect().close()
+ b = Broker(self, args=args, name=name, expect=expect, port=port)
+ if (wait):
+ try: b.ready()
+ except Exception, e:
+ raise Exception("Failed to start broker %s: %s" % ( b.name, e))
return b
def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):