diff options
author | Rafael H. Schloming <rhs@apache.org> | 2009-08-24 13:38:21 +0000 |
---|---|---|
committer | Rafael H. Schloming <rhs@apache.org> | 2009-08-24 13:38:21 +0000 |
commit | 024580d818193979f6c3e0bae10939fe45ea4e68 (patch) | |
tree | e7c34cbdc50f66933bcc5a5ded9d7f33c2029832 /python/qpid/messaging.py | |
parent | 9ee0122456b1ae31cf6c3aaea213c81df0fcb3ec (diff) | |
download | qpid-python-024580d818193979f6c3e0bae10939fe45ea4e68.tar.gz |
added some test assertions; modified driver/client interaction for drain; and fixed handling of unlimited capacity
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@807210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r-- | python/qpid/messaging.py | 69 |
1 files changed, 39 insertions, 30 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 8a7a27d275..d2769dde4c 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -368,6 +368,10 @@ class Session(Lockable): sender = Sender(self, len(self.senders), target) self.senders.append(sender) self.wakeup() + # XXX: because of the lack of waiting here we can end up getting + # into the driver loop with messages sent for senders that haven't + # been linked yet, something similar can probably happen for + # receivers return sender @synchronized @@ -636,7 +640,7 @@ class Receiver(Lockable): self.started = started self.capacity = UNLIMITED self.granted = Serial(0) - self.draining = False + self.drain = False self.impending = Serial(0) self.received = Serial(0) self.returned = Serial(0) @@ -697,34 +701,41 @@ class Receiver(Lockable): if self._capacity() == 0: self.granted = self.returned + 1 self.wakeup() - self.ewait(lambda: self.impending == self.granted) + self.ewait(lambda: self.impending == self.granted) msg = self.session._get(self._pred, timeout=timeout) if msg is None: - self.draining = True + self.drain = True + self.granted = self.received + self.wakeup() + self.ewait(lambda: self.impending == self.received) + self.drain = False + self._grant() self.wakeup() - self.ewait(lambda: not self.draining) - assert self.granted == self.received - if self.capacity is not UNLIMITED: - self.granted += self._capacity() - self.wakeup() msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() - if self._capacity() not in (0, UNLIMITED.value): + elif self._capacity() not in (0, UNLIMITED.value): self.granted += 1 self.wakeup() return msg + def _grant(self): + if self.started: + if self.capacity is UNLIMITED: + self.granted = UNLIMITED + else: + self.granted = self.received + self._capacity() + else: + self.granted = self.received + + @synchronized def start(self): """ Start incoming message delivery for this receiver. """ self.started = True - if self.capacity is UNLIMITED: - self.granted = UNLIMITED - else: - self.granted = self.received + self._capacity() + self._grant() self.wakeup() @synchronized @@ -732,8 +743,8 @@ class Receiver(Lockable): """ Stop incoming message delivery for this receiver. """ - self.granted = self.received self.started = False + self._grant() self.wakeup() self.ewait(lambda: self.impending == self.received) @@ -890,6 +901,7 @@ class Driver(Lockable): for ssn in self.connection.sessions.values(): self.attach(ssn) self.process(ssn) + exi = None except: exi = sys.exc_info() @@ -1107,37 +1119,34 @@ class Driver(Lockable): if rcv.impending is UNLIMITED: delta = 0 else: - delta = UNLIMITED.value + delta = UNLIMITED + elif rcv.impending is UNLIMITED: + delta = -1 else: - delta = rcv.granted - rcv.impending + delta = max(rcv.granted, rcv.received) - rcv.impending - if delta > 0: + if delta is UNLIMITED: + _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) + _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value) + rcv.impending = UNLIMITED + elif delta > 0: _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value) _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta) rcv.impending += delta elif delta < 0: - _ssn.message_stop(rcv.destination, sync=True) + if rcv.drain: + _ssn.message_flush(rcv.destination, sync=True) + else: + _ssn.message_stop(rcv.destination, sync=True) # XXX: need to kill syncs _ssn.sync() rcv.impending = rcv.received - # XXX: this can recurse infinitely if granted drops below received self.grant(rcv) def process_receiver(self, rcv): if rcv.closed: return - _ssn = self._attachments[rcv.session] - _rcv = self._attachments[rcv] - self.grant(rcv) - if rcv.draining: - _ssn.message_flush(rcv.destination, sync=True) - # XXX: really need to make this async so that we don't give up the lock - _ssn.sync() - rcv.granted = rcv.received - rcv.impending = rcv.received - rcv.draining = False - def send(self, snd, msg): _ssn = self._attachments[snd.session] _snd = self._attachments[snd] |