summaryrefslogtreecommitdiff
path: root/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r--cpp/src/tests/brokertest.py312
1 files changed, 162 insertions, 150 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index 16d7fb0b78..98f58ebfdd 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -29,7 +29,6 @@ from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
from logging import getLogger
-import qmf.console
log = getLogger("qpid.brokertest")
@@ -62,6 +61,24 @@ def is_running(pid):
class BadProcessStatus(Exception):
pass
+class ExceptionWrapper:
+ """Proxy object that adds a message to exceptions raised"""
+ def __init__(self, obj, msg):
+ self.obj = obj
+ self.msg = msg
+
+ def __getattr__(self, name):
+ func = getattr(self.obj, name)
+ if type(func) != callable:
+ return func
+ return lambda *args, **kwargs: self._wrap(func, args, kwargs)
+
+ def _wrap(self, func, args, kwargs):
+ try:
+ return func(*args, **kwargs)
+ except Exception, e:
+ raise Exception("%s: %s" %(self.msg, str(e)))
+
def error_line(filename, n=1):
"""Get the last n line(s) of filename for error messages"""
result = []
@@ -71,8 +88,7 @@ def error_line(filename, n=1):
for l in f:
if len(result) == n: result.pop(0)
result.append(" "+l)
- finally:
- f.close()
+ finally: f.close()
except: return ""
return ":\n" + "".join(result)
@@ -80,90 +96,111 @@ def retry(function, timeout=10, delay=.01):
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
- deadline = time.time() + timeout
while not function():
- remaining = deadline - time.time()
- if remaining <= 0: return False
- delay = min(delay, remaining)
+ if delay > timeout: delay = timeout
time.sleep(delay)
+ timeout -= delay
+ if timeout <= 0: return False
delay *= 2
return True
-class AtomicCounter:
- def __init__(self):
- self.count = 0
- self.lock = Lock()
-
- def next(self):
- self.lock.acquire();
- ret = self.count
- self.count += 1
- self.lock.release();
- return ret
-
-_popen_id = AtomicCounter() # Popen identifier for use in output file names.
-
-# Constants for file descriptor arguments to Popen
-FILE = "FILE" # Write to file named after process
-PIPE = subprocess.PIPE
-
class Popen(subprocess.Popen):
"""
Can set and verify expectation of process status at end of test.
Dumps command line, stdout, stderr to data dir for debugging.
"""
- def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
- """Run cmd (should be a list of program and arguments)
+ class DrainThread(Thread):
+ """Thread to drain a file object and write the data to a file."""
+ def __init__(self, infile, outname):
+ Thread.__init__(self)
+ self.infile, self.outname = infile, outname
+ self.outfile = None
+
+ def run(self):
+ try:
+ for line in self.infile:
+ if self.outfile is None:
+ self.outfile = open(self.outname, "w")
+ self.outfile.write(line)
+ finally:
+ self.infile.close()
+ if self.outfile is not None: self.outfile.close()
+
+ class OutStream(ExceptionWrapper):
+ """Wrapper for output streams, handles exceptions & draining output"""
+ def __init__(self, infile, outfile, msg):
+ ExceptionWrapper.__init__(self, infile, msg)
+ self.infile, self.outfile = infile, outfile
+ self.thread = None
+
+ def drain(self):
+ if self.thread is None:
+ self.thread = Popen.DrainThread(self.infile, self.outfile)
+ self.thread.start()
+
+ def outfile(self, ext): return "%s.%s" % (self.pname, ext)
+
+ def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
+ """Run cmd (should be a list of arguments)
expect - if set verify expectation at end of test.
- stdout, stderr - can have the same values as for subprocess.Popen as well as
- FILE (the default) which means write to a file named after the process.
- stdin - like subprocess.Popen but defauts to PIPE
+ drain - if true (default) drain stdout/stderr to files.
"""
self._clean = False
self._clean_lock = Lock()
assert find_exe(cmd[0]), "executable not found: "+cmd[0]
if type(cmd) is type(""): cmd = [cmd] # Make it a list.
self.cmd = [ str(x) for x in cmd ]
+ self.returncode = None
self.expect = expect
- self.id = _popen_id.next()
- self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
- if stdout == FILE: stdout = open(self.outfile("out"), "w")
- if stderr == FILE: stderr = open(self.outfile("err"), "w")
try:
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr,
- close_fds=True)
- except ValueError: # Windows can't do close_fds
- subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
- stdin=stdin, stdout=stdout, stderr=stderr)
-
+ subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True)
+ except ValueError: # Windows can't do close_fds
+ subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE)
+ self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid)
+ msg = "Process %s" % self.pname
+ self.stdin = ExceptionWrapper(self.stdin, msg)
+ self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg)
+ self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg)
f = open(self.outfile("cmd"), "w")
- try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
+ try: f.write(self.cmd_str())
finally: f.close()
log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
+ if drain: self.drain()
- def __str__(self): return "Popen<%s>"%(self.pname)
+ def __str__(self): return "Popen<%s>"%(self.pname)
- def outfile(self, ext): return "%s.%s" % (self.pname, ext)
+ def drain(self):
+ """Start threads to drain stdout/err"""
+ self.stdout.drain()
+ self.stderr.drain()
+
+ def _cleanup(self):
+ """Close pipes to sub-process"""
+ self._clean_lock.acquire()
+ try:
+ if self._clean: return
+ self._clean = True
+ self.stdin.close()
+ self.drain() # Drain output pipes.
+ self.stdout.thread.join() # Drain thread closes pipe.
+ self.stderr.thread.join()
+ finally: self._clean_lock.release()
def unexpected(self,msg):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
-
+
def stop(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
try: self.kill() # Just make sure its dead
except: pass
elif self.expect == EXPECT_RUNNING:
- if self.poll() != None:
- self.unexpected("expected running, exit code %d" % self.returncode)
- else:
- try:
- self.kill()
- except Exception,e:
- self.unexpected("exception from kill: %s" % str(e))
+ try:
+ self.kill()
+ except:
+ self.unexpected("expected running, exit code %d" % self.wait())
else:
retry(lambda: self.poll() is not None)
if self.returncode is None: # Still haven't stopped
@@ -175,21 +212,40 @@ class Popen(subprocess.Popen):
self.unexpected("expected error")
finally:
self.wait() # Clean up the process.
-
+
def communicate(self, input=None):
- ret = subprocess.Popen.communicate(self, input)
- self.cleanup()
- return ret
+ if input:
+ self.stdin.write(input)
+ self.stdin.close()
+ outerr = (self.stdout.read(), self.stderr.read())
+ self.wait()
+ return outerr
- def is_running(self): return self.poll() is None
+ def is_running(self):
+ return self.poll() is None
def assert_running(self):
if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
+ def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
+ if self.returncode is None:
+ # Pass _deadstate only if it has been set, there is no _deadstate
+ # parameter in Python 2.6
+ if _deadstate is None: ret = subprocess.Popen.poll(self)
+ else: ret = subprocess.Popen.poll(self, _deadstate)
+
+ if (ret != -1):
+ self.returncode = ret
+ self._cleanup()
+ return self.returncode
+
def wait(self):
- ret = subprocess.Popen.wait(self)
- self._cleanup()
- return ret
+ if self.returncode is None:
+ self.drain()
+ try: self.returncode = subprocess.Popen.wait(self)
+ except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e))
+ self._cleanup()
+ return self.returncode
def terminate(self):
try: subprocess.Popen.terminate(self)
@@ -198,8 +254,7 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
-
+
def kill(self):
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
@@ -207,20 +262,6 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGKILL)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
-
- def _cleanup(self):
- """Clean up after a dead process"""
- self._clean_lock.acquire()
- if not self._clean:
- self._clean = True
- try: self.stdin.close()
- except: pass
- try: self.stdout.close()
- except: pass
- try: self.stderr.close()
- except: pass
- self._clean_lock.release()
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
@@ -247,11 +288,11 @@ class Broker(Popen):
while (os.path.exists(self.log)):
self.log = "%s-%d.log" % (self.name, i)
i += 1
-
+
def get_log(self):
return os.path.abspath(self.log)
- def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False):
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
"""Start a broker daemon. name determines the data-dir and log
file names."""
@@ -277,20 +318,15 @@ class Broker(Popen):
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
+ cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
- if show_cmd: print cmd
- Popen.__init__(self, cmd, expect, stdout=PIPE)
+ Popen.__init__(self, cmd, expect, drain=False)
test.cleanup_stop(self)
self._host = "127.0.0.1"
log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
self._log_ready = False
- def startQmf(self, handler=None):
- self.qmf_session = qmf.console.Session(handler)
- self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
-
def host(self): return self._host
def port(self):
@@ -321,7 +357,7 @@ class Broker(Popen):
s = c.session(str(qpid.datatypes.uuid4()))
s.queue_declare(queue=queue)
c.close()
-
+
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
if xprops != None: s += ", x-declare:{" + xprops + "}"
@@ -365,14 +401,13 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
- if not self._log_ready:
- self._log_ready = find_in_file("notice Broker running", self.log)
- return self._log_ready
+ if self._log_ready: return True
+ self._log_ready = find_in_file("notice Broker running", self.log)
def ready(self, **kwargs):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=60):
+ if not retry(self.log_ready, timeout=30):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
# Create a connection and a session. For a cluster broker this will
@@ -381,27 +416,23 @@ class Broker(Popen):
c = self.connect(**kwargs)
try: c.session()
finally: c.close()
- except Exception,e: raise RethrownException(
- "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
+ except: raise RethrownException(
+ "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
def store_state(self):
- f = open(os.path.join(self.datadir, "cluster", "store.status"))
- try: uuids = f.readlines()
- finally: f.close()
+ uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
null_uuid="00000000-0000-0000-0000-000000000000\n"
if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
if uuids[0] == null_uuid: return "empty"
if uuids[1] == null_uuid: return "dirty"
return "clean"
-
+
class Cluster:
"""A cluster of brokers in a test."""
- # Client connection options for use in failover tests.
- CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
_cluster_count = 0
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
self.test = test
self._brokers=[]
self.name = "cluster%d" % Cluster._cluster_count
@@ -412,19 +443,16 @@ class Cluster:
self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
+ self.start_n(count, expect=expect, wait=wait)
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
+ self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
return self._brokers[-1]
- def ready(self):
- for b in self: b.ready()
-
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
- for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -453,7 +481,7 @@ class BrokerTest(TestCase):
rootdir = os.getcwd()
def configure(self, config): self.config=config
-
+
def setUp(self):
outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
@@ -474,49 +502,40 @@ class BrokerTest(TestCase):
"""Call thing.stop at end of test"""
self.stopem.append(stopable)
- def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
+ def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True):
"""Start a process that will be killed at end of test, in the test dir."""
os.chdir(self.dir)
- p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
+ p = Popen(cmd, expect, drain)
self.cleanup_stop(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
"""Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
+ cluster = Cluster(self, count, args, expect=expect, wait=wait)
return cluster
- def browse(self, session, queue, timeout=0):
- """Return a list with the contents of each message on queue."""
- r = session.receiver("%s;{mode:browse}"%(queue))
- r.capacity = 100
- try:
- contents = []
- try:
- while True: contents.append(r.fetch(timeout=timeout).content)
- except messaging.Empty: pass
- finally: r.close()
- return contents
-
def assert_browse(self, session, queue, expect_contents, timeout=0):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
- actual_contents = self.browse(session, queue, timeout)
- self.assertEqual(expect_contents, actual_contents)
-def join(thread, timeout=10):
- thread.join(timeout)
- if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
+ r = session.receiver("%s;{mode:browse}"%(queue))
+ actual_contents = []
+ try:
+ for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content)
+ while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages.
+ except messaging.Empty: pass
+ r.close()
+ self.assertEqual(expect_contents, actual_contents)
class RethrownException(Exception):
"""Captures the stack trace of the current exception to be thrown later"""
@@ -535,16 +554,15 @@ class StoppableThread(Thread):
def stop(self):
self.stopped = True
- join(self)
+ self.join()
if self.error: raise self.error
-
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
- def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ def __init__(self, broker, max_depth=None, queue="test-queue"):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -555,11 +573,9 @@ class NumberedSender(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
"--content-stdin"
],
- expect=EXPECT_RUNNING,
- stdin=PIPE)
+ expect=EXPECT_RUNNING)
self.condition = Condition()
self.max = max_depth
self.received = 0
@@ -574,7 +590,6 @@ class NumberedSender(Thread):
try:
self.sent = 0
while not self.stopped:
- self.sender.assert_running()
if self.max:
self.condition.acquire()
while not self.stopped and self.sent - self.received > self.max:
@@ -597,17 +612,16 @@ class NumberedSender(Thread):
self.stopped = True
self.condition.notify()
finally: self.condition.release()
- join(self)
+ self.join()
self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
-
+
class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker, sender = None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ def __init__(self, broker, sender = None, queue="test-queue"):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
@@ -618,24 +632,22 @@ class NumberedReceiver(Thread):
"--broker", "localhost:%s"%broker.port(),
"--address", "%s;{create:always}"%queue,
"--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
"--forever"
],
expect=EXPECT_RUNNING,
- stdout=PIPE)
+ drain=False)
self.lock = Lock()
self.error = None
self.sender = sender
- self.received = 0
def read_message(self):
return int(self.receiver.stdout.readline())
-
+
def run(self):
try:
+ self.received = 0
m = self.read_message()
while m != -1:
- self.receiver.assert_running()
assert(m <= self.received) # Check for missing messages
if (m == self.received): # Ignore duplicates
self.received += 1
@@ -647,7 +659,7 @@ class NumberedReceiver(Thread):
def stop(self):
"""Returns when termination message is received"""
- join(self)
+ self.join()
if self.error: raise self.error
class ErrorGenerator(StoppableThread):
@@ -662,7 +674,7 @@ class ErrorGenerator(StoppableThread):
self.broker=broker
broker.test.cleanup_stop(self)
self.start()
-
+
def run(self):
c = self.broker.connect_old()
try: