summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-12-02 20:32:12 +0000
committerAlan Conway <aconway@apache.org>2009-12-02 20:32:12 +0000
commit6643c4398a7cc2f262bef960d564a6b753166fd5 (patch)
tree932be5c7028cdbb570b593c6218b06b5e67ff8e5
parent59ab4a4ecaf9b6a820a0cf2e60a70add8e0bf31b (diff)
downloadqpid-python-6643c4398a7cc2f262bef960d564a6b753166fd5.tar.gz
Fix test race condition that was causing the test to hang.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@886297 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/python/qpid/brokertest.py48
1 files changed, 33 insertions, 15 deletions
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py
index 39f1e1a410..c3145c06ea 100644
--- a/qpid/python/qpid/brokertest.py
+++ b/qpid/python/qpid/brokertest.py
@@ -352,7 +352,7 @@ class StoppableThread(Thread):
self.join()
if self.error: raise self.error
-class NumberedSender(StoppableThread):
+class NumberedSender(Thread):
"""
Thread to run a sender client and send numbered messages until stopped.
"""
@@ -362,12 +362,14 @@ class NumberedSender(StoppableThread):
max_depth: enable flow control, ensure sent - received <= max_depth.
Requires self.received(n) to be called each time messages are received.
"""
- StoppableThread.__init__(self)
+ Thread.__init__(self)
self.sender = broker.test.popen(
[broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
self.condition = Condition()
self.max = max_depth
self.received = 0
+ self.stopped = False
+ self.error = None
def run(self):
try:
@@ -375,7 +377,7 @@ class NumberedSender(StoppableThread):
while not self.stopped:
if self.max:
self.condition.acquire()
- while self.sent - self.received > self.max:
+ while not self.stopped and self.sent - self.received > self.max:
self.condition.wait()
self.condition.release()
self.sender.stdin.write(str(self.sent)+"\n")
@@ -389,6 +391,16 @@ class NumberedSender(StoppableThread):
self.received = count
self.condition.notify()
self.condition.release()
+
+ def stop(self):
+ log.debug("NumberedSender.stop")
+ self.condition.acquire()
+ self.stopped = True
+ self.condition.notify()
+ self.condition.release()
+ self.join()
+ log.debug("NumberedSender.stop - joined")
+ if self.error: raise self.error
class NumberedReceiver(Thread):
"""
@@ -407,30 +419,36 @@ class NumberedReceiver(Thread):
self.lock = Lock()
self.error = None
self.sender = sender
-
+
+ def continue_test(self):
+ self.lock.acquire()
+ ret = self.stopat is None or self.received < self.stopat
+ self.lock.release()
+ return ret
+
def run(self):
try:
self.received = 0
- while self.stopat is None or self.received < self.stopat:
- self.lock.acquire()
- try:
- m = int(self.receiver.stdout.readline())
- assert(m <= self.received) # Allow for duplicates
- if (m == self.received):
- self.received += 1
- if self.sender:
- self.sender.notify_received(self.received)
- finally:
- self.lock.release()
+ while self.continue_test():
+ m = int(self.receiver.stdout.readline())
+ assert(m <= self.received) # Allow for duplicates
+ if (m == self.received):
+ self.received += 1
+ if self.sender:
+ self.sender.notify_received(self.received)
except Exception, e:
+ log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02:
self.error = RethrownException(e, self.receiver.pname)
def stop(self, count):
"""Returns when received >= count"""
+ log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02:
self.lock.acquire()
+ log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received))
self.stopat = count
self.lock.release()
self.join()
+ log.debug("NumberedReceiver.stop - joined")
if self.error: raise self.error
class ErrorGenerator(StoppableThread):