summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-02 20:32:12 +0000
committerAlan Conway <aconway@apache.org>2009-12-02 20:32:12 +0000
commitc742af513e2afe49de32d90300b0c83ef340569d (patch)
tree5294e2d0e98da35a2b9076c1f34824004057f25f /python
parent9bd37c771305c1744470bb24f5bf515ce8af5520 (diff)
downloadqpid-python-c742af513e2afe49de32d90300b0c83ef340569d.tar.gz
Fix test race condition that was causing the test to hang.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@886297 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/brokertest.py48
1 files changed, 33 insertions, 15 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index 39f1e1a410..c3145c06ea 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -352,7 +352,7 @@ class StoppableThread(Thread):
self.join()
if self.error: raise self.error
-class NumberedSender(StoppableThread):
+class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
@@ -362,12 +362,14 @@ class NumberedSender(StoppableThread):
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.received(n) to be called each time messages are received.
"""
- StoppableThread.__init__(self)
+ Thread.__init__(self)
self.sender = broker.test.popen(
[broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
self.condition = Condition()
self.max = max_depth
self.received = 0
+ self.stopped = False
+ self.error = None
def run(self):
try:
@@ -375,7 +377,7 @@ class NumberedSender(StoppableThread):
while not self.stopped:
if self.max:
self.condition.acquire()
- while self.sent - self.received > self.max:
+ while not self.stopped and self.sent - self.received > self.max:
self.condition.wait()
self.condition.release()
self.sender.stdin.write(str(self.sent)+"\n")
@@ -389,6 +391,16 @@ class NumberedSender(StoppableThread):
self.received = count
self.condition.notify()
self.condition.release()
+
+ def stop(self):
+ log.debug("NumberedSender.stop")
+ self.condition.acquire()
+ self.stopped = True
+ self.condition.notify()
+ self.condition.release()
+ self.join()
+ log.debug("NumberedSender.stop - joined")
+ if self.error: raise self.error
class NumberedReceiver(Thread):
"""
@@ -407,30 +419,36 @@ class NumberedReceiver(Thread):
self.lock = Lock()
self.error = None
self.sender = sender
-
+
+ def continue_test(self):
+ self.lock.acquire()
+ ret = self.stopat is None or self.received < self.stopat
+ self.lock.release()
+ return ret
+
def run(self):
try:
self.received = 0
- while self.stopat is None or self.received < self.stopat:
- self.lock.acquire()
- try:
- m = int(self.receiver.stdout.readline())
- assert(m <= self.received) # Allow for duplicates
- if (m == self.received):
- self.received += 1
- if self.sender:
- self.sender.notify_received(self.received)
- finally:
- self.lock.release()
+ while self.continue_test():
+ m = int(self.receiver.stdout.readline())
+ assert(m <= self.received) # Allow for duplicates
+ if (m == self.received):
+ self.received += 1
+ if self.sender:
+ self.sender.notify_received(self.received)
except Exception, e:
+ log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02:
self.error = RethrownException(e, self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
+ log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02:
self.lock.acquire()
+ log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received))
self.stopat = count
self.lock.release()
self.join()
+ log.debug("NumberedReceiver.stop - joined")
if self.error: raise self.error
class ErrorGenerator(StoppableThread):