diff options
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 34 |
1 files changed, 27 insertions, 7 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 931784024e..84331134a9 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -90,6 +90,17 @@ def default(value, default): AMQP_PORT = 5672 AMQPS_PORT = 5671 +class Constant: + + def __init__(self, name, value=None): + self.name = name + self.value = value + + def __repr__(self): + return self.name + +UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL) + class Connection(Lockable): """ @@ -622,6 +633,7 @@ class Receiver(Lockable): self.source = source self.filter = filter self.started = started + self.capacity = UNLIMITED self.closed = False self.listener = None self._ssn = None @@ -658,6 +670,14 @@ class Receiver(Lockable): def pending(self): return self.session._count(self._pred) + def _capacity(self): + if not self.started: + return 0 + elif self.capacity is UNLIMITED: + return self.capacity.value + else: + return self.capacity + @synchronized def listen(self, listener=None): """ @@ -681,14 +701,14 @@ class Receiver(Lockable): @type timeout: float @param timeout: the time to wait for a message to be available """ - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - 0xFFFFFFFFL) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) + if self.capacity is not UNLIMITED or not self.started: + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, + UNLIMITED.value) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1) msg = self.session._get(self._pred, timeout=timeout) if msg is None: self._ssn.message_flush(self.destination) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, - 0xFFFFFFFFL, sync=True) + self._start() self._ssn.sync() msg = self.session._get(self._pred, timeout=0) if msg is None: @@ -696,8 +716,8 @@ class Receiver(Lockable): return msg def _start(self): - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity()) @synchronized def start(self): |