diff options
| author | Alan Conway <aconway@apache.org> | 2009-11-24 20:07:24 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2009-11-24 20:07:24 +0000 |
| commit | 0fb7ff9cfbfd01e9093c2c6021a5915696d2a089 (patch) | |
| tree | 1d2db335592be80a9aa9f8f404d2c1682afeb485 /python | |
| parent | 1ee447563d208b39e962537a47f14aea741777b0 (diff) | |
| download | qpid-python-0fb7ff9cfbfd01e9093c2c6021a5915696d2a089.tar.gz | |
Support for restarting a persistent cluster.
Option --cluster-size=N: members wait for N members before recovering store.
Stores marked as clean/dirty. Automatically recover from clean store on restart.
Stores marked with UUID to detect errors.
Not yet implemented: consistency checks, manual recovery from all dirty stores.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@883842 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/brokertest.py | 61 |
1 files changed, 40 insertions, 21 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py index a42e71c25b..c6252773a5 100644 --- a/python/qpid/brokertest.py +++ b/python/qpid/brokertest.py @@ -20,7 +20,7 @@ # Support library for tests that start multiple brokers, e.g. cluster # or federation -import os, signal, string, tempfile, popen2, socket, threading, time +import os, signal, string, tempfile, popen2, socket, threading, time, imp import qpid, traceback from qpid import connection, messaging, util from qpid.compat import format_exc @@ -28,6 +28,9 @@ from qpid.harness import Skipped from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition +from logging import getLogger + +log = getLogger("qpid.brokertest") # Values for expected outcome of process at end of test EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test. @@ -126,8 +129,6 @@ def checkenv(name): class Broker(Popen): "A broker process. Takes care of start, stop and logging." - _store_lib = os.getenv("STORE_LIB") - _qpidd = checkenv("QPIDD_EXEC") _broker_count = 0 def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING): @@ -135,7 +136,8 @@ class Broker(Popen): file names.""" self.test = test - cmd = [self._qpidd, "--port=0", "--no-module-dir", "--auth=no"] + args + self._port = None + cmd = [BrokerTest.qpidd_exec, "--port=0", "--no-module-dir", "--auth=no"] + args if name: self.name = name else: self.name = "broker%d" % Broker._broker_count @@ -145,25 +147,30 @@ class Broker(Popen): cmd += ["--log-to-stderr=no"] self.datadir = self.name cmd += ["--data-dir", self.datadir] - if self._store_lib: cmd += ["--load-module", self._store_lib] - Popen.__init__(self, cmd, expect) - try: self.port = int(self.stdout.readline()) - except ValueError, e: - raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname)) test.cleanup_stop(self) - self.host = "localhost" # Placeholder for remote brokers. + self.host = "localhost" + log.debug("Started broker %s (%s)" % (self.name, self.pname)) + + def port(self): + # Read port from broker process stdout if not already read. + if (self._port is None): + try: self._port = int(self.stdout.readline()) + except ValueError, e: + raise Exception("Can't get port for broker %s (%s)" % + (self.name, self.pname)) + return self._port def unexpected(self,msg): raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname)) def connect(self): """New API connection to the broker.""" - return messaging.Connection.open(self.host, self.port) + return messaging.Connection.open(self.host, self.port()) def connect_old(self): """Old API connection to the broker.""" - socket = qpid.util.connect(self.host,self.port) + socket = qpid.util.connect(self.host,self.port()) connection = qpid.connection.Connection (sock=socket) connection.start() return connection; @@ -199,11 +206,13 @@ class Broker(Popen): s.acknowledge() s.connection.close() return m + + def host_port(self): return "%s:%s" % (self.host, self.port()) + class Cluster: """A cluster of brokers in a test.""" - _cluster_lib = checkenv("CLUSTER_LIB") _cluster_count = 0 def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True): @@ -214,12 +223,14 @@ class Cluster: # Use unique cluster name self.args = copy(args) self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ] - self.args += [ "--load-module", self._cluster_lib ] + assert BrokerTest.cluster_lib + self.args += [ "--load-module", BrokerTest.cluster_lib ] self.start_n(count, expect=expect, wait_for_start=wait_for_start) def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True): """Add a broker to the cluster. Returns the index of the new broker.""" if not name: name="%s-%d" % (self.name, len(self._brokers)) + log.debug("Cluster %s starting member %s" % (self.name, name)) self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start)) return self._brokers[-1] @@ -238,6 +249,7 @@ class BrokerTest(TestCase): """ # Environment settings. + qpidd_exec = checkenv("QPIDD_EXEC") cluster_lib = os.getenv("CLUSTER_LIB") xml_lib = os.getenv("XML_LIB") qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC") @@ -261,7 +273,7 @@ class BrokerTest(TestCase): for p in self.stopem: try: p.stop() except Exception, e: err.append(str(e)) - if err: raise Exception("\n ".join(err)) + if err: raise Exception("Unexpected process status:\n "+"\n ".join(err)) # FIXME aconway 2009-11-06: check for core files of exited processes. @@ -289,8 +301,8 @@ class BrokerTest(TestCase): class RethrownException(Exception): """Captures the original stack trace to be thrown later""" - def __init__(self, e): - Exception.__init__(self, format_exc()) + def __init__(self, e, msg=""): + Exception.__init__(self, msg+"\n"+format_exc()) class StoppableThread(Thread): """ @@ -315,7 +327,7 @@ class NumberedSender(StoppableThread): def __init__(self, broker): StoppableThread.__init__(self) self.sender = broker.test.popen( - [broker.test.sender_exec, "--port", broker.port], expect=EXPECT_RUNNING) + [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING) def run(self): try: @@ -324,7 +336,7 @@ class NumberedSender(StoppableThread): self.sender.stdin.write(str(self.sent)+"\n") self.sender.stdin.flush() self.sent += 1 - except Exception, e: self.error = RethrownException(e) + except Exception, e: self.error = RethrownException(e, self.sender.pname) class NumberedReceiver(Thread): """ @@ -335,7 +347,7 @@ class NumberedReceiver(Thread): Thread.__init__(self) self.test = broker.test self.receiver = self.test.popen( - [self.test.receiver_exec, "--port", broker.port], expect=EXPECT_RUNNING) + [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING) self.stopat = None self.lock = Lock() self.error = None @@ -352,7 +364,7 @@ class NumberedReceiver(Thread): finally: self.lock.release() except Exception, e: - self.error = RethrownException(e) + self.error = RethrownException(e, self.receiver.pname) def stop(self, count): """Returns when received >= count""" @@ -386,3 +398,10 @@ class ErrorGenerator(StoppableThread): except qpid.session.SessionException: pass except: pass # Normal if broker is killed. +def import_script(path): + """ + Import executable script at path as a module. + Requires some trickery as scripts are not in standard module format + """ + name=os.path.split(path)[1].replace("-","_") + return imp.load_module(name, file(path), path, ("", "r", imp.PY_SOURCE)) |
