summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py81
1 files changed, 47 insertions, 34 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 752e5603c8..5f235e4451 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -76,7 +76,7 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01):
+def retry(function, timeout=1, delay=.01):
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
@@ -198,16 +198,17 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
+ self.wait()
def kill(self):
- try: subprocess.Popen.kill(self)
+ try:
+ subprocess.Popen.kill(self)
except AttributeError: # No terminate method
try:
os.kill( self.pid , signal.SIGKILL)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
- self._cleanup()
+ self.wait()
def _cleanup(self):
"""Clean up after a dead process"""
@@ -276,8 +277,8 @@ class Broker(Popen):
self.find_log()
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
- if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
+ cmd += ["--log-enable=%s"%(log_level or "info+") ]
+
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
if show_cmd: print cmd
@@ -444,6 +445,7 @@ class BrokerTest(TestCase):
# Environment settings.
qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC"))
cluster_lib = os.getenv("CLUSTER_LIB")
+ ha_lib = os.getenv("HA_LIB")
xml_lib = os.getenv("XML_LIB")
qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
qpid_route_exec = os.getenv("QPID_ROUTE_EXEC")
@@ -499,26 +501,32 @@ class BrokerTest(TestCase):
cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd)
return cluster
- def browse(self, session, queue, timeout=0):
+ def browse(self, session, queue, timeout=0, transform=lambda m: m.content):
"""Return a list with the contents of each message on queue."""
r = session.receiver("%s;{mode:browse}"%(queue))
r.capacity = 100
try:
contents = []
try:
- while True: contents.append(r.fetch(timeout=timeout).content)
+ while True: contents.append(transform(r.fetch(timeout=timeout)))
except messaging.Empty: pass
finally: r.close()
return contents
- def assert_browse(self, session, queue, expect_contents, timeout=0):
+ def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
- actual_contents = self.browse(session, queue, timeout)
+ actual_contents = self.browse(session, queue, timeout, transform=transform)
self.assertEqual(expect_contents, actual_contents)
-def join(thread, timeout=10):
+ def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content):
+ """Wait up to timeout for contents of queue to match expect_contents"""
+ test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents
+ retry(test, timeout, delay)
+ self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform))
+
+def join(thread, timeout=1):
thread.join(timeout)
if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread)
@@ -548,22 +556,22 @@ class NumberedSender(Thread):
"""
def __init__(self, broker, max_depth=None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ connection_options=Cluster.CONNECTION_OPTIONS,
+ failover_updates=True, url=None):
"""
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.notify_received(n) to be called each time messages are received.
"""
Thread.__init__(self)
+ cmd = ["qpid-send",
+ "--broker", url or broker.host_port(),
+ "--address", "%s;{create:always}"%queue,
+ "--connection-options", "{%s}"%(connection_options),
+ "--content-stdin"
+ ]
+ if failover_updates: cmd += ["--failover-updates"]
self.sender = broker.test.popen(
- ["qpid-send",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
- "--content-stdin"
- ],
- expect=EXPECT_RUNNING,
- stdin=PIPE)
+ cmd, expect=EXPECT_RUNNING, stdin=PIPE)
self.condition = Condition()
self.max = max_depth
self.received = 0
@@ -610,30 +618,31 @@ class NumberedReceiver(Thread):
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker, sender = None, queue="test-queue",
- connection_options=Cluster.CONNECTION_OPTIONS):
+ def __init__(self, broker, sender=None, queue="test-queue",
+ connection_options=Cluster.CONNECTION_OPTIONS,
+ failover_updates=True, url=None):
"""
sender: enable flow control. Call sender.received(n) for each message received.
"""
Thread.__init__(self)
self.test = broker.test
+ cmd = ["qpid-receive",
+ "--broker", url or broker.host_port(),
+ "--address", "%s;{create:always}"%queue,
+ "--connection-options", "{%s}"%(connection_options),
+ "--forever"
+ ]
+ if failover_updates: cmd += [ "--failover-updates" ]
self.receiver = self.test.popen(
- ["qpid-receive",
- "--broker", "localhost:%s"%broker.port(),
- "--address", "%s;{create:always}"%queue,
- "--failover-updates",
- "--connection-options", "{%s}"%(connection_options),
- "--forever"
- ],
- expect=EXPECT_RUNNING,
- stdout=PIPE)
+ cmd, expect=EXPECT_RUNNING, stdout=PIPE)
self.lock = Lock()
self.error = None
self.sender = sender
self.received = 0
def read_message(self):
- return int(self.receiver.stdout.readline())
+ n = int(self.receiver.stdout.readline())
+ return n
def run(self):
try:
@@ -649,10 +658,14 @@ class NumberedReceiver(Thread):
except Exception:
self.error = RethrownException(self.receiver.pname)
+ def check(self):
+ """Raise an exception if there has been an error"""
+ if self.error: raise self.error
+
def stop(self):
"""Returns when termination message is received"""
join(self)
- if self.error: raise self.error
+ self.check()
class ErrorGenerator(StoppableThread):
"""