summaryrefslogtreecommitdiff
path: root/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/tests/brokertest.py')
-rw-r--r--cpp/src/tests/brokertest.py102
1 files changed, 13 insertions, 89 deletions
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index aea4460e5a..70c145a51b 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -17,8 +17,7 @@
# under the License.
#
-# Support library for tests that start multiple brokers, e.g. cluster
-# or federation
+# Support library for tests that start multiple brokers, e.g. HA or federation
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
@@ -203,8 +202,10 @@ class Popen(subprocess.Popen):
self.wait()
def kill(self):
- try:
- subprocess.Popen.kill(self)
+ # Set to EXPECT_UNKNOWN, EXPECT_EXIT_FAIL creates a race condition
+ # if the process exits normally concurrent with the call to kill.
+ self.expect = EXPECT_UNKNOWN
+ try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
@@ -380,8 +381,7 @@ class Broker(Popen):
if not retry(self.log_ready, timeout=timeout):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
- # Create a connection and a session. For a cluster broker this will
- # return after cluster init has finished.
+ # Create a connection and a session.
try:
c = self.connect(**kwargs)
try: c.session()
@@ -389,54 +389,6 @@ class Broker(Popen):
except Exception,e: raise RethrownException(
"Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
- def store_state(self):
- f = open(os.path.join(self.datadir, "cluster", "store.status"))
- try: uuids = f.readlines()
- finally: f.close()
- null_uuid="00000000-0000-0000-0000-000000000000\n"
- if len(uuids) < 2: return "unknown" # we looked while the file was being updated.
- if uuids[0] == null_uuid: return "empty"
- if uuids[1] == null_uuid: return "dirty"
- return "clean"
-
-class Cluster:
- """A cluster of brokers in a test."""
- # Client connection options for use in failover tests.
- CONNECTION_OPTIONS = "reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
-
- _cluster_count = 0
-
- def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
- self.test = test
- self._brokers=[]
- self.name = "cluster%d" % Cluster._cluster_count
- Cluster._cluster_count += 1
- # Use unique cluster name
- self.args = copy(args)
- self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, socket.gethostname(), os.getpid()) ]
- self.args += [ "--log-enable=info+", "--log-enable=debug+:cluster"]
- assert BrokerTest.cluster_lib, "Cannot locate cluster plug-in"
- self.args += [ "--load-module", BrokerTest.cluster_lib ]
- self.start_n(count, expect=expect, wait=wait, show_cmd=show_cmd)
-
- def start(self, name=None, expect=EXPECT_RUNNING, wait=True, args=[], port=0, show_cmd=False):
- """Add a broker to the cluster. Returns the index of the new broker."""
- if not name: name="%s-%d" % (self.name, len(self._brokers))
- self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port, show_cmd=show_cmd))
- return self._brokers[-1]
-
- def ready(self):
- for b in self: b.ready()
-
- def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
- for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
-
- # Behave like a list of brokers.
- def __len__(self): return len(self._brokers)
- def __getitem__(self,index): return self._brokers[index]
- def __iter__(self): return self._brokers.__iter__()
-
-
def browse(session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
@@ -473,7 +425,6 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
- cluster_lib = os.getenv("CLUSTER_LIB")
ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
@@ -525,11 +476,6 @@ class BrokerTest(TestCase):
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
- def cluster(self, count=0, args=[], expect=EXPECT_RUNNING, wait=True, show_cmd=False):
- """Create and return a cluster ready for use"""
- cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
- return cluster
-
def browse(self, *args, **kwargs): browse(*args, **kwargs)
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
@@ -558,14 +504,17 @@ class StoppableThread(Thread):
join(self)
if self.error: raise self.error
+# Options for a client that wants to reconnect automatically.
+RECONNECT_OPTIONS="reconnect:true,reconnect-timeout:10,reconnect-urls-replace:true"
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS,
- failover_updates=True, url=None):
+ connection_options=RECONNECT_OPTIONS,
+ failover_updates=True, url=None, args=[]):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
@@ -576,7 +525,7 @@ class NumberedSender(Thread):
"--address", "%s;{create:always}"%queue,
"--connection-options", "{%s}"%(connection_options),
"--content-stdin"
- ]
+ ] + args
if failover_updates: cmd += ["--failover-updates"]
self.sender = broker.test.popen(
cmd, expect=EXPECT_RUNNING, stdin=PIPE)
@@ -627,7 +576,7 @@ class NumberedReceiver(Thread):
sequentially numbered messages.
"""
def __init__(self, broker, sender=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS,
+ connection_options=RECONNECT_OPTIONS,
failover_updates=True, url=None):
"""
sender: enable flow control. Call sender.received(n) for each message received.
@@ -676,31 +625,6 @@ class NumberedReceiver(Thread):
join(self)
self.check()
-class ErrorGenerator(StoppableThread):
- """
- Thread that continuously generates errors by trying to consume from
- a non-existent queue. For cluster regression tests, error handling
- caused issues in the past.
- """
-
- def __init__(self, broker):
- StoppableThread.__init__(self)
- self.broker=broker
- broker.test.cleanup_stop(self)
- self.start()
-
- def run(self):
- c = self.broker.connect_old()
- try:
- while not self.stopped:
- try:
- c.session(str(qpid.datatypes.uuid4())).message_subscribe(
- queue="non-existent-queue")
- assert(False)
- except qpid.session.SessionException: pass
- time.sleep(0.01)
- except: pass # Normal if broker is killed.
-
def import_script(path):
"""
Import executable script at path as a module.