diff options
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 671 |
1 files changed, 671 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py new file mode 100644 index 0000000000..a19dd305e5 --- /dev/null +++ b/qpid/cpp/src/tests/brokertest.py @@ -0,0 +1,671 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Support library for tests that start multiple brokers, e.g. cluster +# or federation + +import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re +import qpid, traceback, signal +from qpid import connection, messaging, util +from qpid.compat import format_exc +from qpid.harness import Skipped +from unittest import TestCase +from copy import copy +from threading import Thread, Lock, Condition +from logging import getLogger +import qmf.console + +log = getLogger("qpid.brokertest") + +# Values for expected outcome of process at end of test +EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. +EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test. +EXPECT_RUNNING=3 # Expect to still be running at end of test +EXPECT_UNKNOWN=4 # No expectation, don't check exit status. + +def find_exe(program): + """Find an executable in the system PATH""" + def is_exe(fpath): + return os.path.isfile(fpath) and os.access(fpath, os.X_OK) + mydir, name = os.path.split(program) + if mydir: + if is_exe(program): return program + else: + for path in os.environ["PATH"].split(os.pathsep): + exe_file = os.path.join(path, program) + if is_exe(exe_file): return exe_file + return None + +def is_running(pid): + try: + os.kill(pid, 0) + return True + except: + return False + +class BadProcessStatus(Exception): + pass + +def error_line(filename, n=1): + """Get the last n line(s) of filename for error messages""" + result = [] + try: + f = open(filename) + try: + for l in f: + if len(result) == n: result.pop(0) + result.append(" "+l) + finally: + f.close() + except: return "" + return ":\n" + "".join(result) + +def retry(function, timeout=10, delay=.01): + """Call function until it returns True or timeout expires. + Double the delay for each retry. Return True if function + returns true, False if timeout expires.""" + deadline = time.time() + timeout + while not function(): + remaining = deadline - time.time() + if remaining <= 0: return False + delay = min(delay, remaining) + time.sleep(delay) + delay *= 2 + return True + +class AtomicCounter: + def __init__(self): + self.count = 0 + self.lock = Lock() + + def next(self): + self.lock.acquire(); + ret = self.count + self.count += 1 + self.lock.release(); + return ret + +_popen_id = AtomicCounter() # Popen identifier for use in output file names. + +# Constants for file descriptor arguments to Popen +FILE = "FILE" # Write to file named after process +PIPE = subprocess.PIPE + +class Popen(subprocess.Popen): + """ + Can set and verify expectation of process status at end of test. + Dumps command line, stdout, stderr to data dir for debugging. + """ + + def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): + """Run cmd (should be a list of program and arguments) + expect - if set verify expectation at end of test. + stdout, stderr - can have the same values as for subprocess.Popen as well as + FILE (the default) which means write to a file named after the process. + stdin - like subprocess.Popen but defauts to PIPE + """ + self._clean = False + self._clean_lock = Lock() + assert find_exe(cmd[0]), "executable not found: "+cmd[0] + if type(cmd) is type(""): cmd = [cmd] # Make it a list. + self.cmd = [ str(x) for x in cmd ] + self.expect = expect + self.id = _popen_id.next() + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id) + if stdout == FILE: stdout = open(self.outfile("out"), "w") + if stderr == FILE: stderr = open(self.outfile("err"), "w") + try: + subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, + stdin=stdin, stdout=stdout, stderr=stderr, + close_fds=True) + except ValueError: # Windows can't do close_fds + subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, + stdin=stdin, stdout=stdout, stderr=stderr) + + f = open(self.outfile("cmd"), "w") + try: f.write("%s\n%d"%(self.cmd_str(), self.pid)) + finally: f.close() + log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) + + def __str__(self): return "Popen<%s>"%(self.pname) + + def outfile(self, ext): return "%s.%s" % (self.pname, ext) + + def unexpected(self,msg): + err = error_line(self.outfile("err")) or error_line(self.outfile("out")) + raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) + + def stop(self): # Clean up at end of test. + try: + if self.expect == EXPECT_UNKNOWN: + try: self.kill() # Just make sure its dead + except: pass + elif self.expect == EXPECT_RUNNING: + try: self.kill() + except: self.unexpected("expected running, exit code %d" % self.wait()) + else: + retry(lambda: self.poll() is not None) + if self.returncode is None: # Still haven't stopped + self.kill() + self.unexpected("still running") + elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: + self.unexpected("exit code %d" % self.returncode) + elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: + self.unexpected("expected error") + finally: + self.wait() # Clean up the process. + + def communicate(self, input=None): + ret = subprocess.Popen.communicate(self, input) + self.cleanup() + return ret + + def is_running(self): return self.poll() is None + + def assert_running(self): + if not self.is_running(): self.unexpected("Exit code %d" % self.returncode) + + def wait(self): + ret = subprocess.Popen.wait(self) + self._cleanup() + return ret + + def terminate(self): + try: subprocess.Popen.terminate(self) + except AttributeError: # No terminate method + try: + os.kill( self.pid , signal.SIGTERM) + except AttributeError: # no os.kill, using taskkill.. (Windows only) + os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') + self._cleanup() + + def kill(self): + try: subprocess.Popen.kill(self) + except AttributeError: # No terminate method + try: + os.kill( self.pid , signal.SIGKILL) + except AttributeError: # no os.kill, using taskkill.. (Windows only) + os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') + self._cleanup() + + def _cleanup(self): + """Clean up after a dead process""" + self._clean_lock.acquire() + if not self._clean: + self._clean = True + try: self.stdin.close() + except: pass + try: self.stdout.close() + except: pass + try: self.stderr.close() + except: pass + self._clean_lock.release() + + def cmd_str(self): return " ".join([str(s) for s in self.cmd]) + +def checkenv(name): + value = os.getenv(name) + if not value: raise Exception("Environment variable %s is not set" % name) + return value + +def find_in_file(str, filename): + if not os.path.exists(filename): return False + f = open(filename) + try: return str in f.read() + finally: f.close() + +class Broker(Popen): + "A broker process. Takes care of start, stop and logging." + _broker_count = 0 + + def __str__(self): return "Broker<%s %s>"%(self.name, self.pname) + + def find_log(self): + self.log = "%s.log" % self.name + i = 1 + while (os.path.exists(self.log)): + self.log = "%s-%d.log" % (self.name, i) + i += 1 + + def get_log(self): + return os.path.abspath(self.log) + + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None): + """Start a broker daemon. name determines the data-dir and log + file names.""" + + self.test = test + self._port=port + if BrokerTest.store_lib: + args = args + ['--load-module', BrokerTest.store_lib] + if BrokerTest.sql_store_lib: + args = args + ['--load-module', BrokerTest.sql_store_lib] + args = args + ['--catalog', BrokerTest.sql_catalog] + if BrokerTest.sql_clfs_store_lib: + args = args + ['--load-module', BrokerTest.sql_clfs_store_lib] + args = args + ['--catalog', BrokerTest.sql_catalog] + cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir"] + args + if not "--auth" in args: cmd.append("--auth=no") + if wait != None: + cmd += ["--wait", str(wait)] + if name: self.name = name + else: + self.name = "broker%d" % Broker._broker_count + Broker._broker_count += 1 + self.find_log() + cmd += ["--log-to-file", self.log] + cmd += ["--log-to-stderr=no"] + if log_level != None: + cmd += ["--log-enable=%s" % log_level] + self.datadir = self.name + cmd += ["--data-dir", self.datadir] + Popen.__init__(self, cmd, expect, stdout=PIPE) + test.cleanup_stop(self) + self._host = "127.0.0.1" + log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) + self._log_ready = False + + def startQmf(self, handler=None): + self.qmf_session = qmf.console.Session(handler) + self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) + + def host(self): return self._host + + def port(self): + # Read port from broker process stdout if not already read. + if (self._port == 0): + try: self._port = int(self.stdout.readline()) + except ValueError: + raise Exception("Can't get port for broker %s (%s)%s" % + (self.name, self.pname, error_line(self.log,5))) + return self._port + + def unexpected(self,msg): + raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) + + def connect(self, **kwargs): + """New API connection to the broker.""" + return messaging.Connection.establish(self.host_port(), **kwargs) + + def connect_old(self): + """Old API connection to the broker.""" + socket = qpid.util.connect(self.host(),self.port()) + connection = qpid.connection.Connection (sock=socket) + connection.start() + return connection; + + def declare_queue(self, queue): + c = self.connect_old() + s = c.session(str(qpid.datatypes.uuid4())) + s.queue_declare(queue=queue) + c.close() + + def _prep_sender(self, queue, durable, xprops): + s = queue + "; {create:always, node:{durable:" + str(durable) + if xprops != None: s += ", x-declare:{" + xprops + "}" + return s + "}}" + + def send_message(self, queue, message, durable=True, xprops=None, session=None): + if session == None: + s = self.connect().session() + else: + s = session + s.sender(self._prep_sender(queue, durable, xprops)).send(message) + if session == None: + s.connection.close() + + def send_messages(self, queue, messages, durable=True, xprops=None, session=None): + if session == None: + s = self.connect().session() + else: + s = session + sender = s.sender(self._prep_sender(queue, durable, xprops)) + for m in messages: sender.send(m) + if session == None: + s.connection.close() + + def get_message(self, queue): + s = self.connect().session() + m = s.receiver(queue+"; {create:always}", capacity=1).fetch(timeout=1) + s.acknowledge() + s.connection.close() + return m + + def get_messages(self, queue, n): + s = self.connect().session() + receiver = s.receiver(queue+"; {create:always}", capacity=n) + m = [receiver.fetch(timeout=1) for i in range(n)] + s.acknowledge() + s.connection.close() + return m + + def host_port(self): return "%s:%s" % (self.host(), self.port()) + + def log_ready(self): + """Return true if the log file exists and contains a broker ready message""" + if not self._log_ready: + self._log_ready = find_in_file("notice Broker running", self.log) + return self._log_ready + + def ready(self, **kwargs): + """Wait till broker is ready to serve clients""" + # First make sure the broker is listening by checking the log. + if not retry(self.log_ready, timeout=60): + raise Exception( + "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) + # Create a connection and a session. For a cluster broker this will + # return after cluster init has finished. + try: + c = self.connect(**kwargs) + try: c.session() + finally: c.close() + except Exception,e: raise RethrownException( + "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) + + def store_state(self): + f = open(os.path.join(self.datadir, "cluster", "store.status")) + try: uuids = f.readlines() + finally: f.close() + null_uuid="00000000-0000-0000-0000-000000000000\n" + if len(uuids) < 2: return "unknown" # we looked while the file was being updated. + if uuids[0] == null_uuid: return "empty" + if uuids[1] == null_uuid: return "dirty" + return "clean" + +class Cluster: + """A cluster of brokers in a test.""" + + _cluster_count = 0 + + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + self.test = test + self._brokers=[] + self.name = "cluster%d" % Cluster._cluster_count + Cluster._cluster_count += 1 + # Use unique cluster name + self.args = copy(args) + self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] + self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] + assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" + self.args += [ "--load-module", BrokerTest.cluster_lib ] + self.start_n(count, expect=expect, wait=wait) + + def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): + """Add a broker to the cluster. Returns the index of the new broker.""" + if not name: name="%s-%d" % (self.name, len(self._brokers)) + self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) + return self._brokers[-1] + + def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): + for i in range(count): self.start(expect=expect, wait=wait, args=args) + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + +class BrokerTest(TestCase): + """ + Tracks processes started by test and kills at end of test. + Provides a well-known working directory for each test. + """ + + # Environment settings. + qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) + cluster_lib = os.getenv("CLUSTER_LIB") + xml_lib = os.getenv("XML_LIB") + qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") + qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") + receiver_exec = os.getenv("RECEIVER_EXEC") + sender_exec = os.getenv("SENDER_EXEC") + sql_store_lib = os.getenv("STORE_SQL_LIB") + sql_clfs_store_lib = os.getenv("STORE_SQL_CLFS_LIB") + sql_catalog = os.getenv("STORE_CATALOG") + store_lib = os.getenv("STORE_LIB") + test_store_lib = os.getenv("TEST_STORE_LIB") + rootdir = os.getcwd() + + def configure(self, config): self.config=config + + def setUp(self): + outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" + self.dir = os.path.join(self.rootdir, outdir, self.id()) + os.makedirs(self.dir) + os.chdir(self.dir) + self.stopem = [] # things to stop at end of test + + def tearDown(self): + err = [] + for p in self.stopem: + try: p.stop() + except Exception, e: err.append(str(e)) + self.stopem = [] # reset in case more processes start + os.chdir(self.rootdir) + if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) + + def cleanup_stop(self, stopable): + """Call thing.stop at end of test""" + self.stopem.append(stopable) + + def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): + """Start a process that will be killed at end of test, in the test dir.""" + os.chdir(self.dir) + p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) + self.cleanup_stop(p) + return p + + def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None): + """Create and return a broker ready for use""" + b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level) + if (wait): + try: b.ready() + except Exception, e: + raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) + return b + + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): + """Create and return a cluster ready for use""" + cluster = Cluster(self, count, args, expect=expect, wait=wait) + return cluster + + def browse(self, session, queue, timeout=0): + """Assert that the contents of messages on queue (as retrieved + using session and timeout) exactly match the strings in + expect_contents""" + r = session.receiver("%s;{mode:browse}"%(queue)) + try: + contents = [] + try: + while True: contents.append(r.fetch(timeout=timeout).content) + except messaging.Empty: pass + finally: pass #FIXME aconway 2011-04-14: r.close() + return contents + + def assert_browse(self, session, queue, expect_contents, timeout=0): + """Assert that the contents of messages on queue (as retrieved + using session and timeout) exactly match the strings in + expect_contents""" + actual_contents = self.browse(session, queue, timeout) + self.assertEqual(expect_contents, actual_contents) + +def join(thread, timeout=10): + thread.join(timeout) + if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) + +class RethrownException(Exception): + """Captures the stack trace of the current exception to be thrown later""" + def __init__(self, msg=""): + Exception.__init__(self, msg+"\n"+format_exc()) + +class StoppableThread(Thread): + """ + Base class for threads that do something in a loop and periodically check + to see if they have been stopped. + """ + def __init__(self): + self.stopped = False + self.error = None + Thread.__init__(self) + + def stop(self): + self.stopped = True + join(self) + if self.error: raise self.error + +class NumberedSender(Thread): + """ + Thread to run a sender client and send numbered messages until stopped. + """ + + def __init__(self, broker, max_depth=None, queue="test-queue"): + """ + max_depth: enable flow control, ensure sent - received <= max_depth. + Requires self.notify_received(n) to be called each time messages are received. + """ + Thread.__init__(self) + self.sender = broker.test.popen( + ["qpid-send", + "--broker", "localhost:%s"%broker.port(), + "--address", "%s;{create:always}"%queue, + "--failover-updates", + "--content-stdin" + ], + expect=EXPECT_RUNNING, + stdin=PIPE) + self.condition = Condition() + self.max = max_depth + self.received = 0 + self.stopped = False + self.error = None + + def write_message(self, n): + self.sender.stdin.write(str(n)+"\n") + self.sender.stdin.flush() + + def run(self): + try: + self.sent = 0 + while not self.stopped: + if self.max: + self.condition.acquire() + while not self.stopped and self.sent - self.received > self.max: + self.condition.wait() + self.condition.release() + self.write_message(self.sent) + self.sent += 1 + except Exception: self.error = RethrownException(self.sender.pname) + + def notify_received(self, count): + """Called by receiver to enable flow control. count = messages received so far.""" + self.condition.acquire() + self.received = count + self.condition.notify() + self.condition.release() + + def stop(self): + self.condition.acquire() + try: + self.stopped = True + self.condition.notify() + finally: self.condition.release() + join(self) + self.write_message(-1) # end-of-messages marker. + if self.error: raise self.error + +class NumberedReceiver(Thread): + """ + Thread to run a receiver client and verify it receives + sequentially numbered messages. + """ + def __init__(self, broker, sender = None, queue="test-queue"): + """ + sender: enable flow control. Call sender.received(n) for each message received. + """ + Thread.__init__(self) + self.test = broker.test + self.receiver = self.test.popen( + ["qpid-receive", + "--broker", "localhost:%s"%broker.port(), + "--address", "%s;{create:always}"%queue, + "--failover-updates", + "--forever" + ], + expect=EXPECT_RUNNING, + stdout=PIPE) + self.lock = Lock() + self.error = None + self.sender = sender + + def read_message(self): + return int(self.receiver.stdout.readline()) + + def run(self): + try: + self.received = 0 + m = self.read_message() + while m != -1: + assert(m <= self.received) # Check for missing messages + if (m == self.received): # Ignore duplicates + self.received += 1 + if self.sender: + self.sender.notify_received(self.received) + m = self.read_message() + except Exception: + self.error = RethrownException(self.receiver.pname) + + def stop(self): + """Returns when termination message is received""" + join(self) + if self.error: raise self.error + +class ErrorGenerator(StoppableThread): + """ + Thread that continuously generates errors by trying to consume from + a non-existent queue. For cluster regression tests, error handling + caused issues in the past. + """ + + def __init__(self, broker): + StoppableThread.__init__(self) + self.broker=broker + broker.test.cleanup_stop(self) + self.start() + + def run(self): + c = self.broker.connect_old() + try: + while not self.stopped: + try: + c.session(str(qpid.datatypes.uuid4())).message_subscribe( + queue="non-existent-queue") + assert(False) + except qpid.session.SessionException: pass + time.sleep(0.01) + except: pass # Normal if broker is killed. + +def import_script(path): + """ + Import executable script at path as a module. + Requires some trickery as scripts are not in standard module format + """ + f = open(path) + try: + name=os.path.split(path)[1].replace("-","_") + return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) + finally: f.close() |