diff options
author | Alan Conway <aconway@apache.org> | 2009-12-02 20:32:12 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2009-12-02 20:32:12 +0000 |
commit | 6643c4398a7cc2f262bef960d564a6b753166fd5 (patch) | |
tree | 932be5c7028cdbb570b593c6218b06b5e67ff8e5 | |
parent | 59ab4a4ecaf9b6a820a0cf2e60a70add8e0bf31b (diff) | |
download | qpid-python-6643c4398a7cc2f262bef960d564a6b753166fd5.tar.gz |
Fix test race condition that was causing the test to hang.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@886297 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/python/qpid/brokertest.py | 48 |
1 files changed, 33 insertions, 15 deletions
diff --git a/qpid/python/qpid/brokertest.py b/qpid/python/qpid/brokertest.py index 39f1e1a410..c3145c06ea 100644 --- a/qpid/python/qpid/brokertest.py +++ b/qpid/python/qpid/brokertest.py @@ -352,7 +352,7 @@ class StoppableThread(Thread): self.join() if self.error: raise self.error -class NumberedSender(StoppableThread): +class NumberedSender(Thread): """ Thread to run a sender client and send numbered messages until stopped. """ @@ -362,12 +362,14 @@ class NumberedSender(StoppableThread): max_depth: enable flow control, ensure sent - received <= max_depth. Requires self.received(n) to be called each time messages are received. """ - StoppableThread.__init__(self) + Thread.__init__(self) self.sender = broker.test.popen( [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING) self.condition = Condition() self.max = max_depth self.received = 0 + self.stopped = False + self.error = None def run(self): try: @@ -375,7 +377,7 @@ class NumberedSender(StoppableThread): while not self.stopped: if self.max: self.condition.acquire() - while self.sent - self.received > self.max: + while not self.stopped and self.sent - self.received > self.max: self.condition.wait() self.condition.release() self.sender.stdin.write(str(self.sent)+"\n") @@ -389,6 +391,16 @@ class NumberedSender(StoppableThread): self.received = count self.condition.notify() self.condition.release() + + def stop(self): + log.debug("NumberedSender.stop") + self.condition.acquire() + self.stopped = True + self.condition.notify() + self.condition.release() + self.join() + log.debug("NumberedSender.stop - joined") + if self.error: raise self.error class NumberedReceiver(Thread): """ @@ -407,30 +419,36 @@ class NumberedReceiver(Thread): self.lock = Lock() self.error = None self.sender = sender - + + def continue_test(self): + self.lock.acquire() + ret = self.stopat is None or self.received < self.stopat + self.lock.release() + return ret + def run(self): try: self.received = 0 - while self.stopat is None or self.received < self.stopat: - self.lock.acquire() - try: - m = int(self.receiver.stdout.readline()) - assert(m <= self.received) # Allow for duplicates - if (m == self.received): - self.received += 1 - if self.sender: - self.sender.notify_received(self.received) - finally: - self.lock.release() + while self.continue_test(): + m = int(self.receiver.stdout.readline()) + assert(m <= self.received) # Allow for duplicates + if (m == self.received): + self.received += 1 + if self.sender: + self.sender.notify_received(self.received) except Exception, e: + log.debug("NumberedReceiver.run exception %s" % (e)) # FIXME aconway 2009-12-02: self.error = RethrownException(e, self.receiver.pname) def stop(self, count): """Returns when received >= count""" + log.debug("NumberedReceiver.stop") # FIXME aconway 2009-12-02: self.lock.acquire() + log.debug("NumberedReceiver.stop at %d, received=%d" % (count, self.received)) self.stopat = count self.lock.release() self.join() + log.debug("NumberedReceiver.stop - joined") if self.error: raise self.error class ErrorGenerator(StoppableThread): |