diff options
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 81 |
1 files changed, 47 insertions, 34 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 752e5603c8..5f235e4451 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -76,7 +76,7 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=10, delay=.01): +def retry(function, timeout=1, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" @@ -198,16 +198,17 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - self._cleanup() + self.wait() def kill(self): - try: subprocess.Popen.kill(self) + try: + subprocess.Popen.kill(self) except AttributeError: # No terminate method try: os.kill( self.pid , signal.SIGKILL) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - self._cleanup() + self.wait() def _cleanup(self): """Clean up after a dead process""" @@ -276,8 +277,8 @@ class Broker(Popen): self.find_log() cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] - if log_level != None: - cmd += ["--log-enable=%s" % log_level] + cmd += ["--log-enable=%s"%(log_level or "info+") ] + self.datadir = self.name cmd += ["--data-dir", self.datadir] if show_cmd: print cmd @@ -444,6 +445,7 @@ class BrokerTest(TestCase): # Environment settings. qpidd_exec = os.path.abspath(checkenv("QPIDD_EXEC")) cluster_lib = os.getenv("CLUSTER_LIB") + ha_lib = os.getenv("HA_LIB") xml_lib = os.getenv("XML_LIB") qpid_config_exec = os.getenv("QPID_CONFIG_EXEC") qpid_route_exec = os.getenv("QPID_ROUTE_EXEC") @@ -499,26 +501,32 @@ class BrokerTest(TestCase): cluster = Cluster(self, count, args, expect=expect, wait=wait, show_cmd=show_cmd) return cluster - def browse(self, session, queue, timeout=0): + def browse(self, session, queue, timeout=0, transform=lambda m: m.content): """Return a list with the contents of each message on queue.""" r = session.receiver("%s;{mode:browse}"%(queue)) r.capacity = 100 try: contents = [] try: - while True: contents.append(r.fetch(timeout=timeout).content) + while True: contents.append(transform(r.fetch(timeout=timeout))) except messaging.Empty: pass finally: r.close() return contents - def assert_browse(self, session, queue, expect_contents, timeout=0): + def assert_browse(self, session, queue, expect_contents, timeout=0, transform=lambda m: m.content): """Assert that the contents of messages on queue (as retrieved using session and timeout) exactly match the strings in expect_contents""" - actual_contents = self.browse(session, queue, timeout) + actual_contents = self.browse(session, queue, timeout, transform=transform) self.assertEqual(expect_contents, actual_contents) -def join(thread, timeout=10): + def assert_browse_retry(self, session, queue, expect_contents, timeout=1, delay=.01, transform=lambda m:m.content): + """Wait up to timeout for contents of queue to match expect_contents""" + test = lambda: self.browse(session, queue, 0, transform=transform) == expect_contents + retry(test, timeout, delay) + self.assertEqual(expect_contents, self.browse(session, queue, 0, transform=transform)) + +def join(thread, timeout=1): thread.join(timeout) if thread.isAlive(): raise Exception("Timed out joining thread %s"%thread) @@ -548,22 +556,22 @@ class NumberedSender(Thread): """ def __init__(self, broker, max_depth=None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS): + connection_options=Cluster.CONNECTION_OPTIONS, + failover_updates=True, url=None): """ max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.notify_received(n) to be called each time messages are received. """ Thread.__init__(self) + cmd = ["qpid-send", + "--broker", url or broker.host_port(), + "--address", "%s;{create:always}"%queue, + "--connection-options", "{%s}"%(connection_options), + "--content-stdin" + ] + if failover_updates: cmd += ["--failover-updates"] self.sender = broker.test.popen( - ["qpid-send", - "--broker", "localhost:%s"%broker.port(), - "--address", "%s;{create:always}"%queue, - "--failover-updates", - "--connection-options", "{%s}"%(connection_options), - "--content-stdin" - ], - expect=EXPECT_RUNNING, - stdin=PIPE) + cmd, expect=EXPECT_RUNNING, stdin=PIPE) self.condition = Condition() self.max = max_depth self.received = 0 @@ -610,30 +618,31 @@ class NumberedReceiver(Thread): Thread to run a receiver client and verify it receives sequentially numbered messages. """ - def __init__(self, broker, sender = None, queue="test-queue", - connection_options=Cluster.CONNECTION_OPTIONS): + def __init__(self, broker, sender=None, queue="test-queue", + connection_options=Cluster.CONNECTION_OPTIONS, + failover_updates=True, url=None): """ sender: enable flow control. Call sender.received(n) for each message received. """ Thread.__init__(self) self.test = broker.test + cmd = ["qpid-receive", + "--broker", url or broker.host_port(), + "--address", "%s;{create:always}"%queue, + "--connection-options", "{%s}"%(connection_options), + "--forever" + ] + if failover_updates: cmd += [ "--failover-updates" ] self.receiver = self.test.popen( - ["qpid-receive", - "--broker", "localhost:%s"%broker.port(), - "--address", "%s;{create:always}"%queue, - "--failover-updates", - "--connection-options", "{%s}"%(connection_options), - "--forever" - ], - expect=EXPECT_RUNNING, - stdout=PIPE) + cmd, expect=EXPECT_RUNNING, stdout=PIPE) self.lock = Lock() self.error = None self.sender = sender self.received = 0 def read_message(self): - return int(self.receiver.stdout.readline()) + n = int(self.receiver.stdout.readline()) + return n def run(self): try: @@ -649,10 +658,14 @@ class NumberedReceiver(Thread): except Exception: self.error = RethrownException(self.receiver.pname) + def check(self): + """Raise an exception if there has been an error""" + if self.error: raise self.error + def stop(self): """Returns when termination message is received""" join(self) - if self.error: raise self.error + self.check() class ErrorGenerator(StoppableThread): """ |