summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py671
1 files changed, 671 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
new file mode 100644
index 0000000000..a19dd305e5
--- /dev/null
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -0,0 +1,671 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Support library for tests that start multiple brokers, e.g. cluster
+# or federation
+
+import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
+import qpid, traceback, signal
+from qpid import connection, messaging, util
+from qpid.compat import format_exc
+from qpid.harness import Skipped
+from unittest import TestCase
+from copy import copy
+from threading import Thread, Lock, Condition
+from logging import getLogger
+import qmf.console
+
+log = getLogger("qpid.brokertest")
+
+# Values for expected outcome of process at end of test
+EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
+EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test.
+EXPECT_RUNNING=3 # Expect to still be running at end of test
+EXPECT_UNKNOWN=4 # No expectation, don't check exit status.
+
+def find_exe(program):
+ """Find an executable in the system PATH"""
+ def is_exe(fpath):
+ return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
+ mydir, name = os.path.split(program)
+ if mydir:
+ if is_exe(program): return program
+ else:
+ for path in os.environ["PATH"].split(os.pathsep):
+ exe_file = os.path.join(path, program)
+ if is_exe(exe_file): return exe_file
+ return None
+
+def is_running(pid):
+ try:
+ os.kill(pid, 0)
+ return True
+ except:
+ return False
+
+class BadProcessStatus(Exception):
+ pass
+
+def error_line(filename, n=1):
+ """Get the last n line(s) of filename for error messages"""
+ result = []
+ try:
+ f = open(filename)
+ try:
+ for l in f:
+ if len(result) == n: result.pop(0)
+ result.append(" "+l)
+ finally:
+ f.close()
+ except: return ""
+ return ":\n" + "".join(result)
+
+def retry(function, timeout=10, delay=.01):
+ """Call function until it returns True or timeout expires.
+ Double the delay for each retry. Return True if function
+ returns true, False if timeout expires."""
+ deadline = time.time() + timeout
+ while not function():
+ remaining = deadline - time.time()
+ if remaining <= 0: return False
+ delay = min(delay, remaining)
+ time.sleep(delay)
+ delay *= 2
+ return True
+
+class AtomicCounter:
+ def __init__(self):
+ self.count = 0
+ self.lock = Lock()
+
+ def next(self):
+ self.lock.acquire();
+ ret = self.count
+ self.count += 1
+ self.lock.release();
+ return ret
+
+_popen_id = AtomicCounter() # Popen identifier for use in output file names.
+
+# Constants for file descriptor arguments to Popen
+FILE = "FILE" # Write to file named after process
+PIPE = subprocess.PIPE
+
+class Popen(subprocess.Popen):
+ """
+ Can set and verify expectation of process status at end of test.
+ Dumps command line, stdout, stderr to data dir for debugging.
+ """
+
+ def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
+ """Run cmd (should be a list of program and arguments)
+ expect - if set verify expectation at end of test.
+ stdout, stderr - can have the same values as for subprocess.Popen as well as
+ FILE (the default) which means write to a file named after the process.
+ stdin - like subprocess.Popen but defauts to PIPE
+ """
+ self._clean = False
+ self._clean_lock = Lock()
+ assert find_exe(cmd[0]), "executable not found: "+cmd[0]
+ if type(cmd) is type(""): cmd = [cmd] # Make it a list.
+ self.cmd = [ str(x) for x in cmd ]
+ self.expect = expect
+ self.id = _popen_id.next()
+ self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id)
+ if stdout == FILE: stdout = open(self.outfile("out"), "w")
+ if stderr == FILE: stderr = open(self.outfile("err"), "w")
+ try:
+ subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+ stdin=stdin, stdout=stdout, stderr=stderr,
+ close_fds=True)
+ except ValueError: # Windows can't do close_fds
+ subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None,
+ stdin=stdin, stdout=stdout, stderr=stderr)
+
+ f = open(self.outfile("cmd"), "w")
+ try: f.write("%s\n%d"%(self.cmd_str(), self.pid))
+ finally: f.close()
+ log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd)))
+
+ def __str__(self): return "Popen<%s>"%(self.pname)
+
+ def outfile(self, ext): return "%s.%s" % (self.pname, ext)
+
+ def unexpected(self,msg):
+ err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
+ raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
+
+ def stop(self): # Clean up at end of test.
+ try:
+ if self.expect == EXPECT_UNKNOWN:
+ try: self.kill() # Just make sure its dead
+ except: pass
+ elif self.expect == EXPECT_RUNNING:
+ try: self.kill()
+ except: self.unexpected("expected running, exit code %d" % self.wait())
+ else:
+ retry(lambda: self.poll() is not None)
+ if self.returncode is None: # Still haven't stopped
+ self.kill()
+ self.unexpected("still running")
+ elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
+ self.unexpected("exit code %d" % self.returncode)
+ elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
+ self.unexpected("expected error")
+ finally:
+ self.wait() # Clean up the process.
+
+ def communicate(self, input=None):
+ ret = subprocess.Popen.communicate(self, input)
+ self.cleanup()
+ return ret
+
+ def is_running(self): return self.poll() is None
+
+ def assert_running(self):
+ if not self.is_running(): self.unexpected("Exit code %d" % self.returncode)
+
+ def wait(self):
+ ret = subprocess.Popen.wait(self)
+ self._cleanup()
+ return ret
+
+ def terminate(self):
+ try: subprocess.Popen.terminate(self)
+ except AttributeError: # No terminate method
+ try:
+ os.kill( self.pid , signal.SIGTERM)
+ except AttributeError: # no os.kill, using taskkill.. (Windows only)
+ os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
+ self._cleanup()
+
+ def kill(self):
+ try: subprocess.Popen.kill(self)
+ except AttributeError: # No terminate method
+ try:
+ os.kill( self.pid , signal.SIGKILL)
+ except AttributeError: # no os.kill, using taskkill.. (Windows only)
+ os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
+ self._cleanup()
+
+ def _cleanup(self):
+ """Clean up after a dead process"""
+ self._clean_lock.acquire()
+ if not self._clean:
+ self._clean = True
+ try: self.stdin.close()
+ except: pass
+ try: self.stdout.close()
+ except: pass
+ try: self.stderr.close()
+ except: pass
+ self._clean_lock.release()
+
+ def cmd_str(self): return " ".join([str(s) for s in self.cmd])
+
+def checkenv(name):
+ value = os.getenv(name)
+ if not value: raise Exception("Environment variable %s is not set" % name)
+ return value
+
+def find_in_file(str, filename):
+ if not os.path.exists(filename): return False
+ f = open(filename)
+ try: return str in f.read()
+ finally: f.close()
+
+class Broker(Popen):
+ "A broker process. Takes care of start, stop and logging."
+ _broker_count = 0
+
+ def __str__(self): return "Broker<%s %s>"%(self.name, self.pname)
+
+ def find_log(self):
+ self.log = "%s.log" % self.name
+ i = 1
+ while (os.path.exists(self.log)):
+ self.log = "%s-%d.log" % (self.name, i)
+ i += 1
+
+ def get_log(self):
+ return os.path.abspath(self.log)
+
+ def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None):
+ """Start a broker daemon. name determines the data-dir and log
+ file names."""
+
+ self.test = test
+ self._port=port
+ if BrokerTest.store_lib:
+ args = args + ['--load-module', BrokerTest.store_lib]
+ if BrokerTest.sql_store_lib:
+ args = args + ['--load-module', BrokerTest.sql_store_lib]
+ args = args + ['--catalog', BrokerTest.sql_catalog]
+ if BrokerTest.sql_clfs_store_lib:
+ args = args + ['--load-module', BrokerTest.sql_clfs_store_lib]
+ args = args + ['--catalog', BrokerTest.sql_catalog]
+ cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir"] + args
+ if not "--auth" in args: cmd.append("--auth=no")
+ if wait != None:
+ cmd += ["--wait", str(wait)]
+ if name: self.name = name
+ else:
+ self.name = "broker%d" % Broker._broker_count
+ Broker._broker_count += 1
+ self.find_log()
+ cmd += ["--log-to-file", self.log]
+ cmd += ["--log-to-stderr=no"]
+ if log_level != None:
+ cmd += ["--log-enable=%s" % log_level]
+ self.datadir = self.name
+ cmd += ["--data-dir", self.datadir]
+ Popen.__init__(self, cmd, expect, stdout=PIPE)
+ test.cleanup_stop(self)
+ self._host = "127.0.0.1"
+ log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log))
+ self._log_ready = False
+
+ def startQmf(self, handler=None):
+ self.qmf_session = qmf.console.Session(handler)
+ self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port()))
+
+ def host(self): return self._host
+
+ def port(self):
+ # Read port from broker process stdout if not already read.
+ if (self._port == 0):
+ try: self._port = int(self.stdout.readline())
+ except ValueError:
+ raise Exception("Can't get port for broker %s (%s)%s" %
+ (self.name, self.pname, error_line(self.log,5)))
+ return self._port
+
+ def unexpected(self,msg):
+ raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
+
+ def connect(self, **kwargs):
+ """New API connection to the broker."""
+ return messaging.Connection.establish(self.host_port(), **kwargs)
+
+ def connect_old(self):
+ """Old API connection to the broker."""
+ socket = qpid.util.connect(self.host(),self.port())
+ connection = qpid.connection.Connection (sock=socket)
+ connection.start()
+ return connection;
+
+ def declare_queue(self, queue):
+ c = self.connect_old()
+ s = c.session(str(qpid.datatypes.uuid4()))
+ s.queue_declare(queue=queue)
+ c.close()
+
+ def _prep_sender(self, queue, durable, xprops):
+ s = queue + "; {create:always, node:{durable:" + str(durable)
+ if xprops != None: s += ", x-declare:{" + xprops + "}"
+ return s + "}}"
+
+ def send_message(self, queue, message, durable=True, xprops=None, session=None):
+ if session == None:
+ s = self.connect().session()
+ else:
+ s = session
+ s.sender(self._prep_sender(queue, durable, xprops)).send(message)
+ if session == None:
+ s.connection.close()
+
+ def send_messages(self, queue, messages, durable=True, xprops=None, session=None):
+ if session == None:
+ s = self.connect().session()
+ else:
+ s = session
+ sender = s.sender(self._prep_sender(queue, durable, xprops))
+ for m in messages: sender.send(m)
+ if session == None:
+ s.connection.close()
+
+ def get_message(self, queue):
+ s = self.connect().session()
+ m = s.receiver(queue+"; {create:always}", capacity=1).fetch(timeout=1)
+ s.acknowledge()
+ s.connection.close()
+ return m
+
+ def get_messages(self, queue, n):
+ s = self.connect().session()
+ receiver = s.receiver(queue+"; {create:always}", capacity=n)
+ m = [receiver.fetch(timeout=1) for i in range(n)]
+ s.acknowledge()
+ s.connection.close()
+ return m
+
+ def host_port(self): return "%s:%s" % (self.host(), self.port())
+
+ def log_ready(self):
+ """Return true if the log file exists and contains a broker ready message"""
+ if not self._log_ready:
+ self._log_ready = find_in_file("notice Broker running", self.log)
+ return self._log_ready
+
+ def ready(self, **kwargs):
+ """Wait till broker is ready to serve clients"""
+ # First make sure the broker is listening by checking the log.
+ if not retry(self.log_ready, timeout=60):
+ raise Exception(
+ "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
+ # Create a connection and a session. For a cluster broker this will
+ # return after cluster init has finished.
+ try:
+ c = self.connect(**kwargs)
+ try: c.session()
+ finally: c.close()
+ except Exception,e: raise RethrownException(
+ "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
+
+ def store_state(self):
+ f = open(os.path.join(self.datadir, "cluster", "store.status"))
+ try: uuids = f.readlines()
+ finally: f.close()
+ null_uuid="00000000-0000-0000-0000-000000000000\n"
+ if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
+ if uuids[0] == null_uuid: return "empty"
+ if uuids[1] == null_uuid: return "dirty"
+ return "clean"
+
+class Cluster:
+ """A cluster of brokers in a test."""
+
+ _cluster_count = 0
+
+ def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ self.test = test
+ self._brokers=[]
+ self.name = "cluster%d" % Cluster._cluster_count
+ Cluster._cluster_count += 1
+ # Use unique cluster name
+ self.args = copy(args)
+ self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
+ self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
+ assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
+ self.args += [ "--load-module", BrokerTest.cluster_lib ]
+ self.start_n(count, expect=expect, wait=wait)
+
+ def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0):
+ """Add a broker to the cluster. Returns the index of the new broker."""
+ if not name: name="%s-%d" % (self.name, len(self._brokers))
+ self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port))
+ return self._brokers[-1]
+
+ def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]):
+ for i in range(count): self.start(expect=expect, wait=wait, args=args)
+
+ # Behave like a list of brokers.
+ def __len__(self): return len(self._brokers)
+ def __getitem__(self,index): return self._brokers[index]
+ def __iter__(self): return self._brokers.__iter__()
+
+class BrokerTest(TestCase):
+ """
+ Tracks processes started by test and kills at end of test.
+ Provides a well-known working directory for each test.
+ """
+
+ # Environment settings.
+ qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
+ cluster_lib = os.getenv("CLUSTER_LIB")
+ xml_lib = os.getenv("XML_LIB")
+ qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
+ qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
+ receiver_exec = os.getenv("RECEIVER_EXEC")
+ sender_exec = os.getenv("SENDER_EXEC")
+ sql_store_lib = os.getenv("STORE_SQL_LIB")
+ sql_clfs_store_lib = os.getenv("STORE_SQL_CLFS_LIB")
+ sql_catalog = os.getenv("STORE_CATALOG")
+ store_lib = os.getenv("STORE_LIB")
+ test_store_lib = os.getenv("TEST_STORE_LIB")
+ rootdir = os.getcwd()
+
+ def configure(self, config): self.config=config
+
+ def setUp(self):
+ outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
+ self.dir = os.path.join(self.rootdir, outdir, self.id())
+ os.makedirs(self.dir)
+ os.chdir(self.dir)
+ self.stopem = [] # things to stop at end of test
+
+ def tearDown(self):
+ err = []
+ for p in self.stopem:
+ try: p.stop()
+ except Exception, e: err.append(str(e))
+ self.stopem = [] # reset in case more processes start
+ os.chdir(self.rootdir)
+ if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
+
+ def cleanup_stop(self, stopable):
+ """Call thing.stop at end of test"""
+ self.stopem.append(stopable)
+
+ def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE):
+ """Start a process that will be killed at end of test, in the test dir."""
+ os.chdir(self.dir)
+ p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr)
+ self.cleanup_stop(p)
+ return p
+
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None):
+ """Create and return a broker ready for use"""
+ b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level)
+ if (wait):
+ try: b.ready()
+ except Exception, e:
+ raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
+ return b
+
+ def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True):
+ """Create and return a cluster ready for use"""
+ cluster = Cluster(self, count, args, expect=expect, wait=wait)
+ return cluster
+
+ def browse(self, session, queue, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+ r = session.receiver("%s;{mode:browse}"%(queue))
+ try:
+ contents = []
+ try:
+ while True: contents.append(r.fetch(timeout=timeout).content)
+ except messaging.Empty: pass
+ finally: pass #FIXME aconway 2011-04-14: r.close()
+ return contents
+
+ def assert_browse(self, session, queue, expect_contents, timeout=0):
+ """Assert that the contents of messages on queue (as retrieved
+ using session and timeout) exactly match the strings in
+ expect_contents"""
+ actual_contents = self.browse(session, queue, timeout)
+ self.assertEqual(expect_contents, actual_contents)
+
+def join(thread, timeout=10):
+ thread.join(timeout)
+ if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
+
+class RethrownException(Exception):
+ """Captures the stack trace of the current exception to be thrown later"""
+ def __init__(self, msg=""):
+ Exception.__init__(self, msg+"\n"+format_exc())
+
+class StoppableThread(Thread):
+ """
+ Base class for threads that do something in a loop and periodically check
+ to see if they have been stopped.
+ """
+ def __init__(self):
+ self.stopped = False
+ self.error = None
+ Thread.__init__(self)
+
+ def stop(self):
+ self.stopped = True
+ join(self)
+ if self.error: raise self.error
+
+class NumberedSender(Thread):
+ """
+ Thread to run a sender client and send numbered messages until stopped.
+ """
+
+ def __init__(self, broker, max_depth=None, queue="test-queue"):
+ """
+ max_depth: enable flow control, ensure sent - received <= max_depth.
+ Requires self.notify_received(n) to be called each time messages are received.
+ """
+ Thread.__init__(self)
+ self.sender = broker.test.popen(
+ ["qpid-send",
+ "--broker", "localhost:%s"%broker.port(),
+ "--address", "%s;{create:always}"%queue,
+ "--failover-updates",
+ "--content-stdin"
+ ],
+ expect=EXPECT_RUNNING,
+ stdin=PIPE)
+ self.condition = Condition()
+ self.max = max_depth
+ self.received = 0
+ self.stopped = False
+ self.error = None
+
+ def write_message(self, n):
+ self.sender.stdin.write(str(n)+"\n")
+ self.sender.stdin.flush()
+
+ def run(self):
+ try:
+ self.sent = 0
+ while not self.stopped:
+ if self.max:
+ self.condition.acquire()
+ while not self.stopped and self.sent - self.received > self.max:
+ self.condition.wait()
+ self.condition.release()
+ self.write_message(self.sent)
+ self.sent += 1
+ except Exception: self.error = RethrownException(self.sender.pname)
+
+ def notify_received(self, count):
+ """Called by receiver to enable flow control. count = messages received so far."""
+ self.condition.acquire()
+ self.received = count
+ self.condition.notify()
+ self.condition.release()
+
+ def stop(self):
+ self.condition.acquire()
+ try:
+ self.stopped = True
+ self.condition.notify()
+ finally: self.condition.release()
+ join(self)
+ self.write_message(-1) # end-of-messages marker.
+ if self.error: raise self.error
+
+class NumberedReceiver(Thread):
+ """
+ Thread to run a receiver client and verify it receives
+ sequentially numbered messages.
+ """
+ def __init__(self, broker, sender = None, queue="test-queue"):
+ """
+ sender: enable flow control. Call sender.received(n) for each message received.
+ """
+ Thread.__init__(self)
+ self.test = broker.test
+ self.receiver = self.test.popen(
+ ["qpid-receive",
+ "--broker", "localhost:%s"%broker.port(),
+ "--address", "%s;{create:always}"%queue,
+ "--failover-updates",
+ "--forever"
+ ],
+ expect=EXPECT_RUNNING,
+ stdout=PIPE)
+ self.lock = Lock()
+ self.error = None
+ self.sender = sender
+
+ def read_message(self):
+ return int(self.receiver.stdout.readline())
+
+ def run(self):
+ try:
+ self.received = 0
+ m = self.read_message()
+ while m != -1:
+ assert(m <= self.received) # Check for missing messages
+ if (m == self.received): # Ignore duplicates
+ self.received += 1
+ if self.sender:
+ self.sender.notify_received(self.received)
+ m = self.read_message()
+ except Exception:
+ self.error = RethrownException(self.receiver.pname)
+
+ def stop(self):
+ """Returns when termination message is received"""
+ join(self)
+ if self.error: raise self.error
+
+class ErrorGenerator(StoppableThread):
+ """
+ Thread that continuously generates errors by trying to consume from
+ a non-existent queue. For cluster regression tests, error handling
+ caused issues in the past.
+ """
+
+ def __init__(self, broker):
+ StoppableThread.__init__(self)
+ self.broker=broker
+ broker.test.cleanup_stop(self)
+ self.start()
+
+ def run(self):
+ c = self.broker.connect_old()
+ try:
+ while not self.stopped:
+ try:
+ c.session(str(qpid.datatypes.uuid4())).message_subscribe(
+ queue="non-existent-queue")
+ assert(False)
+ except qpid.session.SessionException: pass
+ time.sleep(0.01)
+ except: pass # Normal if broker is killed.
+
+def import_script(path):
+ """
+ Import executable script at path as a module.
+ Requires some trickery as scripts are not in standard module format
+ """
+ f = open(path)
+ try:
+ name=os.path.split(path)[1].replace("-","_")
+ return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE))
+ finally: f.close()