summaryrefslogtreecommitdiff
path: root/python/qpid/messaging.py
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2009-08-24 13:38:21 +0000
committerRafael H. Schloming <rhs@apache.org>2009-08-24 13:38:21 +0000
commit024580d818193979f6c3e0bae10939fe45ea4e68 (patch)
treee7c34cbdc50f66933bcc5a5ded9d7f33c2029832 /python/qpid/messaging.py
parent9ee0122456b1ae31cf6c3aaea213c81df0fcb3ec (diff)
downloadqpid-python-024580d818193979f6c3e0bae10939fe45ea4e68.tar.gz
added some test assertions; modified driver/client interaction for drain; and fixed handling of unlimited capacity
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@807210 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/messaging.py')
-rw-r--r--python/qpid/messaging.py69
1 files changed, 39 insertions, 30 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py
index 8a7a27d275..d2769dde4c 100644
--- a/python/qpid/messaging.py
+++ b/python/qpid/messaging.py
@@ -368,6 +368,10 @@ class Session(Lockable):
sender = Sender(self, len(self.senders), target)
self.senders.append(sender)
self.wakeup()
+ # XXX: because of the lack of waiting here we can end up getting
+ # into the driver loop with messages sent for senders that haven't
+ # been linked yet, something similar can probably happen for
+ # receivers
return sender
@synchronized
@@ -636,7 +640,7 @@ class Receiver(Lockable):
self.started = started
self.capacity = UNLIMITED
self.granted = Serial(0)
- self.draining = False
+ self.drain = False
self.impending = Serial(0)
self.received = Serial(0)
self.returned = Serial(0)
@@ -697,34 +701,41 @@ class Receiver(Lockable):
if self._capacity() == 0:
self.granted = self.returned + 1
self.wakeup()
- self.ewait(lambda: self.impending == self.granted)
+ self.ewait(lambda: self.impending == self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self.draining = True
+ self.drain = True
+ self.granted = self.received
+ self.wakeup()
+ self.ewait(lambda: self.impending == self.received)
+ self.drain = False
+ self._grant()
self.wakeup()
- self.ewait(lambda: not self.draining)
- assert self.granted == self.received
- if self.capacity is not UNLIMITED:
- self.granted += self._capacity()
- self.wakeup()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
- if self._capacity() not in (0, UNLIMITED.value):
+ elif self._capacity() not in (0, UNLIMITED.value):
self.granted += 1
self.wakeup()
return msg
+ def _grant(self):
+ if self.started:
+ if self.capacity is UNLIMITED:
+ self.granted = UNLIMITED
+ else:
+ self.granted = self.received + self._capacity()
+ else:
+ self.granted = self.received
+
+
@synchronized
def start(self):
"""
Start incoming message delivery for this receiver.
"""
self.started = True
- if self.capacity is UNLIMITED:
- self.granted = UNLIMITED
- else:
- self.granted = self.received + self._capacity()
+ self._grant()
self.wakeup()
@synchronized
@@ -732,8 +743,8 @@ class Receiver(Lockable):
"""
Stop incoming message delivery for this receiver.
"""
- self.granted = self.received
self.started = False
+ self._grant()
self.wakeup()
self.ewait(lambda: self.impending == self.received)
@@ -890,6 +901,7 @@ class Driver(Lockable):
for ssn in self.connection.sessions.values():
self.attach(ssn)
self.process(ssn)
+
exi = None
except:
exi = sys.exc_info()
@@ -1107,37 +1119,34 @@ class Driver(Lockable):
if rcv.impending is UNLIMITED:
delta = 0
else:
- delta = UNLIMITED.value
+ delta = UNLIMITED
+ elif rcv.impending is UNLIMITED:
+ delta = -1
else:
- delta = rcv.granted - rcv.impending
+ delta = max(rcv.granted, rcv.received) - rcv.impending
- if delta > 0:
+ if delta is UNLIMITED:
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+ rcv.impending = UNLIMITED
+ elif delta > 0:
_ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
_ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
rcv.impending += delta
elif delta < 0:
- _ssn.message_stop(rcv.destination, sync=True)
+ if rcv.drain:
+ _ssn.message_flush(rcv.destination, sync=True)
+ else:
+ _ssn.message_stop(rcv.destination, sync=True)
# XXX: need to kill syncs
_ssn.sync()
rcv.impending = rcv.received
- # XXX: this can recurse infinitely if granted drops below received
self.grant(rcv)
def process_receiver(self, rcv):
if rcv.closed: return
- _ssn = self._attachments[rcv.session]
- _rcv = self._attachments[rcv]
-
self.grant(rcv)
- if rcv.draining:
- _ssn.message_flush(rcv.destination, sync=True)
- # XXX: really need to make this async so that we don't give up the lock
- _ssn.sync()
- rcv.granted = rcv.received
- rcv.impending = rcv.received
- rcv.draining = False
-
def send(self, snd, msg):
_ssn = self._attachments[snd.session]
_snd = self._attachments[snd]