summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/brokertest.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r--qpid/cpp/src/tests/brokertest.py46
1 files changed, 24 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 6e771bf5d6..19e97ce7aa 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -67,7 +67,7 @@ class ExceptionWrapper:
def __init__(self, obj, msg):
self.obj = obj
self.msg = msg
-
+
def __getattr__(self, name):
func = getattr(self.obj, name)
if type(func) != callable:
@@ -97,11 +97,12 @@ def retry(function, timeout=10, delay=.01):
"""Call function until it returns True or timeout expires.
Double the delay for each retry. Return True if function
returns true, False if timeout expires."""
+ deadline = time.time() + timeout
while not function():
- if delay > timeout: delay = timeout
+ remaining = deadline - time.time()
+ if remaining <= 0: return False
+ delay = min(delay, remaining)
time.sleep(delay)
- timeout -= delay
- if timeout <= 0: return False
delay *= 2
return True
@@ -191,7 +192,7 @@ class Popen(subprocess.Popen):
def unexpected(self,msg):
err = error_line(self.outfile("err")) or error_line(self.outfile("out"))
raise BadProcessStatus("%s %s%s" % (self.pname, msg, err))
-
+
def stop(self): # Clean up at end of test.
try:
if self.expect == EXPECT_UNKNOWN:
@@ -213,7 +214,7 @@ class Popen(subprocess.Popen):
self.unexpected("expected error")
finally:
self.wait() # Clean up the process.
-
+
def communicate(self, input=None):
if input:
self.stdin.write(input)
@@ -231,7 +232,7 @@ class Popen(subprocess.Popen):
def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4
if self.returncode is None:
# Pass _deadstate only if it has been set, there is no _deadstate
- # parameter in Python 2.6
+ # parameter in Python 2.6
if _deadstate is None: ret = subprocess.Popen.poll(self)
else: ret = subprocess.Popen.poll(self, _deadstate)
@@ -255,7 +256,7 @@ class Popen(subprocess.Popen):
os.kill( self.pid , signal.SIGTERM)
except AttributeError: # no os.kill, using taskkill.. (Windows only)
os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
-
+
def kill(self):
try: subprocess.Popen.kill(self)
except AttributeError: # No terminate method
@@ -289,7 +290,7 @@ class Broker(Popen):
while (os.path.exists(self.log)):
self.log = "%s-%d.log" % (self.name, i)
i += 1
-
+
def get_log(self):
return os.path.abspath(self.log)
@@ -319,7 +320,7 @@ class Broker(Popen):
cmd += ["--log-to-file", self.log]
cmd += ["--log-to-stderr=no"]
if log_level != None:
- cmd += ["--log-enable=%s" % log_level]
+ cmd += ["--log-enable=%s" % log_level]
self.datadir = self.name
cmd += ["--data-dir", self.datadir]
Popen.__init__(self, cmd, expect, drain=False)
@@ -362,7 +363,7 @@ class Broker(Popen):
s = c.session(str(qpid.datatypes.uuid4()))
s.queue_declare(queue=queue)
c.close()
-
+
def _prep_sender(self, queue, durable, xprops):
s = queue + "; {create:always, node:{durable:" + str(durable)
if xprops != None: s += ", x-declare:{" + xprops + "}"
@@ -406,13 +407,14 @@ class Broker(Popen):
def log_ready(self):
"""Return true if the log file exists and contains a broker ready message"""
- if self._log_ready: return True
- self._log_ready = find_in_file("notice Broker running", self.log)
+ if not self._log_ready:
+ self._log_ready = find_in_file("notice Broker running", self.log)
+ return self._log_ready
def ready(self, **kwargs):
"""Wait till broker is ready to serve clients"""
# First make sure the broker is listening by checking the log.
- if not retry(self.log_ready, timeout=30):
+ if not retry(self.log_ready, timeout=60):
raise Exception(
"Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5)))
# Create a connection and a session. For a cluster broker this will
@@ -421,8 +423,8 @@ class Broker(Popen):
c = self.connect(**kwargs)
try: c.session()
finally: c.close()
- except: raise RethrownException(
- "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5)))
+ except Exception,e: raise RethrownException(
+ "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5)))
def store_state(self):
uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines()
@@ -431,7 +433,7 @@ class Broker(Popen):
if uuids[0] == null_uuid: return "empty"
if uuids[1] == null_uuid: return "dirty"
return "clean"
-
+
class Cluster:
"""A cluster of brokers in a test."""
@@ -486,7 +488,7 @@ class BrokerTest(TestCase):
rootdir = os.getcwd()
def configure(self, config): self.config=config
-
+
def setUp(self):
outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp"
self.dir = os.path.join(self.rootdir, outdir, self.id())
@@ -561,7 +563,7 @@ class StoppableThread(Thread):
self.stopped = True
self.join()
if self.error: raise self.error
-
+
class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
@@ -620,7 +622,7 @@ class NumberedSender(Thread):
self.join()
self.write_message(-1) # end-of-messages marker.
if self.error: raise self.error
-
+
class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
@@ -647,7 +649,7 @@ class NumberedReceiver(Thread):
def read_message(self):
return int(self.receiver.stdout.readline())
-
+
def run(self):
try:
self.received = 0
@@ -679,7 +681,7 @@ class ErrorGenerator(StoppableThread):
self.broker=broker
broker.test.cleanup_stop(self)
self.start()
-
+
def run(self):
c = self.broker.connect_old()
try: