diff options
author | Rafael H. Schloming <rhs@apache.org> | 2010-08-02 12:10:52 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2010-08-02 12:10:52 +0000 |
commit | 7ae7231562aab29a31aaaf0c4741d32f9a1e6f01 (patch) | |
tree | fd8b6a4a25c5cf440672069f7669225105c0f07d | |
parent | 3105021bc9cc72152593c1bce615eabf6720995a (diff) | |
download | qpid-python-7ae7231562aab29a31aaaf0c4741d32f9a1e6f01.tar.gz |
fixed bug in flow control logic; added tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@981474 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | python/qpid/messaging/endpoints.py | 7 | ||||
-rw-r--r-- | python/qpid/tests/messaging/__init__.py | 19 | ||||
-rw-r--r-- | python/qpid/tests/messaging/endpoints.py | 46 |
3 files changed, 57 insertions, 15 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index f989d6c918..7d7d4249a3 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -1009,8 +1009,9 @@ class Receiver(Endpoint, object): if msg is None: raise Empty() elif self._capacity not in (0, UNLIMITED.value): - if self.received - self.returned <= int(ceil(self.threshold * self._capacity)): - self.granted = self.received + self._capacity + t = int(ceil(self.threshold * self._capacity)) + if self.received - self.returned <= t: + self.granted = self.returned + self._capacity self._wakeup() return msg @@ -1018,7 +1019,7 @@ class Receiver(Endpoint, object): if self._capacity == UNLIMITED.value: self.granted = UNLIMITED else: - self.granted = self.received + self._capacity + self.granted = self.returned + self._capacity @synchronized def close(self, timeout=None): diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py index a160f38531..ddacf77609 100644 --- a/python/qpid/tests/messaging/__init__.py +++ b/python/qpid/tests/messaging/__init__.py @@ -18,6 +18,7 @@ # import time +from math import ceil from qpid.harness import Skipped from qpid.messaging import * from qpid.tests import Test @@ -134,9 +135,23 @@ class Base(Test): contents = self.drain(rcv) assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents) - def assertAvailable(self, rcv, expected): + def assertAvailable(self, rcv, expected=None, lower=None, upper=None): + if expected is not None: + if lower is not None or upper is not None: + raise ValueError("cannot specify lower or upper when specifying expected") + lower = expected + upper = expected + else: + if lower is None: + lower = int(ceil(rcv.threshold*rcv.capacity)) + if upper is None: + upper = rcv.capacity + p = rcv.available() - assert p == expected, "expected %s, got %s" % (expected, p) + if upper == lower: + assert p == lower, "expected %s, got %s" % (lower, p) + else: + assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper) def sleep(self): time.sleep(self.delay()) diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py index bc1706806c..b360482747 100644 --- a/python/qpid/tests/messaging/endpoints.py +++ b/python/qpid/tests/messaging/endpoints.py @@ -659,9 +659,9 @@ class ReceiverTests(Base): def setup_receiver(self): return self.ssn.receiver(RECEIVER_Q) - def send(self, base, count = None): + def send(self, base, count = None, sync=True): content = self.content(base, count) - self.snd.send(content) + self.snd.send(content, sync=sync) return content def testFetch(self): @@ -762,25 +762,51 @@ class ReceiverTests(Base): self.ssn.acknowledge() - def testCapacity(self): - self.rcv.capacity = 5 + def capacityTest(self, capacity, threshold=None): + if threshold is not None: + self.rcv.threshold = threshold + self.rcv.capacity = capacity self.assertAvailable(self.rcv, 0) - for i in range(15): - self.send("testCapacity", i) + for i in range(2*capacity): + self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False) + self.snd.sync() self.sleep() - self.assertAvailable(self.rcv, 5) + self.assertAvailable(self.rcv) - self.drain(self.rcv, limit = 5) + first = capacity/2 + second = capacity - first + self.drain(self.rcv, limit = first) + self.sleep() + self.assertAvailable(self.rcv) + self.drain(self.rcv, limit = second) self.sleep() - self.assertAvailable(self.rcv, 5) + self.assertAvailable(self.rcv) drained = self.drain(self.rcv) - assert len(drained) == 10, "%s, %s" % (len(drained), drained) + assert len(drained) == capacity, "%s, %s" % (len(drained), drained) self.assertAvailable(self.rcv, 0) self.ssn.acknowledge() + def testCapacity5(self): + self.capacityTest(5) + + def testCapacity5Threshold1(self): + self.capacityTest(5, 1) + + def testCapacity10(self): + self.capacityTest(10) + + def testCapacity10Threshold1(self): + self.capacityTest(10, 1) + + def testCapacity100(self): + self.capacityTest(100) + + def testCapacity100Threshold1(self): + self.capacityTest(100, 1) + def testCapacityUNLIMITED(self): self.rcv.capacity = UNLIMITED self.assertAvailable(self.rcv, 0) |