# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Support library for tests that start multiple brokers, e.g. cluster # or federation import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal from qpid import connection, messaging, util from qpid.compat import format_exc from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger import qmf.console log = getLogger("qpid.brokertest") # Values for expected outcome of process at end of test EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test. EXPECT_RUNNING=3 # Expect to still be running at end of test EXPECT_UNKNOWN=4 # No expectation, don't check exit status. def find_exe(program): """Find an executable in the system PATH""" def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) mydir, name = os.path.split(program) if mydir: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def is_running(pid): try: os.kill(pid, 0) return True except: return False class BadProcessStatus(Exception): pass def error_line(filename, n=1): """Get the last n line(s) of filename for error messages""" result = [] try: f = open(filename) try: for l in f: if len(result) == n: result.pop(0) result.append(" "+l) finally: f.close() except: return "" return ":\n" + "".join(result) def retry(function, timeout=10, delay=.01, max_delay=1): """Call function until it returns a true value or timeout expires. Double the delay for each retry up to max_delay. Returns what function returns if true, None if timeout expires.""" deadline = time.time() + timeout ret = None while True: ret = function() if ret: return ret remaining = deadline - time.time() if remaining <= 0: return False delay = min(delay, remaining) time.sleep(delay) delay = min(delay*2, max_delay) class AtomicCounter: def __init__(self): self.count = 0 self.lock = Lock() def next(self): self.lock.acquire(); ret = self.count self.count += 1 self.lock.release(); return ret _popen_id = AtomicCounter() # Popen identifier for use in output file names. # Constants for file descriptor arguments to Popen FILE = "FILE" # Write to file named after process PIPE = subprocess.PIPE class Popen(subprocess.Popen): """ Can set and verify expectation of process status at end of test. Dumps command line, stdout, stderr to data dir for debugging. """ def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Run cmd (should be a list of program and arguments) expect - if set verify expectation at end of test. stdout, stderr - can have the same values as for subprocess.Popen as well as FILE (the default) which means write to a file named after the process. stdin - like subprocess.Popen but defauts to PIPE """ self._clean = False self._clean_lock = Lock() assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] self.expect = expect self.id = _popen_id.next() self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id) if stdout == FILE: stdout = open(self.outfile("out"), "w") if stderr == FILE: stderr = open(self.outfile("err"), "w") try: subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True) except ValueError: # Windows can't do close_fds subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, stdin=stdin, stdout=stdout, stderr=stderr) f = open(self.outfile("cmd"), "w") try: f.write("%s\n%d"%(self.cmd_str(), self.pid)) finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) def __str__(self): return "Popen<%s>"%(self.pname) def outfile(self, ext): return "%s.%s" % (self.pname, ext) def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) def stop(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: if self.poll() != None: self.unexpected("expected running, exit code %d" % self.returncode) else: try: self.kill() except Exception,e: self.unexpected("exception from kill: %s" % str(e)) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped self.kill() self.unexpected("still running") elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: self.unexpected("exit code %d" % self.returncode) elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: self.unexpected("expected error") finally: self.wait() # Clean up the process. def communicate(self, input=None): ret = subprocess.Popen.communicate(self, input) self.cleanup() return ret def is_running(self): return self.poll() is None def assert_running(self): if not self.is_running(): self.unexpected("Exit code %d" % self.returncode) def wait(self): ret = subprocess.Popen.wait(self) self._cleanup() return ret def terminate(self): try: subprocess.Popen.terminate(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') self.wait() def kill(self): try: subprocess.Popen.kill(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') self.wait() def _cleanup(self): """Clean up after a dead process""" self._clean_lock.acquire() if not self._clean: self._clean = True try: self.stdin.close() except: pass try: self.stdout.close() except: pass try: self.stderr.close() except: pass self._clean_lock.release() def cmd_str(self): return " ".join([str(s) for s in self.cmd]) def checkenv(name): value = os.getenv(name) if not value: raise Exception("Environment variable %s is not set" % name) return value def find_in_file(str, filename): if not os.path.exists(filename): return False f = open(filename) try: return str in f.read() finally: f.close() class Broker(Popen): "A broker process. Takes care of start, stop and logging." _broker_count = 0 _log_count = 0 def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port()) def find_log(self): self.log = "%03d:%s.log" % (Broker._log_count, self.name) Broker._log_count += 1 def get_log(self): return os.path.abspath(self.log) def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None, show_cmd=False): """Start a broker daemon. name determines the data-dir and log file names.""" self.test = test self._port=port if BrokerTest.store_lib: args = args + ['--load-module', BrokerTest.store_lib] if BrokerTest.sql_store_lib: args = args + ['--load-module', BrokerTest.sql_store_lib] args = args + ['--catalog', BrokerTest.sql_catalog] if BrokerTest.sql_clfs_store_lib: args = args + ['--load-module', BrokerTest.sql_clfs_store_lib] args = args + ['--catalog', BrokerTest.sql_catalog] cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir"] + args if not "--auth" in args: cmd.append("--auth=no") if wait != None: cmd += ["--wait", str(wait)] if name: self.name = name else: self.name = "broker%d" % Broker._broker_count Broker._broker_count += 1 self.find_log() cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] cmd += ["--log-enable=%s"%(log_level or "info+") ] self.datadir = self.name cmd += ["--data-dir", self.datadir] if show_cmd: print cmd Popen.__init__(self, cmd, expect, stdout=PIPE) test.cleanup_stop(self) self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) self._log_ready = False def startQmf(self, handler=None): self.qmf_session = qmf.console.Session(handler) self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) def host(self): return self._host def port(self): # Read port from broker process stdout if not already read. if (self._port == 0): try: self._port = int(self.stdout.readline()) except ValueError, e: raise Exception("Can't get port for broker %s (%s)%s: %s" % (self.name, self.pname, error_line(self.log,5), e)) return self._port def unexpected(self,msg): raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) def connect(self, **kwargs): """New API connection to the broker.""" return messaging.Connection.establish(self.host_port(), **kwargs) def connect_old(self): """Old API connection to the broker.""" socket = qpid.util.connect(self.host(),self.port()) connection = qpid.connection.Connection (sock=socket) connection.start() return connection; def declare_queue(self, queue): c = self.connect_old() s = c.session(str(qpid.datatypes.uuid4())) s.queue_declare(queue=queue) c.close() def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) if xprops != None: s += ", x-declare:{" + xprops + "}" return s + "}}" def send_message(self, queue, message, durable=True, xprops=None, session=None): if session == None: s = self.connect().session() else: s = session s.sender(self._prep_sender(queue, durable, xprops)).send(message) if session == None: s.connection.close() def send_messages(self, queue, messages, durable=True, xprops=None, session=None): if session == None: s = self.connect().session() else: s = session sender = s.sender(self._prep_sender(queue, durable, xprops)) for m in messages: sender.send(m) if session == None: s.connection.close() def get_message(self, queue): s = self.connect().session() m = s.receiver(queue+"; {create:always}", capacity=1).fetch(timeout=1) s.acknowledge() s.connection.close() return m def get_messages(self, queue, n): s = self.connect().session() receiver = s.receiver(queue+"; {create:always}", capacity=n) m = [receiver.fetch(timeout=1) for i in range(n)] s.acknowledge() s.connection.close() return m def host_port(self): return "%s:%s" % (self.host(), self.port()) def log_contains(self, str, timeout=1): """Wait for str to appear in the log file up to timeout. Return true if found""" return retry(lambda: find_in_file(str, self.log), timeout) def log_ready(self): """Return true if the log file exists and contains a broker ready message""" if not self._log_ready: self._log_ready = find_in_file("notice Broker running", self.log) return self._log_ready def ready(self, timeout=30, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. if not retry(self.log_ready, timeout=timeout): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will # return after cluster init has finished. try: c = self.connect(**kwargs) try: c.session() finally: c.close() except Exception,e: raise RethrownException( "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) def store_state(self): f = open(os.path.join(self.datadir, "cluster", "store.status")) try: uuids = f.readlines() finally: f.close() null_uuid="00000000-0000-0000-0000-000000000000\n" if len(uuids) < 2: return "unknown" # we looked while the file was being updated. if uuids[0] == null_uuid: return "empty" if uuids[1] == null_uuid: return "dirty" return "clean" class Cluster: """A cluster of brokers in a test.""" # Client connection options for use in failover tests. CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true" _cluster_count = 0 def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count Cluster._cluster_count += 1 # Use unique cluster name self.args = copy(args) self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", BrokerTest.cluster_lib ] self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd) def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd)) return self._brokers[-1] def ready(self): for b in self: b.ready() def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False): for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd) # Behave like a list of brokers. def __len__(self): return len(self._brokers) def __getitem__(self,index): return self._brokers[index] def __iter__(self): return self._brokers.__iter__() def browse(session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) r.capacity = 100 try: contents = [] try: while True: contents.append(transform(r.fetch(timeout=timeout))) except messaging.Empty: pass finally: r.close() return contents def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" actual_contents = browse(session, queue, timeout, transform=transform) if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg def assert_browse_retry(session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content, msg="browse failed"): """Wait up to timeout for contents of queue to match expect_contents""" test = lambda: browse(session, queue, 0, transform=transform) == expect_contents retry(test, timeout, delay) actual_contents = browse(session, queue, 0, transform=transform) if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents) assert expect_contents == actual_contents, msg class BrokerTest(TestCase): """ Tracks processes started by test and kills at end of test. Provides a well-known working directory for each test. """ # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") ha_lib = os.getenv("HA_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") receiver_exec = os.getenv("RECEIVER_EXEC") sender_exec = os.getenv("SENDER_EXEC") sql_store_lib = os.getenv("STORE_SQL_LIB") sql_clfs_store_lib = os.getenv("STORE_SQL_CLFS_LIB") sql_catalog = os.getenv("STORE_CATALOG") store_lib = os.getenv("STORE_LIB") test_store_lib = os.getenv("TEST_STORE_LIB") rootdir = os.getcwd() def configure(self, config): self.config=config def setUp(self): outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) os.makedirs(self.dir) os.chdir(self.dir) self.stopem = [] # things to stop at end of test def tearDown(self): err = [] for p in self.stopem: try: p.stop() except Exception, e: err.append(str(e)) self.stopem = [] # reset in case more processes start os.chdir(self.rootdir) if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) def cleanup_stop(self, stopable): """Call thing.stop at end of test""" self.stopem.append(stopable) def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) self.cleanup_stop(p) return p def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None, show_cmd=False): """Create and return a broker ready for use""" b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level, show_cmd=show_cmd) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False): """Create and return a cluster ready for use""" cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster def browse(self, *args, **kwargs): browse(*args, **kwargs) def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs) def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs) def join(thread, timeout=10): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) class RethrownException(Exception): """Captures the stack trace of the current exception to be thrown later""" def __init__(self, msg=""): Exception.__init__(self, msg+"\n"+format_exc()) class StoppableThread(Thread): """ Base class for threads that do something in a loop and periodically check to see if they have been stopped. """ def __init__(self): self.stopped = False self.error = None Thread.__init__(self) def stop(self): self.stopped = True join(self) if self.error: raise self.error class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ def __init__(self, broker, max_depth=None, queue="test-queue", connection_options=Cluster.CONNECTION_OPTIONS, failover_updates=True, url=None): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. """ Thread.__init__(self) cmd = ["qpid-send", "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, "--connection-options", "{%s}"%(connection_options), "--content-stdin" ] if failover_updates: cmd += ["--failover-updates"] self.sender = broker.test.popen( cmd, expect=EXPECT_RUNNING, stdin=PIPE) self.condition = Condition() self.max = max_depth self.received = 0 self.stopped = False self.error = None def write_message(self, n): self.sender.stdin.write(str(n)+"\n") self.sender.stdin.flush() def run(self): try: self.sent = 0 while not self.stopped: self.sender.assert_running() if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() self.write_message(self.sent) self.sent += 1 except Exception: self.error = RethrownException(self.sender.pname) def notify_received(self, count): """Called by receiver to enable flow control. count = messages received so far.""" self.condition.acquire() self.received = count self.condition.notify() self.condition.release() def stop(self): self.condition.acquire() try: self.stopped = True self.condition.notify() finally: self.condition.release() join(self) self.write_message(-1) # end-of-messages marker. if self.error: raise self.error class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. """ def __init__(self, broker, sender=None, queue="test-queue", connection_options=Cluster.CONNECTION_OPTIONS, failover_updates=True, url=None): """ sender: enable flow control. Call sender.received(n) for each message received. """ Thread.__init__(self) self.test = broker.test cmd = ["qpid-receive", "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, "--connection-options", "{%s}"%(connection_options), "--forever" ] if failover_updates: cmd += [ "--failover-updates" ] self.receiver = self.test.popen( cmd, expect=EXPECT_RUNNING, stdout=PIPE) self.lock = Lock() self.error = None self.sender = sender self.received = 0 self.queue = queue def read_message(self): n = int(self.receiver.stdout.readline()) return n def run(self): try: m = self.read_message() while m != -1: self.receiver.assert_running() assert m <= self.received, "%s missing message %s>%s"%(self.queue, m, self.received) if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: self.sender.notify_received(self.received) m = self.read_message() except Exception: self.error = RethrownException(self.receiver.pname) def check(self): """Raise an exception if there has been an error""" if self.error: raise self.error def stop(self): """Returns when termination message is received""" join(self) self.check() class ErrorGenerator(StoppableThread): """ Thread that continuously generates errors by trying to consume from a non-existent queue. For cluster regression tests, error handling caused issues in the past. """ def __init__(self, broker): StoppableThread.__init__(self) self.broker=broker broker.test.cleanup_stop(self) self.start() def run(self): c = self.broker.connect_old() try: while not self.stopped: try: c.session(str(qpid.datatypes.uuid4())).message_subscribe( queue="non-existent-queue") assert(False) except qpid.session.SessionException: pass time.sleep(0.01) except: pass # Normal if broker is killed. def import_script(path): """ Import executable script at path as a module. Requires some trickery as scripts are not in standard module format """ f = open(path) try: name=os.path.split(path)[1].replace("-","_") return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) finally: f.close()