# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Support library for tests that start multiple brokers, e.g. cluster # or federation import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re import qpid, traceback, signal from qpid import connection, messaging, util from qpid.compat import format_exc from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger import qmf.console log = getLogger("qpid.brokertest") # Values for expected outcome of process at end of test EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test. EXPECT_RUNNING=3 # Expect to still be running at end of test EXPECT_UNKNOWN=4 # No expectation, don't check exit status. def find_exe(program): """Find an executable in the system PATH""" def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) mydir, name = os.path.split(program) if mydir: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def is_running(pid): try: os.kill(pid, 0) return True except: return False class BadProcessStatus(Exception): pass def error_line(filename, n=1): """Get the last n line(s) of filename for error messages""" result = [] try: f = open(filename) try: for l in f: if len(result) == n: result.pop(0) result.append(" "+l) finally: f.close() except: return "" return ":\n" + "".join(result) def retry(function, timeout=10, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" deadline = time.time() + timeout while not function(): remaining = deadline - time.time() if remaining <= 0: return False delay = min(delay, remaining) time.sleep(delay) delay *= 2 return True class AtomicCounter: def __init__(self): self.count = 0 self.lock = Lock() def next(self): self.lock.acquire(); ret = self.count self.count += 1 self.lock.release(); return ret _popen_id = AtomicCounter() # Popen identifier for use in output file names. # Constants for file descriptor arguments to Popen FILE = "FILE" # Write to file named after process PIPE = subprocess.PIPE class Popen(subprocess.Popen): """ Can set and verify expectation of process status at end of test. Dumps command line, stdout, stderr to data dir for debugging. """ def __init__(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Run cmd (should be a list of program and arguments) expect - if set verify expectation at end of test. stdout, stderr - can have the same values as for subprocess.Popen as well as FILE (the default) which means write to a file named after the process. stdin - like subprocess.Popen but defauts to PIPE """ self._clean = False self._clean_lock = Lock() assert find_exe(cmd[0]), "executable not found: "+cmd[0] if type(cmd) is type(""): cmd = [cmd] # Make it a list. self.cmd = [ str(x) for x in cmd ] self.expect = expect self.id = _popen_id.next() self.pname = "%s-%d" % (os.path.split(self.cmd[0])[1], self.id) if stdout == FILE: stdout = open(self.outfile("out"), "w") if stderr == FILE: stderr = open(self.outfile("err"), "w") try: subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, stdin=stdin, stdout=stdout, stderr=stderr, close_fds=True) except ValueError: # Windows can't do close_fds subprocess.Popen.__init__(self, self.cmd, bufsize=0, executable=None, stdin=stdin, stdout=stdout, stderr=stderr) f = open(self.outfile("cmd"), "w") try: f.write("%s\n%d"%(self.cmd_str(), self.pid)) finally: f.close() log.debug("Started process %s: %s" % (self.pname, " ".join(self.cmd))) def __str__(self): return "Popen<%s>"%(self.pname) def outfile(self, ext): return "%s.%s" % (self.pname, ext) def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) def stop(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: try: self.kill() # Just make sure its dead except: pass elif self.expect == EXPECT_RUNNING: try: self.kill() except: self.unexpected("expected running, exit code %d" % self.wait()) else: retry(lambda: self.poll() is not None) if self.returncode is None: # Still haven't stopped self.kill() self.unexpected("still running") elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: self.unexpected("exit code %d" % self.returncode) elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: self.unexpected("expected error") finally: self.wait() # Clean up the process. def communicate(self, input=None): ret = subprocess.Popen.communicate(self, input) self.cleanup() return ret def is_running(self): return self.poll() is None def assert_running(self): if not self.is_running(): self.unexpected("Exit code %d" % self.returncode) def wait(self): ret = subprocess.Popen.wait(self) self._cleanup() return ret def terminate(self): try: subprocess.Popen.terminate(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') self._cleanup() def kill(self): try: subprocess.Popen.kill(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') self._cleanup() def _cleanup(self): """Clean up after a dead process""" self._clean_lock.acquire() if not self._clean: self._clean = True try: self.stdin.close() except: pass try: self.stdout.close() except: pass try: self.stderr.close() except: pass self._clean_lock.release() def cmd_str(self): return " ".join([str(s) for s in self.cmd]) def checkenv(name): value = os.getenv(name) if not value: raise Exception("Environment variable %s is not set" % name) return value def find_in_file(str, filename): if not os.path.exists(filename): return False f = open(filename) try: return str in f.read() finally: f.close() class Broker(Popen): "A broker process. Takes care of start, stop and logging." _broker_count = 0 def __str__(self): return "Broker<%s %s>"%(self.name, self.pname) def find_log(self): self.log = "%s.log" % self.name i = 1 while (os.path.exists(self.log)): self.log = "%s-%d.log" % (self.name, i) i += 1 def get_log(self): return os.path.abspath(self.log) def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING, port=0, log_level=None, wait=None): """Start a broker daemon. name determines the data-dir and log file names.""" self.test = test self._port=port if BrokerTest.store_lib: args = args + ['--load-module', BrokerTest.store_lib] if BrokerTest.sql_store_lib: args = args + ['--load-module', BrokerTest.sql_store_lib] args = args + ['--catalog', BrokerTest.sql_catalog] if BrokerTest.sql_clfs_store_lib: args = args + ['--load-module', BrokerTest.sql_clfs_store_lib] args = args + ['--catalog', BrokerTest.sql_catalog] cmd = [BrokerTest.qpidd_exec, "--port", port, "--no-module-dir"] + args if not "--auth" in args: cmd.append("--auth=no") if wait != None: cmd += ["--wait", str(wait)] if name: self.name = name else: self.name = "broker%d" % Broker._broker_count Broker._broker_count += 1 self.find_log() cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] if log_level != None: cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] Popen.__init__(self, cmd, expect, stdout=PIPE) test.cleanup_stop(self) self._host = "127.0.0.1" log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) self._log_ready = False def startQmf(self, handler=None): self.qmf_session = qmf.console.Session(handler) self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) def host(self): return self._host def port(self): # Read port from broker process stdout if not already read. if (self._port == 0): try: self._port = int(self.stdout.readline()) except ValueError: raise Exception("Can't get port for broker %s (%s)%s" % (self.name, self.pname, error_line(self.log,5))) return self._port def unexpected(self,msg): raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) def connect(self, **kwargs): """New API connection to the broker.""" return messaging.Connection.establish(self.host_port(), **kwargs) def connect_old(self): """Old API connection to the broker.""" socket = qpid.util.connect(self.host(),self.port()) connection = qpid.connection.Connection (sock=socket) connection.start() return connection; def declare_queue(self, queue): c = self.connect_old() s = c.session(str(qpid.datatypes.uuid4())) s.queue_declare(queue=queue) c.close() def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) if xprops != None: s += ", x-declare:{" + xprops + "}" return s + "}}" def send_message(self, queue, message, durable=True, xprops=None, session=None): if session == None: s = self.connect().session() else: s = session s.sender(self._prep_sender(queue, durable, xprops)).send(message) if session == None: s.connection.close() def send_messages(self, queue, messages, durable=True, xprops=None, session=None): if session == None: s = self.connect().session() else: s = session sender = s.sender(self._prep_sender(queue, durable, xprops)) for m in messages: sender.send(m) if session == None: s.connection.close() def get_message(self, queue): s = self.connect().session() m = s.receiver(queue+"; {create:always}", capacity=1).fetch(timeout=1) s.acknowledge() s.connection.close() return m def get_messages(self, queue, n): s = self.connect().session() receiver = s.receiver(queue+"; {create:always}", capacity=n) m = [receiver.fetch(timeout=1) for i in range(n)] s.acknowledge() s.connection.close() return m def host_port(self): return "%s:%s" % (self.host(), self.port()) def log_ready(self): """Return true if the log file exists and contains a broker ready message""" if not self._log_ready: self._log_ready = find_in_file("notice Broker running", self.log) return self._log_ready def ready(self, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. if not retry(self.log_ready, timeout=60): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will # return after cluster init has finished. try: c = self.connect(**kwargs) try: c.session() finally: c.close() except Exception,e: raise RethrownException( "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) def store_state(self): f = open(os.path.join(self.datadir, "cluster", "store.status")) try: uuids = f.readlines() finally: f.close() null_uuid="00000000-0000-0000-0000-000000000000\n" if len(uuids) < 2: return "unknown" # we looked while the file was being updated. if uuids[0] == null_uuid: return "empty" if uuids[1] == null_uuid: return "dirty" return "clean" class Cluster: """A cluster of brokers in a test.""" _cluster_count = 0 def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True): self.test = test self._brokers=[] self.name = "cluster%d" % Cluster._cluster_count Cluster._cluster_count += 1 # Use unique cluster name self.args = copy(args) self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"] assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in" self.args += [ "--load-module", BrokerTest.cluster_lib ] self.start_n(count, expect=expect, wait=wait) def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port)) return self._brokers[-1] def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[]): for i in range(count): self.start(expect=expect, wait=wait, args=args) # Behave like a list of brokers. def __len__(self): return len(self._brokers) def __getitem__(self,index): return self._brokers[index] def __iter__(self): return self._brokers.__iter__() class BrokerTest(TestCase): """ Tracks processes started by test and kills at end of test. Provides a well-known working directory for each test. """ # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") receiver_exec = os.getenv("RECEIVER_EXEC") sender_exec = os.getenv("SENDER_EXEC") sql_store_lib = os.getenv("STORE_SQL_LIB") sql_clfs_store_lib = os.getenv("STORE_SQL_CLFS_LIB") sql_catalog = os.getenv("STORE_CATALOG") store_lib = os.getenv("STORE_LIB") test_store_lib = os.getenv("TEST_STORE_LIB") rootdir = os.getcwd() def configure(self, config): self.config=config def setUp(self): outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) os.makedirs(self.dir) os.chdir(self.dir) self.stopem = [] # things to stop at end of test def tearDown(self): err = [] for p in self.stopem: try: p.stop() except Exception, e: err.append(str(e)) self.stopem = [] # reset in case more processes start os.chdir(self.rootdir) if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) def cleanup_stop(self, stopable): """Call thing.stop at end of test""" self.stopem.append(stopable) def popen(self, cmd, expect=EXPECT_EXIT_OK, stdin=None, stdout=FILE, stderr=FILE): """Start a process that will be killed at end of test, in the test dir.""" os.chdir(self.dir) p = Popen(cmd, expect, stdin=stdin, stdout=stdout, stderr=stderr) self.cleanup_stop(p) return p def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, log_level=None): """Create and return a broker ready for use""" b = Broker(self, args=args, name=name, expect=expect, port=port, log_level=log_level) if (wait): try: b.ready() except Exception, e: raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e)) return b def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True): """Create and return a cluster ready for use""" cluster = Cluster(self, count, args, expect=expect, wait=wait) return cluster def browse(self, session, queue, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" r = session.receiver("%s;{mode:browse}"%(queue)) try: contents = [] try: while True: contents.append(r.fetch(timeout=timeout).content) except messaging.Empty: pass finally: pass #FIXME aconway 2011-04-14: r.close() return contents def assert_browse(self, session, queue, expect_contents, timeout=0): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" actual_contents = self.browse(session, queue, timeout) self.assertEqual(expect_contents, actual_contents) def join(thread, timeout=10): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) class RethrownException(Exception): """Captures the stack trace of the current exception to be thrown later""" def __init__(self, msg=""): Exception.__init__(self, msg+"\n"+format_exc()) class StoppableThread(Thread): """ Base class for threads that do something in a loop and periodically check to see if they have been stopped. """ def __init__(self): self.stopped = False self.error = None Thread.__init__(self) def stop(self): self.stopped = True join(self) if self.error: raise self.error class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ def __init__(self, broker, max_depth=None, queue="test-queue"): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. """ Thread.__init__(self) self.sender = broker.test.popen( ["qpid-send", "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", "--content-stdin" ], expect=EXPECT_RUNNING, stdin=PIPE) self.condition = Condition() self.max = max_depth self.received = 0 self.stopped = False self.error = None def write_message(self, n): self.sender.stdin.write(str(n)+"\n") self.sender.stdin.flush() def run(self): try: self.sent = 0 while not self.stopped: if self.max: self.condition.acquire() while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() self.write_message(self.sent) self.sent += 1 except Exception: self.error = RethrownException(self.sender.pname) def notify_received(self, count): """Called by receiver to enable flow control. count = messages received so far.""" self.condition.acquire() self.received = count self.condition.notify() self.condition.release() def stop(self): self.condition.acquire() try: self.stopped = True self.condition.notify() finally: self.condition.release() join(self) self.write_message(-1) # end-of-messages marker. if self.error: raise self.error class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. """ def __init__(self, broker, sender = None, queue="test-queue"): """ sender: enable flow control. Call sender.received(n) for each message received. """ Thread.__init__(self) self.test = broker.test self.receiver = self.test.popen( ["qpid-receive", "--broker", "localhost:%s"%broker.port(), "--address", "%s;{create:always}"%queue, "--failover-updates", "--forever" ], expect=EXPECT_RUNNING, stdout=PIPE) self.lock = Lock() self.error = None self.sender = sender def read_message(self): return int(self.receiver.stdout.readline()) def run(self): try: self.received = 0 m = self.read_message() while m != -1: assert(m <= self.received) # Check for missing messages if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: self.sender.notify_received(self.received) m = self.read_message() except Exception: self.error = RethrownException(self.receiver.pname) def stop(self): """Returns when termination message is received""" join(self) if self.error: raise self.error class ErrorGenerator(StoppableThread): """ Thread that continuously generates errors by trying to consume from a non-existent queue. For cluster regression tests, error handling caused issues in the past. """ def __init__(self, broker): StoppableThread.__init__(self) self.broker=broker broker.test.cleanup_stop(self) self.start() def run(self): c = self.broker.connect_old() try: while not self.stopped: try: c.session(str(qpid.datatypes.uuid4())).message_subscribe( queue="non-existent-queue") assert(False) except qpid.session.SessionException: pass time.sleep(0.01) except: pass # Normal if broker is killed. def import_script(path): """ Import executable script at path as a module. Requires some trickery as scripts are not in standard module format """ f = open(path) try: name=os.path.split(path)[1].replace("-","_") return imp.load_module(name, f, path, ("", "r", imp.PY_SOURCE)) finally: f.close()