summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-24 20:07:24 +0000
committerAlan Conway <aconway@apache.org>2009-11-24 20:07:24 +0000
commit0fb7ff9cfbfd01e9093c2c6021a5915696d2a089 (patch)
tree1d2db335592be80a9aa9f8f404d2c1682afeb485 /python
parent1ee447563d208b39e962537a47f14aea741777b0 (diff)
downloadqpid-python-0fb7ff9cfbfd01e9093c2c6021a5915696d2a089.tar.gz
Support for restarting a persistent cluster.
Option --cluster-size=N: members wait for N members before recovering store. Stores marked as clean/dirty. Automatically recover from clean store on restart. Stores marked with UUID to detect errors. Not yet implemented: consistency checks, manual recovery from all dirty stores. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@883842 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/brokertest.py61
1 files changed, 40 insertions, 21 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index a42e71c25b..c6252773a5 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -20,7 +20,7 @@
# Support library for tests that start multiple brokers, e.g. cluster
# or federation
-import os, signal, string, tempfile, popen2, socket, threading, time
+import os, signal, string, tempfile, popen2, socket, threading, time, imp
import qpid, traceback
from qpid import connection, messaging, util
from qpid.compat import format_exc
@@ -28,6 +28,9 @@ from qpid.harness import Skipped
from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
+from logging import getLogger
+
+log = getLogger("qpid.brokertest")
# Values for expected outcome of process at end of test
EXPECT_EXIT_OK=1 # Expect to exit with 0 status before end of test.
@@ -126,8 +129,6 @@ def checkenv(name):
class Broker(Popen):
"A broker process. Takes care of start, stop and logging."
- _store_lib = os.getenv("STORE_LIB")
- _qpidd = checkenv("QPIDD_EXEC")
_broker_count = 0
def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
@@ -135,7 +136,8 @@ class Broker(Popen):
file names."""
self.test = test
- cmd = [self._qpidd, "--port=0", "--no-module-dir", "--auth=no"] + args
+ self._port = None
+ cmd = [BrokerTest.qpidd_exec, "--port=0", "--no-module-dir", "--auth=no"] + args
if name: self.name = name
else:
self.name = "broker%d" % Broker._broker_count
@@ -145,25 +147,30 @@ class Broker(Popen):
cmd += ["--log-to-stderr=no"]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
- if self._store_lib: cmd += ["--load-module", self._store_lib]
-
Popen.__init__(self, cmd, expect)
- try: self.port = int(self.stdout.readline())
- except ValueError, e:
- raise Exception("Failed to start broker %s (%s)" % (self.name, self.pname))
test.cleanup_stop(self)
- self.host = "localhost" # Placeholder for remote brokers.
+ self.host = "localhost"
+ log.debug("Started broker %s (%s)" % (self.name, self.pname))
+
+ def port(self):
+ # Read port from broker process stdout if not already read.
+ if (self._port is None):
+ try: self._port = int(self.stdout.readline())
+ except ValueError, e:
+ raise Exception("Can't get port for broker %s (%s)" %
+ (self.name, self.pname))
+ return self._port
def unexpected(self,msg):
raise BadProcessStatus("%s: %s (%s)" % (msg, self.name, self.pname))
def connect(self):
"""New API connection to the broker."""
- return messaging.Connection.open(self.host, self.port)
+ return messaging.Connection.open(self.host, self.port())
def connect_old(self):
"""Old API connection to the broker."""
- socket = qpid.util.connect(self.host,self.port)
+ socket = qpid.util.connect(self.host,self.port())
connection = qpid.connection.Connection (sock=socket)
connection.start()
return connection;
@@ -199,11 +206,13 @@ class Broker(Popen):
s.acknowledge()
s.connection.close()
return m
+
+ def host_port(self): return "%s:%s" % (self.host, self.port())
+
class Cluster:
"""A cluster of brokers in a test."""
- _cluster_lib = checkenv("CLUSTER_LIB")
_cluster_count = 0
def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait_for_start=True):
@@ -214,12 +223,14 @@ class Cluster:
# Use unique cluster name
self.args = copy(args)
self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
- self.args += [ "--load-module", self._cluster_lib ]
+ assert BrokerTest.cluster_lib
+ self.args += [ "--load-module", BrokerTest.cluster_lib ]
self.start_n(count, expect=expect, wait_for_start=wait_for_start)
def start(self, name=None, expect=EXPECT_RUNNING, wait_for_start=True):
"""Add a broker to the cluster. Returns the index of the new broker."""
if not name: name="%s-%d" % (self.name, len(self._brokers))
+ log.debug("Cluster %s starting member %s" % (self.name, name))
self._brokers.append(self.test.broker(self.args, name, expect, wait_for_start))
return self._brokers[-1]
@@ -238,6 +249,7 @@ class BrokerTest(TestCase):
"""
# Environment settings.
+ qpidd_exec = checkenv("QPIDD_EXEC")
cluster_lib = os.getenv("CLUSTER_LIB")
xml_lib = os.getenv("XML_LIB")
qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -261,7 +273,7 @@ class BrokerTest(TestCase):
for p in self.stopem:
try: p.stop()
except Exception, e: err.append(str(e))
- if err: raise Exception("\n ".join(err))
+ if err: raise Exception("Unexpected process status:\n "+"\n ".join(err))
# FIXME aconway 2009-11-06: check for core files of exited processes.
@@ -289,8 +301,8 @@ class BrokerTest(TestCase):
class RethrownException(Exception):
"""Captures the original stack trace to be thrown later"""
- def __init__(self, e):
- Exception.__init__(self, format_exc())
+ def __init__(self, e, msg=""):
+ Exception.__init__(self, msg+"\n"+format_exc())
class StoppableThread(Thread):
"""
@@ -315,7 +327,7 @@ class NumberedSender(StoppableThread):
def __init__(self, broker):
StoppableThread.__init__(self)
self.sender = broker.test.popen(
- [broker.test.sender_exec, "--port", broker.port], expect=EXPECT_RUNNING)
+ [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
def run(self):
try:
@@ -324,7 +336,7 @@ class NumberedSender(StoppableThread):
self.sender.stdin.write(str(self.sent)+"\n")
self.sender.stdin.flush()
self.sent += 1
- except Exception, e: self.error = RethrownException(e)
+ except Exception, e: self.error = RethrownException(e, self.sender.pname)
class NumberedReceiver(Thread):
"""
@@ -335,7 +347,7 @@ class NumberedReceiver(Thread):
Thread.__init__(self)
self.test = broker.test
self.receiver = self.test.popen(
- [self.test.receiver_exec, "--port", broker.port], expect=EXPECT_RUNNING)
+ [self.test.receiver_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
self.stopat = None
self.lock = Lock()
self.error = None
@@ -352,7 +364,7 @@ class NumberedReceiver(Thread):
finally:
self.lock.release()
except Exception, e:
- self.error = RethrownException(e)
+ self.error = RethrownException(e, self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
@@ -386,3 +398,10 @@ class ErrorGenerator(StoppableThread):
except qpid.session.SessionException: pass
except: pass # Normal if broker is killed.
+def import_script(path):
+ """
+ Import executable script at path as a module.
+ Requires some trickery as scripts are not in standard module format
+ """
+ name=os.path.split(path)[1].replace("-","_")
+ return imp.load_module(name, file(path), path, ("", "r", imp.PY_SOURCE))