diff options
Diffstat (limited to 'qpid/cpp/src/tests/brokertest.py')
-rw-r--r-- | qpid/cpp/src/tests/brokertest.py | 46 |
1 files changed, 24 insertions, 22 deletions
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 6e771bf5d6..19e97ce7aa 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -67,7 +67,7 @@ class ExceptionWrapper: def __init__(self, obj, msg): self.obj = obj self.msg = msg - + def __getattr__(self, name): func = getattr(self.obj, name) if type(func) != callable: @@ -97,11 +97,12 @@ def retry(function, timeout=10, delay=.01): """Call function until it returns True or timeout expires. Double the delay for each retry. Return True if function returns true, False if timeout expires.""" + deadline = time.time() + timeout while not function(): - if delay > timeout: delay = timeout + remaining = deadline - time.time() + if remaining <= 0: return False + delay = min(delay, remaining) time.sleep(delay) - timeout -= delay - if timeout <= 0: return False delay *= 2 return True @@ -191,7 +192,7 @@ class Popen(subprocess.Popen): def unexpected(self,msg): err = error_line(self.outfile("err")) or error_line(self.outfile("out")) raise BadProcessStatus("%s %s%s" % (self.pname, msg, err)) - + def stop(self): # Clean up at end of test. try: if self.expect == EXPECT_UNKNOWN: @@ -213,7 +214,7 @@ class Popen(subprocess.Popen): self.unexpected("expected error") finally: self.wait() # Clean up the process. - + def communicate(self, input=None): if input: self.stdin.write(input) @@ -231,7 +232,7 @@ class Popen(subprocess.Popen): def poll(self, _deadstate=None): # _deadstate required by base class in python 2.4 if self.returncode is None: # Pass _deadstate only if it has been set, there is no _deadstate - # parameter in Python 2.6 + # parameter in Python 2.6 if _deadstate is None: ret = subprocess.Popen.poll(self) else: ret = subprocess.Popen.poll(self, _deadstate) @@ -255,7 +256,7 @@ class Popen(subprocess.Popen): os.kill( self.pid , signal.SIGTERM) except AttributeError: # no os.kill, using taskkill.. (Windows only) os.popen('TASKKILL /PID ' +str(self.pid) + ' /F') - + def kill(self): try: subprocess.Popen.kill(self) except AttributeError: # No terminate method @@ -289,7 +290,7 @@ class Broker(Popen): while (os.path.exists(self.log)): self.log = "%s-%d.log" % (self.name, i) i += 1 - + def get_log(self): return os.path.abspath(self.log) @@ -319,7 +320,7 @@ class Broker(Popen): cmd += ["--log-to-file", self.log] cmd += ["--log-to-stderr=no"] if log_level != None: - cmd += ["--log-enable=%s" % log_level] + cmd += ["--log-enable=%s" % log_level] self.datadir = self.name cmd += ["--data-dir", self.datadir] Popen.__init__(self, cmd, expect, drain=False) @@ -362,7 +363,7 @@ class Broker(Popen): s = c.session(str(qpid.datatypes.uuid4())) s.queue_declare(queue=queue) c.close() - + def _prep_sender(self, queue, durable, xprops): s = queue + "; {create:always, node:{durable:" + str(durable) if xprops != None: s += ", x-declare:{" + xprops + "}" @@ -406,13 +407,14 @@ class Broker(Popen): def log_ready(self): """Return true if the log file exists and contains a broker ready message""" - if self._log_ready: return True - self._log_ready = find_in_file("notice Broker running", self.log) + if not self._log_ready: + self._log_ready = find_in_file("notice Broker running", self.log) + return self._log_ready def ready(self, **kwargs): """Wait till broker is ready to serve clients""" # First make sure the broker is listening by checking the log. - if not retry(self.log_ready, timeout=30): + if not retry(self.log_ready, timeout=60): raise Exception( "Timed out waiting for broker %s%s"%(self.name, error_line(self.log,5))) # Create a connection and a session. For a cluster broker this will @@ -421,8 +423,8 @@ class Broker(Popen): c = self.connect(**kwargs) try: c.session() finally: c.close() - except: raise RethrownException( - "Broker %s failed ready test%s"%(self.name,error_line(self.log, 5))) + except Exception,e: raise RethrownException( + "Broker %s not responding: (%s)%s"%(self.name,e,error_line(self.log, 5))) def store_state(self): uuids = open(os.path.join(self.datadir, "cluster", "store.status")).readlines() @@ -431,7 +433,7 @@ class Broker(Popen): if uuids[0] == null_uuid: return "empty" if uuids[1] == null_uuid: return "dirty" return "clean" - + class Cluster: """A cluster of brokers in a test.""" @@ -486,7 +488,7 @@ class BrokerTest(TestCase): rootdir = os.getcwd() def configure(self, config): self.config=config - + def setUp(self): outdir = self.config.defines.get("OUTDIR") or "brokertest.tmp" self.dir = os.path.join(self.rootdir, outdir, self.id()) @@ -561,7 +563,7 @@ class StoppableThread(Thread): self.stopped = True self.join() if self.error: raise self.error - + class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. @@ -620,7 +622,7 @@ class NumberedSender(Thread): self.join() self.write_message(-1) # end-of-messages marker. if self.error: raise self.error - + class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives @@ -647,7 +649,7 @@ class NumberedReceiver(Thread): def read_message(self): return int(self.receiver.stdout.readline()) - + def run(self): try: self.received = 0 @@ -679,7 +681,7 @@ class ErrorGenerator(StoppableThread): self.broker=broker broker.test.cleanup_stop(self) self.start() - + def run(self): c = self.broker.connect_old() try: |