diff options
| author | Alan Conway <aconway@apache.org> | 2009-11-10 21:09:56 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-11-10 21:09:56 +0000 |
| commit | 0caf9647078392d464386d030c6a014b642ef35f (patch) | |
| tree | 962e5db4f3dc6dab0db2b717c9ad08b39990f979 /python/qpid/brokertest.py | |
| parent | 03e7b94f1e12cf90e08196327d56aeabbee6cbdf (diff) | |
| download | qpid-python-0caf9647078392d464386d030c6a014b642ef35f.tar.gz | |
Moved brokertest.py to python directory.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@834668 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/brokertest.py')
| -rw-r--r-- | python/qpid/brokertest.py | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py new file mode 100644 index 0000000000..d70990bfae --- /dev/null +++ b/python/qpid/brokertest.py @@ -0,0 +1,393 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Support library for tests that start multiple brokers, e.g. cluster +# or federation + +import os, signal, string, tempfile, popen2, socket, threading, time +import qpid, traceback +from qpid import connection, messaging, util +from qpid.compat import format_exc +from qpid.harness import Skipped +from unittest import TestCase +from copy import copy +from threading import Thread, Lock, Condition + +# Values for expected outcome of process at end of test +EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. +EXPECT_EXIT_FAIL=2 # Expect to exit with non-0 status before end of test. +EXPECT_RUNNING=3 # Expect to still be running at end of test + +def is_running(pid): + try: + os.kill(pid, 0) + return True + except: + return False + +class BadProcessStatus(Exception): + pass + +class Popen(popen2.Popen3): + """ + Similar to subprocess.Popen but using popen2 classes for portability. + Can set and verify expectation of process status at end of test. + Dumps command line, stdout, stderr to data dir for debugging. + """ + + def __init__(self, cmd, expect=EXPECT_EXIT_OK): + self.cmd = [ str(x) for x in cmd ] + popen2.Popen3.__init__(self, self.cmd, True) + self.expect = expect + self.stdin = self.tochild + self.stdout = self.fromchild + self.stderr = self.childerr + self.pname = "%s-%d" % (os.path.split(self.cmd[0])[-1], self.pid) + self.dump(self.cmd_str(), "cmd") + + def dump(self, str, ext): + f = file("%s.%s" % (self.pname, ext), "w") + f.write(str) + f.close() + + def unexpected(self,msg): + self.dump(self.stdout.read(), "out") + self.dump(self.stderr.read(), "err") + raise BadProcessStatus("%s: %s" % (msg, self.pname)) + + def stop(self): # Clean up at end of test. + if self.expect == EXPECT_RUNNING: + try: + self.kill() + except: + self.unexpected("Exit code %d" % self.wait()) + else: + # Give the process some time to exit. + delay = 0.1 + while (self.poll() is None and delay < 1): + time.sleep(delay) + delay *= 2 + if self.returncode is None: # Still haven't stopped + self.kill() + self.unexpected("Still running") + elif self.expect == EXPECT_EXIT_OK and self.returncode != 0: + self.unexpected("Exit code %d" % self.returncode) + elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0: + self.unexpected("Expected error") + + def communicate(self, input=None): + if input: + self.stdin.write(input) + self.stdin.close() + outerr = (self.stdout.read(), self.stderr.read()) + self.wait() + return outerr + + def is_running(self): return is_running(self.pid) + + def poll(self): + self.returncode = popen2.Popen3.poll(self) + if (self.returncode == -1): self.returncode = None + return self.returncode + + def wait(self): + self.returncode = popen2.Popen3.wait(self) + return self.returncode + + def send_signal(self, sig): + os.kill(self.pid,sig) + self.wait() + + def terminate(self): self.send_signal(signal.SIGTERM) + def kill(self): self.send_signal(signal.SIGKILL) + + def cmd_str(self): return " ".join([str(s) for s in self.cmd]) + +def checkenv(name): + value = os.getenv(name) + if not value: raise Exception("Environment variable %s is not set" % name) + return value + +class Broker(Popen): + "A broker process. Takes care of start, stop and logging." + _store_lib = os.getenv("STORE_LIB") + _qpidd = checkenv("QPIDD_EXEC") + _broker_count = 0 + + def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING): + """Start a broker daemon. name determines the data-dir and log + file names.""" + + self.test = test + cmd = [self._qpidd, "--port=0", "--no-module-dir", "--auth=no"] + args + if name: self.name = name + else: + self.name = "broker%d" % Broker._broker_count + Broker._broker_count += 1 + self.log = self.name+".log" + cmd += ["--log-to-file", self.log, "--log-prefix", self.name] + cmd += ["--log-to-stderr=no"] + self.datadir = self.name + cmd += ["--data-dir", self.datadir] + if self._store_lib: cmd += ["--load-module", self._store_lib] + + Popen.__init__(self, cmd, expect) + try: self.port = int(self.stdout.readline()) + except Exception, e: + raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname)) + test.cleanup_stop(self) + self.host = "localhost" # Placeholder for remote brokers. + + def unexpected(self,msg): + raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) + + def connect(self): + """New API connection to the broker.""" + return messaging.Connection.open(self.host, self.port) + + def connect_old(self): + """Old API connection to the broker.""" + socket = qpid.util.connect(self.host,self.port) + connection = qpid.connection.Connection (sock=socket) + connection.start() + return connection; + + def declare_queue(self, queue): + c = self.connect_old() + s = c.session(str(qpid.datatypes.uuid4())) + s.queue_declare(queue=queue) + c.close() + + def send_message(self, queue, message): + s = self.connect().session() + s.sender(queue+" {create:always}").send(message) + s.connection.close() + + def send_messages(self, queue, messages): + s = self.connect().session() + sender = s.sender(queue+" {create:always}") + for m in messages: sender.send(m) + s.connection.close() + + def get_message(self, queue): + s = self.connect().session() + m = s.receiver(queue+" {create:always}", capacity=1).fetch(timeout=1) + s.acknowledge() + s.connection.close() + return m + + def get_messages(self, queue, n): + s = self.connect().session() + receiver = s.receiver(queue+" {create:always}", capacity=n) + m = [receiver.fetch(timeout=1) for i in range(n)] + s.acknowledge() + s.connection.close() + return m + +class Cluster: + """A cluster of brokers in a test.""" + + _cluster_lib = checkenv("CLUSTER_LIB") + _cluster_count = 0 + + def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING): + self.test = test + self._brokers=[] + self.name = "cluster%d" % Cluster._cluster_count + Cluster._cluster_count += 1 + # Use unique cluster name + self.args = copy(args) + self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] + self.args += [ "--load-module", self._cluster_lib ] + self.start_n(count, expect=expect) + + def start(self, name=None, expect=EXPECT_RUNNING): + """Add a broker to the cluster. Returns the index of the new broker.""" + if not name: name="%s-%d" % (self.name, len(self._brokers)) + self._brokers.append(self.test.broker(self.args, name, expect)) + return self._brokers[-1] + + def start_n(self, count, expect=EXPECT_RUNNING): + for i in range(count): self.start(expect=expect) + + def wait(self): + """Wait for all cluster members to be ready""" + for b in self._brokers: + b.connect().close() + + # Behave like a list of brokers. + def __len__(self): return len(self._brokers) + def __getitem__(self,index): return self._brokers[index] + def __iter__(self): return self._brokers.__iter__() + +class BrokerTest(TestCase): + """ + Tracks processes started by test and kills at end of test. + Provides a well-known working directory for each test. + """ + + # Environment settings. + cluster_lib = os.getenv("CLUSTER_LIB") + xml_lib = os.getenv("XML_LIB") + qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC") + qpidRoute_exec = os.getenv("QPID_ROUTE_EXEC") + receiver_exec = os.getenv("RECEIVER_EXEC") + sender_exec = os.getenv("SENDER_EXEC") + store_lib = os.getenv("STORE_LIB") + + rootdir = os.getcwd() + def configure(self, config): self.config=config + + def setUp(self): + self.dir = os.path.join(self.rootdir, self.config.defines["OUTDIR"], self.id()) + os.makedirs(self.dir) + os.chdir(self.dir) + self.stopem = [] # things to stop at end of test + + def tearDown(self): + err = [] + for p in self.stopem: + try: p.stop() + except Exception, e: err.append(str(e)) + if err: raise Exception("\n ".join(err)) + + # FIXME aconway 2009-11-06: check for core files of exited processes. + + def cleanup_stop(self, stopable): + """Call thing.stop at end of test""" + self.stopem.append(stopable) + + def popen(self, cmd, expect=EXPECT_EXIT_OK): + """Start a process that will be killed at end of test, in the test dir.""" + os.chdir(self.dir) + p = Popen(cmd, expect) + self.cleanup_stop(p) + return p + + def broker(self, args=[], name=None, expect=EXPECT_RUNNING): + """Create and return a broker ready for use""" + b = Broker(self, args=args, name=name, expect=expect) + b.connect().close() + return b + + def cluster(self, count=0, args=[], expect=EXPECT_RUNNING): + """Create and return a cluster ready for use""" + cluster = Cluster(self, count, args, expect=expect) + cluster.wait() + return cluster + +class RethrownException(Exception): + """Captures the original stack trace to be thrown later""" + def __init__(self, e): + Exception.__init__(self, format_exc()) + +class StoppableThread(Thread): + """ + Base class for threads that do something in a loop and periodically check + to see if they have been stopped. + """ + def __init__(self): + self.stopped = False + self.error = None + Thread.__init__(self) + + def stop(self): + self.stopped = True + self.join() + if self.error: raise self.error + +class NumberedSender(StoppableThread): + """ + Thread to run a sender client and send numbered messages until stopped. + """ + + def __init__(self, broker): + StoppableThread.__init__(self) + self.sender = broker.test.popen( + [broker.test.sender_exec, "--port", broker.port], expect=EXPECT_RUNNING) + + def run(self): + try: + self.sent = 0 + while not self.stopped: + self.sender.stdin.write(str(self.sent)+"\n") + self.sender.stdin.flush() + self.sent += 1 + except Exception, e: self.error = RethrownException(e) + +class NumberedReceiver(Thread): + """ + Thread to run a receiver client and verify it receives + sequentially numbered messages. + """ + def __init__(self, broker): + Thread.__init__(self) + self.test = broker.test + self.receiver = self.test.popen( + [self.test.receiver_exec, "--port", broker.port], expect=EXPECT_RUNNING) + self.stopat = None + self.lock = Lock() + self.error = None + + def run(self): + try: + self.received = 0 + while self.stopat is None or self.received < self.stopat: + self.lock.acquire() + try: + m = int(self.receiver.stdout.readline()) + assert(m <= self.received) # Allow for duplicates + if (m == self.received): self.received += 1 + finally: + self.lock.release() + except Exception, e: + self.error = RethrownException(e) + + def stop(self, count): + """Returns when received >= count""" + self.lock.acquire() + self.stopat = count + self.lock.release() + self.join() + if self.error: raise self.error + +class ErrorGenerator(StoppableThread): + """ + Thread that continuously generates errors by trying to consume from + a non-existent queue. For cluster regression tests, error handling + caused issues in the past. + """ + + def __init__(self, broker): + StoppableThread.__init__(self) + self.broker=broker + broker.test.cleanup_stop(self) + self.start() + + def run(self): + c = self.broker.connect_old() + try: + while not self.stopped: + try: + c.session(str(qpid.datatypes.uuid4())).message_subscribe( + queue="non-existent-queue") + assert(False) + except qpid.session.SessionException: pass + except: pass # Normal if broker is killed. + |
