diff options
Diffstat (limited to 'python')
-rw-r--r-- | python/qpid/messaging/driver.py | 19 | ||||
-rw-r--r-- | python/qpid/messaging/endpoints.py | 7 |
2 files changed, 21 insertions, 5 deletions
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py index 8463aead13..16f1b2902e 100644 --- a/python/qpid/messaging/driver.py +++ b/python/qpid/messaging/driver.py @@ -208,6 +208,7 @@ class LinkIn: _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv _rcv.draining = False + _rcv.bytes_open = False _rcv.on_unlink = [] def do_link(self, sst, rcv, _rcv, type, subtype, action): @@ -762,6 +763,7 @@ class Engine: sst.write_op(SessionCommandPoint(sst.sent, 0)) sst.outgoing_idx = 0 sst.acked = [] + sst.acked_idx = 0 if ssn.transactional: sst.write_cmd(TxSelect()) self._attachments[ssn] = sst @@ -965,7 +967,8 @@ class Engine: self.process_receiver(rcv) if ssn.acked: - messages = [m for m in ssn.acked if m not in sst.acked] + messages = ssn.acked[sst.acked_idx:] + delta = len(messages) if messages: ids = RangedSet() @@ -975,6 +978,7 @@ class Engine: # could we deal this via some message-id based purge? if m._transfer_id is None: ssn.acked.remove(m) + delta -= 1 continue ids.add(m._transfer_id) disp = m._disposition or DEFAULT_DISPOSITION @@ -992,6 +996,7 @@ class Engine: def ack_ack(): for m in msgs: ssn.acked.remove(m) + sst.acked_idx -= 1 if not ssn.transactional: sst.acked.remove(m) return ack_ack @@ -1011,7 +1016,9 @@ class Engine: for m in msgs: log.debug("SACK[%s]: %s, %s", ssn.log_id, m, m._disposition) + # XXX: could add messages with _transfer_id of None sst.acked.extend(messages) + sst.acked_idx += delta if ssn.committing and not sst.committing: def commit_ok(): @@ -1076,11 +1083,15 @@ class Engine: delta = max(rcv.granted, rcv.received) - rcv.impending if delta is UNLIMITED: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + if not _rcv.bytes_open: + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + _rcv.bytes_open = True sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, UNLIMITED.value)) rcv.impending = UNLIMITED elif delta > 0: - sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + if not _rcv.bytes_open: + sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.byte, UNLIMITED.value)) + _rcv.bytes_open = True sst.write_cmd(MessageFlow(_rcv.destination, credit_unit.message, delta)) rcv.impending += delta elif delta < 0 and not rcv.draining: @@ -1088,6 +1099,7 @@ class Engine: def do_stop(): rcv.impending = rcv.received _rcv.draining = False + _rcv.bytes_open = False self.grant(rcv) sst.write_cmd(MessageStop(_rcv.destination), do_stop) @@ -1097,6 +1109,7 @@ class Engine: rcv.impending = rcv.received rcv.granted = rcv.impending _rcv.draining = False + _rcv.bytes_open = False rcv.draining = False sst.write_cmd(MessageFlush(_rcv.destination), do_flush) diff --git a/python/qpid/messaging/endpoints.py b/python/qpid/messaging/endpoints.py index f5f957c821..707aee3ed6 100644 --- a/python/qpid/messaging/endpoints.py +++ b/python/qpid/messaging/endpoints.py @@ -29,6 +29,7 @@ Areas that still need work: """ from logging import getLogger +from math import ceil from qpid.codec010 import StringCodec from qpid.concurrency import synchronized, Waiter, Condition from qpid.datatypes import Serial, uuid4 @@ -843,6 +844,7 @@ class Receiver(object): self._lock = self.session._lock self._capacity = 0 self._set_capacity(options.get("capacity", 0), False) + self.threshold = 0.5 @synchronized def _set_capacity(self, c, wakeup=True): @@ -931,8 +933,9 @@ class Receiver(object): if msg is None: raise Empty() elif self._capacity not in (0, UNLIMITED.value): - self.granted += 1 - self._wakeup() + if self.received - self.returned <= int(ceil(self.threshold * self._capacity)): + self.granted = self.received + self._capacity + self._wakeup() return msg def _grant(self): |