diff options
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r-- | cpp/src/tests/brokertest.py | 312 |
1 files changed, 162 insertions, 150 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 16d7fb0b78..98f58ebfdd 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -29,7 +29,6 @@ from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger -import qmf.console log = getLogger("qpid.brokertest") @@ -62,6 +61,24 @@ def is_running(pid): class BadProcessStatus(Exception): pass +class ExceptionWrapper: + """Proxy object that adds a message to exceptions raised""" + def __init__(self, obj, msg): + self.obj = obj + self.msg = msg + + def __getattr__(self, name): + func = getattr(self.obj, name) + if type(func) != callable: + return func + return lambda *args, **kwargs: self._wrap(func, args, kwargs) + + def _wrap(self, func, args, kwargs): + try: + return func(*args, **kwargs) + except Exception, e: + raise Exception("%s: %s" %(self.msg, str(e))) + def error_line(filename, n=1): """Get the last n line(s) of filename for error messages""" result = [] @@ -71,8 +88,7 @@ def error_line(filename, n=1): for l in f: if len(result) == n: result.pop(0) result.append(" "+l) - finally: - f.close() + finally: f.close() except: return "" return ":\n" + "".join(result) @@ -80,90 +96,111 @@ def retry(function, timeout=10, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" - deadline = time.time() + timeout while not function(): - remaining = deadline - time.time() - if remaining <= 0: return False - delay = min(delay, remaining) + if delay > timeout: delay = timeout time.sleep(delay) + timeout -= delay + if timeout <= 0: return False delay *= 2 return True -class AtomicCounter: - def __init__(self): - self.count = 0 - self.lock = Lock() - - def next(self): - self.lock.acquire(); - ret = self.count - self.count += 1 - self.lock.release(); - return ret - -_popen_id = AtomicCounter() # Popen identifier for use in output file names. - -# Constants for file descriptor arguments to Popen -FILE = "FILE" # Write to file named after process -PIPE = subprocess.PIPE - class Popen(subprocess.Popen): """ Can set and verify expectation of process status at end of test. Dumps command line, stdout, stderr to data dir for debugging. """ - def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): - """Run cmd (should be a list of program and arguments) + class DrainThread(Thread): + """Thread to drain a file object and write the data to a file.""" + def __init__(self, infile, outname): + Thread.__init__(self) + self.infile, self.outname = infile, outname + self.outfile = None + + def run(self): + try: + for line in self.infile: + if self.outfile is None: + self.outfile = open(self.outname, "w") + self.outfile.write(line) + finally: + self.infile.close() + if self.outfile is not None: self.outfile.close() + + class OutStream(ExceptionWrapper): + """Wrapper for output streams, handles exceptions & draining output""" + def __init__(self, infile, outfile, msg): + ExceptionWrapper.__init__(self, infile, msg) + self.infile, self.outfile = infile, outfile + self.thread = None + + def drain(self): + if self.thread is None: + self.thread = Popen.DrainThread(self.infile, self.outfile) + self.thread.start() + + def outfile(self, ext): return "%s.%s" % (self.pname, ext) + + def __init__(self, cmd, expect=EXPECT_EXIT_OK, drain=True): + """Run cmd (should be a list of arguments) expect - if set verify expectation at end of test. - stdout, stderr - can have the same values as for subprocess.Popen as well as - FILE (the default) which means write to a file named after the process. - stdin - like subprocess.Popen but defauts to PIPE + drain - if true (default) drain stdout/stderr to files. """ self._clean = False self._clean_lock = Lock() assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] + self.returncode = None self.expect = expect - self.id = _popen_id.next() - self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id) - if stdout == FILE: stdout = open(self.outfile("out"), "w") - if stderr == FILE: stderr = open(self.outfile("err"), "w") try: - subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, - stdin=stdin, stdout=stdout, stderr=stderr, - close_fds=True) - except ValueError: # Windows can't do close_fds - subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, - stdin=stdin, stdout=stdout, stderr=stderr) - + subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE, close_fds=True) + except ValueError: # Windows can't do close_fds + subprocess.Popen.__init__(self, self.cmd, 0, None, subprocess.PIPE, subprocess.PIPE, subprocess.PIPE) + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.pid) + msg = "Process %s" % self.pname + self.stdin = ExceptionWrapper(self.stdin, msg) + self.stdout = Popen.OutStream(self.stdout, self.outfile("out"), msg) + self.stderr = Popen.OutStream(self.stderr, self.outfile("err"), msg) f = open(self.outfile("cmd"), "w") - try: f.write("%s\n%d"%(self.cmd_str(), self.pid)) + try: f.write(self.cmd_str()) finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) + if drain: self.drain() - def __str__(self): return "Popen<%s>"%(self.pname) + def __str__(self): return "Popen<%s>"%(self.pname) - def outfile(self, ext): return "%s.%s" % (self.pname, ext) + def drain(self): + """Start threads to drain stdout/err""" + self.stdout.drain() + self.stderr.drain() + + def _cleanup(self): + """Close pipes to sub-process""" + self._clean_lock.acquire() + try: + if self._clean: return + self._clean = True + self.stdin.close() + self.drain() # Drain output pipes. + self.stdout.thread.join() # Drain thread closes pipe. + self.stderr.thread.join() + finally: self._clean_lock.release() def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) - + def stop(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: - if self.poll() != None: - self.unexpected("expected running, exit code %d" % self.returncode) - else: - try: - self.kill() - except Exception,e: - self.unexpected("exception from kill: %s" % str(e)) + try: + self.kill() + except: + self.unexpected("expected running, exit code %d" % self.wait()) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped @@ -175,21 +212,40 @@ class Popen(subprocess.Popen): self.unexpected("expected error") finally: self.wait() # Clean up the process. - + def communicate(self, input=None): - ret = subprocess.Popen.communicate(self, input) - self.cleanup() - return ret + if input: + self.stdin.write(input) + self.stdin.close() + outerr = (self.stdout.read(), self.stderr.read()) + self.wait() + return outerr - def is_running(self): return self.poll() is None + def is_running(self): + return self.poll() is None def assert_running(self): if not self.is_running(): self.unexpected("Exit code %d" % self.returncode) + def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4 + if self.returncode is None: + # Pass _deadstate only if it has been set, there is no _deadstate + # parameter in Python 2.6 + if _deadstate is None: ret = subprocess.Popen.poll(self) + else: ret = subprocess.Popen.poll(self, _deadstate) + + if (ret != -1): + self.returncode = ret + self._cleanup() + return self.returncode + def wait(self): - ret = subprocess.Popen.wait(self) - self._cleanup() - return ret + if self.returncode is None: + self.drain() + try: self.returncode = subprocess.Popen.wait(self) + except OSError,e: raise OSError("Wait failed %s: %s"%(self.pname, e)) + self._cleanup() + return self.returncode def terminate(self): try: subprocess.Popen.terminate(self) @@ -198,8 +254,7 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - self._cleanup() - + def kill(self): try: subprocess.Popen.kill(self) except AttributeError: # No terminate method @@ -207,20 +262,6 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - self._cleanup() - - def _cleanup(self): - """Clean up after a dead process""" - self._clean_lock.acquire() - if not self._clean: - self._clean = True - try: self.stdin.close() - except: pass - try: self.stdout.close() - except: pass - try: self.stderr.close() - except: pass - self._clean_lock.release() def cmd_str(self): return " ".join([str(s) for s in self.cmd]) @@ -247,11 +288,11 @@ class Broker(Popen): while (os.path.exists(self.log)): self.log = "%s-%d.log" % (self.name, i) i += 1 - + def get_log(self): return os.path.abspath(self.log) - def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None): """Start a broker daemon. name determines the data-dir and log file names.""" @@ -277,20 +318,15 @@ class Broker(Popen): cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] if log_level != None: - cmd += ["--log-enable=%s" % log_level] + cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] - if show_cmd: print cmd - Popen.__init__(self, cmd, expect, stdout=PIPE) + Popen.__init__(self, cmd, expect, drain=False) test.cleanup_stop(self) self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) self._log_ready = False - def startQmf(self, handler=None): - self.qmf_session = qmf.console.Session(handler) - self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) - def host(self): return self._host def port(self): @@ -321,7 +357,7 @@ class Broker(Popen): s = c.session(str(qpid.datatypes.uuid4())) s.queue_declare(queue=queue) c.close() - + def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) if xprops != None: s += ", x-declare:{" + xprops + "}" @@ -365,14 +401,13 @@ class Broker(Popen): def log_ready(self): """Return true if the log file exists and contains a broker ready message""" - if not self._log_ready: - self._log_ready = find_in_file("notice Broker running", self.log) - return self._log_ready + if self._log_ready: return True + self._log_ready = find_in_file("notice Broker running", self.log) def ready(self, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=60): + if not retry(self.log_ready, timeout=30): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will @@ -381,27 +416,23 @@ class Broker(Popen): c = self.connect(**kwargs) try: c.session() finally: c.close() - except Exception,e: raise RethrownException( - "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) + except: raise RethrownException( + "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5))) def store_state(self): - f = open(os.path.join(self.datadir, "cluster", "store.status")) - try: uuids = f.readlines() - finally: f.close() + uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines() null_uuid="00000000-0000-0000-0000-000000000000\n" if len(uuids) < 2: return "unknown" # we looked while the file was being updated. if uuids[0] == null_uuid: return "empty" if uuids[1] == null_uuid: return "dirty" return "clean" - + class Cluster: """A cluster of brokers in a test.""" - # Client connection options for use in failover tests. - CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true" _cluster_count = 0 - def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count @@ -412,19 +443,16 @@ class Cluster: self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", BrokerTest.cluster_lib ] - self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) + self.start_n(count, expect=expect, wait=wait) - def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) - self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) return self._brokers[-1] - def ready(self): - for b in self: b.ready() - - def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): - for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): + for i in range(count): self.start(expect=expect, wait=wait, args=args) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -453,7 +481,7 @@ class BrokerTest(TestCase): rootdir = os.getcwd() def configure(self, config): self.config=config - + def setUp(self): outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) @@ -474,49 +502,40 @@ class BrokerTest(TestCase): """Call thing.stop at end of test""" self.stopem.append(stopable) - def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): + def popen(self, cmd, expect=EXPECT_EXIT_OK, drain=True): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) - p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) + p = Popen(cmd, expect, drain) self.cleanup_stop(p) return p - def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False): + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None): """Create and return a broker ready for use""" - b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd) + b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b - def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): """Create and return a cluster ready for use""" - cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) + cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster - def browse(self, session, queue, timeout=0): - """Return a list with the contents of each message on queue.""" - r = session.receiver("%s;{mode:browse}"%(queue)) - r.capacity = 100 - try: - contents = [] - try: - while True: contents.append(r.fetch(timeout=timeout).content) - except messaging.Empty: pass - finally: r.close() - return contents - def assert_browse(self, session, queue, expect_contents, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" - actual_contents = self.browse(session, queue, timeout) - self.assertEqual(expect_contents, actual_contents) -def join(thread, timeout=10): - thread.join(timeout) - if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) + r = session.receiver("%s;{mode:browse}"%(queue)) + actual_contents = [] + try: + for c in expect_contents: actual_contents.append(r.fetch(timeout=timeout).content) + while True: actual_contents.append(r.fetch(timeout=0).content) # Check for extra messages. + except messaging.Empty: pass + r.close() + self.assertEqual(expect_contents, actual_contents) class RethrownException(Exception): """Captures the stack trace of the current exception to be thrown later""" @@ -535,16 +554,15 @@ class StoppableThread(Thread): def stop(self): self.stopped = True - join(self) + self.join() if self.error: raise self.error - + class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ - def __init__(self, broker, max_depth=None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS): + def __init__(self, broker, max_depth=None, queue="test-queue"): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. @@ -555,11 +573,9 @@ class NumberedSender(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", - "--connection-options", "{%s}"%(connection_options), "--content-stdin" ], - expect=EXPECT_RUNNING, - stdin=PIPE) + expect=EXPECT_RUNNING) self.condition = Condition() self.max = max_depth self.received = 0 @@ -574,7 +590,6 @@ class NumberedSender(Thread): try: self.sent = 0 while not self.stopped: - self.sender.assert_running() if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: @@ -597,17 +612,16 @@ class NumberedSender(Thread): self.stopped = True self.condition.notify() finally: self.condition.release() - join(self) + self.join() self.write_message(-1) # end-of-messages marker. if self.error: raise self.error - + class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. """ - def __init__(self, broker, sender = None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS): + def __init__(self, broker, sender = None, queue="test-queue"): """ sender: enable flow control. Call sender.received(n) for each message received. """ @@ -618,24 +632,22 @@ class NumberedReceiver(Thread): "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", - "--connection-options", "{%s}"%(connection_options), "--forever" ], expect=EXPECT_RUNNING, - stdout=PIPE) + drain=False) self.lock = Lock() self.error = None self.sender = sender - self.received = 0 def read_message(self): return int(self.receiver.stdout.readline()) - + def run(self): try: + self.received = 0 m = self.read_message() while m != -1: - self.receiver.assert_running() assert(m <= self.received) # Check for missing messages if (m == self.received): # Ignore duplicates self.received += 1 @@ -647,7 +659,7 @@ class NumberedReceiver(Thread): def stop(self): """Returns when termination message is received""" - join(self) + self.join() if self.error: raise self.error class ErrorGenerator(StoppableThread): @@ -662,7 +674,7 @@ class ErrorGenerator(StoppableThread): self.broker=broker broker.test.cleanup_stop(self) self.start() - + def run(self): c = self.broker.connect_old() try: |