summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-08-02 12:10:52 +0000
committerRafael H. Schloming <rhs@apache.org>2010-08-02 12:10:52 +0000
commit7ae7231562aab29a31aaaf0c4741d32f9a1e6f01 (patch)
treefd8b6a4a25c5cf440672069f7669225105c0f07d
parent3105021bc9cc72152593c1bce615eabf6720995a (diff)
downloadqpid-python-7ae7231562aab29a31aaaf0c4741d32f9a1e6f01.tar.gz
fixed bug in flow control logic; added tests
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@981474 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--python/qpid/messaging/endpoints.py7
-rw-r--r--python/qpid/tests/messaging/__init__.py19
-rw-r--r--python/qpid/tests/messaging/endpoints.py46
3 files changed, 57 insertions, 15 deletions
diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py
index f989d6c918..7d7d4249a3 100644
--- a/python/qpid/messaging/endpoints.py
+++ b/python/qpid/messaging/endpoints.py
@@ -1009,8 +1009,9 @@ class Receiver(Endpoint, object):
if msg is None:
raise Empty()
elif self._capacity not in (0, UNLIMITED.value):
- if self.received - self.returned <= int(ceil(self.threshold * self._capacity)):
- self.granted = self.received + self._capacity
+ t = int(ceil(self.threshold * self._capacity))
+ if self.received - self.returned <= t:
+ self.granted = self.returned + self._capacity
self._wakeup()
return msg
@@ -1018,7 +1019,7 @@ class Receiver(Endpoint, object):
if self._capacity == UNLIMITED.value:
self.granted = UNLIMITED
else:
- self.granted = self.received + self._capacity
+ self.granted = self.returned + self._capacity
@synchronized
def close(self, timeout=None):
diff --git a/python/qpid/tests/messaging/__init__.py b/python/qpid/tests/messaging/__init__.py
index a160f38531..ddacf77609 100644
--- a/python/qpid/tests/messaging/__init__.py
+++ b/python/qpid/tests/messaging/__init__.py
@@ -18,6 +18,7 @@
#
import time
+from math import ceil
from qpid.harness import Skipped
from qpid.messaging import *
from qpid.tests import Test
@@ -134,9 +135,23 @@ class Base(Test):
contents = self.drain(rcv)
assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
- def assertAvailable(self, rcv, expected):
+ def assertAvailable(self, rcv, expected=None, lower=None, upper=None):
+ if expected is not None:
+ if lower is not None or upper is not None:
+ raise ValueError("cannot specify lower or upper when specifying expected")
+ lower = expected
+ upper = expected
+ else:
+ if lower is None:
+ lower = int(ceil(rcv.threshold*rcv.capacity))
+ if upper is None:
+ upper = rcv.capacity
+
p = rcv.available()
- assert p == expected, "expected %s, got %s" % (expected, p)
+ if upper == lower:
+ assert p == lower, "expected %s, got %s" % (lower, p)
+ else:
+ assert lower <= p <= upper, "expected %s to be in range [%s, %s]" % (p, lower, upper)
def sleep(self):
time.sleep(self.delay())
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index bc1706806c..b360482747 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -659,9 +659,9 @@ class ReceiverTests(Base):
def setup_receiver(self):
return self.ssn.receiver(RECEIVER_Q)
- def send(self, base, count = None):
+ def send(self, base, count = None, sync=True):
content = self.content(base, count)
- self.snd.send(content)
+ self.snd.send(content, sync=sync)
return content
def testFetch(self):
@@ -762,25 +762,51 @@ class ReceiverTests(Base):
self.ssn.acknowledge()
- def testCapacity(self):
- self.rcv.capacity = 5
+ def capacityTest(self, capacity, threshold=None):
+ if threshold is not None:
+ self.rcv.threshold = threshold
+ self.rcv.capacity = capacity
self.assertAvailable(self.rcv, 0)
- for i in range(15):
- self.send("testCapacity", i)
+ for i in range(2*capacity):
+ self.send("capacityTest(%s, %s)" % (capacity, threshold), i, sync=False)
+ self.snd.sync()
self.sleep()
- self.assertAvailable(self.rcv, 5)
+ self.assertAvailable(self.rcv)
- self.drain(self.rcv, limit = 5)
+ first = capacity/2
+ second = capacity - first
+ self.drain(self.rcv, limit = first)
+ self.sleep()
+ self.assertAvailable(self.rcv)
+ self.drain(self.rcv, limit = second)
self.sleep()
- self.assertAvailable(self.rcv, 5)
+ self.assertAvailable(self.rcv)
drained = self.drain(self.rcv)
- assert len(drained) == 10, "%s, %s" % (len(drained), drained)
+ assert len(drained) == capacity, "%s, %s" % (len(drained), drained)
self.assertAvailable(self.rcv, 0)
self.ssn.acknowledge()
+ def testCapacity5(self):
+ self.capacityTest(5)
+
+ def testCapacity5Threshold1(self):
+ self.capacityTest(5, 1)
+
+ def testCapacity10(self):
+ self.capacityTest(10)
+
+ def testCapacity10Threshold1(self):
+ self.capacityTest(10, 1)
+
+ def testCapacity100(self):
+ self.capacityTest(100)
+
+ def testCapacity100Threshold1(self):
+ self.capacityTest(100, 1)
+
def testCapacityUNLIMITED(self):
self.rcv.capacity = UNLIMITED
self.assertAvailable(self.rcv, 0)