summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py34
1 files changed, 27 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 931784024e..84331134a9 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -90,6 +90,17 @@ def default(value, default):
AMQP_PORT = 5672
AMQPS_PORT = 5671
+class Constant:
+
+ def __init__(self, name, value=None):
+ self.name = name
+ self.value = value
+
+ def __repr__(self):
+ return self.name
+
+UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
+
class Connection(Lockable):
"""
@@ -622,6 +633,7 @@ class Receiver(Lockable):
self.source = source
self.filter = filter
self.started = started
+ self.capacity = UNLIMITED
self.closed = False
self.listener = None
self._ssn = None
@@ -658,6 +670,14 @@ class Receiver(Lockable):
def pending(self):
return self.session._count(self._pred)
+ def _capacity(self):
+ if not self.started:
+ return 0
+ elif self.capacity is UNLIMITED:
+ return self.capacity.value
+ else:
+ return self.capacity
+
@synchronized
def listen(self, listener=None):
"""
@@ -681,14 +701,14 @@ class Receiver(Lockable):
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
- 0xFFFFFFFFL)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+ if self.capacity is not UNLIMITED or not self.started:
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
+ UNLIMITED.value)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
self._ssn.message_flush(self.destination)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
- 0xFFFFFFFFL, sync=True)
+ self._start()
self._ssn.sync()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
@@ -696,8 +716,8 @@ class Receiver(Lockable):
return msg
def _start(self):
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value)
+ self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity())
@synchronized
def start(self):