summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2009-11-24 22:40:53 +0000
committerAlan Conway <aconway@apache.org>2009-11-24 22:40:53 +0000
commit51d4bf1a1a07e53164c6e771f6ecf10e3ffca4ec (patch)
treefd5a110a8e22909a21c6c165fed801215ad87a96 /python
parent0fb7ff9cfbfd01e9093c2c6021a5915696d2a089 (diff)
downloadqpid-python-51d4bf1a1a07e53164c6e771f6ecf10e3ffca4ec.tar.gz
Added flow control to failover_test in cluster_tests.py.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@883909 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/brokertest.py34
1 files changed, 30 insertions, 4 deletions
diff --git a/python/qpid/brokertest.py b/python/qpid/brokertest.py
index c6252773a5..7458638cfd 100644
--- a/python/qpid/brokertest.py
+++ b/python/qpid/brokertest.py
@@ -324,26 +324,48 @@ class NumberedSender(StoppableThread):
Thread to run a sender client and send numbered messages until stopped.
"""
- def __init__(self, broker):
+ def __init__(self, broker, max_depth=None):
+ """
+ max_depth: enable flow control, ensure sent - received <= max_depth.
+ Requires self.received(n) to be called each time messages are received.
+ """
StoppableThread.__init__(self)
self.sender = broker.test.popen(
[broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING)
+ self.condition = Condition()
+ self.max = max_depth
+ self.received = 0
def run(self):
try:
self.sent = 0
while not self.stopped:
+ if self.max:
+ self.condition.acquire()
+ while self.sent - self.received > self.max:
+ self.condition.wait()
+ self.condition.release()
self.sender.stdin.write(str(self.sent)+"\n")
self.sender.stdin.flush()
self.sent += 1
except Exception, e: self.error = RethrownException(e, self.sender.pname)
+ def notify_received(self, count):
+ """Called by receiver to enable flow control. count = messages received so far."""
+ self.condition.acquire()
+ self.received = count
+ self.condition.notify()
+ self.condition.release()
+
class NumberedReceiver(Thread):
"""
Thread to run a receiver client and verify it receives
sequentially numbered messages.
"""
- def __init__(self, broker):
+ def __init__(self, broker, sender = None):
+ """
+ sender: enable flow control. Call sender.received(n) for each message received.
+ """
Thread.__init__(self)
self.test = broker.test
self.receiver = self.test.popen(
@@ -351,7 +373,8 @@ class NumberedReceiver(Thread):
self.stopat = None
self.lock = Lock()
self.error = None
-
+ self.sender = sender
+
def run(self):
try:
self.received = 0
@@ -360,7 +383,10 @@ class NumberedReceiver(Thread):
try:
m = int(self.receiver.stdout.readline())
assert(m <= self.received) # Allow for duplicates
- if (m == self.received): self.received += 1
+ if (m == self.received):
+ self.received += 1
+ if self.sender:
+ self.sender.notify_received(self.received)
finally:
self.lock.release()
except Exception, e: